From ebb2363d9d5982cc6a06df7439ff316216d9cc1a Mon Sep 17 00:00:00 2001 From: heaven Date: Mon, 1 Jun 2026 01:30:30 +0300 Subject: [PATCH] fix(ai-bot): ack transactions instantly with async per-room processing to stop slow-call freezes, and signal system states via emoji reactions --- apps/ai-bot/appservice.go | 18 +- apps/ai-bot/appservice_test.go | 47 ++- apps/ai-bot/bot.go | 522 +++++++++++++++++++++++--------- apps/ai-bot/concurrency_test.go | 82 +++++ apps/ai-bot/messages.go | 22 +- apps/ai-bot/store.go | 22 +- apps/ai-bot/util.go | 11 + apps/ai-bot/xai.go | 8 +- 8 files changed, 539 insertions(+), 193 deletions(-) create mode 100644 apps/ai-bot/concurrency_test.go diff --git a/apps/ai-bot/appservice.go b/apps/ai-bot/appservice.go index 447dc8cd..870fcee5 100644 --- a/apps/ai-bot/appservice.go +++ b/apps/ai-bot/appservice.go @@ -103,15 +103,23 @@ func (a *AppService) handleTransaction(w http.ResponseWriter, r *http.Request) { writeError(w, http.StatusBadRequest, "M_NOT_JSON", "invalid transaction body") return } - a.log.Debug("transaction received", "txn", txnID, "events", len(txn.Events)) - - // Process with the bot's long-lived context (not the request context) so a - // homeserver-side timeout can't cancel an in-flight reply mid-send. - a.handler(a.baseCtx, txn.Events) + a.log.Debug("transaction accepted", "txn", txnID, "events", len(txn.Events)) + // Mark the transaction done BEFORE processing and ack 200 immediately. The bot + // must answer Synapse fast: Synapse delivers transactions serially and waits for + // the 200, and if it's late (the handler used to block on the ~180s xAI call) it + // marks the AS down and replays with growing backoff — the "bot silent for + // minutes" symptom. Per-event durable dedup (Store.SeenEvent) is the real guard + // against double-answers, so acking before the work finishes is safe (at-most-once: + // a hard crash mid-processing drops the message rather than answering it twice). if err := a.store.MarkTxn(txnID); err != nil { a.log.Error("txn mark failed", "txn", txnID, "err", err) } + // Process off the request path with the bot's long-lived context (not the request + // context) so the work — and the eventual reply — survives the homeserver dropping + // the connection. + go a.handler(a.baseCtx, txn.Events) + writeJSON(w, http.StatusOK, struct{}{}) } diff --git a/apps/ai-bot/appservice_test.go b/apps/ai-bot/appservice_test.go index 2345af94..d915221b 100644 --- a/apps/ai-bot/appservice_test.go +++ b/apps/ai-bot/appservice_test.go @@ -9,22 +9,39 @@ import ( "path/filepath" "strings" "testing" + "time" ) -func newTestAS(t *testing.T, dispatched *[][]Event) (*AppService, *Store) { +// newTestAS wires an AppService whose handler pushes each dispatched batch onto a +// channel. Transactions are now processed asynchronously (the 200 is returned before +// the handler runs), so tests read from the channel with a timeout instead of +// inspecting a slice immediately after the call. +func newTestAS(t *testing.T) (*AppService, *Store, chan []Event) { t.Helper() st, err := OpenStore(filepath.Join(t.TempDir(), "t.db")) if err != nil { t.Fatalf("open store: %v", err) } + dispatched := make(chan []Event, 8) as := NewAppService( &Config{HSToken: "secret", BotMXID: "@ai:vojo.chat"}, slog.New(slog.NewTextHandler(io.Discard, nil)), st, - func(_ context.Context, ev []Event) { *dispatched = append(*dispatched, ev) }, + func(_ context.Context, ev []Event) { dispatched <- ev }, ) as.baseCtx = context.Background() - return as, st + return as, st, dispatched +} + +// waitDispatch returns the next dispatched batch, or (nil,false) if none arrives +// within the timeout. +func waitDispatch(ch chan []Event, timeout time.Duration) ([]Event, bool) { + select { + case ev := <-ch: + return ev, true + case <-time.After(timeout): + return nil, false + } } func txnReq(txnID, auth, body string) *http.Request { @@ -37,8 +54,7 @@ func txnReq(txnID, auth, body string) *http.Request { } func TestTransactionAuthAndIdempotency(t *testing.T) { - var dispatched [][]Event - as, st := newTestAS(t, &dispatched) + as, st, dispatched := newTestAS(t) defer st.Close() body := `{"events":[{"type":"m.room.message","room_id":"!r:vojo.chat","event_id":"$1","sender":"@u:vojo.chat"}]}` @@ -48,18 +64,19 @@ func TestTransactionAuthAndIdempotency(t *testing.T) { if w.Code != http.StatusForbidden { t.Fatalf("bad token: got %d, want 403", w.Code) } - if len(dispatched) != 0 { - t.Fatalf("bad token must not dispatch, got %d", len(dispatched)) + if _, ok := waitDispatch(dispatched, 100*time.Millisecond); ok { + t.Fatalf("bad token must not dispatch") } - // Good hs_token → 200, one batch dispatched. + // Good hs_token → 200, one batch dispatched (asynchronously). w = httptest.NewRecorder() as.handleTransaction(w, txnReq("txn1", "secret", body)) if w.Code != http.StatusOK { t.Fatalf("good token: got %d, want 200", w.Code) } - if len(dispatched) != 1 || len(dispatched[0]) != 1 { - t.Fatalf("expected one dispatched batch of one event, got %v", dispatched) + batch, ok := waitDispatch(dispatched, time.Second) + if !ok || len(batch) != 1 { + t.Fatalf("expected one dispatched batch of one event, got %v ok=%v", batch, ok) } // Same txnId again → idempotent no-op (still 200, no re-dispatch). @@ -68,14 +85,13 @@ func TestTransactionAuthAndIdempotency(t *testing.T) { if w.Code != http.StatusOK { t.Fatalf("retry: got %d, want 200", w.Code) } - if len(dispatched) != 1 { - t.Fatalf("retried transaction must not re-dispatch, got %d batches", len(dispatched)) + if _, ok := waitDispatch(dispatched, 100*time.Millisecond); ok { + t.Fatalf("retried transaction must not re-dispatch") } } func TestTransactionLegacyQueryTokenAccepted(t *testing.T) { - var dispatched [][]Event - as, st := newTestAS(t, &dispatched) + as, st, _ := newTestAS(t) defer st.Close() r := httptest.NewRequest(http.MethodPut, "/transactions/txnX?access_token=secret", strings.NewReader(`{"events":[]}`)) @@ -88,8 +104,7 @@ func TestTransactionLegacyQueryTokenAccepted(t *testing.T) { } func TestUserQuery(t *testing.T) { - var dispatched [][]Event - as, st := newTestAS(t, &dispatched) + as, st, _ := newTestAS(t) defer st.Close() mk := func(uid string) *http.Request { diff --git a/apps/ai-bot/bot.go b/apps/ai-bot/bot.go index 280b31ee..2e424951 100644 --- a/apps/ai-bot/bot.go +++ b/apps/ai-bot/bot.go @@ -5,6 +5,7 @@ import ( "fmt" "log/slog" "sync" + "time" ) // roomMeta caches per-room classification we need to handle a message: member @@ -13,7 +14,9 @@ import ( // (appservice transactions carry no room summary) and INVALIDATED whenever a // third party's membership changes, so a 1:1 that gains a member is reclassified // out of DM mode (no DM-mode third-party leak) and a newly added foreign member -// is caught. +// is caught. All fields are guarded by Bot.mu — never read or written without it, +// because the slow generation and the lazy CS-API probes run in concurrent per-room +// goroutines. type roomMeta struct { joined, invited int countsKnown bool @@ -30,14 +33,18 @@ type Bot struct { xai *XAIClient st *Store - // Transactions are delivered one at a time by Synapse, but guard the shared - // maps/sets anyway so an unexpected concurrent call can't corrupt them. - mu sync.Mutex - seen *lruSet // event ids already handled (dedup within a session) - botSent *lruSet // event ids the bot itself sent (reply-parent detection) - meta map[string]*roomMeta - buf map[string][]bufferedMsg - globalNote map[string]string // roomID → UTC date we last sent the global daily-limit notice + // mu guards the in-memory maps/sets below. Each transaction is acked to Synapse + // immediately (appservice.go) and its events are processed in transaction order, + // but the slow xAI generation runs in a per-room goroutine and the lazy probes run + // off-lock, so several goroutines touch this shared state at once. mu is held only + // for short map operations and is NEVER held across a network or xAI call — that + // head-of-line hold was the root cause of the multi-minute silence. + mu sync.Mutex + seen *lruSet // event ids already handled (dedup within a session; self-locking) + botSent *lruSet // event ids the bot itself sent (reply-parent detection; self-locking) + meta map[string]*roomMeta + buf map[string][]bufferedMsg + inflight map[string]bool // roomID currently generating a reply (per-room single-flight) } func NewBot(ctx context.Context, cfg *Config, logger *slog.Logger) (*Bot, error) { @@ -50,16 +57,16 @@ func NewBot(ctx context.Context, cfg *Config, logger *slog.Logger) (*Bot, error) } b := &Bot{ - cfg: cfg, - log: logger, - mx: mx, - xai: xai, - st: st, - seen: newLRUSet(5000), - botSent: newLRUSet(5000), - meta: make(map[string]*roomMeta), - buf: make(map[string][]bufferedMsg), - globalNote: make(map[string]string), + cfg: cfg, + log: logger, + mx: mx, + xai: xai, + st: st, + seen: newLRUSet(5000), + botSent: newLRUSet(5000), + meta: make(map[string]*roomMeta), + buf: make(map[string][]bufferedMsg), + inflight: make(map[string]bool), } // Confirm the as_token + user_id resolves to BOT_MXID before serving. @@ -98,52 +105,83 @@ func (b *Bot) Run(ctx context.Context) error { return as.Serve(ctx) } -// handleTransaction processes one pushed transaction's events in order. +// handleTransaction processes one already-acked transaction's events. It runs in a +// background goroutine (the 200 has already been returned to Synapse). Events are +// processed IN ORDER (dedup + classification are synchronous) so the per-room +// single-flight claim is taken in arrival order; only the slow xAI generation is +// spawned per room, so different rooms answer concurrently and a slow call never +// blocks the next event or another room. Ordering within a room is therefore kept, +// while the head-of-line freeze is gone. func (b *Bot) handleTransaction(ctx context.Context, events []Event) { - b.mu.Lock() - defer b.mu.Unlock() for i := range events { b.handleEvent(ctx, &events[i]) } } +// safego runs fn in a goroutine with panic recovery. The slow per-room work is +// detached from the HTTP handler, so an unrecovered panic there would crash the +// whole process and silence the bot for EVERY room — recover + log instead, so one +// malformed event can never take the bot down. +func (b *Bot) safego(what string, fn func()) { + go func() { + defer func() { + if r := recover(); r != nil { + b.log.Error("recovered panic in handler goroutine", "what", what, "panic", r) + } + }() + fn() + }() +} + func (b *Bot) handleEvent(ctx context.Context, ev *Event) { if ev.EventID == "" || ev.RoomID == "" { return } - if !b.seen.Add(ev.EventID) { - return // already handled this session (fast in-memory path) - } - // Durable dedup across restarts: if a previous run already handled this event - // but crashed before its transaction was acked, Synapse re-pushes it — don't - // reprocess (no dup answer / double-bill). On a DB error, fall through; the - // in-memory set still guards this session. - if isNew, err := b.st.SeenEvent(ev.EventID); err != nil { - b.log.Error("durable dedup check failed", "id", ev.EventID, "err", err) - } else if !isNew { - return + if !b.markSeen(ev.EventID) { + return // already handled (in-memory or durable dedup) } b.log.Debug("event", "type", ev.Type, "room", ev.RoomID, "sender", ev.Sender, "id", ev.EventID) switch ev.Type { case "m.room.member": if ev.StateKey != nil && *ev.StateKey == b.cfg.BotMXID { - b.handleSelfMembership(ctx, ev) - } else if m := b.meta[ev.RoomID]; m != nil { - // A third party's membership changed: counts + foreign flag are now - // stale. Re-probe on the next message so a 1:1 that gains a member drops - // out of DM mode (no third-party leak) and a new foreign member is caught. - m.countsKnown = false + b.safego("self-membership", func() { b.handleSelfMembership(ctx, ev) }) + } else { + // A third party's membership changed: counts + foreign flag are now stale. + // Re-probe on the next message so a 1:1 that gains a member drops out of DM + // mode (no third-party leak) and a new foreign member is caught. + b.invalidateCounts(ev.RoomID) } case "m.room.encryption": - m := b.getMeta(ev.RoomID) - m.encrypted, m.encKnown = true, true + b.setEncrypted(ev.RoomID) case "m.room.message": + // Synchronous (in transaction order) up to the single-flight claim; only the + // slow generation inside handleMessage is spawned as a goroutine. This keeps + // per-room event order — the earlier message wins the claim — while different + // rooms still run concurrently. b.handleMessage(ctx, ev) } } +// markSeen records an event id in both the in-memory set and the durable store and +// reports whether it is NEW (first time). The in-memory Add is atomic, and SeenEvent +// is an atomic INSERT OR IGNORE, so two racing goroutines for the same event can never +// both proceed. On a durable-store error we fall through (the in-memory set still +// guards this session). +func (b *Bot) markSeen(eventID string) bool { + if !b.seen.Add(eventID) { + return false + } + isNew, err := b.st.SeenEvent(eventID) + if err != nil { + b.log.Error("durable dedup check failed", "id", eventID, "err", err) + return true + } + return isNew +} + // handleSelfMembership reacts to membership changes for the bot user: auto-join -// invites from allowed servers (F11), reject others, forget rooms we leave. +// invites from allowed servers (F11), reject others, forget rooms we leave. Runs in +// its own goroutine because JoinRoom/LeaveRoom are network calls. func (b *Bot) handleSelfMembership(ctx context.Context, ev *Event) { switch ev.membershipOf() { case "invite": @@ -161,14 +199,11 @@ func (b *Bot) handleSelfMembership(ctx context.Context, ev *Event) { } // Fully-on-allowed-servers gate: a vojo.chat inviter can still pull the bot // into a room that already holds federated third parties — leave at once. - m := b.getMeta(ev.RoomID) - b.ensureCounts(ctx, ev.RoomID, m) - if m.countsKnown && m.foreign { + if _, _, foreign := b.ensureCounts(ctx, ev.RoomID); foreign { b.leaveForeign(ctx, ev.RoomID) } case "leave", "ban": - delete(b.meta, ev.RoomID) - delete(b.buf, ev.RoomID) + b.forgetRoom(ev.RoomID) } } @@ -183,116 +218,148 @@ func (b *Bot) leaveForeign(ctx context.Context, roomID string) { func (b *Bot) handleMessage(ctx context.Context, ev *Event) { roomID := ev.RoomID - m := b.getMeta(roomID) - // A9/F15: re-check encryption; if (or once) encrypted, warn once and skip — - // the bot can't read it. - b.ensureEncryption(ctx, roomID, m) - if m.encrypted { + // A9/F15: re-check encryption; if (or once) encrypted, react once and skip — the + // bot can't read it. The probe runs without the lock. + if b.ensureEncryption(ctx, roomID) { b.log.Debug("skip: encrypted room", "room", roomID) - b.warnEncryptedOnce(ctx, roomID) + b.reactEncryptedOnce(ctx, roomID, ev.EventID) return } mc, ok := ev.DecodeMessage() if !ok { + // A message addressed to the bot that we can't decode shouldn't vanish without + // a trace (no silent drops): log at WARN so it's visible at the default level. + b.log.Warn("skip: undecodable message content", "room", roomID, "sender", ev.Sender, "id", ev.EventID) return } // Edits re-carry m.mentions; never re-trigger or replay them (F16). if mc.IsReplace() { return } - // Only plain text ever reaches xAI: drop media (m.image/m.file/m.audio/…) and - // other custom msgtypes outright — the bot doesn't fetch or forward media, and - // a caption/filename is third-party content we keep out of xAI. m.notice falls - // through to its existing anti-loop handling below. - if mc.MsgType != "m.text" && mc.MsgType != "m.emote" && mc.MsgType != "m.notice" { - b.log.Debug("skip: non-text msgtype", "room", roomID, "sender", ev.Sender, "msgtype", mc.MsgType) - return - } - - // Buffer prior context BEFORE classifying so buildContext sees history only. - history := b.buf[roomID] - b.appendBuf(roomID, bufferedMsg{sender: ev.Sender, body: mc.Body, isBot: ev.Sender == b.cfg.BotMXID}) - if ev.Sender == b.cfg.BotMXID { - return // our own message (also tracked via botSent) + return // our own message (the reply is buffered when we send it, not on echo-back) } if mc.MsgType == "m.notice" { return // anti-loop: ignore notices (ours and other bots') } - b.ensureCounts(ctx, roomID, m) + // Media / non-text is handled only once we know the message is addressed (below), + // so a stray image in a group the bot isn't mentioned in stays silent (correct), + // while media in a 1:1 or an @-mention gets a clear "text only" reaction. + isMedia := mc.MsgType != "m.text" && mc.MsgType != "m.emote" + + countsKnown, isDM, foreign := b.ensureCounts(ctx, roomID) // Stay only in rooms hosted entirely on allowed servers — never operate in (or // "leak" the bot into) a federated room with non-consenting third parties. - if m.countsKnown && m.foreign { + if foreign { b.leaveForeign(ctx, roomID) return } + replyParentIsBot := mc.RelatesTo != nil && mc.RelatesTo.InReplyTo != nil && b.botSent.Has(mc.RelatesTo.InReplyTo.EventID) - mentioned := mentionsBot(mc, b.cfg.BotMXID, replyParentIsBot) - if !(m.isDM() || mentioned) { - b.log.Debug("skip: not addressed (group without mention)", "room", roomID, "sender", ev.Sender, - "dm", m.isDM(), "joined", m.joined, "invited", m.invited, "countsKnown", m.countsKnown, "mentioned", mentioned) + + if !(isDM || mentioned) { + if !countsKnown { + // We couldn't classify the room (member probe failed) and the message isn't + // an explicit mention, so we can't tell a 1:1 (answer everything) from a + // group (answer only on mention). Log at WARN — not a silent Debug drop — + // so it's visible; we don't react because reacting in a group the bot isn't + // addressed in would be wrong. Re-probed on the next message. + b.log.Warn("skip: room unclassified (member probe failed), message not an explicit mention", + "room", roomID, "sender", ev.Sender) + } else { + b.log.Debug("skip: not addressed (group without mention)", "room", roomID, "sender", ev.Sender, + "dm", isDM, "mentioned", mentioned) + } return } - b.respond(ctx, roomID, m, ev, mc, history) + + // Addressed but not text: react "text only" (no silent drop). + if isMedia { + b.log.Debug("skip: non-text msgtype (reacted)", "room", roomID, "sender", ev.Sender, "msgtype", mc.MsgType) + b.react(ctx, roomID, ev.EventID, reactMedia) + return + } + + // Per-room single-flight: while a generation is in flight for this room, drop + // further messages (like a chat — no queue, no new session, no token burn). No + // reaction here: the typing indicator is already showing for this room, which is + // the language-free "I'm busy" signal. The claim is taken here, synchronously and + // in transaction order, so the FIRST message for a room wins and later ones are + // dropped until release — never the reverse. + if !b.tryClaim(roomID) { + b.log.Debug("drop: room busy generating", "room", roomID, "sender", ev.Sender) + return + } + + // Snapshot the room history (excludes this trigger) under the claim, then run the + // slow generation in its own goroutine so this transaction's remaining events and + // other rooms are not blocked by the xAI call. respond appends the trigger+answer + // to the buffer itself, only on success (see sendReply), and releases the claim. + history := b.snapshotBuf(roomID) + b.safego("respond", func() { + defer b.release(roomID) + b.respond(ctx, roomID, isDM, ev, mc, history) + }) } // unlimitedCap is the effective per-user cap for UNLIMITED_USERS — high enough to // never trip the per-user gate, while the global DAILY_USD_CEILING still applies. const unlimitedCap = 1 << 30 -func (b *Bot) respond(ctx context.Context, roomID string, m *roomMeta, ev *Event, mc *MessageContent, history []bufferedMsg) { +func (b *Bot) respond(ctx context.Context, roomID string, isDM bool, ev *Event, mc *MessageContent, history []bufferedMsg) { perUserCap := b.cfg.PerUserDailyCap if b.cfg.UnlimitedUsers[ev.Sender] { perUserCap = unlimitedCap } switch res, err := b.st.Reserve(ev.Sender, perUserCap, b.cfg.DailyUSDCeiling); { case err != nil: + // A limiter failure is on our side — don't leave the user wondering. b.log.Error("limiter reserve failed", "sender", ev.Sender, "err", err) + b.react(ctx, roomID, ev.EventID, reactError) return case res == reserveDeniedUser: - // Per-user cap (anti-abuse, F24): stop answering, but always tell the user - // their request hit the limit — no message addressed to the bot is left - // silent. (m.notice → the anti-loop skip keeps this from re-triggering.) - b.log.Info("per-user daily cap reached; notifying", "sender", ev.Sender) - b.sendNotice(ctx, roomID, ev, mc, noticeUserLimit) + // Per-user cap (anti-abuse, F24): stop answering, but always signal the limit — + // no message addressed to the bot is left without feedback. + b.log.Info("per-user daily cap reached; reacting", "sender", ev.Sender) + b.react(ctx, roomID, ev.EventID, reactRateLimit) return case res == reserveDeniedGlobal: - // Global USD ceiling — notice once per room per day, then stay quiet. + // Global USD ceiling. A reaction is cheap and non-intrusive (unlike the old + // once-per-day text notice), so signal every affected message rather than + // going silent after the first. b.log.Warn("global daily USD ceiling reached", "room", roomID, "sender", ev.Sender) - if b.globalNote[roomID] != todayUTC() { - b.globalNote[roomID] = todayUTC() - b.sendNotice(ctx, roomID, ev, mc, noticeDailyLimit) - } + b.react(ctx, roomID, ev.EventID, reactRateLimit) return } - // Show "Vojo AI печатает…" while we build the answer; the deferred clear fires - // on every exit (success or failure). Pure UX — typing failures are best-effort. - b.setTyping(ctx, roomID, true) - defer b.setTyping(ctx, roomID, false) + // Show "Vojo AI печатает…" for the whole generation. The keepalive refreshes the + // typing notification every 20s (the server expires it after 30s) so the indicator + // never lapses on a slow/retried answer, and the deferred stop clears it on exit. + stopTyping := b.startTypingKeepalive(ctx, roomID) + defer stopTyping() - msgs := buildContext(b.cfg.SystemPrompt, history, m.isDM(), mc.Body, b.cfg.MaxCtxEvent, 8000) + msgs := buildContext(b.cfg.SystemPrompt, history, isDM, mc.Body, b.cfg.MaxCtxEvent, 8000) resp, err := b.xai.Complete(ctx, b.cfg.XAIModel, msgs, b.cfg.MaxOutTok, b.cfg.XAITemp) if err != nil { - // at-most-once already retried transient failures inside Complete; refund - // the reserved request so an xAI outage doesn't burn the user's daily cap, - // and tell the user we couldn't answer (notice → no anti-loop re-trigger). + // at-most-once already retried transient failures inside Complete; refund the + // reserved request so an xAI outage doesn't burn the user's daily cap, and + // signal the failure (react → no anti-loop, no language). b.log.Error("xai completion failed", "sender", ev.Sender, "err", err) if rerr := b.st.RefundRequest(ev.Sender); rerr != nil { b.log.Error("refund failed", "sender", ev.Sender, "err", rerr) } - b.sendNotice(ctx, roomID, ev, mc, noticeError) + b.react(ctx, roomID, ev.EventID, reactError) return } - // A 2xx from xAI is billed even if the text came back empty — always book the - // real cost so both caps see it (the old refund-without-reconcile on an empty - // 200 let such calls bypass the per-user cap and the global ceiling). + // A 2xx from xAI is billed even if the text came back empty — always book the real + // cost so both caps see it (an empty 200 must not bypass the per-user cap and the + // global ceiling). usd := computeUSD(resp.Usage, b.cfg) if err := b.st.Reconcile(ev.Sender, usd); err != nil { b.log.Error("reconcile spend failed", "sender", ev.Sender, "err", err) @@ -300,12 +367,15 @@ func (b *Bot) respond(ctx context.Context, roomID string, m *roomMeta, ev *Event text := resp.Text() if text == "" { - b.log.Warn("xai returned empty completion (billed, nothing to send)", "sender", ev.Sender, "usd", usd) + // Billed but no usable text (content filter / length cap / empty choices). Never + // leave a billed request without feedback — react "couldn't answer". + b.log.Warn("xai returned empty completion (billed, reacting)", "sender", ev.Sender, "usd", usd) + b.react(ctx, roomID, ev.EventID, reactError) return } - b.log.Info("answered", "room", roomID, "sender", ev.Sender, "dm", m.isDM(), + b.log.Info("answered", "room", roomID, "sender", ev.Sender, "dm", isDM, "usd", usd, "prompt_tokens", resp.Usage.PromptTokens, "completion_tokens", resp.Usage.CompletionTokens) - b.sendNotice(ctx, roomID, ev, mc, text) + b.sendReply(ctx, roomID, ev, mc, text) } // computeUSD prices the call from the API-returned token usage (authoritative @@ -322,30 +392,30 @@ func computeUSD(u xaiUsage, cfg *Config) float64 { float64(u.CompletionTokens)/1e6*cfg.PriceOutputPerM } -func (b *Bot) sendNotice(ctx context.Context, roomID string, trigger *Event, triggerMC *MessageContent, body string) { - content := buildNoticeContent(trigger.EventID, trigger.Sender, triggerMC.RelatesTo, body) - id, err := b.mx.SendEvent(ctx, roomID, "m.room.message", content) - if err != nil { - b.log.Error("send notice failed", "room", roomID, "err", err) - return +// react adds an emoji m.reaction to the triggering event — the bot's language-free +// way to signal a system state (error / rate limit / encrypted / media) it can't +// express as a model-generated answer. Best-effort: a failed reaction is logged, not +// retried. Reactions are m.reaction (not m.room.message), so they never re-enter +// handleMessage and need no anti-loop tracking. +func (b *Bot) react(ctx context.Context, roomID, eventID, emoji string) { + content := map[string]any{ + "m.relates_to": map[string]any{ + "rel_type": "m.annotation", + "event_id": eventID, + "key": emoji, + }, } - // Track our own reply so a future reply-to-it is recognised as addressing us, - // and add it to the room buffer as an assistant turn for context continuity. - b.botSent.Add(id) - b.appendBuf(roomID, bufferedMsg{sender: b.cfg.BotMXID, body: body, isBot: true}) -} - -// setTyping sets/clears the bot's typing indicator (best-effort UX; failures are -// non-fatal). The 30s timeout comfortably covers a normal completion, and -// respond() defers a clear so the indicator ends the moment the answer is sent -// or fails. -func (b *Bot) setTyping(ctx context.Context, roomID string, typing bool) { - if err := b.mx.SendTyping(ctx, roomID, typing, 30000); err != nil { - b.log.Debug("set typing failed", "room", roomID, "typing", typing, "err", err) + if _, err := b.mx.SendEvent(ctx, roomID, "m.reaction", content); err != nil { + b.log.Error("react failed", "room", roomID, "emoji", emoji, "err", err) } } -func (b *Bot) warnEncryptedOnce(ctx context.Context, roomID string) { +// reactEncryptedOnce reacts 🔒 to the first message seen in an encrypted room and +// records a durable flag so a restart doesn't re-react (F5). Vojo disables E2EE by +// default, so this is a near-dead safety path; the reaction is far less intrusive +// than the old text notice, but the once-gate keeps it from annotating every message +// in the rare encrypted room. +func (b *Bot) reactEncryptedOnce(ctx context.Context, roomID, eventID string) { warned, err := b.st.HasWarnedEncrypted(roomID) if err != nil { b.log.Error("warned-flag read failed", "room", roomID, "err", err) @@ -354,16 +424,79 @@ func (b *Bot) warnEncryptedOnce(ctx context.Context, roomID string) { if warned { return } - content := map[string]any{"msgtype": "m.notice", "body": noticeEncryptedUnsupported} - if _, err := b.mx.SendEvent(ctx, roomID, "m.room.message", content); err != nil { - b.log.Error("encrypted-notice failed", "room", roomID, "err", err) - return - } + b.react(ctx, roomID, eventID, reactEncrypted) if err := b.st.SetWarnedEncrypted(roomID); err != nil { b.log.Error("persist warned-flag failed", "room", roomID, "err", err) } } +// sendReply sends the model's actual answer and records the completed exchange in the +// conversation buffer so the next turn has context. +func (b *Bot) sendReply(ctx context.Context, roomID string, trigger *Event, triggerMC *MessageContent, body string) { + id := b.sendMessage(ctx, roomID, trigger, triggerMC, body) + if id == "" { + return + } + // Record the user trigger AND the assistant answer together, only AFTER the answer + // was sent, so a failed or empty generation never leaves a dangling user turn (a + // question with no reply) in the buffer — which would skew later completions. + // Single-flight guarantees no other turn for this room interleaves between the two. + b.appendBuf(roomID, bufferedMsg{sender: trigger.Sender, body: triggerMC.Body, isBot: false}) + b.appendBuf(roomID, bufferedMsg{sender: b.cfg.BotMXID, body: body, isBot: true}) +} + +// sendMessage builds and sends an m.notice reply, tracks our own event id, and returns +// the new event id ("" on failure). +func (b *Bot) sendMessage(ctx context.Context, roomID string, trigger *Event, triggerMC *MessageContent, body string) string { + content := buildNoticeContent(trigger.EventID, trigger.Sender, triggerMC.RelatesTo, body) + id, err := b.mx.SendEvent(ctx, roomID, "m.room.message", content) + if err != nil { + b.log.Error("send failed", "room", roomID, "err", err) + return "" + } + // Track our own reply so a future reply-to-it is recognised as addressing us. + b.botSent.Add(id) + return id +} + +// startTypingKeepalive starts the typing indicator and keeps it alive for the whole +// generation (the CS-API server-side typing notification expires after the 30s we +// pass, so we refresh every 20s). The returned stop clears the indicator and is safe +// to call once via defer. Typing is best-effort UX — failures are non-fatal. +func (b *Bot) startTypingKeepalive(ctx context.Context, roomID string) func() { + b.setTyping(ctx, roomID, true) + done := make(chan struct{}) + go func() { + t := time.NewTicker(20 * time.Second) + defer t.Stop() + for { + select { + case <-ctx.Done(): + return + case <-done: + return + case <-t.C: + b.setTyping(ctx, roomID, true) + } + } + }() + var once sync.Once + return func() { + once.Do(func() { + close(done) + b.setTyping(ctx, roomID, false) + }) + } +} + +// setTyping sets/clears the bot's typing indicator (best-effort UX; failures are +// non-fatal). The 30s server-side timeout is refreshed by startTypingKeepalive. +func (b *Bot) setTyping(ctx context.Context, roomID string, typing bool) { + if err := b.mx.SendTyping(ctx, roomID, typing, 30000); err != nil { + b.log.Debug("set typing failed", "room", roomID, "typing", typing, "err", err) + } +} + // buildNoticeContent builds the reply. m.notice (not m.text) so the anti-loop // skip catches our own output. Thread-aware (F27): a trigger from a thread gets a // thread relation so the answer lands in the thread, not the main timeline. @@ -394,9 +527,30 @@ func buildNoticeContent(replyTo, sender string, triggerRelates *RelatesTo, body return content } -// --- per-room metadata helpers ------------------------------------------------- +// --- per-room single-flight ---------------------------------------------------- -func (b *Bot) getMeta(roomID string) *roomMeta { +// tryClaim marks a room as generating and returns true if the caller won the claim +// (no generation was already in flight). The loser must drop its message. +func (b *Bot) tryClaim(roomID string) bool { + b.mu.Lock() + defer b.mu.Unlock() + if b.inflight[roomID] { + return false + } + b.inflight[roomID] = true + return true +} + +func (b *Bot) release(roomID string) { + b.mu.Lock() + defer b.mu.Unlock() + delete(b.inflight, roomID) +} + +// --- per-room metadata helpers (all guarded by b.mu; probes run outside it) ----- + +// getMetaLocked returns (creating if needed) the room's meta. Caller MUST hold b.mu. +func (b *Bot) getMetaLocked(roomID string) *roomMeta { m := b.meta[roomID] if m == nil { m = &roomMeta{} @@ -405,38 +559,112 @@ func (b *Bot) getMeta(roomID string) *roomMeta { return m } -func (b *Bot) ensureEncryption(ctx context.Context, roomID string, m *roomMeta) { - if m.encKnown { - return +func (b *Bot) invalidateCounts(roomID string) { + b.mu.Lock() + defer b.mu.Unlock() + if m := b.meta[roomID]; m != nil { + m.countsKnown = false } +} + +func (b *Bot) setEncrypted(roomID string) { + b.mu.Lock() + defer b.mu.Unlock() + m := b.getMetaLocked(roomID) + m.encrypted, m.encKnown = true, true +} + +func (b *Bot) forgetRoom(roomID string) { + b.mu.Lock() + defer b.mu.Unlock() + delete(b.meta, roomID) + delete(b.buf, roomID) + delete(b.inflight, roomID) +} + +// ensureEncryption returns whether the room is encrypted, probing the CS-API once +// (without holding the lock) and caching the result. On probe error it returns false +// (treated as not-encrypted this round) and leaves the state unknown for a re-probe. +func (b *Bot) ensureEncryption(ctx context.Context, roomID string) bool { + b.mu.Lock() + if m := b.getMetaLocked(roomID); m.encKnown { + enc := m.encrypted + b.mu.Unlock() + return enc + } + b.mu.Unlock() + enc, err := b.mx.RoomEncrypted(ctx, roomID) if err != nil { b.log.Warn("encryption probe failed", "room", roomID, "err", err) - return // leave unknown; re-probed on the next message + return false // leave unknown; re-probed on the next message } - m.encrypted, m.encKnown = enc, true + // Re-fetch under the lock instead of writing to the pointer captured before the + // unlocked probe: if the room was forgotten (leave/ban) mid-probe its meta was + // deleted, and writing to the captured pointer would resurrect a dead room. + b.mu.Lock() + if m := b.meta[roomID]; m != nil { + m.encrypted, m.encKnown = enc, true + } + b.mu.Unlock() + return enc } -func (b *Bot) ensureCounts(ctx context.Context, roomID string, m *roomMeta) { - if m.countsKnown { - return - } - joined, invited, servers, err := b.mx.RoomMembership(ctx, roomID) - if err != nil { - b.log.Warn("member probe failed", "room", roomID, "err", err) - return - } - foreign := false - for s := range servers { - if !b.cfg.AllowedServers[s] { - foreign = true - break +// ensureCounts returns (countsKnown, isDM, foreign), probing /members once (without +// holding the lock) and caching the result. On probe error it returns +// (false, false, false): the caller treats an unclassified room conservatively and +// logs a visible WARN rather than silently dropping. +func (b *Bot) ensureCounts(ctx context.Context, roomID string) (countsKnown, isDM, foreign bool) { + b.mu.Lock() + known := b.getMetaLocked(roomID).countsKnown + b.mu.Unlock() + + if !known { + joined, invited, servers, err := b.mx.RoomMembership(ctx, roomID) + if err != nil { + b.log.Warn("member probe failed", "room", roomID, "err", err) + return false, false, false } + isForeign := false + for s := range servers { + if !b.cfg.AllowedServers[s] { + isForeign = true + break + } + } + // Re-fetch under the lock rather than writing a pointer captured before the + // unlocked /members probe (see ensureEncryption): a leave/ban mid-probe must + // not be undone by resurrecting the room's meta. + b.mu.Lock() + if m := b.meta[roomID]; m != nil { + m.joined, m.invited, m.foreign, m.countsKnown = joined, invited, isForeign, true + } + b.mu.Unlock() } - m.joined, m.invited, m.foreign, m.countsKnown = joined, invited, foreign, true + + b.mu.Lock() + defer b.mu.Unlock() + if m := b.meta[roomID]; m != nil { + return m.countsKnown, m.isDM(), m.foreign + } + return false, false, false +} + +func (b *Bot) snapshotBuf(roomID string) []bufferedMsg { + b.mu.Lock() + defer b.mu.Unlock() + src := b.buf[roomID] + if len(src) == 0 { + return nil + } + out := make([]bufferedMsg, len(src)) + copy(out, src) + return out } func (b *Bot) appendBuf(roomID string, msg bufferedMsg) { + b.mu.Lock() + defer b.mu.Unlock() limit := b.cfg.MaxCtxEvent * 2 if limit < 8 { limit = 8 diff --git a/apps/ai-bot/concurrency_test.go b/apps/ai-bot/concurrency_test.go new file mode 100644 index 00000000..dde04dbe --- /dev/null +++ b/apps/ai-bot/concurrency_test.go @@ -0,0 +1,82 @@ +package main + +import ( + "sync" + "testing" +) + +// TestSingleFlightClaim documents the per-room single-flight invariant the async +// refactor relies on: at most one generation per room at a time, the claim is +// independent per room, and a release re-arms the room. handleEvent takes this claim +// synchronously in transaction order, so the FIRST message for a room wins and later +// ones are dropped until release (never the reverse). +func TestSingleFlightClaim(t *testing.T) { + b := &Bot{inflight: make(map[string]bool)} + + if !b.tryClaim("!a") { + t.Fatal("first claim on !a should win") + } + if b.tryClaim("!a") { + t.Fatal("second claim on !a must fail while in flight") + } + if !b.tryClaim("!b") { + t.Fatal("a different room must claim independently") + } + b.release("!a") + if !b.tryClaim("!a") { + t.Fatal("after release !a must be claimable again") + } +} + +// TestSingleFlightClaimExactlyOneWinner runs many goroutines racing for the same +// room and asserts EXACTLY ONE wins the claim — the property that prevents two +// concurrent generations (double xAI spend) for one room. Run under -race. +func TestSingleFlightClaimExactlyOneWinner(t *testing.T) { + b := &Bot{inflight: make(map[string]bool)} + const n = 64 + var wins int64 + var mu sync.Mutex + var wg sync.WaitGroup + wg.Add(n) + for i := 0; i < n; i++ { + go func() { + defer wg.Done() + if b.tryClaim("!room") { + mu.Lock() + wins++ + mu.Unlock() + } + }() + } + wg.Wait() + if wins != 1 { + t.Fatalf("exactly one goroutine must win the claim, got %d", wins) + } +} + +// TestLRUSetConcurrentAddOnce asserts the dedup set's check-and-insert is atomic: +// with many goroutines racing on the same id, Add returns true exactly once. This is +// the in-memory half of markSeen, now called from concurrent per-room goroutines. +// Run under -race. +func TestLRUSetConcurrentAddOnce(t *testing.T) { + s := newLRUSet(1000) + const n = 64 + var trues int64 + var mu sync.Mutex + var wg sync.WaitGroup + wg.Add(n) + for i := 0; i < n; i++ { + go func() { + defer wg.Done() + if s.Add("$evt") { + mu.Lock() + trues++ + mu.Unlock() + } + }() + } + wg.Wait() + if trues != 1 { + t.Fatalf("Add must return true exactly once for one id, got %d", trues) + } +} diff --git a/apps/ai-bot/messages.go b/apps/ai-bot/messages.go index 98c41a04..19fd1858 100644 --- a/apps/ai-bot/messages.go +++ b/apps/ai-bot/messages.go @@ -1,15 +1,15 @@ package main -// Bot-authored, user-facing notices. Vojo is a Russian-market product, so these -// are RU. They are NOT the i18n bundle of the cinny client — this is a separate -// service; keep the few strings here. +// Language-free status signals. The bot answers questions in the user's own +// language — the model handles that — but it cannot localize system states like +// "rate limited" or "xAI is down": an appservice transaction carries no per-user +// locale, so there is no reliable language to pick, and the bot serves a mixed +// RU/EN audience. Rather than hardcode prose in one language (and rather than drop +// silently), the bot REACTS to the triggering message with a self-evident emoji. +// These are symbols, not translatable copy — edit the glyphs freely. const ( - noticeEncryptedUnsupported = "Я не читаю зашифрованные комнаты, поэтому не отвечаю здесь. " + - "Напишите мне в обычном (незашифрованном) чате." - - noticeDailyLimit = "Достигнут дневной лимит обращений к ИИ в этом сервисе. Попробуйте позже." - - noticeUserLimit = "Вы исчерпали свой дневной лимит обращений к ИИ. Попробуйте позже." - - noticeError = "⚠️ Не удалось получить ответ от ИИ. Попробуйте ещё раз чуть позже." + reactError = "⚠️" // couldn't answer — xAI failed or returned nothing usable + reactRateLimit = "⏳" // daily limit reached (per-user or global) — try later + reactEncrypted = "🔒" // encrypted room — the bot can't read it + reactMedia = "🚫" // non-text message — the bot only reads text ) diff --git a/apps/ai-bot/store.go b/apps/ai-bot/store.go index 4d63945f..a797fabd 100644 --- a/apps/ai-bot/store.go +++ b/apps/ai-bot/store.go @@ -13,13 +13,13 @@ type reserveResult int const ( reserveOK reserveResult = iota - reserveDeniedUser // per-user daily request cap hit (silent drop, F24) - reserveDeniedGlobal // global daily USD ceiling hit (notice, F24) + reserveDeniedUser // per-user daily request cap hit (⏳ rate-limit reaction, F24) + reserveDeniedGlobal // global daily USD ceiling hit (⏳ rate-limit reaction, F24) ) -// Store is the durable bot state: /sync cursor, daily spend ledger, and the -// encrypted-room warned set. Pure-Go SQLite (no cgo) so the binary stays static -// for a distroless/scratch image. +// Store is the durable bot state: transaction + event dedup, the daily spend +// ledger, and the encrypted-room warned set. Pure-Go SQLite (no cgo) so the binary +// stays static for a distroless/scratch image. type Store struct { db *sql.DB } @@ -29,7 +29,9 @@ func OpenStore(path string) (*Store, error) { if err != nil { return nil, err } - // Single writer — the loop is serial; one connection avoids lock churn. + // One connection: database/sql serializes all callers onto it, which keeps the + // now-concurrent handler goroutines from contending on SQLite write locks. All + // statements here are short, so callers block only briefly and never deadlock. db.SetMaxOpenConns(1) schema := ` @@ -163,10 +165,10 @@ func (s *Store) Reconcile(mxid string, usd float64) error { return err } -// HasWarnedEncrypted / SetWarnedEncrypted persist the one-shot "told this room I -// can't read encryption" flag so a restart doesn't re-spam the notice (F5) — the -// bot never sees its own notice (the sync filter drops nothing, but the loop skips -// m.notice and self). +// HasWarnedEncrypted / SetWarnedEncrypted persist the one-shot "reacted 🔒 to this +// room because I can't read encryption" flag so a restart doesn't re-react on every +// message (F5). The bot never reacts to its own events: m.reaction is not an +// m.room.message, so it never re-enters handleMessage. func (s *Store) HasWarnedEncrypted(roomID string) (bool, error) { var one int err := s.db.QueryRow(`SELECT 1 FROM warned_encrypted WHERE room_id = ?`, roomID).Scan(&one) diff --git a/apps/ai-bot/util.go b/apps/ai-bot/util.go index d8e83523..b3ee5671 100644 --- a/apps/ai-bot/util.go +++ b/apps/ai-bot/util.go @@ -1,8 +1,13 @@ package main +import "sync" + // lruSet is a bounded insertion-ordered string set used for event-id dedup and // tracking our own sent event ids. Oldest entries evict once cap is reached. +// Self-locking: events are now processed in concurrent per-message goroutines, so +// Add/Has must be safe to call from several goroutines at once. type lruSet struct { + mu sync.Mutex cap int set map[string]struct{} order []string @@ -13,12 +18,18 @@ func newLRUSet(cap int) *lruSet { } func (l *lruSet) Has(k string) bool { + l.mu.Lock() + defer l.mu.Unlock() _, ok := l.set[k] return ok } // Add inserts k and returns true if it was newly added (false if already present). +// The check-and-insert is atomic, so two goroutines racing on the same id can +// never both get true — the in-memory dedup stays correct under concurrency. func (l *lruSet) Add(k string) bool { + l.mu.Lock() + defer l.mu.Unlock() if _, ok := l.set[k]; ok { return false } diff --git a/apps/ai-bot/xai.go b/apps/ai-bot/xai.go index 783ee8cc..c55f832b 100644 --- a/apps/ai-bot/xai.go +++ b/apps/ai-bot/xai.go @@ -70,10 +70,10 @@ func (r *xaiResponse) Text() string { return r.Choices[0].Message.Content } -// Complete calls Chat Completions with at-most-once retry on transient failures -// (429 / 5xx / network timeout, exponential backoff + jitter). Non-retryable 4xx -// fail immediately. The caller advances since/seen only AFTER this returns so a -// transient failure isn't silently swallowed by a moved cursor (F6). +// Complete calls Chat Completions with retry on transient failures (429 / 5xx / +// network timeout, exponential backoff + jitter). Non-retryable 4xx fail +// immediately. On exhaustion the caller refunds the reserved request and notifies +// the user, so a transient failure is never silently swallowed (F6). func (x *XAIClient) Complete(ctx context.Context, model string, msgs []xaiMessage, maxTokens int, temp float64) (*xaiResponse, error) { reqBody := xaiRequest{ Model: model,