vojo/apps/ai-bot/bot.go

993 lines
40 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package main
import (
"context"
"fmt"
"log/slog"
"sync"
"sync/atomic"
"time"
)
// roomMeta caches per-room classification we need to handle a message: member
// counts (for the 1:1 test, F3), whether any member is outside ALLOWED_SERVERS,
// and encryption state (F15). Lazily fetched from the CS-API on first need
// (appservice transactions carry no room summary) and INVALIDATED whenever a
// third party's membership changes, so a 1:1 that gains a member is reclassified
// out of DM mode (no DM-mode third-party leak) and a newly added foreign member
// is caught. All fields are guarded by Bot.mu — never read or written without it,
// because the slow generation and the lazy CS-API probes run in concurrent per-room
// goroutines.
type roomMeta struct {
joined, invited int
countsKnown bool
foreign bool // a joined/invited member is outside ALLOWED_SERVERS
encrypted, encKnown bool
}
func (m *roomMeta) isDM() bool { return m.countsKnown && m.joined+m.invited == 2 }
type Bot struct {
cfg *Config
log *slog.Logger
mx *MatrixClient
llm LLMClient
st *Store
// gemini is the cheap chat backend for the trivial route and the Layer-1 classifier
// (an LLMClient so tests can fake it); nil unless a layer that uses it is enabled.
// web is the web-freshness provider, built only when WEB_ENABLED. Both nil → the
// cascade can only ever produce grok_direct.
gemini LLMClient
web WebProvider
// promptVersion is a short stable hash of the system prompt, logged with each
// request so prompt changes are visible in the analytics (A/B + regressions).
promptVersion string
// telemetryWrites paces the retention trim (every telemetryTrimEvery writes).
telemetryWrites atomic.Uint64
// mu guards the in-memory maps/sets below. Each transaction is acked to Synapse
// immediately (appservice.go) and its events are processed in transaction order,
// but the slow xAI generation runs in a per-room goroutine and the lazy probes run
// off-lock, so several goroutines touch this shared state at once. mu is held only
// for short map operations and is NEVER held across a network or xAI call — that
// head-of-line hold was the root cause of the multi-minute silence.
mu sync.Mutex
seen *lruSet // event ids already handled (dedup within a session; self-locking)
botSent *lruSet // event ids the bot itself sent (reply-parent detection; self-locking)
meta map[string]*roomMeta
// buf and inflight are keyed by roomID THEN by thread root ("" = main timeline), so
// each conversation (a thread) keeps an isolated context window and an independent
// single-flight claim. Two messages in different threads of one room no longer block
// each other or pollute each other's history. roomMeta stays room-level (membership
// /encryption are room facts). forgetRoom drops a room's whole subtree in one delete.
buf map[string]map[string]*convBuf
inflight map[string]map[string]bool
// bufSeq is a monotonic counter stamped onto a convBuf on every append (guarded by mu);
// it orders conversations by last activity so appendBuf can LRU-evict the coldest one
// when a room exceeds maxConvBuffersPerRoom.
bufSeq uint64
// typingRefs counts in-flight generations per ROOM that are currently showing the
// "typing…" indicator. Matrix typing notifications are room-scoped (no thread_id in
// the CS-API), so the indicator can't be split per thread: it goes on when the first
// generation in a room starts and off only when the last one finishes (refcount).
typingRefs map[string]int
}
func NewBot(ctx context.Context, cfg *Config, logger *slog.Logger) (*Bot, error) {
mx := NewMatrixClient(cfg.HomeserverURL, cfg.ASToken, cfg.BotMXID)
llm := NewXAIClient(cfg.XAIBaseURL, cfg.XAIAPIKey, logger)
st, err := OpenStore(cfg.DatabaseURL)
if err != nil {
return nil, err
}
b := &Bot{
cfg: cfg,
log: logger,
mx: mx,
llm: llm,
st: st,
promptVersion: fmt.Sprintf("%08x", hashString(cfg.SystemPrompt)),
seen: newLRUSet(5000),
botSent: newLRUSet(5000),
meta: make(map[string]*roomMeta),
buf: make(map[string]map[string]*convBuf),
inflight: make(map[string]map[string]bool),
typingRefs: make(map[string]int),
}
// Build the cascade backends only for enabled layers (config already fail-fast
// validated that the keys exist). With every cascade flag off these stay nil and
// generate() can only produce grok_direct — today's bot. The grounding web provider
// needs the concrete client (for the native generateContent call), so keep a typed
// handle alongside the LLMClient face.
var gc *geminiClient
if cfg.needsGemini() {
gc = NewGeminiClient(cfg.GeminiBaseURL, cfg.GeminiAPIKey, cfg.GeminiModel, logger)
b.gemini = gc
}
if cfg.WebEnabled {
if cfg.WebProvider == webProviderGeminiGrounding {
b.web = &geminiGrounding{gem: gc, st: st, cfg: cfg}
} else {
b.web = newGrokWebSearch(cfg, logger)
}
}
// Confirm the as_token + user_id resolves to BOT_MXID before serving.
if err := b.verifyIdentity(ctx); err != nil {
st.Close()
return nil, err
}
// F23: ensure the profile has a display name (best-effort, idempotent).
if err := mx.SetDisplayName(ctx, cfg.BotDisplayName); err != nil {
logger.Warn("set display name failed (non-fatal)", "err", err)
}
return b, nil
}
func (b *Bot) Close() {
if b.st != nil {
_ = b.st.Close()
}
}
func (b *Bot) verifyIdentity(ctx context.Context) error {
who, err := b.mx.Whoami(ctx)
if err != nil {
return err
}
if who != b.cfg.BotMXID {
return fmt.Errorf("as_token resolves to %q but BOT_MXID is %q", who, b.cfg.BotMXID)
}
b.log.Info("authenticated", "mxid", who)
return nil
}
// Run starts the appservice transaction server and blocks until ctx is cancelled.
func (b *Bot) Run(ctx context.Context) error {
as := NewAppService(b.cfg, b.log, b.st, b.handleTransaction)
return as.Serve(ctx)
}
// handleTransaction processes one already-acked transaction's events. It runs in a
// background goroutine (the 200 has already been returned to Synapse). Events are
// processed IN ORDER (dedup + classification are synchronous) so the per-room
// single-flight claim is taken in arrival order; only the slow xAI generation is
// spawned per room, so different rooms answer concurrently and a slow call never
// blocks the next event or another room. Ordering within a room is therefore kept,
// while the head-of-line freeze is gone.
func (b *Bot) handleTransaction(ctx context.Context, events []Event) {
for i := range events {
b.handleEvent(ctx, &events[i])
}
}
// safego runs fn in a goroutine with panic recovery. The slow per-room work is
// detached from the HTTP handler, so an unrecovered panic there would crash the
// whole process and silence the bot for EVERY room — recover + log instead, so one
// malformed event can never take the bot down.
func (b *Bot) safego(what string, fn func()) {
go func() {
defer func() {
if r := recover(); r != nil {
b.log.Error("recovered panic in handler goroutine", "what", what, "panic", r)
}
}()
fn()
}()
}
func (b *Bot) handleEvent(ctx context.Context, ev *Event) {
if ev.EventID == "" || ev.RoomID == "" {
return
}
if !b.markSeen(ev.EventID) {
return // already handled (in-memory or durable dedup)
}
b.log.Debug("event", "type", ev.Type, "room", ev.RoomID, "sender", ev.Sender, "id", ev.EventID)
switch ev.Type {
case "m.room.member":
if ev.StateKey != nil && *ev.StateKey == b.cfg.BotMXID {
b.safego("self-membership", func() { b.handleSelfMembership(ctx, ev) })
} else {
// A third party's membership changed: counts + foreign flag are now stale.
// Re-probe on the next message so a 1:1 that gains a member drops out of DM
// mode (no third-party leak) and a new foreign member is caught.
b.invalidateCounts(ev.RoomID)
}
case "m.room.encryption":
b.setEncrypted(ev.RoomID)
case "m.room.message":
// Synchronous (in transaction order) up to the single-flight claim; only the
// slow generation inside handleMessage is spawned as a goroutine. This keeps
// per-room event order — the earlier message wins the claim — while different
// rooms still run concurrently.
b.handleMessage(ctx, ev)
}
}
// markSeen records an event id in both the in-memory set and the durable store and
// reports whether it is NEW (first time). The in-memory Add is atomic, and SeenEvent
// is an atomic INSERT … ON CONFLICT DO NOTHING, so two racing goroutines for the same
// event can never both proceed. On a durable-store error we fall through (the in-memory set still
// guards this session).
func (b *Bot) markSeen(eventID string) bool {
if !b.seen.Add(eventID) {
return false
}
isNew, err := b.st.SeenEvent(eventID)
if err != nil {
b.log.Error("durable dedup check failed", "id", eventID, "err", err)
return true
}
return isNew
}
// handleSelfMembership reacts to membership changes for the bot user: auto-join
// invites from allowed servers (F11), reject others, forget rooms we leave. Runs in
// its own goroutine because JoinRoom/LeaveRoom are network calls.
func (b *Bot) handleSelfMembership(ctx context.Context, ev *Event) {
switch ev.membershipOf() {
case "invite":
if !b.cfg.AllowedServers[serverOf(ev.Sender)] {
b.log.Warn("rejecting invite (server not allowed)", "room", ev.RoomID, "sender", ev.Sender)
if err := b.mx.LeaveRoom(ctx, ev.RoomID); err != nil {
b.log.Error("leave (reject) failed", "room", ev.RoomID, "err", err)
}
return
}
b.log.Info("accepting invite", "room", ev.RoomID, "sender", ev.Sender)
if err := b.mx.JoinRoom(ctx, ev.RoomID); err != nil {
b.log.Error("join failed", "room", ev.RoomID, "err", err)
return
}
// Fully-on-allowed-servers gate: a vojo.chat inviter can still pull the bot
// into a room that already holds federated third parties — leave at once.
if _, _, foreign := b.ensureCounts(ctx, ev.RoomID); foreign {
b.leaveForeign(ctx, ev.RoomID)
}
case "leave", "ban":
b.forgetRoom(ev.RoomID)
}
}
// leaveForeign leaves a room that contains a member outside ALLOWED_SERVERS, so
// the bot only ever operates in rooms hosted entirely on allowed homeservers.
func (b *Bot) leaveForeign(ctx context.Context, roomID string) {
b.log.Warn("leaving room — a member is outside ALLOWED_SERVERS", "room", roomID)
if err := b.mx.LeaveRoom(ctx, roomID); err != nil {
b.log.Error("leave (foreign) failed", "room", roomID, "err", err)
}
}
func (b *Bot) handleMessage(ctx context.Context, ev *Event) {
roomID := ev.RoomID
// A9/F15: re-check encryption; if (or once) encrypted, react once and skip — the
// bot can't read it. The probe runs without the lock.
if b.ensureEncryption(ctx, roomID) {
b.log.Debug("skip: encrypted room", "room", roomID)
// Log the skip only when we actually react (once per room), so an encrypted room
// the bot can't read doesn't flood request_log with one row per message.
if b.reactEncryptedOnce(ctx, roomID, ev.EventID) {
b.recordSkip(ev, degradeEncrypted)
}
return
}
mc, ok := ev.DecodeMessage()
if !ok {
// A message addressed to the bot that we can't decode shouldn't vanish without
// a trace (no silent drops): log at WARN so it's visible at the default level.
b.log.Warn("skip: undecodable message content", "room", roomID, "sender", ev.Sender, "id", ev.EventID)
return
}
// Edits re-carry m.mentions; never re-trigger or replay them (F16).
if mc.IsReplace() {
return
}
if ev.Sender == b.cfg.BotMXID {
return // our own message (the reply is buffered when we send it, not on echo-back)
}
if mc.MsgType == "m.notice" {
return // anti-loop: ignore notices (ours and other bots')
}
// Media / non-text is handled only once we know the message is addressed (below),
// so a stray image in a group the bot isn't mentioned in stays silent (correct),
// while media in a 1:1 or an @-mention gets a clear "text only" reaction.
isMedia := mc.MsgType != "m.text" && mc.MsgType != "m.emote"
countsKnown, isDM, foreign := b.ensureCounts(ctx, roomID)
// Stay only in rooms hosted entirely on allowed servers — never operate in (or
// "leak" the bot into) a federated room with non-consenting third parties.
if foreign {
b.leaveForeign(ctx, roomID)
b.recordSkip(ev, degradeForeign)
return
}
replyParentIsBot := mc.RelatesTo != nil && mc.RelatesTo.InReplyTo != nil &&
b.botSent.Has(mc.RelatesTo.InReplyTo.EventID)
mentioned := mentionsBot(mc, b.cfg.BotMXID, replyParentIsBot)
if !(isDM || mentioned) {
if !countsKnown {
// We couldn't classify the room (member probe failed) and the message isn't
// an explicit mention, so we can't tell a 1:1 (answer everything) from a
// group (answer only on mention). Log at WARN — not a silent Debug drop —
// so it's visible; we don't react because reacting in a group the bot isn't
// addressed in would be wrong. Re-probed on the next message.
b.log.Warn("skip: room unclassified (member probe failed), message not an explicit mention",
"room", roomID, "sender", ev.Sender)
} else {
b.log.Debug("skip: not addressed (group without mention)", "room", roomID, "sender", ev.Sender,
"dm", isDM, "mentioned", mentioned)
}
return
}
// Addressed but not text: react "text only" (no silent drop).
if isMedia {
b.log.Debug("skip: non-text msgtype (reacted)", "room", roomID, "sender", ev.Sender, "msgtype", mc.MsgType)
b.react(ctx, roomID, ev.EventID, reactMedia)
b.recordSkip(ev, degradeMedia)
return
}
// Per-room single-flight: while a generation is in flight for this room, drop
// further messages (like a chat — no queue, no new session, no token burn). No
// reaction here: the typing indicator is already showing for this room, which is
// the language-free "I'm busy" signal. The claim is taken here, synchronously and
// in transaction order, so the FIRST message for a room wins and later ones are
// dropped until release — never the reverse.
// Resolve which conversation this turn belongs to: an existing thread (continue it),
// a freshly rooted DM thread (auto-thread, DM-only), or the main timeline ("").
// Groups never auto-thread; see resolveThreadRoot.
threadRoot := b.resolveThreadRoot(isDM, ev, mc)
// Per-(room,thread) single-flight: a slow answer in one conversation no longer blocks
// another thread in the same room, and two messages in the SAME conversation still see
// exactly one winner. The claim is taken synchronously in transaction order.
if !b.tryClaim(roomID, threadRoot) {
b.log.Debug("drop: conversation busy generating", "room", roomID, "thread", threadRoot, "sender", ev.Sender)
return
}
// Snapshot THIS conversation's history (excludes this trigger) under the claim, then
// run the slow generation in its own goroutine so this transaction's remaining events
// and other rooms/threads are not blocked by the xAI call. respond appends the
// trigger+answer to the same per-thread buffer on success (see sendReply) and releases
// the claim.
history := b.snapshotBuf(roomID, threadRoot)
b.safego("respond", func() {
defer b.release(roomID, threadRoot)
b.respond(ctx, roomID, threadRoot, isDM, ev, mc, history)
})
}
// unlimitedCap is the effective per-user cap for UNLIMITED_USERS — high enough to
// never trip the per-user gate, while the global DAILY_USD_CEILING still applies.
const unlimitedCap = 1 << 30
func (b *Bot) respond(ctx context.Context, roomID, threadRoot string, isDM bool, ev *Event, mc *MessageContent, history []bufferedMsg) {
started := time.Now()
// One telemetry row per request, populated as the flow decides its outcome and
// emitted once via defer — so every exit (deny, error, empty, paid silence, success)
// is recorded without scattering writes (F-FUNC-5). It starts as route=none/ok=false;
// proceeding to the model sets the route, success sets ok=true.
rl := RequestLog{
ID: ev.EventID, RoomID: roomID, Sender: ev.Sender,
Route: routeNone, RouterSource: "default",
PromptVersion: b.promptVersion,
QueryText: mc.Body,
Models: map[string]string{"final": b.cfg.XAIModel},
}
defer func() {
rl.LatencyMS = int(time.Since(started).Milliseconds())
b.recordTelemetry(rl)
}()
perUserCap := b.cfg.PerUserDailyCap
perUserUSD := b.cfg.PerUserDailyUSD
if b.cfg.UnlimitedUsers[ev.Sender] {
perUserCap = unlimitedCap
perUserUSD = 0 // exempt from both per-user gates; the global ceiling still applies
}
// Reserve the route's estimated max-cost (not $0) so the global ceiling counts
// this in-flight call BEFORE it returns — the TOCTOU fix (§8.1). The envelope covers
// the most expensive ENABLED route, so whichever the router picks is admitted within
// the reservation; with the cascade off it is exactly grok_direct's estimate.
estimate := b.reserveEstimate()
switch res, err := b.st.Reserve(ev.Sender, perUserCap, perUserUSD, b.cfg.DailyUSDCeiling, estimate); {
case err != nil:
// A limiter failure is on our side — don't leave the user wondering.
b.log.Error("limiter reserve failed", "sender", ev.Sender, "err", err)
rl.Degraded, rl.Err = degradeReserveErr, err.Error()
b.react(ctx, roomID, ev.EventID, reactError)
return
case res == reserveDeniedUser:
// Per-user cap (anti-abuse, F24): stop answering, but always signal the limit —
// no message addressed to the bot is left without feedback.
b.log.Info("per-user daily cap reached; reacting", "sender", ev.Sender)
rl.PerUserCapHit = true
b.react(ctx, roomID, ev.EventID, reactRateLimit)
return
case res == reserveDeniedGlobal:
// Global USD ceiling. A reaction is cheap and non-intrusive (unlike the old
// once-per-day text notice), so signal every affected message rather than
// going silent after the first.
b.log.Warn("global daily USD ceiling reached", "room", roomID, "sender", ev.Sender)
rl.CeilingHit = true
b.react(ctx, roomID, ev.EventID, reactRateLimit)
return
}
// Past admission, a reservation + request slot are held. Guarantee they're freed on
// ANY exit that didn't settle — including a panic in generate() (recovered by safego)
// — so a leaked reservation can never permanently drift the global ceiling down
// (§8.1c). The normal paths set settled=true at Settle, so this defer then no-ops; it
// fires only on the panic/unexpected-return path, where it also reacts so the failure
// isn't silent.
settled := false
defer func() {
if settled {
return
}
rl.Degraded, rl.Err = "panic", "generation panicked or returned without settling"
if rerr := b.st.ReleaseReservation(ev.Sender, estimate); rerr != nil {
b.log.Error("release reservation (unsettled) failed", "sender", ev.Sender, "err", rerr)
}
if rerr := b.st.RefundRequest(ev.Sender); rerr != nil {
b.log.Error("refund (unsettled) failed", "sender", ev.Sender, "err", rerr)
}
b.react(ctx, roomID, ev.EventID, reactError)
}()
// Show "Vojo AI печатает…" for the whole generation. The keepalive refreshes the
// typing notification every 20s (the server expires it after 30s) so the indicator
// never lapses on a slow/retried answer, and the deferred stop clears it on exit.
stopTyping := b.startTypingKeepalive(ctx, roomID)
defer stopTyping()
// Overall per-request deadline (§8.2.2): every model call in the cascade shares this
// single budget (genCtx), so a multi-stage route can't accrete minutes the way
// per-stage 3×60s retries would. react/send/store ops use the live room ctx, NOT
// genCtx, so a budget timeout still surfaces as a ⚠️ react, never silence.
genCtx, cancel := context.WithTimeout(ctx, b.cfg.RequestBudget)
defer cancel()
msgs := buildContext(b.cfg.SystemPrompt, history, isDM, mc.Body, b.cfg.MaxCtxEvent, maxPromptTokens)
res, err := b.generate(genCtx, mc.Body, msgs, b.convID(roomID, threadRoot))
// Record what the routing + generation actually did, whatever the outcome.
rl.Route = res.route
rl.RouterSource = res.decision.Source
rl.RouterConfidence = res.decision.Confidence
rl.FallbackFired = res.fallback
rl.Escalated = res.route == routeReason
rl.Cost = res.cost
if res.stageMS != nil {
rl.StageMS = res.stageMS
}
if res.finalModel != "" {
rl.Models["final"] = res.finalModel
}
if res.decision.Source == "classifier" {
rl.Models["router"] = b.cfg.GeminiModel
}
if res.degraded != "" {
rl.Degraded = res.degraded
}
if err != nil {
// Terminal: even grok_direct failed. Settle whatever the cascade ACTUALLY spent
// (e.g. a paid web fetch before the failure) and release the rest of the
// reservation in one step, then refund the request slot so an outage doesn't burn
// the cap, and react (never silent). Settle with an all-zero cost is just a
// release, so a pure grok_direct failure books nothing — exactly as before.
b.log.Error("generation failed", "sender", ev.Sender, "route", res.route, "err", err)
rl.Err = err.Error()
if serr := b.st.Settle(ev.Sender, estimate, res.cost); serr != nil {
b.log.Error("settle (failed request) failed", "sender", ev.Sender, "err", serr)
}
settled = true
if rerr := b.st.RefundRequest(ev.Sender); rerr != nil {
b.log.Error("refund failed", "sender", ev.Sender, "err", rerr)
}
b.react(ctx, roomID, ev.EventID, reactError)
return
}
// Success from some route. Settle: release the reservation and book the real
// per-component cost, so both caps see grounding/tool fees too — not just tokens.
if err := b.st.Settle(ev.Sender, estimate, res.cost); err != nil {
b.log.Error("settle spend failed", "sender", ev.Sender, "err", err)
}
settled = true
rl.PromptTokens, rl.CachedTokens, rl.CompletionTokens =
res.usage.PromptTokens, res.usage.CachedTokens, res.usage.CompletionTokens
rl.CacheHit = res.usage.CachedTokens > 0
rl.ProviderRequestID = res.providerID
text := res.text
if text == "" {
// Billed but no usable text (content filter / length cap / empty choices). Never
// leave a billed request without feedback — react "couldn't answer". The slot
// stays consumed (the 2xx was real); no refund, or an empty reply could be forced
// to dodge the cap.
b.log.Warn("empty completion (billed, reacting)", "sender", ev.Sender, "usd", res.cost.Total())
rl.Degraded = degradeEmpty
b.react(ctx, roomID, ev.EventID, reactError)
return
}
b.log.Info("answered", "room", roomID, "sender", ev.Sender, "dm", isDM, "route", res.route,
"usd", res.cost.Total(), "prompt_tokens", res.usage.PromptTokens, "completion_tokens", res.usage.CompletionTokens)
if err := b.sendReply(ctx, roomID, threadRoot, ev, mc, text); err != nil {
// Paid silence (§8.1): the spend is real (USD is kept — refunding it would
// under-count the ceiling), but the reply never landed. Refund the request SLOT
// so the user can retry, and react ⚠️ so the failure isn't silent.
b.log.Error("send reply failed after billing; refunding slot + reacting", "sender", ev.Sender, "err", err)
rl.Degraded, rl.Err = degradeSendFailed, err.Error()
if rerr := b.st.RefundRequest(ev.Sender); rerr != nil {
b.log.Error("refund failed", "sender", ev.Sender, "err", rerr)
}
b.react(ctx, roomID, ev.EventID, reactError)
return
}
rl.OK = true
}
// maxPromptTokens bounds the assembled prompt (history is trimmed to fit) and feeds
// the reservation estimate, so the two never disagree about a request's size.
const maxPromptTokens = 8000
// estimateUSD is the conservative max-cost reserved for a route before the call, so
// the global ceiling can count an in-flight request (§8.1). It prices a full prompt
// (maxPromptTokens) plus the max output at the model's non-cached rates — an upper-ish
// bound, since real calls send fewer tokens and get the cheaper cached rate. Settle
// later books the authoritative actual cost regardless, so a slightly-off estimate
// only nudges admission, never the final accounting.
func (b *Bot) estimateUSD(model string) float64 {
p := b.cfg.priceFor(model)
return float64(maxPromptTokens)/1e6*p.InputPerM + float64(b.cfg.MaxOutTok)/1e6*p.OutputPerM
}
// convID returns the prompt-cache routing hint sent as x-grok-conv-id, or "" when
// GROK_PROMPT_CACHE is off. Grok caches prompt prefixes automatically; the header
// only pins a conversation to the same backend to raise the hit rate (docs.x.ai). The
// right unit is the (room,thread) pair, not the room: once conversations are threaded,
// each thread has its OWN system+history prefix, so pinning all of a room's threads to
// one id would thrash the cache between divergent prefixes. The main timeline keys on
// roomID alone (threadRoot ""), preserving the previous value for the flat case. Carries
// no PII (ids are opaque) and is hashed to stay compact and non-identifying.
func (b *Bot) convID(roomID, threadRoot string) string {
if !b.cfg.GrokPromptCache {
return ""
}
key := roomID
if threadRoot != "" {
key = roomID + "|" + threadRoot
}
return fmt.Sprintf("vojo-%08x", hashString(key))
}
// computeUSD prices a call from the API-returned token usage (authoritative
// counts) and the per-model price table — so the hard ceiling tracks real usage
// even if the model/price changes (only the price table needs updating), and a
// call books at the price of the model that actually served it.
func computeUSD(model string, u Usage, cfg *Config) float64 {
p := cfg.priceFor(model)
nonCached := u.PromptTokens - u.CachedTokens
if nonCached < 0 {
nonCached = 0
}
return float64(nonCached)/1e6*p.InputPerM +
float64(u.CachedTokens)/1e6*p.CachedPerM +
float64(u.CompletionTokens)/1e6*p.OutputPerM
}
// react adds an emoji m.reaction to the triggering event — the bot's language-free
// way to signal a system state (error / rate limit / encrypted / media) it can't
// express as a model-generated answer. Best-effort: a failed reaction is logged, not
// retried. Reactions are m.reaction (not m.room.message), so they never re-enter
// handleMessage and need no anti-loop tracking.
func (b *Bot) react(ctx context.Context, roomID, eventID, emoji string) {
content := map[string]any{
"m.relates_to": map[string]any{
"rel_type": "m.annotation",
"event_id": eventID,
"key": emoji,
},
}
if _, err := b.mx.SendEvent(ctx, roomID, "m.reaction", content); err != nil {
b.log.Error("react failed", "room", roomID, "emoji", emoji, "err", err)
}
}
// reactEncryptedOnce reacts 🔒 to the first message seen in an encrypted room and
// records a durable flag so a restart doesn't re-react (F5). Vojo disables E2EE by
// default, so this is a near-dead safety path; the reaction is far less intrusive
// than the old text notice, but the once-gate keeps it from annotating every message
// in the rare encrypted room.
// reactEncryptedOnce returns whether it reacted this call (true only the first time
// for a room), so the caller can log the skip exactly once too.
func (b *Bot) reactEncryptedOnce(ctx context.Context, roomID, eventID string) bool {
warned, err := b.st.HasWarnedEncrypted(roomID)
if err != nil {
b.log.Error("warned-flag read failed", "room", roomID, "err", err)
return false
}
if warned {
return false
}
b.react(ctx, roomID, eventID, reactEncrypted)
if err := b.st.SetWarnedEncrypted(roomID); err != nil {
b.log.Error("persist warned-flag failed", "room", roomID, "err", err)
}
return true
}
// sendReply sends the model's actual answer and records the completed exchange in the
// conversation buffer so the next turn has context. It RETURNS the send error so the
// caller can handle paid silence (§8.1): a billed answer that failed to deliver must
// refund the slot and react, not vanish.
func (b *Bot) sendReply(ctx context.Context, roomID, threadRoot string, trigger *Event, triggerMC *MessageContent, body string) error {
if err := b.sendMessage(ctx, roomID, threadRoot, trigger, triggerMC, body); err != nil {
return err
}
// Record the user trigger AND the assistant answer together, only AFTER the answer
// was sent, so a failed or empty generation never leaves a dangling user turn (a
// question with no reply) in the buffer — which would skew later completions. Both go
// to THIS conversation's buffer (roomID,threadRoot). Per-(room,thread) single-flight
// guarantees no other turn for this conversation interleaves between the two.
b.appendBuf(roomID, threadRoot, bufferedMsg{sender: trigger.Sender, body: triggerMC.Body, isBot: false})
b.appendBuf(roomID, threadRoot, bufferedMsg{sender: b.cfg.BotMXID, body: body, isBot: true})
return nil
}
// sendMessage builds and sends an m.notice reply and tracks our own event id. Returns
// the send error (nil on success) so the caller can detect a failed delivery.
func (b *Bot) sendMessage(ctx context.Context, roomID, threadRoot string, trigger *Event, triggerMC *MessageContent, body string) error {
content := buildNoticeContent(trigger.EventID, trigger.Sender, threadRoot, body)
id, err := b.mx.SendEvent(ctx, roomID, "m.room.message", content)
if err != nil {
b.log.Error("send failed", "room", roomID, "err", err)
return err
}
// Track our own reply so a future reply-to-it is recognised as addressing us.
b.botSent.Add(id)
return nil
}
// startTypingKeepalive shows the room-level "typing…" indicator for the whole generation
// and keeps it alive (the CS-API notification expires after the 30s we pass, so we refresh
// every 20s). The returned stop is safe to call once via defer. Typing is ROOM-scoped in
// Matrix — there is no per-thread typing — so with per-(room,thread) concurrency several
// generations can run for one room at once. A per-room refcount keeps the indicator on
// until the LAST of them finishes, rather than letting whichever finishes first clear it
// out from under the others. Best-effort UX — failures are non-fatal.
func (b *Bot) startTypingKeepalive(ctx context.Context, roomID string) func() {
b.typingAcquire(roomID)
b.setTyping(ctx, roomID, true)
done := make(chan struct{})
go func() {
t := time.NewTicker(20 * time.Second)
defer t.Stop()
for {
select {
case <-ctx.Done():
return
case <-done:
return
case <-t.C:
b.setTyping(ctx, roomID, true)
}
}
}()
var once sync.Once
return func() {
once.Do(func() {
close(done)
// Only the last in-flight generation for the room clears the indicator; the
// others merely stop their own keepalive loop (a forgetRoom mid-flight can drop
// the counter to <=0, which the guard treats as "last" — still correct).
if b.typingRelease(roomID) {
b.setTyping(ctx, roomID, false)
}
})
}
}
// typingAcquire registers one in-flight generation against the room-level typing indicator.
// The increment is the only state change; the caller (re)asserts the indicator on regardless,
// since re-sending typing=true is idempotent and refreshes the server-side 30s timeout.
func (b *Bot) typingAcquire(roomID string) {
b.mu.Lock()
b.typingRefs[roomID]++
b.mu.Unlock()
}
// typingRelease drops one in-flight generation and reports whether it was the LAST one for
// the room (refcount fell to <=0), so the caller clears the indicator only then. A forgetRoom
// that already deleted the key leaves a missing entry: the decrement reads 0 and lands at -1
// (<=0), so the guard still treats it as "last" and the negative entry is deleted — no leak.
func (b *Bot) typingRelease(roomID string) (last bool) {
b.mu.Lock()
defer b.mu.Unlock()
b.typingRefs[roomID]--
last = b.typingRefs[roomID] <= 0
if last {
delete(b.typingRefs, roomID)
}
return last
}
// setTyping sets/clears the bot's typing indicator (best-effort UX; failures are
// non-fatal). The 30s server-side timeout is refreshed by startTypingKeepalive.
func (b *Bot) setTyping(ctx context.Context, roomID string, typing bool) {
if err := b.mx.SendTyping(ctx, roomID, typing, 30000); err != nil {
b.log.Debug("set typing failed", "room", roomID, "typing", typing, "err", err)
}
}
// buildNoticeContent builds the reply. m.notice (not m.text) so the anti-loop
// skip catches our own output. threadRoot decides where the answer lands: when non-empty
// the answer carries an m.thread relation rooted there (F27) — either replying inside an
// existing thread or auto-rooting a NEW DM conversation on the trigger. The caller's
// resolveThreadRoot makes that choice and is DM-gated, so a group answer never gets a
// thread relation it didn't already have. When empty the answer is a plain top-level
// reply (groups, and DMs with conversations off).
func buildNoticeContent(replyTo, sender, threadRoot, body string) map[string]any {
relates := map[string]any{}
if threadRoot != "" {
relates["rel_type"] = "m.thread"
relates["event_id"] = threadRoot
relates["is_falling_back"] = true
relates["m.in_reply_to"] = map[string]any{"event_id": replyTo}
} else {
relates["m.in_reply_to"] = map[string]any{"event_id": replyTo}
}
content := map[string]any{
"msgtype": "m.notice",
"body": body,
"m.mentions": map[string]any{"user_ids": []string{sender}},
"m.relates_to": relates,
}
// The model answers in markdown; render it to org.matrix.custom.html so clients
// show formatting instead of raw `**`, `#`, lists, code fences. Only attach
// formatted_body when there's actual formatting — a plain answer keeps rendering
// from `body` exactly as before.
if html, formatted := markdownToHTML(body); formatted {
content["format"] = matrixHTMLFormat
content["formatted_body"] = html
}
return content
}
// --- per-(room,thread) single-flight ---------------------------------------------
// resolveThreadRoot decides which conversation a trigger belongs to, returning the thread
// root event id, or "" for the main timeline. Order: (1) a trigger already inside a thread
// continues that thread; (2) in a 1:1 DM, a top-level message roots a NEW conversation on
// itself (ChatGPT-style "new chat"); (3) otherwise — EVERY group message — the main timeline
// (""). This is the single place auto-threading is decided and it is hard-gated on isDM, so
// a group is NEVER auto-threaded (the group gate is structural, not a flag). buildNoticeContent
// emits the matching m.thread relation for the same value, so the conversation we serialize on
// is always the one the answer lands in.
func (b *Bot) resolveThreadRoot(isDM bool, ev *Event, mc *MessageContent) string {
if mc.RelatesTo != nil && mc.RelatesTo.RelType == "m.thread" && mc.RelatesTo.EventID != "" {
return mc.RelatesTo.EventID
}
if isDM {
return ev.EventID
}
return ""
}
// tryClaim marks a (room,thread) conversation as generating and returns true if the caller
// won the claim (nothing was already in flight for that exact conversation). The loser
// drops its message. Different threads of one room claim independently — a slow answer in
// one conversation never blocks another. The check-and-set is atomic under b.mu (one map,
// no per-thread mutex), so there is no lazy-lock TOCTOU.
func (b *Bot) tryClaim(roomID, threadRoot string) bool {
b.mu.Lock()
defer b.mu.Unlock()
m := b.inflight[roomID]
if m == nil {
m = make(map[string]bool)
b.inflight[roomID] = m
}
if m[threadRoot] {
return false
}
m[threadRoot] = true
return true
}
func (b *Bot) release(roomID, threadRoot string) {
b.mu.Lock()
defer b.mu.Unlock()
if m := b.inflight[roomID]; m != nil {
delete(m, threadRoot)
if len(m) == 0 {
delete(b.inflight, roomID)
}
}
}
// --- per-room metadata helpers (all guarded by b.mu; probes run outside it) -----
// getMetaLocked returns (creating if needed) the room's meta. Caller MUST hold b.mu.
func (b *Bot) getMetaLocked(roomID string) *roomMeta {
m := b.meta[roomID]
if m == nil {
m = &roomMeta{}
b.meta[roomID] = m
}
return m
}
func (b *Bot) invalidateCounts(roomID string) {
b.mu.Lock()
defer b.mu.Unlock()
if m := b.meta[roomID]; m != nil {
m.countsKnown = false
}
}
func (b *Bot) setEncrypted(roomID string) {
b.mu.Lock()
defer b.mu.Unlock()
m := b.getMetaLocked(roomID)
m.encrypted, m.encKnown = true, true
}
func (b *Bot) forgetRoom(roomID string) {
b.mu.Lock()
defer b.mu.Unlock()
delete(b.meta, roomID)
delete(b.buf, roomID) // drops the room's whole per-thread subtree in one delete
delete(b.inflight, roomID) // (nested maps keyed by roomID keep forgetRoom O(1))
delete(b.typingRefs, roomID)
}
// ensureEncryption returns whether the room is encrypted, probing the CS-API once
// (without holding the lock) and caching the result. On probe error it returns false
// (treated as not-encrypted this round) and leaves the state unknown for a re-probe.
func (b *Bot) ensureEncryption(ctx context.Context, roomID string) bool {
b.mu.Lock()
if m := b.getMetaLocked(roomID); m.encKnown {
enc := m.encrypted
b.mu.Unlock()
return enc
}
b.mu.Unlock()
enc, err := b.mx.RoomEncrypted(ctx, roomID)
if err != nil {
b.log.Warn("encryption probe failed", "room", roomID, "err", err)
return false // leave unknown; re-probed on the next message
}
// Re-fetch under the lock instead of writing to the pointer captured before the
// unlocked probe: if the room was forgotten (leave/ban) mid-probe its meta was
// deleted, and writing to the captured pointer would resurrect a dead room.
b.mu.Lock()
if m := b.meta[roomID]; m != nil {
m.encrypted, m.encKnown = enc, true
}
b.mu.Unlock()
return enc
}
// ensureCounts returns (countsKnown, isDM, foreign), probing /members once (without
// holding the lock) and caching the result. On probe error it returns
// (false, false, false): the caller treats an unclassified room conservatively and
// logs a visible WARN rather than silently dropping.
func (b *Bot) ensureCounts(ctx context.Context, roomID string) (countsKnown, isDM, foreign bool) {
b.mu.Lock()
known := b.getMetaLocked(roomID).countsKnown
b.mu.Unlock()
if !known {
joined, invited, servers, err := b.mx.RoomMembership(ctx, roomID)
if err != nil {
b.log.Warn("member probe failed", "room", roomID, "err", err)
return false, false, false
}
isForeign := false
for s := range servers {
if !b.cfg.AllowedServers[s] {
isForeign = true
break
}
}
// Re-fetch under the lock rather than writing a pointer captured before the
// unlocked /members probe (see ensureEncryption): a leave/ban mid-probe must
// not be undone by resurrecting the room's meta.
b.mu.Lock()
if m := b.meta[roomID]; m != nil {
m.joined, m.invited, m.foreign, m.countsKnown = joined, invited, isForeign, true
}
b.mu.Unlock()
}
b.mu.Lock()
defer b.mu.Unlock()
if m := b.meta[roomID]; m != nil {
return m.countsKnown, m.isDM(), m.foreign
}
return false, false, false
}
// convBuf is one conversation's (one thread's) rolling context window plus a last-touch
// stamp used for LRU eviction. Without eviction, a long-lived control DM that spawns many
// ChatGPT-style conversations would accumulate one buffer per conversation for the whole
// process lifetime (forgetRoom only frees them on room leave/ban).
type convBuf struct {
msgs []bufferedMsg
touched uint64 // b.bufSeq at last append; higher = more recent
}
// maxConvBuffersPerRoom bounds how many conversation buffers a single room retains. A
// human DM rarely keeps this many live conversations at once; an evicted cold conversation
// just rebuilds its window from scratch on its next turn (same as after a restart), so the
// only cost of eviction is a one-turn cold start, never a wrong answer.
const maxConvBuffersPerRoom = 64
func (b *Bot) snapshotBuf(roomID, threadRoot string) []bufferedMsg {
b.mu.Lock()
defer b.mu.Unlock()
// A never-seen conversation has no buffer (nil inner map / nil convBuf) → nil history,
// exactly what a fresh "new chat" wants (context = system + trigger).
cb := b.buf[roomID][threadRoot]
if cb == nil || len(cb.msgs) == 0 {
return nil
}
out := make([]bufferedMsg, len(cb.msgs))
copy(out, cb.msgs)
return out
}
func (b *Bot) appendBuf(roomID, threadRoot string, msg bufferedMsg) {
b.mu.Lock()
defer b.mu.Unlock()
limit := b.cfg.MaxCtxEvent * 2
if limit < 8 {
limit = 8
}
m := b.buf[roomID]
if m == nil {
m = make(map[string]*convBuf)
b.buf[roomID] = m
}
cb := m[threadRoot]
if cb == nil {
cb = &convBuf{}
m[threadRoot] = cb
}
cb.msgs = append(cb.msgs, msg)
if len(cb.msgs) > limit {
cb.msgs = cb.msgs[len(cb.msgs)-limit:]
}
b.bufSeq++
cb.touched = b.bufSeq
// LRU-evict the least-recently-touched conversation once the room exceeds the cap. The
// just-touched conversation has the highest `touched`, so it is never the victim.
if len(m) > maxConvBuffersPerRoom {
var victim string
var oldest uint64
for k, v := range m {
if victim == "" || v.touched < oldest {
victim, oldest = k, v.touched
}
}
delete(m, victim)
}
}