449 lines
16 KiB
Go
449 lines
16 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"log/slog"
|
|
"sync"
|
|
)
|
|
|
|
// roomMeta caches per-room classification we need to handle a message: member
|
|
// counts (for the 1:1 test, F3), whether any member is outside ALLOWED_SERVERS,
|
|
// and encryption state (F15). Lazily fetched from the CS-API on first need
|
|
// (appservice transactions carry no room summary) and INVALIDATED whenever a
|
|
// third party's membership changes, so a 1:1 that gains a member is reclassified
|
|
// out of DM mode (no DM-mode third-party leak) and a newly added foreign member
|
|
// is caught.
|
|
type roomMeta struct {
|
|
joined, invited int
|
|
countsKnown bool
|
|
foreign bool // a joined/invited member is outside ALLOWED_SERVERS
|
|
encrypted, encKnown bool
|
|
}
|
|
|
|
func (m *roomMeta) isDM() bool { return m.countsKnown && m.joined+m.invited == 2 }
|
|
|
|
type Bot struct {
|
|
cfg *Config
|
|
log *slog.Logger
|
|
mx *MatrixClient
|
|
xai *XAIClient
|
|
st *Store
|
|
|
|
// Transactions are delivered one at a time by Synapse, but guard the shared
|
|
// maps/sets anyway so an unexpected concurrent call can't corrupt them.
|
|
mu sync.Mutex
|
|
seen *lruSet // event ids already handled (dedup within a session)
|
|
botSent *lruSet // event ids the bot itself sent (reply-parent detection)
|
|
meta map[string]*roomMeta
|
|
buf map[string][]bufferedMsg
|
|
globalNote map[string]string // roomID → UTC date we last sent the global daily-limit notice
|
|
}
|
|
|
|
func NewBot(ctx context.Context, cfg *Config, logger *slog.Logger) (*Bot, error) {
|
|
mx := NewMatrixClient(cfg.HomeserverURL, cfg.ASToken, cfg.BotMXID)
|
|
xai := NewXAIClient(cfg.XAIBaseURL, cfg.XAIAPIKey, logger)
|
|
|
|
st, err := OpenStore(cfg.statePath("ai-bot.db"))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
b := &Bot{
|
|
cfg: cfg,
|
|
log: logger,
|
|
mx: mx,
|
|
xai: xai,
|
|
st: st,
|
|
seen: newLRUSet(5000),
|
|
botSent: newLRUSet(5000),
|
|
meta: make(map[string]*roomMeta),
|
|
buf: make(map[string][]bufferedMsg),
|
|
globalNote: make(map[string]string),
|
|
}
|
|
|
|
// Confirm the as_token + user_id resolves to BOT_MXID before serving.
|
|
if err := b.verifyIdentity(ctx); err != nil {
|
|
st.Close()
|
|
return nil, err
|
|
}
|
|
// F23: ensure the profile has a display name (best-effort, idempotent).
|
|
if err := mx.SetDisplayName(ctx, cfg.BotDisplayName); err != nil {
|
|
logger.Warn("set display name failed (non-fatal)", "err", err)
|
|
}
|
|
return b, nil
|
|
}
|
|
|
|
func (b *Bot) Close() {
|
|
if b.st != nil {
|
|
_ = b.st.Close()
|
|
}
|
|
}
|
|
|
|
func (b *Bot) verifyIdentity(ctx context.Context) error {
|
|
who, err := b.mx.Whoami(ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if who != b.cfg.BotMXID {
|
|
return fmt.Errorf("as_token resolves to %q but BOT_MXID is %q", who, b.cfg.BotMXID)
|
|
}
|
|
b.log.Info("authenticated", "mxid", who)
|
|
return nil
|
|
}
|
|
|
|
// Run starts the appservice transaction server and blocks until ctx is cancelled.
|
|
func (b *Bot) Run(ctx context.Context) error {
|
|
as := NewAppService(b.cfg, b.log, b.st, b.handleTransaction)
|
|
return as.Serve(ctx)
|
|
}
|
|
|
|
// handleTransaction processes one pushed transaction's events in order.
|
|
func (b *Bot) handleTransaction(ctx context.Context, events []Event) {
|
|
b.mu.Lock()
|
|
defer b.mu.Unlock()
|
|
for i := range events {
|
|
b.handleEvent(ctx, &events[i])
|
|
}
|
|
}
|
|
|
|
func (b *Bot) handleEvent(ctx context.Context, ev *Event) {
|
|
if ev.EventID == "" || ev.RoomID == "" {
|
|
return
|
|
}
|
|
if !b.seen.Add(ev.EventID) {
|
|
return // already handled this session (fast in-memory path)
|
|
}
|
|
// Durable dedup across restarts: if a previous run already handled this event
|
|
// but crashed before its transaction was acked, Synapse re-pushes it — don't
|
|
// reprocess (no dup answer / double-bill). On a DB error, fall through; the
|
|
// in-memory set still guards this session.
|
|
if isNew, err := b.st.SeenEvent(ev.EventID); err != nil {
|
|
b.log.Error("durable dedup check failed", "id", ev.EventID, "err", err)
|
|
} else if !isNew {
|
|
return
|
|
}
|
|
b.log.Debug("event", "type", ev.Type, "room", ev.RoomID, "sender", ev.Sender, "id", ev.EventID)
|
|
switch ev.Type {
|
|
case "m.room.member":
|
|
if ev.StateKey != nil && *ev.StateKey == b.cfg.BotMXID {
|
|
b.handleSelfMembership(ctx, ev)
|
|
} else if m := b.meta[ev.RoomID]; m != nil {
|
|
// A third party's membership changed: counts + foreign flag are now
|
|
// stale. Re-probe on the next message so a 1:1 that gains a member drops
|
|
// out of DM mode (no third-party leak) and a new foreign member is caught.
|
|
m.countsKnown = false
|
|
}
|
|
case "m.room.encryption":
|
|
m := b.getMeta(ev.RoomID)
|
|
m.encrypted, m.encKnown = true, true
|
|
case "m.room.message":
|
|
b.handleMessage(ctx, ev)
|
|
}
|
|
}
|
|
|
|
// handleSelfMembership reacts to membership changes for the bot user: auto-join
|
|
// invites from allowed servers (F11), reject others, forget rooms we leave.
|
|
func (b *Bot) handleSelfMembership(ctx context.Context, ev *Event) {
|
|
switch ev.membershipOf() {
|
|
case "invite":
|
|
if !b.cfg.AllowedServers[serverOf(ev.Sender)] {
|
|
b.log.Warn("rejecting invite (server not allowed)", "room", ev.RoomID, "sender", ev.Sender)
|
|
if err := b.mx.LeaveRoom(ctx, ev.RoomID); err != nil {
|
|
b.log.Error("leave (reject) failed", "room", ev.RoomID, "err", err)
|
|
}
|
|
return
|
|
}
|
|
b.log.Info("accepting invite", "room", ev.RoomID, "sender", ev.Sender)
|
|
if err := b.mx.JoinRoom(ctx, ev.RoomID); err != nil {
|
|
b.log.Error("join failed", "room", ev.RoomID, "err", err)
|
|
return
|
|
}
|
|
// Fully-on-allowed-servers gate: a vojo.chat inviter can still pull the bot
|
|
// into a room that already holds federated third parties — leave at once.
|
|
m := b.getMeta(ev.RoomID)
|
|
b.ensureCounts(ctx, ev.RoomID, m)
|
|
if m.countsKnown && m.foreign {
|
|
b.leaveForeign(ctx, ev.RoomID)
|
|
}
|
|
case "leave", "ban":
|
|
delete(b.meta, ev.RoomID)
|
|
delete(b.buf, ev.RoomID)
|
|
}
|
|
}
|
|
|
|
// leaveForeign leaves a room that contains a member outside ALLOWED_SERVERS, so
|
|
// the bot only ever operates in rooms hosted entirely on allowed homeservers.
|
|
func (b *Bot) leaveForeign(ctx context.Context, roomID string) {
|
|
b.log.Warn("leaving room — a member is outside ALLOWED_SERVERS", "room", roomID)
|
|
if err := b.mx.LeaveRoom(ctx, roomID); err != nil {
|
|
b.log.Error("leave (foreign) failed", "room", roomID, "err", err)
|
|
}
|
|
}
|
|
|
|
func (b *Bot) handleMessage(ctx context.Context, ev *Event) {
|
|
roomID := ev.RoomID
|
|
m := b.getMeta(roomID)
|
|
|
|
// A9/F15: re-check encryption; if (or once) encrypted, warn once and skip —
|
|
// the bot can't read it.
|
|
b.ensureEncryption(ctx, roomID, m)
|
|
if m.encrypted {
|
|
b.log.Debug("skip: encrypted room", "room", roomID)
|
|
b.warnEncryptedOnce(ctx, roomID)
|
|
return
|
|
}
|
|
|
|
mc, ok := ev.DecodeMessage()
|
|
if !ok {
|
|
return
|
|
}
|
|
// Edits re-carry m.mentions; never re-trigger or replay them (F16).
|
|
if mc.IsReplace() {
|
|
return
|
|
}
|
|
// Only plain text ever reaches xAI: drop media (m.image/m.file/m.audio/…) and
|
|
// other custom msgtypes outright — the bot doesn't fetch or forward media, and
|
|
// a caption/filename is third-party content we keep out of xAI. m.notice falls
|
|
// through to its existing anti-loop handling below.
|
|
if mc.MsgType != "m.text" && mc.MsgType != "m.emote" && mc.MsgType != "m.notice" {
|
|
b.log.Debug("skip: non-text msgtype", "room", roomID, "sender", ev.Sender, "msgtype", mc.MsgType)
|
|
return
|
|
}
|
|
|
|
// Buffer prior context BEFORE classifying so buildContext sees history only.
|
|
history := b.buf[roomID]
|
|
b.appendBuf(roomID, bufferedMsg{sender: ev.Sender, body: mc.Body, isBot: ev.Sender == b.cfg.BotMXID})
|
|
|
|
if ev.Sender == b.cfg.BotMXID {
|
|
return // our own message (also tracked via botSent)
|
|
}
|
|
if mc.MsgType == "m.notice" {
|
|
return // anti-loop: ignore notices (ours and other bots')
|
|
}
|
|
|
|
b.ensureCounts(ctx, roomID, m)
|
|
// Stay only in rooms hosted entirely on allowed servers — never operate in (or
|
|
// "leak" the bot into) a federated room with non-consenting third parties.
|
|
if m.countsKnown && m.foreign {
|
|
b.leaveForeign(ctx, roomID)
|
|
return
|
|
}
|
|
replyParentIsBot := mc.RelatesTo != nil && mc.RelatesTo.InReplyTo != nil &&
|
|
b.botSent.Has(mc.RelatesTo.InReplyTo.EventID)
|
|
|
|
mentioned := mentionsBot(mc, b.cfg.BotMXID, replyParentIsBot)
|
|
if !(m.isDM() || mentioned) {
|
|
b.log.Debug("skip: not addressed (group without mention)", "room", roomID, "sender", ev.Sender,
|
|
"dm", m.isDM(), "joined", m.joined, "invited", m.invited, "countsKnown", m.countsKnown, "mentioned", mentioned)
|
|
return
|
|
}
|
|
b.respond(ctx, roomID, m, ev, mc, history)
|
|
}
|
|
|
|
// unlimitedCap is the effective per-user cap for UNLIMITED_USERS — high enough to
|
|
// never trip the per-user gate, while the global DAILY_USD_CEILING still applies.
|
|
const unlimitedCap = 1 << 30
|
|
|
|
func (b *Bot) respond(ctx context.Context, roomID string, m *roomMeta, ev *Event, mc *MessageContent, history []bufferedMsg) {
|
|
perUserCap := b.cfg.PerUserDailyCap
|
|
if b.cfg.UnlimitedUsers[ev.Sender] {
|
|
perUserCap = unlimitedCap
|
|
}
|
|
switch res, err := b.st.Reserve(ev.Sender, perUserCap, b.cfg.DailyUSDCeiling); {
|
|
case err != nil:
|
|
b.log.Error("limiter reserve failed", "sender", ev.Sender, "err", err)
|
|
return
|
|
case res == reserveDeniedUser:
|
|
// Per-user cap (anti-abuse, F24): stop answering, but always tell the user
|
|
// their request hit the limit — no message addressed to the bot is left
|
|
// silent. (m.notice → the anti-loop skip keeps this from re-triggering.)
|
|
b.log.Info("per-user daily cap reached; notifying", "sender", ev.Sender)
|
|
b.sendNotice(ctx, roomID, ev, mc, noticeUserLimit)
|
|
return
|
|
case res == reserveDeniedGlobal:
|
|
// Global USD ceiling — notice once per room per day, then stay quiet.
|
|
b.log.Warn("global daily USD ceiling reached", "room", roomID, "sender", ev.Sender)
|
|
if b.globalNote[roomID] != todayUTC() {
|
|
b.globalNote[roomID] = todayUTC()
|
|
b.sendNotice(ctx, roomID, ev, mc, noticeDailyLimit)
|
|
}
|
|
return
|
|
}
|
|
|
|
// Show "Vojo AI печатает…" while we build the answer; the deferred clear fires
|
|
// on every exit (success or failure). Pure UX — typing failures are best-effort.
|
|
b.setTyping(ctx, roomID, true)
|
|
defer b.setTyping(ctx, roomID, false)
|
|
|
|
msgs := buildContext(b.cfg.SystemPrompt, history, m.isDM(), mc.Body, b.cfg.MaxCtxEvent, 8000)
|
|
resp, err := b.xai.Complete(ctx, b.cfg.XAIModel, msgs, b.cfg.MaxOutTok, b.cfg.XAITemp)
|
|
if err != nil {
|
|
// at-most-once already retried transient failures inside Complete; refund
|
|
// the reserved request so an xAI outage doesn't burn the user's daily cap,
|
|
// and tell the user we couldn't answer (notice → no anti-loop re-trigger).
|
|
b.log.Error("xai completion failed", "sender", ev.Sender, "err", err)
|
|
if rerr := b.st.RefundRequest(ev.Sender); rerr != nil {
|
|
b.log.Error("refund failed", "sender", ev.Sender, "err", rerr)
|
|
}
|
|
b.sendNotice(ctx, roomID, ev, mc, noticeError)
|
|
return
|
|
}
|
|
|
|
// A 2xx from xAI is billed even if the text came back empty — always book the
|
|
// real cost so both caps see it (the old refund-without-reconcile on an empty
|
|
// 200 let such calls bypass the per-user cap and the global ceiling).
|
|
usd := computeUSD(resp.Usage, b.cfg)
|
|
if err := b.st.Reconcile(ev.Sender, usd); err != nil {
|
|
b.log.Error("reconcile spend failed", "sender", ev.Sender, "err", err)
|
|
}
|
|
|
|
text := resp.Text()
|
|
if text == "" {
|
|
b.log.Warn("xai returned empty completion (billed, nothing to send)", "sender", ev.Sender, "usd", usd)
|
|
return
|
|
}
|
|
b.log.Info("answered", "room", roomID, "sender", ev.Sender, "dm", m.isDM(),
|
|
"usd", usd, "prompt_tokens", resp.Usage.PromptTokens, "completion_tokens", resp.Usage.CompletionTokens)
|
|
b.sendNotice(ctx, roomID, ev, mc, text)
|
|
}
|
|
|
|
// computeUSD prices the call from the API-returned token usage (authoritative
|
|
// counts) and the configured per-1M prices — so the hard ceiling tracks real
|
|
// usage even if the model/price changes (only the constants need updating).
|
|
func computeUSD(u xaiUsage, cfg *Config) float64 {
|
|
cached := u.PromptTokensDetails.CachedTokens
|
|
nonCached := u.PromptTokens - cached
|
|
if nonCached < 0 {
|
|
nonCached = 0
|
|
}
|
|
return float64(nonCached)/1e6*cfg.PriceInputPerM +
|
|
float64(cached)/1e6*cfg.PriceCachedPerM +
|
|
float64(u.CompletionTokens)/1e6*cfg.PriceOutputPerM
|
|
}
|
|
|
|
func (b *Bot) sendNotice(ctx context.Context, roomID string, trigger *Event, triggerMC *MessageContent, body string) {
|
|
content := buildNoticeContent(trigger.EventID, trigger.Sender, triggerMC.RelatesTo, body)
|
|
id, err := b.mx.SendEvent(ctx, roomID, "m.room.message", content)
|
|
if err != nil {
|
|
b.log.Error("send notice failed", "room", roomID, "err", err)
|
|
return
|
|
}
|
|
// Track our own reply so a future reply-to-it is recognised as addressing us,
|
|
// and add it to the room buffer as an assistant turn for context continuity.
|
|
b.botSent.Add(id)
|
|
b.appendBuf(roomID, bufferedMsg{sender: b.cfg.BotMXID, body: body, isBot: true})
|
|
}
|
|
|
|
// setTyping sets/clears the bot's typing indicator (best-effort UX; failures are
|
|
// non-fatal). The 30s timeout comfortably covers a normal completion, and
|
|
// respond() defers a clear so the indicator ends the moment the answer is sent
|
|
// or fails.
|
|
func (b *Bot) setTyping(ctx context.Context, roomID string, typing bool) {
|
|
if err := b.mx.SendTyping(ctx, roomID, typing, 30000); err != nil {
|
|
b.log.Debug("set typing failed", "room", roomID, "typing", typing, "err", err)
|
|
}
|
|
}
|
|
|
|
func (b *Bot) warnEncryptedOnce(ctx context.Context, roomID string) {
|
|
warned, err := b.st.HasWarnedEncrypted(roomID)
|
|
if err != nil {
|
|
b.log.Error("warned-flag read failed", "room", roomID, "err", err)
|
|
return
|
|
}
|
|
if warned {
|
|
return
|
|
}
|
|
content := map[string]any{"msgtype": "m.notice", "body": noticeEncryptedUnsupported}
|
|
if _, err := b.mx.SendEvent(ctx, roomID, "m.room.message", content); err != nil {
|
|
b.log.Error("encrypted-notice failed", "room", roomID, "err", err)
|
|
return
|
|
}
|
|
if err := b.st.SetWarnedEncrypted(roomID); err != nil {
|
|
b.log.Error("persist warned-flag failed", "room", roomID, "err", err)
|
|
}
|
|
}
|
|
|
|
// buildNoticeContent builds the reply. m.notice (not m.text) so the anti-loop
|
|
// skip catches our own output. Thread-aware (F27): a trigger from a thread gets a
|
|
// thread relation so the answer lands in the thread, not the main timeline.
|
|
func buildNoticeContent(replyTo, sender string, triggerRelates *RelatesTo, body string) map[string]any {
|
|
relates := map[string]any{}
|
|
if triggerRelates != nil && triggerRelates.RelType == "m.thread" && triggerRelates.EventID != "" {
|
|
relates["rel_type"] = "m.thread"
|
|
relates["event_id"] = triggerRelates.EventID
|
|
relates["is_falling_back"] = true
|
|
relates["m.in_reply_to"] = map[string]any{"event_id": replyTo}
|
|
} else {
|
|
relates["m.in_reply_to"] = map[string]any{"event_id": replyTo}
|
|
}
|
|
content := map[string]any{
|
|
"msgtype": "m.notice",
|
|
"body": body,
|
|
"m.mentions": map[string]any{"user_ids": []string{sender}},
|
|
"m.relates_to": relates,
|
|
}
|
|
// The model answers in markdown; render it to org.matrix.custom.html so clients
|
|
// show formatting instead of raw `**`, `#`, lists, code fences. Only attach
|
|
// formatted_body when there's actual formatting — a plain answer keeps rendering
|
|
// from `body` exactly as before.
|
|
if html, formatted := markdownToHTML(body); formatted {
|
|
content["format"] = matrixHTMLFormat
|
|
content["formatted_body"] = html
|
|
}
|
|
return content
|
|
}
|
|
|
|
// --- per-room metadata helpers -------------------------------------------------
|
|
|
|
func (b *Bot) getMeta(roomID string) *roomMeta {
|
|
m := b.meta[roomID]
|
|
if m == nil {
|
|
m = &roomMeta{}
|
|
b.meta[roomID] = m
|
|
}
|
|
return m
|
|
}
|
|
|
|
func (b *Bot) ensureEncryption(ctx context.Context, roomID string, m *roomMeta) {
|
|
if m.encKnown {
|
|
return
|
|
}
|
|
enc, err := b.mx.RoomEncrypted(ctx, roomID)
|
|
if err != nil {
|
|
b.log.Warn("encryption probe failed", "room", roomID, "err", err)
|
|
return // leave unknown; re-probed on the next message
|
|
}
|
|
m.encrypted, m.encKnown = enc, true
|
|
}
|
|
|
|
func (b *Bot) ensureCounts(ctx context.Context, roomID string, m *roomMeta) {
|
|
if m.countsKnown {
|
|
return
|
|
}
|
|
joined, invited, servers, err := b.mx.RoomMembership(ctx, roomID)
|
|
if err != nil {
|
|
b.log.Warn("member probe failed", "room", roomID, "err", err)
|
|
return
|
|
}
|
|
foreign := false
|
|
for s := range servers {
|
|
if !b.cfg.AllowedServers[s] {
|
|
foreign = true
|
|
break
|
|
}
|
|
}
|
|
m.joined, m.invited, m.foreign, m.countsKnown = joined, invited, foreign, true
|
|
}
|
|
|
|
func (b *Bot) appendBuf(roomID string, msg bufferedMsg) {
|
|
limit := b.cfg.MaxCtxEvent * 2
|
|
if limit < 8 {
|
|
limit = 8
|
|
}
|
|
buf := append(b.buf[roomID], msg)
|
|
if len(buf) > limit {
|
|
buf = buf[len(buf)-limit:]
|
|
}
|
|
b.buf[roomID] = buf
|
|
}
|