package main import ( "context" "fmt" "log/slog" "sync" "sync/atomic" "time" ) // roomMeta caches per-room classification we need to handle a message: member // counts (for the 1:1 test, F3), whether any member is outside ALLOWED_SERVERS, // and encryption state (F15). Lazily fetched from the CS-API on first need // (appservice transactions carry no room summary) and INVALIDATED whenever a // third party's membership changes, so a 1:1 that gains a member is reclassified // out of DM mode (no DM-mode third-party leak) and a newly added foreign member // is caught. All fields are guarded by Bot.mu — never read or written without it, // because the slow generation and the lazy CS-API probes run in concurrent per-room // goroutines. type roomMeta struct { joined, invited int countsKnown bool foreign bool // a joined/invited member is outside ALLOWED_SERVERS encrypted, encKnown bool } func (m *roomMeta) isDM() bool { return m.countsKnown && m.joined+m.invited == 2 } type Bot struct { cfg *Config log *slog.Logger mx *MatrixClient llm LLMClient st *Store // gemini is the cheap chat backend for the trivial route and the Layer-1 classifier // (an LLMClient so tests can fake it); nil unless a layer that uses it is enabled. // web is the web-freshness provider, built only when WEB_ENABLED. Both nil → the // cascade can only ever produce grok_direct. gemini LLMClient web WebProvider // promptVersion is a short stable hash of the system prompt, logged with each // request so prompt changes are visible in the analytics (A/B + regressions). promptVersion string // telemetryWrites paces the retention trim (every telemetryTrimEvery writes). telemetryWrites atomic.Uint64 // mu guards the in-memory maps/sets below. Each transaction is acked to Synapse // immediately (appservice.go) and its events are processed in transaction order, // but the slow xAI generation runs in a per-room goroutine and the lazy probes run // off-lock, so several goroutines touch this shared state at once. mu is held only // for short map operations and is NEVER held across a network or xAI call — that // head-of-line hold was the root cause of the multi-minute silence. mu sync.Mutex seen *lruSet // event ids already handled (dedup within a session; self-locking) botSent *lruSet // event ids the bot itself sent (reply-parent detection; self-locking) meta map[string]*roomMeta // buf and inflight are keyed by roomID THEN by thread root ("" = main timeline), so // each conversation (a thread) keeps an isolated context window and an independent // single-flight claim. Two messages in different threads of one room no longer block // each other or pollute each other's history. roomMeta stays room-level (membership // /encryption are room facts). forgetRoom drops a room's whole subtree in one delete. buf map[string]map[string]*convBuf inflight map[string]map[string]bool // bufSeq is a monotonic counter stamped onto a convBuf on every append (guarded by mu); // it orders conversations by last activity so appendBuf can LRU-evict the coldest one // when a room exceeds maxConvBuffersPerRoom. bufSeq uint64 // typingRefs counts in-flight generations per ROOM that are currently showing the // "typing…" indicator. Matrix typing notifications are room-scoped (no thread_id in // the CS-API), so the indicator can't be split per thread: it goes on when the first // generation in a room starts and off only when the last one finishes (refcount). typingRefs map[string]int } func NewBot(ctx context.Context, cfg *Config, logger *slog.Logger) (*Bot, error) { mx := NewMatrixClient(cfg.HomeserverURL, cfg.ASToken, cfg.BotMXID) llm := NewXAIClient(cfg.XAIBaseURL, cfg.XAIAPIKey, logger) st, err := OpenStore(cfg.DatabaseURL) if err != nil { return nil, err } b := &Bot{ cfg: cfg, log: logger, mx: mx, llm: llm, st: st, promptVersion: fmt.Sprintf("%08x", hashString(cfg.SystemPrompt)), seen: newLRUSet(5000), botSent: newLRUSet(5000), meta: make(map[string]*roomMeta), buf: make(map[string]map[string]*convBuf), inflight: make(map[string]map[string]bool), typingRefs: make(map[string]int), } // Build the cascade backends only for enabled layers (config already fail-fast // validated that the keys exist). With every cascade flag off these stay nil and // generate() can only produce grok_direct — today's bot. The grounding web provider // needs the concrete client (for the native generateContent call), so keep a typed // handle alongside the LLMClient face. var gc *geminiClient if cfg.needsGemini() { gc = NewGeminiClient(cfg.GeminiBaseURL, cfg.GeminiAPIKey, cfg.GeminiModel, logger) b.gemini = gc } if cfg.WebEnabled { if cfg.WebProvider == webProviderGeminiGrounding { b.web = &geminiGrounding{gem: gc, st: st, cfg: cfg} } else { b.web = newGrokWebSearch(cfg, logger) } } // Confirm the as_token + user_id resolves to BOT_MXID before serving. if err := b.verifyIdentity(ctx); err != nil { st.Close() return nil, err } // F23: ensure the profile has a display name (best-effort, idempotent). if err := mx.SetDisplayName(ctx, cfg.BotDisplayName); err != nil { logger.Warn("set display name failed (non-fatal)", "err", err) } return b, nil } func (b *Bot) Close() { if b.st != nil { _ = b.st.Close() } } func (b *Bot) verifyIdentity(ctx context.Context) error { who, err := b.mx.Whoami(ctx) if err != nil { return err } if who != b.cfg.BotMXID { return fmt.Errorf("as_token resolves to %q but BOT_MXID is %q", who, b.cfg.BotMXID) } b.log.Info("authenticated", "mxid", who) return nil } // Run starts the appservice transaction server and blocks until ctx is cancelled. func (b *Bot) Run(ctx context.Context) error { as := NewAppService(b.cfg, b.log, b.st, b.handleTransaction) return as.Serve(ctx) } // handleTransaction processes one already-acked transaction's events. It runs in a // background goroutine (the 200 has already been returned to Synapse). Events are // processed IN ORDER (dedup + classification are synchronous) so the per-room // single-flight claim is taken in arrival order; only the slow xAI generation is // spawned per room, so different rooms answer concurrently and a slow call never // blocks the next event or another room. Ordering within a room is therefore kept, // while the head-of-line freeze is gone. func (b *Bot) handleTransaction(ctx context.Context, events []Event) { for i := range events { b.handleEvent(ctx, &events[i]) } } // safego runs fn in a goroutine with panic recovery. The slow per-room work is // detached from the HTTP handler, so an unrecovered panic there would crash the // whole process and silence the bot for EVERY room — recover + log instead, so one // malformed event can never take the bot down. func (b *Bot) safego(ctx context.Context, what string, fn func()) { go func() { defer func() { if r := recover(); r != nil { b.log.ErrorContext(ctx, "recovered panic in handler goroutine", "what", what, "panic", r) } }() fn() }() } func (b *Bot) handleEvent(ctx context.Context, ev *Event) { if ev.EventID == "" || ev.RoomID == "" { return } // Mint a fresh trace id for this event and stamp it (plus sender + the per-user // body-logging decision) onto ctx. Every log line below — through the per-room // goroutine and down to the model call — carries this trace_id automatically. ctx = withRequestTrace(ctx, newTraceID(), ev.Sender, b.cfg.LogBodiesUsers[ev.Sender]) if !b.markSeen(ctx, ev.EventID) { return // already handled (in-memory or durable dedup) } b.log.DebugContext(ctx, "event", "type", ev.Type, "room", ev.RoomID, "sender", ev.Sender, "id", ev.EventID) switch ev.Type { case "m.room.member": if ev.StateKey != nil && *ev.StateKey == b.cfg.BotMXID { b.safego(ctx, "self-membership", func() { b.handleSelfMembership(ctx, ev) }) } else { // A third party's membership changed: counts + foreign flag are now stale. // Re-probe on the next message so a 1:1 that gains a member drops out of DM // mode (no third-party leak) and a new foreign member is caught. b.invalidateCounts(ev.RoomID) } case "m.room.encryption": b.setEncrypted(ev.RoomID) case "m.room.message": // Synchronous (in transaction order) up to the single-flight claim; only the // slow generation inside handleMessage is spawned as a goroutine. This keeps // per-room event order — the earlier message wins the claim — while different // rooms still run concurrently. b.handleMessage(ctx, ev) } } // markSeen records an event id in both the in-memory set and the durable store and // reports whether it is NEW (first time). The in-memory Add is atomic, and SeenEvent // is an atomic INSERT … ON CONFLICT DO NOTHING, so two racing goroutines for the same // event can never both proceed. On a durable-store error we fall through (the in-memory set still // guards this session). func (b *Bot) markSeen(ctx context.Context, eventID string) bool { if !b.seen.Add(eventID) { return false } isNew, err := b.st.SeenEvent(eventID) if err != nil { b.log.ErrorContext(ctx, "durable dedup check failed", "id", eventID, "err", err) return true } return isNew } // handleSelfMembership reacts to membership changes for the bot user: auto-join // invites from allowed servers (F11), reject others, forget rooms we leave. Runs in // its own goroutine because JoinRoom/LeaveRoom are network calls. func (b *Bot) handleSelfMembership(ctx context.Context, ev *Event) { switch ev.membershipOf() { case "invite": if !b.cfg.AllowedServers[serverOf(ev.Sender)] { b.log.WarnContext(ctx, "rejecting invite (server not allowed)", "room", ev.RoomID, "sender", ev.Sender) if err := b.mx.LeaveRoom(ctx, ev.RoomID); err != nil { b.log.ErrorContext(ctx, "leave (reject) failed", "room", ev.RoomID, "err", err) } return } b.log.InfoContext(ctx, "accepting invite", "room", ev.RoomID, "sender", ev.Sender) if err := b.mx.JoinRoom(ctx, ev.RoomID); err != nil { b.log.ErrorContext(ctx, "join failed", "room", ev.RoomID, "err", err) return } // Fully-on-allowed-servers gate: a vojo.chat inviter can still pull the bot // into a room that already holds federated third parties — leave at once. if _, _, foreign := b.ensureCounts(ctx, ev.RoomID); foreign { b.leaveForeign(ctx, ev.RoomID) } case "leave", "ban": b.forgetRoom(ev.RoomID) } } // leaveForeign leaves a room that contains a member outside ALLOWED_SERVERS, so // the bot only ever operates in rooms hosted entirely on allowed homeservers. func (b *Bot) leaveForeign(ctx context.Context, roomID string) { b.log.WarnContext(ctx, "leaving room — a member is outside ALLOWED_SERVERS", "room", roomID) if err := b.mx.LeaveRoom(ctx, roomID); err != nil { b.log.ErrorContext(ctx, "leave (foreign) failed", "room", roomID, "err", err) } } func (b *Bot) handleMessage(ctx context.Context, ev *Event) { roomID := ev.RoomID // A9/F15: re-check encryption; if (or once) encrypted, react once and skip — the // bot can't read it. The probe runs without the lock. if b.ensureEncryption(ctx, roomID) { b.log.DebugContext(ctx, "skip: encrypted room", "room", roomID) // Log the skip only when we actually react (once per room), so an encrypted room // the bot can't read doesn't flood request_log with one row per message. if b.reactEncryptedOnce(ctx, roomID, ev.EventID) { b.recordSkip(ctx, ev, degradeEncrypted) } return } mc, ok := ev.DecodeMessage() if !ok { // A message addressed to the bot that we can't decode shouldn't vanish without // a trace (no silent drops): log at WARN so it's visible at the default level. b.log.WarnContext(ctx, "skip: undecodable message content", "room", roomID, "sender", ev.Sender, "id", ev.EventID) return } // Edits re-carry m.mentions; never re-trigger or replay them (F16). if mc.IsReplace() { return } if ev.Sender == b.cfg.BotMXID { return // our own message (the reply is buffered when we send it, not on echo-back) } if mc.MsgType == "m.notice" { return // anti-loop: ignore notices (ours and other bots') } // Media / non-text is handled only once we know the message is addressed (below), // so a stray image in a group the bot isn't mentioned in stays silent (correct), // while media in a 1:1 or an @-mention gets a clear "text only" reaction. isMedia := mc.MsgType != "m.text" && mc.MsgType != "m.emote" countsKnown, isDM, foreign := b.ensureCounts(ctx, roomID) // Stay only in rooms hosted entirely on allowed servers — never operate in (or // "leak" the bot into) a federated room with non-consenting third parties. if foreign { b.leaveForeign(ctx, roomID) b.recordSkip(ctx, ev, degradeForeign) return } replyParentIsBot := mc.RelatesTo != nil && mc.RelatesTo.InReplyTo != nil && b.botSent.Has(mc.RelatesTo.InReplyTo.EventID) mentioned := mentionsBot(mc, b.cfg.BotMXID, replyParentIsBot) if !(isDM || mentioned) { if !countsKnown { // We couldn't classify the room (member probe failed) and the message isn't // an explicit mention, so we can't tell a 1:1 (answer everything) from a // group (answer only on mention). Log at WARN — not a silent Debug drop — // so it's visible; we don't react because reacting in a group the bot isn't // addressed in would be wrong. Re-probed on the next message. b.log.WarnContext(ctx, "skip: room unclassified (member probe failed), message not an explicit mention", "room", roomID, "sender", ev.Sender) } else { b.log.DebugContext(ctx, "skip: not addressed (group without mention)", "room", roomID, "sender", ev.Sender, "dm", isDM, "mentioned", mentioned) } return } // Addressed but not text: react "text only" (no silent drop). if isMedia { b.log.DebugContext(ctx, "skip: non-text msgtype (reacted)", "room", roomID, "sender", ev.Sender, "msgtype", mc.MsgType) b.react(ctx, roomID, ev.EventID, reactMedia) b.recordSkip(ctx, ev, degradeMedia) return } // Per-room single-flight: while a generation is in flight for this room, drop // further messages (like a chat — no queue, no new session, no token burn). No // reaction here: the typing indicator is already showing for this room, which is // the language-free "I'm busy" signal. The claim is taken here, synchronously and // in transaction order, so the FIRST message for a room wins and later ones are // dropped until release — never the reverse. // Resolve which conversation this turn belongs to: an existing thread (continue it), // a freshly rooted DM thread (auto-thread, DM-only), or the main timeline (""). // Groups never auto-thread; see resolveThreadRoot. threadRoot := b.resolveThreadRoot(isDM, ev, mc) // Per-(room,thread) single-flight: a slow answer in one conversation no longer blocks // another thread in the same room, and two messages in the SAME conversation still see // exactly one winner. The claim is taken synchronously in transaction order. if !b.tryClaim(roomID, threadRoot) { b.log.DebugContext(ctx, "drop: conversation busy generating", "room", roomID, "thread", threadRoot, "sender", ev.Sender) return } // Snapshot THIS conversation's history (excludes this trigger) under the claim, then // run the slow generation in its own goroutine so this transaction's remaining events // and other rooms/threads are not blocked by the xAI call. respond appends the // trigger+answer to the same per-thread buffer on success (see sendReply) and releases // the claim. history := b.snapshotBuf(roomID, threadRoot) b.safego(ctx, "respond", func() { defer b.release(roomID, threadRoot) b.respond(ctx, roomID, threadRoot, isDM, ev, mc, history) }) } // unlimitedCap is the effective per-user cap for UNLIMITED_USERS — high enough to // never trip the per-user gate, while the global DAILY_USD_CEILING still applies. const unlimitedCap = 1 << 30 func (b *Bot) respond(ctx context.Context, roomID, threadRoot string, isDM bool, ev *Event, mc *MessageContent, history []bufferedMsg) { started := time.Now() // One telemetry row per request, populated as the flow decides its outcome and // emitted once via defer — so every exit (deny, error, empty, paid silence, success) // is recorded without scattering writes (F-FUNC-5). It starts as route=none/ok=false; // proceeding to the model sets the route, success sets ok=true. rl := RequestLog{ ID: ev.EventID, RoomID: roomID, Sender: ev.Sender, Route: routeNone, RouterSource: "default", PromptVersion: b.promptVersion, QueryText: mc.Body, Models: map[string]string{"final": b.cfg.XAIModel}, } defer func() { rl.LatencyMS = int(time.Since(started).Milliseconds()) b.recordTelemetry(ctx, rl) }() perUserCap := b.cfg.PerUserDailyCap perUserUSD := b.cfg.PerUserDailyUSD if b.cfg.UnlimitedUsers[ev.Sender] { perUserCap = unlimitedCap perUserUSD = 0 // exempt from both per-user gates; the global ceiling still applies } // Reserve the route's estimated max-cost (not $0) so the global ceiling counts // this in-flight call BEFORE it returns — the TOCTOU fix (§8.1). The envelope covers // the most expensive ENABLED route, so whichever the router picks is admitted within // the reservation; with the cascade off it is exactly grok_direct's estimate. estimate := b.reserveEstimate() switch res, err := b.st.Reserve(ev.Sender, perUserCap, perUserUSD, b.cfg.DailyUSDCeiling, estimate); { case err != nil: // A limiter failure is on our side — don't leave the user wondering. b.log.ErrorContext(ctx, "limiter reserve failed", "sender", ev.Sender, "err", err) rl.Degraded, rl.Err = degradeReserveErr, err.Error() b.react(ctx, roomID, ev.EventID, reactError) return case res == reserveDeniedUser: // Per-user cap (anti-abuse, F24): stop answering, but always signal the limit — // no message addressed to the bot is left without feedback. b.log.InfoContext(ctx, "per-user daily cap reached; reacting", "sender", ev.Sender) rl.PerUserCapHit = true b.react(ctx, roomID, ev.EventID, reactRateLimit) return case res == reserveDeniedGlobal: // Global USD ceiling. A reaction is cheap and non-intrusive (unlike the old // once-per-day text notice), so signal every affected message rather than // going silent after the first. b.log.WarnContext(ctx, "global daily USD ceiling reached", "room", roomID, "sender", ev.Sender) rl.CeilingHit = true b.react(ctx, roomID, ev.EventID, reactRateLimit) return } // Past admission, a reservation + request slot are held. Guarantee they're freed on // ANY exit that didn't settle — including a panic in generate() (recovered by safego) // — so a leaked reservation can never permanently drift the global ceiling down // (§8.1c). The normal paths set settled=true at Settle, so this defer then no-ops; it // fires only on the panic/unexpected-return path, where it also reacts so the failure // isn't silent. settled := false defer func() { if settled { return } rl.Degraded, rl.Err = "panic", "generation panicked or returned without settling" if rerr := b.st.ReleaseReservation(ev.Sender, estimate); rerr != nil { b.log.ErrorContext(ctx, "release reservation (unsettled) failed", "sender", ev.Sender, "err", rerr) } if rerr := b.st.RefundRequest(ev.Sender); rerr != nil { b.log.ErrorContext(ctx, "refund (unsettled) failed", "sender", ev.Sender, "err", rerr) } b.react(ctx, roomID, ev.EventID, reactError) }() // Show "Vojo AI печатает…" for the whole generation. The keepalive refreshes the // typing notification every 20s (the server expires it after 30s) so the indicator // never lapses on a slow/retried answer, and the deferred stop clears it on exit. stopTyping := b.startTypingKeepalive(ctx, roomID) defer stopTyping() // Overall per-request deadline (§8.2.2): every model call in the cascade shares this // single budget (genCtx), so a multi-stage route can't accrete minutes the way // per-stage 3×60s retries would. react/send/store ops use the live room ctx, NOT // genCtx, so a budget timeout still surfaces as a ⚠️ react, never silence. genCtx, cancel := context.WithTimeout(ctx, b.cfg.RequestBudget) defer cancel() msgs := buildContext(b.cfg.SystemPrompt, history, isDM, mc.Body, b.cfg.MaxCtxEvent, maxPromptTokens) res, err := b.generate(genCtx, mc.Body, msgs, b.convID(roomID, threadRoot)) // Record what the routing + generation actually did, whatever the outcome. rl.Route = res.route rl.RouterSource = res.decision.Source rl.RouterConfidence = res.decision.Confidence rl.FallbackFired = res.fallback rl.Escalated = res.route == routeReason rl.Cost = res.cost if res.stageMS != nil { rl.StageMS = res.stageMS } if res.finalModel != "" { rl.Models["final"] = res.finalModel } if res.decision.Source == "classifier" { rl.Models["router"] = b.cfg.GeminiModel } if res.degraded != "" { rl.Degraded = res.degraded } // The full routing/generation picture for one request, in one line: which route ran, // whether it was a fallback, the degrade reason (if any), the per-stage timings and // the spend. At DEBUG so prod stays quiet; turn LOG_LEVEL=debug on to trace routing. b.log.DebugContext(ctx, "generation outcome", "route", res.route, "router_source", res.decision.Source, "router_confidence", res.decision.Confidence, "fallback", res.fallback, "degraded", res.degraded, "stage_ms", res.stageMS, "usd", res.cost.Total()) if err != nil { // Terminal: even grok_direct failed. Settle whatever the cascade ACTUALLY spent // (e.g. a paid web fetch before the failure) and release the rest of the // reservation in one step, then refund the request slot so an outage doesn't burn // the cap, and react (never silent). Settle with an all-zero cost is just a // release, so a pure grok_direct failure books nothing — exactly as before. b.log.ErrorContext(ctx, "generation failed", "sender", ev.Sender, "route", res.route, "err", err) rl.Err = err.Error() if serr := b.st.Settle(ev.Sender, estimate, res.cost); serr != nil { b.log.ErrorContext(ctx, "settle (failed request) failed", "sender", ev.Sender, "err", serr) } settled = true if rerr := b.st.RefundRequest(ev.Sender); rerr != nil { b.log.ErrorContext(ctx, "refund failed", "sender", ev.Sender, "err", rerr) } b.react(ctx, roomID, ev.EventID, reactError) return } // Success from some route. Settle: release the reservation and book the real // per-component cost, so both caps see grounding/tool fees too — not just tokens. if err := b.st.Settle(ev.Sender, estimate, res.cost); err != nil { b.log.ErrorContext(ctx, "settle spend failed", "sender", ev.Sender, "err", err) } settled = true rl.PromptTokens, rl.CachedTokens, rl.CompletionTokens = res.usage.PromptTokens, res.usage.CachedTokens, res.usage.CompletionTokens rl.CacheHit = res.usage.CachedTokens > 0 rl.ProviderRequestID = res.providerID text := res.text if text == "" { // Billed but no usable text (content filter / length cap / empty choices). Never // leave a billed request without feedback — react "couldn't answer". The slot // stays consumed (the 2xx was real); no refund, or an empty reply could be forced // to dodge the cap. b.log.WarnContext(ctx, "empty completion (billed, reacting)", "sender", ev.Sender, "usd", res.cost.Total()) rl.Degraded = degradeEmpty b.react(ctx, roomID, ev.EventID, reactError) return } b.log.InfoContext(ctx, "answered", "room", roomID, "sender", ev.Sender, "dm", isDM, "route", res.route, "usd", res.cost.Total(), "prompt_tokens", res.usage.PromptTokens, "completion_tokens", res.usage.CompletionTokens) if err := b.sendReply(ctx, roomID, threadRoot, ev, mc, text); err != nil { // Paid silence (§8.1): the spend is real (USD is kept — refunding it would // under-count the ceiling), but the reply never landed. Refund the request SLOT // so the user can retry, and react ⚠️ so the failure isn't silent. b.log.ErrorContext(ctx, "send reply failed after billing; refunding slot + reacting", "sender", ev.Sender, "err", err) rl.Degraded, rl.Err = degradeSendFailed, err.Error() if rerr := b.st.RefundRequest(ev.Sender); rerr != nil { b.log.ErrorContext(ctx, "refund failed", "sender", ev.Sender, "err", rerr) } b.react(ctx, roomID, ev.EventID, reactError) return } rl.OK = true } // maxPromptTokens bounds the assembled prompt (history is trimmed to fit) and feeds // the reservation estimate, so the two never disagree about a request's size. const maxPromptTokens = 8000 // estimateUSD is the conservative max-cost reserved for a route before the call, so // the global ceiling can count an in-flight request (§8.1). It prices a full prompt // (maxPromptTokens) plus the max output at the model's non-cached rates — an upper-ish // bound, since real calls send fewer tokens and get the cheaper cached rate. Settle // later books the authoritative actual cost regardless, so a slightly-off estimate // only nudges admission, never the final accounting. func (b *Bot) estimateUSD(model string) float64 { p := b.cfg.priceFor(model) return float64(maxPromptTokens)/1e6*p.InputPerM + float64(b.cfg.MaxOutTok)/1e6*p.OutputPerM } // convID returns the prompt-cache routing hint sent as x-grok-conv-id, or "" when // GROK_PROMPT_CACHE is off. Grok caches prompt prefixes automatically; the header // only pins a conversation to the same backend to raise the hit rate (docs.x.ai). The // right unit is the (room,thread) pair, not the room: once conversations are threaded, // each thread has its OWN system+history prefix, so pinning all of a room's threads to // one id would thrash the cache between divergent prefixes. The main timeline keys on // roomID alone (threadRoot ""), preserving the previous value for the flat case. Carries // no PII (ids are opaque) and is hashed to stay compact and non-identifying. func (b *Bot) convID(roomID, threadRoot string) string { if !b.cfg.GrokPromptCache { return "" } key := roomID if threadRoot != "" { key = roomID + "|" + threadRoot } return fmt.Sprintf("vojo-%08x", hashString(key)) } // computeUSD prices a call from the API-returned token usage (authoritative // counts) and the per-model price table — so the hard ceiling tracks real usage // even if the model/price changes (only the price table needs updating), and a // call books at the price of the model that actually served it. func computeUSD(model string, u Usage, cfg *Config) float64 { p := cfg.priceFor(model) nonCached := u.PromptTokens - u.CachedTokens if nonCached < 0 { nonCached = 0 } return float64(nonCached)/1e6*p.InputPerM + float64(u.CachedTokens)/1e6*p.CachedPerM + float64(u.CompletionTokens)/1e6*p.OutputPerM } // react adds an emoji m.reaction to the triggering event — the bot's language-free // way to signal a system state (error / rate limit / encrypted / media) it can't // express as a model-generated answer. Best-effort: a failed reaction is logged, not // retried. Reactions are m.reaction (not m.room.message), so they never re-enter // handleMessage and need no anti-loop tracking. func (b *Bot) react(ctx context.Context, roomID, eventID, emoji string) { content := map[string]any{ "m.relates_to": map[string]any{ "rel_type": "m.annotation", "event_id": eventID, "key": emoji, }, } if _, err := b.mx.SendEvent(ctx, roomID, "m.reaction", content); err != nil { b.log.ErrorContext(ctx, "react failed", "room", roomID, "emoji", emoji, "err", err) } } // reactEncryptedOnce reacts 🔒 to the first message seen in an encrypted room and // records a durable flag so a restart doesn't re-react (F5). Vojo disables E2EE by // default, so this is a near-dead safety path; the reaction is far less intrusive // than the old text notice, but the once-gate keeps it from annotating every message // in the rare encrypted room. // reactEncryptedOnce returns whether it reacted this call (true only the first time // for a room), so the caller can log the skip exactly once too. func (b *Bot) reactEncryptedOnce(ctx context.Context, roomID, eventID string) bool { warned, err := b.st.HasWarnedEncrypted(roomID) if err != nil { b.log.ErrorContext(ctx, "warned-flag read failed", "room", roomID, "err", err) return false } if warned { return false } b.react(ctx, roomID, eventID, reactEncrypted) if err := b.st.SetWarnedEncrypted(roomID); err != nil { b.log.ErrorContext(ctx, "persist warned-flag failed", "room", roomID, "err", err) } return true } // sendReply sends the model's actual answer and records the completed exchange in the // conversation buffer so the next turn has context. It RETURNS the send error so the // caller can handle paid silence (§8.1): a billed answer that failed to deliver must // refund the slot and react, not vanish. func (b *Bot) sendReply(ctx context.Context, roomID, threadRoot string, trigger *Event, triggerMC *MessageContent, body string) error { if err := b.sendMessage(ctx, roomID, threadRoot, trigger, triggerMC, body); err != nil { return err } // Record the user trigger AND the assistant answer together, only AFTER the answer // was sent, so a failed or empty generation never leaves a dangling user turn (a // question with no reply) in the buffer — which would skew later completions. Both go // to THIS conversation's buffer (roomID,threadRoot). Per-(room,thread) single-flight // guarantees no other turn for this conversation interleaves between the two. b.appendBuf(roomID, threadRoot, bufferedMsg{sender: trigger.Sender, body: triggerMC.Body, isBot: false}) b.appendBuf(roomID, threadRoot, bufferedMsg{sender: b.cfg.BotMXID, body: body, isBot: true}) return nil } // sendMessage builds and sends an m.notice reply and tracks our own event id. Returns // the send error (nil on success) so the caller can detect a failed delivery. func (b *Bot) sendMessage(ctx context.Context, roomID, threadRoot string, trigger *Event, triggerMC *MessageContent, body string) error { content := buildNoticeContent(trigger.EventID, trigger.Sender, threadRoot, body) id, err := b.mx.SendEvent(ctx, roomID, "m.room.message", content) if err != nil { b.log.ErrorContext(ctx, "send failed", "room", roomID, "err", err) return err } // Track our own reply so a future reply-to-it is recognised as addressing us. b.botSent.Add(id) return nil } // startTypingKeepalive shows the room-level "typing…" indicator for the whole generation // and keeps it alive (the CS-API notification expires after the 30s we pass, so we refresh // every 20s). The returned stop is safe to call once via defer. Typing is ROOM-scoped in // Matrix — there is no per-thread typing — so with per-(room,thread) concurrency several // generations can run for one room at once. A per-room refcount keeps the indicator on // until the LAST of them finishes, rather than letting whichever finishes first clear it // out from under the others. Best-effort UX — failures are non-fatal. func (b *Bot) startTypingKeepalive(ctx context.Context, roomID string) func() { b.typingAcquire(roomID) b.setTyping(ctx, roomID, true) done := make(chan struct{}) go func() { t := time.NewTicker(20 * time.Second) defer t.Stop() for { select { case <-ctx.Done(): return case <-done: return case <-t.C: b.setTyping(ctx, roomID, true) } } }() var once sync.Once return func() { once.Do(func() { close(done) // Only the last in-flight generation for the room clears the indicator; the // others merely stop their own keepalive loop (a forgetRoom mid-flight can drop // the counter to <=0, which the guard treats as "last" — still correct). if b.typingRelease(roomID) { b.setTyping(ctx, roomID, false) } }) } } // typingAcquire registers one in-flight generation against the room-level typing indicator. // The increment is the only state change; the caller (re)asserts the indicator on regardless, // since re-sending typing=true is idempotent and refreshes the server-side 30s timeout. func (b *Bot) typingAcquire(roomID string) { b.mu.Lock() b.typingRefs[roomID]++ b.mu.Unlock() } // typingRelease drops one in-flight generation and reports whether it was the LAST one for // the room (refcount fell to <=0), so the caller clears the indicator only then. A forgetRoom // that already deleted the key leaves a missing entry: the decrement reads 0 and lands at -1 // (<=0), so the guard still treats it as "last" and the negative entry is deleted — no leak. func (b *Bot) typingRelease(roomID string) (last bool) { b.mu.Lock() defer b.mu.Unlock() b.typingRefs[roomID]-- last = b.typingRefs[roomID] <= 0 if last { delete(b.typingRefs, roomID) } return last } // setTyping sets/clears the bot's typing indicator (best-effort UX; failures are // non-fatal). The 30s server-side timeout is refreshed by startTypingKeepalive. func (b *Bot) setTyping(ctx context.Context, roomID string, typing bool) { if err := b.mx.SendTyping(ctx, roomID, typing, 30000); err != nil { b.log.DebugContext(ctx, "set typing failed", "room", roomID, "typing", typing, "err", err) } } // buildNoticeContent builds the reply. m.notice (not m.text) so the anti-loop // skip catches our own output. threadRoot decides where the answer lands: when non-empty // the answer carries an m.thread relation rooted there (F27) — either replying inside an // existing thread or auto-rooting a NEW DM conversation on the trigger. The caller's // resolveThreadRoot makes that choice and is DM-gated, so a group answer never gets a // thread relation it didn't already have. When empty the answer is a plain top-level // reply (groups, and DMs with conversations off). func buildNoticeContent(replyTo, sender, threadRoot, body string) map[string]any { relates := map[string]any{} if threadRoot != "" { relates["rel_type"] = "m.thread" relates["event_id"] = threadRoot relates["is_falling_back"] = true relates["m.in_reply_to"] = map[string]any{"event_id": replyTo} } else { relates["m.in_reply_to"] = map[string]any{"event_id": replyTo} } content := map[string]any{ "msgtype": "m.notice", "body": body, "m.mentions": map[string]any{"user_ids": []string{sender}}, "m.relates_to": relates, } // The model answers in markdown; render it to org.matrix.custom.html so clients // show formatting instead of raw `**`, `#`, lists, code fences. Only attach // formatted_body when there's actual formatting — a plain answer keeps rendering // from `body` exactly as before. if html, formatted := markdownToHTML(body); formatted { content["format"] = matrixHTMLFormat content["formatted_body"] = html } return content } // --- per-(room,thread) single-flight --------------------------------------------- // resolveThreadRoot decides which conversation a trigger belongs to, returning the thread // root event id, or "" for the main timeline. Order: (1) a trigger already inside a thread // continues that thread; (2) in a 1:1 DM, a top-level message roots a NEW conversation on // itself (ChatGPT-style "new chat"); (3) otherwise — EVERY group message — the main timeline // (""). This is the single place auto-threading is decided and it is hard-gated on isDM, so // a group is NEVER auto-threaded (the group gate is structural, not a flag). buildNoticeContent // emits the matching m.thread relation for the same value, so the conversation we serialize on // is always the one the answer lands in. func (b *Bot) resolveThreadRoot(isDM bool, ev *Event, mc *MessageContent) string { if mc.RelatesTo != nil && mc.RelatesTo.RelType == "m.thread" && mc.RelatesTo.EventID != "" { return mc.RelatesTo.EventID } if isDM { return ev.EventID } return "" } // tryClaim marks a (room,thread) conversation as generating and returns true if the caller // won the claim (nothing was already in flight for that exact conversation). The loser // drops its message. Different threads of one room claim independently — a slow answer in // one conversation never blocks another. The check-and-set is atomic under b.mu (one map, // no per-thread mutex), so there is no lazy-lock TOCTOU. func (b *Bot) tryClaim(roomID, threadRoot string) bool { b.mu.Lock() defer b.mu.Unlock() m := b.inflight[roomID] if m == nil { m = make(map[string]bool) b.inflight[roomID] = m } if m[threadRoot] { return false } m[threadRoot] = true return true } func (b *Bot) release(roomID, threadRoot string) { b.mu.Lock() defer b.mu.Unlock() if m := b.inflight[roomID]; m != nil { delete(m, threadRoot) if len(m) == 0 { delete(b.inflight, roomID) } } } // --- per-room metadata helpers (all guarded by b.mu; probes run outside it) ----- // getMetaLocked returns (creating if needed) the room's meta. Caller MUST hold b.mu. func (b *Bot) getMetaLocked(roomID string) *roomMeta { m := b.meta[roomID] if m == nil { m = &roomMeta{} b.meta[roomID] = m } return m } func (b *Bot) invalidateCounts(roomID string) { b.mu.Lock() defer b.mu.Unlock() if m := b.meta[roomID]; m != nil { m.countsKnown = false } } func (b *Bot) setEncrypted(roomID string) { b.mu.Lock() defer b.mu.Unlock() m := b.getMetaLocked(roomID) m.encrypted, m.encKnown = true, true } func (b *Bot) forgetRoom(roomID string) { b.mu.Lock() defer b.mu.Unlock() delete(b.meta, roomID) delete(b.buf, roomID) // drops the room's whole per-thread subtree in one delete delete(b.inflight, roomID) // (nested maps keyed by roomID keep forgetRoom O(1)) delete(b.typingRefs, roomID) } // ensureEncryption returns whether the room is encrypted, probing the CS-API once // (without holding the lock) and caching the result. On probe error it returns false // (treated as not-encrypted this round) and leaves the state unknown for a re-probe. func (b *Bot) ensureEncryption(ctx context.Context, roomID string) bool { b.mu.Lock() if m := b.getMetaLocked(roomID); m.encKnown { enc := m.encrypted b.mu.Unlock() return enc } b.mu.Unlock() enc, err := b.mx.RoomEncrypted(ctx, roomID) if err != nil { b.log.WarnContext(ctx, "encryption probe failed", "room", roomID, "err", err) return false // leave unknown; re-probed on the next message } // Re-fetch under the lock instead of writing to the pointer captured before the // unlocked probe: if the room was forgotten (leave/ban) mid-probe its meta was // deleted, and writing to the captured pointer would resurrect a dead room. b.mu.Lock() if m := b.meta[roomID]; m != nil { m.encrypted, m.encKnown = enc, true } b.mu.Unlock() return enc } // ensureCounts returns (countsKnown, isDM, foreign), probing /members once (without // holding the lock) and caching the result. On probe error it returns // (false, false, false): the caller treats an unclassified room conservatively and // logs a visible WARN rather than silently dropping. func (b *Bot) ensureCounts(ctx context.Context, roomID string) (countsKnown, isDM, foreign bool) { b.mu.Lock() known := b.getMetaLocked(roomID).countsKnown b.mu.Unlock() if !known { joined, invited, servers, err := b.mx.RoomMembership(ctx, roomID) if err != nil { b.log.WarnContext(ctx, "member probe failed", "room", roomID, "err", err) return false, false, false } isForeign := false for s := range servers { if !b.cfg.AllowedServers[s] { isForeign = true break } } // Re-fetch under the lock rather than writing a pointer captured before the // unlocked /members probe (see ensureEncryption): a leave/ban mid-probe must // not be undone by resurrecting the room's meta. b.mu.Lock() if m := b.meta[roomID]; m != nil { m.joined, m.invited, m.foreign, m.countsKnown = joined, invited, isForeign, true } b.mu.Unlock() } b.mu.Lock() defer b.mu.Unlock() if m := b.meta[roomID]; m != nil { return m.countsKnown, m.isDM(), m.foreign } return false, false, false } // convBuf is one conversation's (one thread's) rolling context window plus a last-touch // stamp used for LRU eviction. Without eviction, a long-lived control DM that spawns many // ChatGPT-style conversations would accumulate one buffer per conversation for the whole // process lifetime (forgetRoom only frees them on room leave/ban). type convBuf struct { msgs []bufferedMsg touched uint64 // b.bufSeq at last append; higher = more recent } // maxConvBuffersPerRoom bounds how many conversation buffers a single room retains. A // human DM rarely keeps this many live conversations at once; an evicted cold conversation // just rebuilds its window from scratch on its next turn (same as after a restart), so the // only cost of eviction is a one-turn cold start, never a wrong answer. const maxConvBuffersPerRoom = 64 func (b *Bot) snapshotBuf(roomID, threadRoot string) []bufferedMsg { b.mu.Lock() defer b.mu.Unlock() // A never-seen conversation has no buffer (nil inner map / nil convBuf) → nil history, // exactly what a fresh "new chat" wants (context = system + trigger). cb := b.buf[roomID][threadRoot] if cb == nil || len(cb.msgs) == 0 { return nil } out := make([]bufferedMsg, len(cb.msgs)) copy(out, cb.msgs) return out } func (b *Bot) appendBuf(roomID, threadRoot string, msg bufferedMsg) { b.mu.Lock() defer b.mu.Unlock() limit := b.cfg.MaxCtxEvent * 2 if limit < 8 { limit = 8 } m := b.buf[roomID] if m == nil { m = make(map[string]*convBuf) b.buf[roomID] = m } cb := m[threadRoot] if cb == nil { cb = &convBuf{} m[threadRoot] = cb } cb.msgs = append(cb.msgs, msg) if len(cb.msgs) > limit { cb.msgs = cb.msgs[len(cb.msgs)-limit:] } b.bufSeq++ cb.touched = b.bufSeq // LRU-evict the least-recently-touched conversation once the room exceeds the cap. The // just-touched conversation has the highest `touched`, so it is never the victim. if len(m) > maxConvBuffersPerRoom { var victim string var oldest uint64 for k, v := range m { if victim == "" || v.touched < oldest { victim, oldest = k, v.touched } } delete(m, victim) } }