package main import ( "context" "log" "sync" ) // roomMeta caches per-room classification we need to handle a message: member // counts (for the 1:1 test, F3) and encryption state (F15). Rebuilt per process; // unknown fields are lazily fetched from the CS-API on first need — appservice // transactions carry no room summary. type roomMeta struct { joined, invited int countsKnown bool encrypted, encKnown bool } func (m *roomMeta) isDM() bool { return m.countsKnown && m.joined+m.invited == 2 } type Bot struct { cfg *Config log *log.Logger mx *MatrixClient xai *XAIClient st *Store // Transactions are delivered one at a time by Synapse, but guard the shared // maps/sets anyway so an unexpected concurrent call can't corrupt them. mu sync.Mutex seen *lruSet // event ids already handled (dedup within a session) botSent *lruSet // event ids the bot itself sent (reply-parent detection) meta map[string]*roomMeta buf map[string][]bufferedMsg globalNote map[string]string // roomID → UTC date we last sent the daily-limit notice } func NewBot(ctx context.Context, cfg *Config, logger *log.Logger) (*Bot, error) { mx := NewMatrixClient(cfg.HomeserverURL, cfg.ASToken, cfg.BotMXID) xai := NewXAIClient(cfg.XAIBaseURL, cfg.XAIAPIKey) st, err := OpenStore(cfg.statePath("ai-bot.db")) if err != nil { return nil, err } b := &Bot{ cfg: cfg, log: logger, mx: mx, xai: xai, st: st, seen: newLRUSet(5000), botSent: newLRUSet(5000), meta: make(map[string]*roomMeta), buf: make(map[string][]bufferedMsg), globalNote: make(map[string]string), } // Confirm the as_token + user_id resolves to BOT_MXID before serving. if err := b.verifyIdentity(ctx); err != nil { st.Close() return nil, err } // F23: ensure the profile has a display name (best-effort, idempotent). if err := mx.SetDisplayName(ctx, cfg.BotDisplayName); err != nil { logger.Printf("set display name failed (non-fatal): %v", err) } return b, nil } func (b *Bot) Close() { if b.st != nil { _ = b.st.Close() } } func (b *Bot) verifyIdentity(ctx context.Context) error { who, err := b.mx.Whoami(ctx) if err != nil { return err } if who != b.cfg.BotMXID { b.log.Fatalf("as_token resolves to %q but BOT_MXID is %q", who, b.cfg.BotMXID) } b.log.Printf("authenticated as %s", who) return nil } // Run starts the appservice transaction server and blocks until ctx is cancelled. func (b *Bot) Run(ctx context.Context) error { as := NewAppService(b.cfg, b.log, b.st, b.handleTransaction) return as.Serve(ctx) } // handleTransaction processes one pushed transaction's events in order. func (b *Bot) handleTransaction(ctx context.Context, events []Event) { b.mu.Lock() defer b.mu.Unlock() for i := range events { b.handleEvent(ctx, &events[i]) } } func (b *Bot) handleEvent(ctx context.Context, ev *Event) { if ev.EventID == "" || ev.RoomID == "" { return } if !b.seen.Add(ev.EventID) { return } switch ev.Type { case "m.room.member": if ev.StateKey != nil && *ev.StateKey == b.cfg.BotMXID { b.handleSelfMembership(ctx, ev) } case "m.room.encryption": m := b.getMeta(ev.RoomID) m.encrypted, m.encKnown = true, true case "m.room.message": b.handleMessage(ctx, ev) } } // handleSelfMembership reacts to membership changes for the bot user: auto-join // invites from allowed servers (F11), reject others, forget rooms we leave. func (b *Bot) handleSelfMembership(ctx context.Context, ev *Event) { switch ev.membershipOf() { case "invite": if b.cfg.AllowedServers[serverOf(ev.Sender)] { b.log.Printf("accepting invite to %s from %s", ev.RoomID, ev.Sender) if err := b.mx.JoinRoom(ctx, ev.RoomID); err != nil { b.log.Printf("join %s failed: %v", ev.RoomID, err) } } else { b.log.Printf("rejecting invite to %s from %q (server not allowed)", ev.RoomID, ev.Sender) if err := b.mx.LeaveRoom(ctx, ev.RoomID); err != nil { b.log.Printf("leave (reject) %s failed: %v", ev.RoomID, err) } } case "leave", "ban": delete(b.meta, ev.RoomID) delete(b.buf, ev.RoomID) } } func (b *Bot) handleMessage(ctx context.Context, ev *Event) { roomID := ev.RoomID m := b.getMeta(roomID) // A9/F15: re-check encryption; if (or once) encrypted, warn once and skip — // the bot can't read it. b.ensureEncryption(ctx, roomID, m) if m.encrypted { b.warnEncryptedOnce(ctx, roomID) return } mc, ok := ev.DecodeMessage() if !ok { return } // Edits re-carry m.mentions; never re-trigger or replay them (F16). if mc.IsReplace() { return } // Buffer prior context BEFORE classifying so buildContext sees history only. history := b.buf[roomID] b.appendBuf(roomID, bufferedMsg{sender: ev.Sender, body: mc.Body, isBot: ev.Sender == b.cfg.BotMXID}) if ev.Sender == b.cfg.BotMXID { return // our own message (also tracked via botSent) } if mc.MsgType == "m.notice" { return // anti-loop: ignore notices (ours and other bots') } b.ensureCounts(ctx, roomID, m) replyParentIsBot := mc.RelatesTo != nil && mc.RelatesTo.InReplyTo != nil && b.botSent.Has(mc.RelatesTo.InReplyTo.EventID) if !(m.isDM() || mentionsBot(mc, b.cfg.BotMXID, replyParentIsBot)) { return } b.respond(ctx, roomID, m, ev, mc, history) } func (b *Bot) respond(ctx context.Context, roomID string, m *roomMeta, ev *Event, mc *MessageContent, history []bufferedMsg) { switch res, err := b.st.Reserve(ev.Sender, b.cfg.PerUserDailyCap, b.cfg.DailyUSDCeiling); { case err != nil: b.log.Printf("limiter reserve failed: %v", err) return case res == reserveDeniedUser: // Silent drop — per-user cap is anti-abuse (F24). return case res == reserveDeniedGlobal: // Global USD ceiling — notice once per room per day, then stay quiet. if b.globalNote[roomID] != todayUTC() { b.globalNote[roomID] = todayUTC() b.sendNotice(ctx, roomID, ev, mc, noticeDailyLimit) } return } msgs := buildContext(b.cfg.SystemPrompt, history, m.isDM(), mc.Body, b.cfg.MaxCtxEvent, 8000) resp, err := b.xai.Complete(ctx, b.cfg.XAIModel, msgs, b.cfg.MaxOutTok, b.cfg.XAITemp) if err != nil { // at-most-once already retried transient failures inside Complete; refund // the reserved request so an xAI outage doesn't burn the user's daily cap. b.log.Printf("xai completion failed for %s: %v", ev.Sender, err) if rerr := b.st.RefundRequest(ev.Sender); rerr != nil { b.log.Printf("refund failed: %v", rerr) } return } usd := computeUSD(resp.Usage, b.cfg) if err := b.st.Reconcile(ev.Sender, usd); err != nil { b.log.Printf("reconcile spend failed: %v", err) } b.sendNotice(ctx, roomID, ev, mc, resp.Text()) } // computeUSD prices the call from the API-returned token usage (authoritative // counts) and the configured per-1M prices — so the hard ceiling tracks real // usage even if the model/price changes (only the constants need updating). func computeUSD(u xaiUsage, cfg *Config) float64 { cached := u.PromptTokensDetails.CachedTokens nonCached := u.PromptTokens - cached if nonCached < 0 { nonCached = 0 } return float64(nonCached)/1e6*cfg.PriceInputPerM + float64(cached)/1e6*cfg.PriceCachedPerM + float64(u.CompletionTokens)/1e6*cfg.PriceOutputPerM } func (b *Bot) sendNotice(ctx context.Context, roomID string, trigger *Event, triggerMC *MessageContent, body string) { content := buildNoticeContent(trigger.EventID, trigger.Sender, triggerMC.RelatesTo, body) id, err := b.mx.SendEvent(ctx, roomID, "m.room.message", content) if err != nil { b.log.Printf("send notice to %s failed: %v", roomID, err) return } // Track our own reply so a future reply-to-it is recognised as addressing us, // and add it to the room buffer as an assistant turn for context continuity. b.botSent.Add(id) b.appendBuf(roomID, bufferedMsg{sender: b.cfg.BotMXID, body: body, isBot: true}) } func (b *Bot) warnEncryptedOnce(ctx context.Context, roomID string) { warned, err := b.st.HasWarnedEncrypted(roomID) if err != nil { b.log.Printf("warned-flag read failed: %v", err) return } if warned { return } content := map[string]any{"msgtype": "m.notice", "body": noticeEncryptedUnsupported} if _, err := b.mx.SendEvent(ctx, roomID, "m.room.message", content); err != nil { b.log.Printf("encrypted-notice to %s failed: %v", roomID, err) return } if err := b.st.SetWarnedEncrypted(roomID); err != nil { b.log.Printf("persist warned-flag failed: %v", err) } } // buildNoticeContent builds the reply. m.notice (not m.text) so the anti-loop // skip catches our own output. Thread-aware (F27): a trigger from a thread gets a // thread relation so the answer lands in the thread, not the main timeline. func buildNoticeContent(replyTo, sender string, triggerRelates *RelatesTo, body string) map[string]any { relates := map[string]any{} if triggerRelates != nil && triggerRelates.RelType == "m.thread" && triggerRelates.EventID != "" { relates["rel_type"] = "m.thread" relates["event_id"] = triggerRelates.EventID relates["is_falling_back"] = true relates["m.in_reply_to"] = map[string]any{"event_id": replyTo} } else { relates["m.in_reply_to"] = map[string]any{"event_id": replyTo} } return map[string]any{ "msgtype": "m.notice", "body": body, "m.mentions": map[string]any{"user_ids": []string{sender}}, "m.relates_to": relates, } } // --- per-room metadata helpers ------------------------------------------------- func (b *Bot) getMeta(roomID string) *roomMeta { m := b.meta[roomID] if m == nil { m = &roomMeta{} b.meta[roomID] = m } return m } func (b *Bot) ensureEncryption(ctx context.Context, roomID string, m *roomMeta) { if m.encKnown { return } enc, err := b.mx.RoomEncrypted(ctx, roomID) if err != nil { b.log.Printf("encryption probe %s failed: %v", roomID, err) return // leave unknown; re-probed on the next message } m.encrypted, m.encKnown = enc, true } func (b *Bot) ensureCounts(ctx context.Context, roomID string, m *roomMeta) { if m.countsKnown { return } joined, invited, err := b.mx.MemberCounts(ctx, roomID) if err != nil { b.log.Printf("member-count probe %s failed: %v", roomID, err) return } m.joined, m.invited, m.countsKnown = joined, invited, true } func (b *Bot) appendBuf(roomID string, msg bufferedMsg) { limit := b.cfg.MaxCtxEvent * 2 if limit < 8 { limit = 8 } buf := append(b.buf[roomID], msg) if len(buf) > limit { buf = buf[len(buf)-limit:] } b.buf[roomID] = buf }