1040 lines
43 KiB
Go
1040 lines
43 KiB
Go
package main
|
||
|
||
import (
|
||
"context"
|
||
"fmt"
|
||
"log/slog"
|
||
"sync"
|
||
"sync/atomic"
|
||
"time"
|
||
)
|
||
|
||
// roomMeta caches per-room classification we need to handle a message: member
|
||
// counts (for the 1:1 test, F3), whether any member is outside ALLOWED_SERVERS,
|
||
// and encryption state (F15). Lazily fetched from the CS-API on first need
|
||
// (appservice transactions carry no room summary) and INVALIDATED whenever a
|
||
// third party's membership changes, so a 1:1 that gains a member is reclassified
|
||
// out of DM mode (no DM-mode third-party leak) and a newly added foreign member
|
||
// is caught. All fields are guarded by Bot.mu — never read or written without it,
|
||
// because the slow generation and the lazy CS-API probes run in concurrent per-room
|
||
// goroutines.
|
||
type roomMeta struct {
|
||
joined, invited int
|
||
countsKnown bool
|
||
foreign bool // a joined/invited member is outside ALLOWED_SERVERS
|
||
encrypted, encKnown bool
|
||
}
|
||
|
||
func (m *roomMeta) isDM() bool { return m.countsKnown && m.joined+m.invited == 2 }
|
||
|
||
type Bot struct {
|
||
cfg *Config
|
||
log *slog.Logger
|
||
mx *MatrixClient
|
||
llm LLMClient
|
||
st *Store
|
||
|
||
// gemini is the cheap chat backend for the trivial route and the Layer-1 classifier
|
||
// (an LLMClient so tests can fake it); nil unless a layer that uses it is enabled.
|
||
// web is the web-freshness provider, built only when WEB_ENABLED. Both nil → the
|
||
// cascade can only ever produce grok_direct.
|
||
gemini LLMClient
|
||
web WebProvider
|
||
|
||
// promptVersion is a short stable hash of the system prompt, logged with each
|
||
// request so prompt changes are visible in the analytics (A/B + regressions).
|
||
promptVersion string
|
||
// telemetryWrites paces the retention trim (every telemetryTrimEvery writes).
|
||
telemetryWrites atomic.Uint64
|
||
|
||
// mu guards the in-memory maps/sets below. Each transaction is acked to Synapse
|
||
// immediately (appservice.go) and its events are processed in transaction order,
|
||
// but the slow xAI generation runs in a per-room goroutine and the lazy probes run
|
||
// off-lock, so several goroutines touch this shared state at once. mu is held only
|
||
// for short map operations and is NEVER held across a network or xAI call — that
|
||
// head-of-line hold was the root cause of the multi-minute silence.
|
||
mu sync.Mutex
|
||
seen *lruSet // event ids already handled (dedup within a session; self-locking)
|
||
botSent *lruSet // event ids the bot itself sent (reply-parent detection; self-locking)
|
||
meta map[string]*roomMeta
|
||
// buf and inflight are keyed by roomID THEN by thread root ("" = main timeline), so
|
||
// each conversation (a thread) keeps an isolated context window and an independent
|
||
// single-flight claim. Two messages in different threads of one room no longer block
|
||
// each other or pollute each other's history. roomMeta stays room-level (membership
|
||
// /encryption are room facts). forgetRoom drops a room's whole subtree in one delete.
|
||
buf map[string]map[string]*convBuf
|
||
inflight map[string]map[string]bool
|
||
// bufSeq is a monotonic counter stamped onto a convBuf on every append (guarded by mu);
|
||
// it orders conversations by last activity so appendBuf can LRU-evict the coldest one
|
||
// when a room exceeds maxConvBuffersPerRoom.
|
||
bufSeq uint64
|
||
// typingRefs counts in-flight generations per ROOM that are currently showing the
|
||
// "typing…" indicator. Matrix typing notifications are room-scoped (no thread_id in
|
||
// the CS-API), so the indicator can't be split per thread: it goes on when the first
|
||
// generation in a room starts and off only when the last one finishes (refcount).
|
||
typingRefs map[string]int
|
||
}
|
||
|
||
func NewBot(ctx context.Context, cfg *Config, logger *slog.Logger) (*Bot, error) {
|
||
mx := NewMatrixClient(cfg.HomeserverURL, cfg.ASToken, cfg.BotMXID)
|
||
llm := NewXAIClient(cfg.XAIBaseURL, cfg.XAIAPIKey, logger)
|
||
|
||
st, err := OpenStore(cfg.DatabaseURL)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
// prompt_version is a stable hash logged with each request so prompt changes show in
|
||
// telemetry. Fold the project KB in WHEN PRESENT so a KB revision is visible too; with the
|
||
// route off (KB ""), promptForVersion is exactly cfg.SystemPrompt, so the hash is unchanged
|
||
// from before this feature and flags-off telemetry is byte-identical.
|
||
promptForVersion := cfg.SystemPrompt
|
||
if cfg.ProjectKB != "" {
|
||
promptForVersion += "\x00" + cfg.ProjectKB
|
||
}
|
||
|
||
b := &Bot{
|
||
cfg: cfg,
|
||
log: logger,
|
||
mx: mx,
|
||
llm: llm,
|
||
st: st,
|
||
promptVersion: fmt.Sprintf("%08x", hashString(promptForVersion)),
|
||
seen: newLRUSet(5000),
|
||
botSent: newLRUSet(5000),
|
||
meta: make(map[string]*roomMeta),
|
||
buf: make(map[string]map[string]*convBuf),
|
||
inflight: make(map[string]map[string]bool),
|
||
typingRefs: make(map[string]int),
|
||
}
|
||
|
||
// Build the cascade backends only for enabled layers (config already fail-fast
|
||
// validated that the keys exist). With every cascade flag off these stay nil and
|
||
// generate() can only produce grok_direct — today's bot. The grounding web provider
|
||
// needs the concrete client (for the native generateContent call), so keep a typed
|
||
// handle alongside the LLMClient face.
|
||
var gc *geminiClient
|
||
if cfg.needsGemini() {
|
||
gc = NewGeminiClient(cfg.GeminiBaseURL, cfg.GeminiAPIKey, cfg.GeminiModel, logger)
|
||
b.gemini = gc
|
||
}
|
||
if cfg.WebEnabled {
|
||
if cfg.WebProvider == webProviderGeminiGrounding {
|
||
b.web = &geminiGrounding{gem: gc, st: st, cfg: cfg, logger: logger}
|
||
} else {
|
||
b.web = newGrokWebSearch(cfg, logger)
|
||
}
|
||
}
|
||
|
||
// Confirm the as_token + user_id resolves to BOT_MXID before serving.
|
||
if err := b.verifyIdentity(ctx); err != nil {
|
||
st.Close()
|
||
return nil, err
|
||
}
|
||
// F23: ensure the profile has a display name (best-effort, idempotent).
|
||
if err := mx.SetDisplayName(ctx, cfg.BotDisplayName); err != nil {
|
||
logger.Warn("set display name failed (non-fatal)", "err", err)
|
||
}
|
||
return b, nil
|
||
}
|
||
|
||
func (b *Bot) Close() {
|
||
if b.st != nil {
|
||
_ = b.st.Close()
|
||
}
|
||
}
|
||
|
||
func (b *Bot) verifyIdentity(ctx context.Context) error {
|
||
who, err := b.mx.Whoami(ctx)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
if who != b.cfg.BotMXID {
|
||
return fmt.Errorf("as_token resolves to %q but BOT_MXID is %q", who, b.cfg.BotMXID)
|
||
}
|
||
b.log.Info("authenticated", "mxid", who)
|
||
return nil
|
||
}
|
||
|
||
// Run starts the appservice transaction server and blocks until ctx is cancelled.
|
||
func (b *Bot) Run(ctx context.Context) error {
|
||
as := NewAppService(b.cfg, b.log, b.st, b.handleTransaction)
|
||
return as.Serve(ctx)
|
||
}
|
||
|
||
// handleTransaction processes one already-acked transaction's events. It runs in a
|
||
// background goroutine (the 200 has already been returned to Synapse). Events are
|
||
// processed IN ORDER (dedup + classification are synchronous) so the per-room
|
||
// single-flight claim is taken in arrival order; only the slow xAI generation is
|
||
// spawned per room, so different rooms answer concurrently and a slow call never
|
||
// blocks the next event or another room. Ordering within a room is therefore kept,
|
||
// while the head-of-line freeze is gone.
|
||
func (b *Bot) handleTransaction(ctx context.Context, events []Event) {
|
||
for i := range events {
|
||
b.handleEvent(ctx, &events[i])
|
||
}
|
||
}
|
||
|
||
// safego runs fn in a goroutine with panic recovery. The slow per-room work is
|
||
// detached from the HTTP handler, so an unrecovered panic there would crash the
|
||
// whole process and silence the bot for EVERY room — recover + log instead, so one
|
||
// malformed event can never take the bot down.
|
||
func (b *Bot) safego(ctx context.Context, what string, fn func()) {
|
||
go func() {
|
||
defer func() {
|
||
if r := recover(); r != nil {
|
||
b.log.ErrorContext(ctx, "recovered panic in handler goroutine", "what", what, "panic", r)
|
||
}
|
||
}()
|
||
fn()
|
||
}()
|
||
}
|
||
|
||
func (b *Bot) handleEvent(ctx context.Context, ev *Event) {
|
||
if ev.EventID == "" || ev.RoomID == "" {
|
||
return
|
||
}
|
||
// Mint a fresh trace id for this event and stamp it (plus sender + the per-user
|
||
// body-logging decision) onto ctx. Every log line below — through the per-room
|
||
// goroutine and down to the model call — carries this trace_id automatically.
|
||
ctx = withRequestTrace(ctx, newTraceID(), ev.Sender, b.cfg.LogBodiesUsers[ev.Sender])
|
||
if !b.markSeen(ctx, ev.EventID) {
|
||
return // already handled (in-memory or durable dedup)
|
||
}
|
||
b.log.DebugContext(ctx, "event", "type", ev.Type, "room", ev.RoomID, "sender", ev.Sender, "id", ev.EventID)
|
||
switch ev.Type {
|
||
case "m.room.member":
|
||
if ev.StateKey != nil && *ev.StateKey == b.cfg.BotMXID {
|
||
b.safego(ctx, "self-membership", func() { b.handleSelfMembership(ctx, ev) })
|
||
} else {
|
||
// A third party's membership changed: counts + foreign flag are now stale.
|
||
// Re-probe on the next message so a 1:1 that gains a member drops out of DM
|
||
// mode (no third-party leak) and a new foreign member is caught.
|
||
b.invalidateCounts(ev.RoomID)
|
||
}
|
||
case "m.room.encryption":
|
||
b.setEncrypted(ev.RoomID)
|
||
case "m.room.message":
|
||
// Synchronous (in transaction order) up to the single-flight claim; only the
|
||
// slow generation inside handleMessage is spawned as a goroutine. This keeps
|
||
// per-room event order — the earlier message wins the claim — while different
|
||
// rooms still run concurrently.
|
||
b.handleMessage(ctx, ev)
|
||
}
|
||
}
|
||
|
||
// markSeen records an event id in both the in-memory set and the durable store and
|
||
// reports whether it is NEW (first time). The in-memory Add is atomic, and SeenEvent
|
||
// is an atomic INSERT … ON CONFLICT DO NOTHING, so two racing goroutines for the same
|
||
// event can never both proceed. On a durable-store error we fall through (the in-memory set still
|
||
// guards this session).
|
||
func (b *Bot) markSeen(ctx context.Context, eventID string) bool {
|
||
if !b.seen.Add(eventID) {
|
||
return false
|
||
}
|
||
isNew, err := b.st.SeenEvent(eventID)
|
||
if err != nil {
|
||
b.log.ErrorContext(ctx, "durable dedup check failed", "id", eventID, "err", err)
|
||
return true
|
||
}
|
||
return isNew
|
||
}
|
||
|
||
// handleSelfMembership reacts to membership changes for the bot user: auto-join
|
||
// invites from allowed servers (F11), reject others, forget rooms we leave. Runs in
|
||
// its own goroutine because JoinRoom/LeaveRoom are network calls.
|
||
func (b *Bot) handleSelfMembership(ctx context.Context, ev *Event) {
|
||
switch ev.membershipOf() {
|
||
case "invite":
|
||
if !b.cfg.AllowedServers[serverOf(ev.Sender)] {
|
||
b.log.WarnContext(ctx, "rejecting invite (server not allowed)", "room", ev.RoomID, "sender", ev.Sender)
|
||
if err := b.mx.LeaveRoom(ctx, ev.RoomID); err != nil {
|
||
b.log.ErrorContext(ctx, "leave (reject) failed", "room", ev.RoomID, "err", err)
|
||
}
|
||
return
|
||
}
|
||
b.log.InfoContext(ctx, "accepting invite", "room", ev.RoomID, "sender", ev.Sender)
|
||
if err := b.mx.JoinRoom(ctx, ev.RoomID); err != nil {
|
||
b.log.ErrorContext(ctx, "join failed", "room", ev.RoomID, "err", err)
|
||
return
|
||
}
|
||
// Fully-on-allowed-servers gate: a vojo.chat inviter can still pull the bot
|
||
// into a room that already holds federated third parties — leave at once.
|
||
if _, _, foreign := b.ensureCounts(ctx, ev.RoomID); foreign {
|
||
b.leaveForeign(ctx, ev.RoomID)
|
||
}
|
||
case "leave", "ban":
|
||
b.forgetRoom(ev.RoomID)
|
||
}
|
||
}
|
||
|
||
// leaveForeign leaves a room that contains a member outside ALLOWED_SERVERS, so
|
||
// the bot only ever operates in rooms hosted entirely on allowed homeservers.
|
||
func (b *Bot) leaveForeign(ctx context.Context, roomID string) {
|
||
b.log.WarnContext(ctx, "leaving room — a member is outside ALLOWED_SERVERS", "room", roomID)
|
||
if err := b.mx.LeaveRoom(ctx, roomID); err != nil {
|
||
b.log.ErrorContext(ctx, "leave (foreign) failed", "room", roomID, "err", err)
|
||
}
|
||
}
|
||
|
||
func (b *Bot) handleMessage(ctx context.Context, ev *Event) {
|
||
roomID := ev.RoomID
|
||
|
||
// A9/F15: re-check encryption; if (or once) encrypted, react once and skip — the
|
||
// bot can't read it. The probe runs without the lock.
|
||
if b.ensureEncryption(ctx, roomID) {
|
||
b.log.DebugContext(ctx, "skip: encrypted room", "room", roomID)
|
||
// Log the skip only when we actually react (once per room), so an encrypted room
|
||
// the bot can't read doesn't flood request_log with one row per message.
|
||
if b.reactEncryptedOnce(ctx, roomID, ev.EventID) {
|
||
b.recordSkip(ctx, ev, degradeEncrypted)
|
||
}
|
||
return
|
||
}
|
||
|
||
mc, ok := ev.DecodeMessage()
|
||
if !ok {
|
||
// A message addressed to the bot that we can't decode shouldn't vanish without
|
||
// a trace (no silent drops): log at WARN so it's visible at the default level.
|
||
b.log.WarnContext(ctx, "skip: undecodable message content", "room", roomID, "sender", ev.Sender, "id", ev.EventID)
|
||
return
|
||
}
|
||
// Edits re-carry m.mentions; never re-trigger or replay them (F16).
|
||
if mc.IsReplace() {
|
||
return
|
||
}
|
||
if ev.Sender == b.cfg.BotMXID {
|
||
return // our own message (the reply is buffered when we send it, not on echo-back)
|
||
}
|
||
if mc.MsgType == "m.notice" {
|
||
return // anti-loop: ignore notices (ours and other bots')
|
||
}
|
||
|
||
// Media / non-text is handled only once we know the message is addressed (below),
|
||
// so a stray image in a group the bot isn't mentioned in stays silent (correct),
|
||
// while media in a 1:1 or an @-mention gets a clear "text only" reaction.
|
||
isMedia := mc.MsgType != "m.text" && mc.MsgType != "m.emote"
|
||
|
||
countsKnown, isDM, foreign := b.ensureCounts(ctx, roomID)
|
||
// Stay only in rooms hosted entirely on allowed servers — never operate in (or
|
||
// "leak" the bot into) a federated room with non-consenting third parties.
|
||
if foreign {
|
||
b.leaveForeign(ctx, roomID)
|
||
b.recordSkip(ctx, ev, degradeForeign)
|
||
return
|
||
}
|
||
|
||
replyParentIsBot := mc.RelatesTo != nil && mc.RelatesTo.InReplyTo != nil &&
|
||
b.botSent.Has(mc.RelatesTo.InReplyTo.EventID)
|
||
mentioned := mentionsBot(mc, b.cfg.BotMXID, replyParentIsBot)
|
||
|
||
if !(isDM || mentioned) {
|
||
if !countsKnown {
|
||
// We couldn't classify the room (member probe failed) and the message isn't
|
||
// an explicit mention, so we can't tell a 1:1 (answer everything) from a
|
||
// group (answer only on mention). Log at WARN — not a silent Debug drop —
|
||
// so it's visible; we don't react because reacting in a group the bot isn't
|
||
// addressed in would be wrong. Re-probed on the next message.
|
||
b.log.WarnContext(ctx, "skip: room unclassified (member probe failed), message not an explicit mention",
|
||
"room", roomID, "sender", ev.Sender)
|
||
} else {
|
||
b.log.DebugContext(ctx, "skip: not addressed (group without mention)", "room", roomID, "sender", ev.Sender,
|
||
"dm", isDM, "mentioned", mentioned)
|
||
}
|
||
return
|
||
}
|
||
|
||
// Addressed but not text: react "text only" (no silent drop).
|
||
if isMedia {
|
||
b.log.DebugContext(ctx, "skip: non-text msgtype (reacted)", "room", roomID, "sender", ev.Sender, "msgtype", mc.MsgType)
|
||
b.react(ctx, roomID, ev.EventID, reactMedia)
|
||
b.recordSkip(ctx, ev, degradeMedia)
|
||
return
|
||
}
|
||
|
||
// Per-room single-flight: while a generation is in flight for this room, drop
|
||
// further messages (like a chat — no queue, no new session, no token burn). No
|
||
// reaction here: the typing indicator is already showing for this room, which is
|
||
// the language-free "I'm busy" signal. The claim is taken here, synchronously and
|
||
// in transaction order, so the FIRST message for a room wins and later ones are
|
||
// dropped until release — never the reverse.
|
||
// Resolve which conversation this turn belongs to: an existing thread (continue it),
|
||
// a freshly rooted DM thread (auto-thread, DM-only), or the main timeline ("").
|
||
// Groups never auto-thread; see resolveThreadRoot.
|
||
threadRoot := b.resolveThreadRoot(isDM, ev, mc)
|
||
|
||
// Per-(room,thread) single-flight: a slow answer in one conversation no longer blocks
|
||
// another thread in the same room, and two messages in the SAME conversation still see
|
||
// exactly one winner. The claim is taken synchronously in transaction order.
|
||
if !b.tryClaim(roomID, threadRoot) {
|
||
b.log.DebugContext(ctx, "drop: conversation busy generating", "room", roomID, "thread", threadRoot, "sender", ev.Sender)
|
||
return
|
||
}
|
||
|
||
// Snapshot THIS conversation's history (excludes this trigger) under the claim, then
|
||
// run the slow generation in its own goroutine so this transaction's remaining events
|
||
// and other rooms/threads are not blocked by the xAI call. respond appends the
|
||
// trigger+answer to the same per-thread buffer on success (see sendReply) and releases
|
||
// the claim.
|
||
history := b.snapshotBuf(roomID, threadRoot)
|
||
b.safego(ctx, "respond", func() {
|
||
defer b.release(roomID, threadRoot)
|
||
b.respond(ctx, roomID, threadRoot, isDM, ev, mc, history)
|
||
})
|
||
}
|
||
|
||
// unlimitedCap is the effective per-user cap for UNLIMITED_USERS — high enough to
|
||
// never trip the per-user gate, while the global DAILY_USD_CEILING still applies.
|
||
const unlimitedCap = 1 << 30
|
||
|
||
func (b *Bot) respond(ctx context.Context, roomID, threadRoot string, isDM bool, ev *Event, mc *MessageContent, history []bufferedMsg) {
|
||
started := time.Now()
|
||
// One telemetry row per request, populated as the flow decides its outcome and
|
||
// emitted once via defer — so every exit (deny, error, empty, paid silence, success)
|
||
// is recorded without scattering writes (F-FUNC-5). It starts as route=none/ok=false;
|
||
// proceeding to the model sets the route, success sets ok=true.
|
||
rl := RequestLog{
|
||
ID: ev.EventID, RoomID: roomID, Sender: ev.Sender,
|
||
Route: routeNone, RouterSource: "default",
|
||
PromptVersion: b.promptVersion,
|
||
QueryText: mc.Body,
|
||
Models: map[string]string{"final": b.cfg.XAIModel},
|
||
}
|
||
defer func() {
|
||
rl.LatencyMS = int(time.Since(started).Milliseconds())
|
||
b.recordTelemetry(ctx, rl)
|
||
}()
|
||
|
||
perUserCap := b.cfg.PerUserDailyCap
|
||
perUserUSD := b.cfg.PerUserDailyUSD
|
||
if b.cfg.UnlimitedUsers[ev.Sender] {
|
||
perUserCap = unlimitedCap
|
||
perUserUSD = 0 // exempt from both per-user gates; the global ceiling still applies
|
||
}
|
||
// Reserve the route's estimated max-cost (not $0) so the global ceiling counts
|
||
// this in-flight call BEFORE it returns — the TOCTOU fix (§8.1). The envelope covers
|
||
// the most expensive ENABLED route, so whichever the router picks is admitted within
|
||
// the reservation; with the cascade off it is exactly grok_direct's estimate.
|
||
estimate := b.reserveEstimate()
|
||
switch res, err := b.st.Reserve(ev.Sender, perUserCap, perUserUSD, b.cfg.DailyUSDCeiling, estimate); {
|
||
case err != nil:
|
||
// A limiter failure is on our side — don't leave the user wondering.
|
||
b.log.ErrorContext(ctx, "limiter reserve failed", "sender", ev.Sender, "err", err)
|
||
rl.Degraded, rl.Err = degradeReserveErr, err.Error()
|
||
b.react(ctx, roomID, ev.EventID, reactError)
|
||
return
|
||
case res == reserveDeniedUser:
|
||
// Per-user cap (anti-abuse, F24): stop answering, but always signal the limit —
|
||
// no message addressed to the bot is left without feedback.
|
||
b.log.InfoContext(ctx, "per-user daily cap reached; reacting", "sender", ev.Sender)
|
||
rl.PerUserCapHit = true
|
||
b.react(ctx, roomID, ev.EventID, reactRateLimit)
|
||
return
|
||
case res == reserveDeniedGlobal:
|
||
// Global USD ceiling. A reaction is cheap and non-intrusive (unlike the old
|
||
// once-per-day text notice), so signal every affected message rather than
|
||
// going silent after the first.
|
||
b.log.WarnContext(ctx, "global daily USD ceiling reached", "room", roomID, "sender", ev.Sender)
|
||
rl.CeilingHit = true
|
||
b.react(ctx, roomID, ev.EventID, reactRateLimit)
|
||
return
|
||
}
|
||
|
||
// Past admission, a reservation + request slot are held. Guarantee they're freed on
|
||
// ANY exit that didn't settle — including a panic in generate() (recovered by safego)
|
||
// — so a leaked reservation can never permanently drift the global ceiling down
|
||
// (§8.1c). The normal paths set settled=true at Settle, so this defer then no-ops; it
|
||
// fires only on the panic/unexpected-return path, where it also reacts so the failure
|
||
// isn't silent.
|
||
settled := false
|
||
defer func() {
|
||
if settled {
|
||
return
|
||
}
|
||
rl.Degraded, rl.Err = "panic", "generation panicked or returned without settling"
|
||
if rerr := b.st.ReleaseReservation(ev.Sender, estimate); rerr != nil {
|
||
b.log.ErrorContext(ctx, "release reservation (unsettled) failed", "sender", ev.Sender, "err", rerr)
|
||
}
|
||
if rerr := b.st.RefundRequest(ev.Sender); rerr != nil {
|
||
b.log.ErrorContext(ctx, "refund (unsettled) failed", "sender", ev.Sender, "err", rerr)
|
||
}
|
||
b.react(ctx, roomID, ev.EventID, reactError)
|
||
}()
|
||
|
||
// Show "Vojo AI печатает…" for the whole generation. The keepalive refreshes the
|
||
// typing notification every 20s (the server expires it after 30s) so the indicator
|
||
// never lapses on a slow/retried answer, and the deferred stop clears it on exit.
|
||
stopTyping := b.startTypingKeepalive(ctx, roomID)
|
||
defer stopTyping()
|
||
|
||
// Overall per-request deadline (§8.2.2): every model call in the cascade shares this
|
||
// single budget (genCtx), so a multi-stage route can't accrete minutes the way
|
||
// per-stage 3×60s retries would. react/send/store ops use the live room ctx, NOT
|
||
// genCtx, so a budget timeout still surfaces as a ⚠️ react, never silence.
|
||
genCtx, cancel := context.WithTimeout(ctx, b.cfg.RequestBudget)
|
||
defer cancel()
|
||
|
||
msgs := buildContext(b.cfg.SystemPrompt, history, isDM, mc.Body, b.cfg.MaxCtxEvent, maxPromptTokens)
|
||
res, err := b.generate(genCtx, mc.Body, msgs, b.convID(roomID, threadRoot), isDM)
|
||
|
||
// Record what the routing + generation actually did, whatever the outcome.
|
||
rl.Route = res.route
|
||
rl.RouterSource = res.decision.Source
|
||
rl.RouterConfidence = res.decision.Confidence
|
||
rl.FallbackFired = res.fallback
|
||
rl.Escalated = res.route == routeReason
|
||
rl.Cost = res.cost
|
||
if res.stageMS != nil {
|
||
rl.StageMS = res.stageMS
|
||
}
|
||
if res.finalModel != "" {
|
||
rl.Models["final"] = res.finalModel
|
||
}
|
||
if res.decision.Source == "classifier" {
|
||
rl.Models["router"] = b.cfg.GeminiModel
|
||
}
|
||
if res.degraded != "" {
|
||
rl.Degraded = res.degraded
|
||
}
|
||
// Classifier signals + web outcome for the offline eval (§8). Booleans/counts are
|
||
// metadata (always recorded when telemetry is on); SearchQuery/AnswerText are content
|
||
// (stripped unless TELEMETRY_STORE_TEXT — see recordTelemetry).
|
||
rl.NeedsWeb = res.decision.NeedsWeb
|
||
rl.EntityObscure = res.decision.EntityObscure
|
||
rl.TimeSensitive = res.decision.TimeSensitive
|
||
rl.Verifiable = res.decision.Verifiable
|
||
rl.TrivialScore = res.decision.TrivialScore
|
||
rl.AboutProject = res.decision.AboutProject
|
||
rl.WebDecidedBy = res.decision.WebDecidedBy
|
||
rl.RewriteUsed = res.rewriteUsed
|
||
rl.WebGrounded = res.webGrounded
|
||
rl.CitationCount = res.citationCount
|
||
rl.SearchQuery = res.searchQuery
|
||
rl.AnswerText = res.text
|
||
|
||
// The full routing/generation picture for one request, in one line: which route ran,
|
||
// whether it was a fallback, the degrade reason (if any), the per-stage timings and
|
||
// the spend. At DEBUG so prod stays quiet; turn LOG_LEVEL=debug on to trace routing.
|
||
b.log.DebugContext(ctx, "generation outcome",
|
||
"route", res.route, "router_source", res.decision.Source,
|
||
"router_confidence", res.decision.Confidence, "fallback", res.fallback,
|
||
"degraded", res.degraded, "stage_ms", res.stageMS, "usd", res.cost.Total(),
|
||
"web_grounded", res.webGrounded, "citation_count", res.citationCount,
|
||
"grounding_fee_usd", res.cost.GroundingFee, "rewrite_used", res.rewriteUsed)
|
||
|
||
if err != nil {
|
||
// Terminal: even grok_direct failed. Settle whatever the cascade ACTUALLY spent
|
||
// (e.g. a paid web fetch before the failure) and release the rest of the
|
||
// reservation in one step, then refund the request slot so an outage doesn't burn
|
||
// the cap, and react (never silent). Settle with an all-zero cost is just a
|
||
// release, so a pure grok_direct failure books nothing — exactly as before.
|
||
b.log.ErrorContext(ctx, "generation failed", "sender", ev.Sender, "route", res.route, "err", err)
|
||
rl.Err = err.Error()
|
||
if serr := b.st.Settle(ev.Sender, estimate, res.cost); serr != nil {
|
||
b.log.ErrorContext(ctx, "settle (failed request) failed", "sender", ev.Sender, "err", serr)
|
||
}
|
||
settled = true
|
||
if rerr := b.st.RefundRequest(ev.Sender); rerr != nil {
|
||
b.log.ErrorContext(ctx, "refund failed", "sender", ev.Sender, "err", rerr)
|
||
}
|
||
b.react(ctx, roomID, ev.EventID, reactError)
|
||
return
|
||
}
|
||
|
||
// Success from some route. Settle: release the reservation and book the real
|
||
// per-component cost, so both caps see grounding/tool fees too — not just tokens.
|
||
if err := b.st.Settle(ev.Sender, estimate, res.cost); err != nil {
|
||
b.log.ErrorContext(ctx, "settle spend failed", "sender", ev.Sender, "err", err)
|
||
}
|
||
settled = true
|
||
rl.PromptTokens, rl.CachedTokens, rl.CompletionTokens =
|
||
res.usage.PromptTokens, res.usage.CachedTokens, res.usage.CompletionTokens
|
||
rl.CacheHit = res.usage.CachedTokens > 0
|
||
rl.ProviderRequestID = res.providerID
|
||
|
||
text := res.text
|
||
if text == "" {
|
||
// Billed but no usable text (content filter / length cap / empty choices). Never
|
||
// leave a billed request without feedback — react "couldn't answer". The slot
|
||
// stays consumed (the 2xx was real); no refund, or an empty reply could be forced
|
||
// to dodge the cap.
|
||
b.log.WarnContext(ctx, "empty completion (billed, reacting)", "sender", ev.Sender, "usd", res.cost.Total())
|
||
rl.Degraded = degradeEmpty
|
||
b.react(ctx, roomID, ev.EventID, reactError)
|
||
return
|
||
}
|
||
b.log.InfoContext(ctx, "answered", "room", roomID, "sender", ev.Sender, "dm", isDM, "route", res.route,
|
||
"usd", res.cost.Total(), "prompt_tokens", res.usage.PromptTokens, "completion_tokens", res.usage.CompletionTokens)
|
||
if err := b.sendReply(ctx, roomID, threadRoot, ev, mc, text); err != nil {
|
||
// Paid silence (§8.1): the spend is real (USD is kept — refunding it would
|
||
// under-count the ceiling), but the reply never landed. Refund the request SLOT
|
||
// so the user can retry, and react ⚠️ so the failure isn't silent.
|
||
b.log.ErrorContext(ctx, "send reply failed after billing; refunding slot + reacting", "sender", ev.Sender, "err", err)
|
||
rl.Degraded, rl.Err = degradeSendFailed, err.Error()
|
||
if rerr := b.st.RefundRequest(ev.Sender); rerr != nil {
|
||
b.log.ErrorContext(ctx, "refund failed", "sender", ev.Sender, "err", rerr)
|
||
}
|
||
b.react(ctx, roomID, ev.EventID, reactError)
|
||
return
|
||
}
|
||
rl.OK = true
|
||
}
|
||
|
||
// maxPromptTokens bounds the assembled prompt (history is trimmed to fit) and feeds
|
||
// the reservation estimate, so the two never disagree about a request's size.
|
||
const maxPromptTokens = 8000
|
||
|
||
// maxProjectKBTokens caps the curated project KB at startup. The KB is injected via
|
||
// insertSystemNote AFTER buildContext truncates history, so an oversized KB could push the
|
||
// real prompt past maxPromptTokens (the reservation estimate). It is a COARSE sanity backstop,
|
||
// not a precise budget: estimateTokens is runes/4 (ASCII-biased) and a Cyrillic KB tokenizes
|
||
// denser, so a KB near this cap can be more real tokens than estimated — fine, because a real
|
||
// curated single-product FAQ is ~250–800 tokens (far under the cap) and the prompt stays well
|
||
// within model limits. Enforced in main.go (fail-fast at startup).
|
||
const maxProjectKBTokens = 2500
|
||
|
||
// estimateUSD is the conservative max-cost reserved for a route before the call, so
|
||
// the global ceiling can count an in-flight request (§8.1). It prices a full prompt
|
||
// (maxPromptTokens) plus the max output at the model's non-cached rates — an upper-ish
|
||
// bound, since real calls send fewer tokens and get the cheaper cached rate. Settle
|
||
// later books the authoritative actual cost regardless, so a slightly-off estimate
|
||
// only nudges admission, never the final accounting.
|
||
func (b *Bot) estimateUSD(model string) float64 {
|
||
p := b.cfg.priceFor(model)
|
||
return float64(maxPromptTokens)/1e6*p.InputPerM + float64(b.cfg.MaxOutTok)/1e6*p.OutputPerM
|
||
}
|
||
|
||
// convID returns the prompt-cache routing hint sent as x-grok-conv-id, or "" when
|
||
// GROK_PROMPT_CACHE is off. Grok caches prompt prefixes automatically; the header
|
||
// only pins a conversation to the same backend to raise the hit rate (docs.x.ai). The
|
||
// right unit is the (room,thread) pair, not the room: once conversations are threaded,
|
||
// each thread has its OWN system+history prefix, so pinning all of a room's threads to
|
||
// one id would thrash the cache between divergent prefixes. The main timeline keys on
|
||
// roomID alone (threadRoot ""), preserving the previous value for the flat case. Carries
|
||
// no PII (ids are opaque) and is hashed to stay compact and non-identifying.
|
||
func (b *Bot) convID(roomID, threadRoot string) string {
|
||
if !b.cfg.GrokPromptCache {
|
||
return ""
|
||
}
|
||
key := roomID
|
||
if threadRoot != "" {
|
||
key = roomID + "|" + threadRoot
|
||
}
|
||
return fmt.Sprintf("vojo-%08x", hashString(key))
|
||
}
|
||
|
||
// computeUSD prices a call from the API-returned token usage (authoritative
|
||
// counts) and the per-model price table — so the hard ceiling tracks real usage
|
||
// even if the model/price changes (only the price table needs updating), and a
|
||
// call books at the price of the model that actually served it.
|
||
func computeUSD(model string, u Usage, cfg *Config) float64 {
|
||
p := cfg.priceFor(model)
|
||
nonCached := u.PromptTokens - u.CachedTokens
|
||
if nonCached < 0 {
|
||
nonCached = 0
|
||
}
|
||
return float64(nonCached)/1e6*p.InputPerM +
|
||
float64(u.CachedTokens)/1e6*p.CachedPerM +
|
||
float64(u.CompletionTokens)/1e6*p.OutputPerM
|
||
}
|
||
|
||
// react adds an emoji m.reaction to the triggering event — the bot's language-free
|
||
// way to signal a system state (error / rate limit / encrypted / media) it can't
|
||
// express as a model-generated answer. Best-effort: a failed reaction is logged, not
|
||
// retried. Reactions are m.reaction (not m.room.message), so they never re-enter
|
||
// handleMessage and need no anti-loop tracking.
|
||
func (b *Bot) react(ctx context.Context, roomID, eventID, emoji string) {
|
||
content := map[string]any{
|
||
"m.relates_to": map[string]any{
|
||
"rel_type": "m.annotation",
|
||
"event_id": eventID,
|
||
"key": emoji,
|
||
},
|
||
}
|
||
if _, err := b.mx.SendEvent(ctx, roomID, "m.reaction", content); err != nil {
|
||
b.log.ErrorContext(ctx, "react failed", "room", roomID, "emoji", emoji, "err", err)
|
||
}
|
||
}
|
||
|
||
// reactEncryptedOnce reacts 🔒 to the first message seen in an encrypted room and
|
||
// records a durable flag so a restart doesn't re-react (F5). Vojo disables E2EE by
|
||
// default, so this is a near-dead safety path; the reaction is far less intrusive
|
||
// than the old text notice, but the once-gate keeps it from annotating every message
|
||
// in the rare encrypted room.
|
||
// reactEncryptedOnce returns whether it reacted this call (true only the first time
|
||
// for a room), so the caller can log the skip exactly once too.
|
||
func (b *Bot) reactEncryptedOnce(ctx context.Context, roomID, eventID string) bool {
|
||
warned, err := b.st.HasWarnedEncrypted(roomID)
|
||
if err != nil {
|
||
b.log.ErrorContext(ctx, "warned-flag read failed", "room", roomID, "err", err)
|
||
return false
|
||
}
|
||
if warned {
|
||
return false
|
||
}
|
||
b.react(ctx, roomID, eventID, reactEncrypted)
|
||
if err := b.st.SetWarnedEncrypted(roomID); err != nil {
|
||
b.log.ErrorContext(ctx, "persist warned-flag failed", "room", roomID, "err", err)
|
||
}
|
||
return true
|
||
}
|
||
|
||
// sendReply sends the model's actual answer and records the completed exchange in the
|
||
// conversation buffer so the next turn has context. It RETURNS the send error so the
|
||
// caller can handle paid silence (§8.1): a billed answer that failed to deliver must
|
||
// refund the slot and react, not vanish.
|
||
func (b *Bot) sendReply(ctx context.Context, roomID, threadRoot string, trigger *Event, triggerMC *MessageContent, body string) error {
|
||
if err := b.sendMessage(ctx, roomID, threadRoot, trigger, triggerMC, body); err != nil {
|
||
return err
|
||
}
|
||
// Record the user trigger AND the assistant answer together, only AFTER the answer
|
||
// was sent, so a failed or empty generation never leaves a dangling user turn (a
|
||
// question with no reply) in the buffer — which would skew later completions. Both go
|
||
// to THIS conversation's buffer (roomID,threadRoot). Per-(room,thread) single-flight
|
||
// guarantees no other turn for this conversation interleaves between the two.
|
||
b.appendBuf(roomID, threadRoot, bufferedMsg{sender: trigger.Sender, body: triggerMC.Body, isBot: false})
|
||
b.appendBuf(roomID, threadRoot, bufferedMsg{sender: b.cfg.BotMXID, body: body, isBot: true})
|
||
return nil
|
||
}
|
||
|
||
// sendMessage builds and sends an m.notice reply and tracks our own event id. Returns
|
||
// the send error (nil on success) so the caller can detect a failed delivery.
|
||
func (b *Bot) sendMessage(ctx context.Context, roomID, threadRoot string, trigger *Event, triggerMC *MessageContent, body string) error {
|
||
content := buildNoticeContent(trigger.EventID, trigger.Sender, threadRoot, body)
|
||
id, err := b.mx.SendEvent(ctx, roomID, "m.room.message", content)
|
||
if err != nil {
|
||
b.log.ErrorContext(ctx, "send failed", "room", roomID, "err", err)
|
||
return err
|
||
}
|
||
// Track our own reply so a future reply-to-it is recognised as addressing us.
|
||
b.botSent.Add(id)
|
||
return nil
|
||
}
|
||
|
||
// startTypingKeepalive shows the room-level "typing…" indicator for the whole generation
|
||
// and keeps it alive (the CS-API notification expires after the 30s we pass, so we refresh
|
||
// every 20s). The returned stop is safe to call once via defer. Typing is ROOM-scoped in
|
||
// Matrix — there is no per-thread typing — so with per-(room,thread) concurrency several
|
||
// generations can run for one room at once. A per-room refcount keeps the indicator on
|
||
// until the LAST of them finishes, rather than letting whichever finishes first clear it
|
||
// out from under the others. Best-effort UX — failures are non-fatal.
|
||
func (b *Bot) startTypingKeepalive(ctx context.Context, roomID string) func() {
|
||
b.typingAcquire(roomID)
|
||
|
||
b.setTyping(ctx, roomID, true)
|
||
done := make(chan struct{})
|
||
go func() {
|
||
t := time.NewTicker(20 * time.Second)
|
||
defer t.Stop()
|
||
for {
|
||
select {
|
||
case <-ctx.Done():
|
||
return
|
||
case <-done:
|
||
return
|
||
case <-t.C:
|
||
b.setTyping(ctx, roomID, true)
|
||
}
|
||
}
|
||
}()
|
||
var once sync.Once
|
||
return func() {
|
||
once.Do(func() {
|
||
close(done)
|
||
// Only the last in-flight generation for the room clears the indicator; the
|
||
// others merely stop their own keepalive loop (a forgetRoom mid-flight can drop
|
||
// the counter to <=0, which the guard treats as "last" — still correct).
|
||
if b.typingRelease(roomID) {
|
||
b.setTyping(ctx, roomID, false)
|
||
}
|
||
})
|
||
}
|
||
}
|
||
|
||
// typingAcquire registers one in-flight generation against the room-level typing indicator.
|
||
// The increment is the only state change; the caller (re)asserts the indicator on regardless,
|
||
// since re-sending typing=true is idempotent and refreshes the server-side 30s timeout.
|
||
func (b *Bot) typingAcquire(roomID string) {
|
||
b.mu.Lock()
|
||
b.typingRefs[roomID]++
|
||
b.mu.Unlock()
|
||
}
|
||
|
||
// typingRelease drops one in-flight generation and reports whether it was the LAST one for
|
||
// the room (refcount fell to <=0), so the caller clears the indicator only then. A forgetRoom
|
||
// that already deleted the key leaves a missing entry: the decrement reads 0 and lands at -1
|
||
// (<=0), so the guard still treats it as "last" and the negative entry is deleted — no leak.
|
||
func (b *Bot) typingRelease(roomID string) (last bool) {
|
||
b.mu.Lock()
|
||
defer b.mu.Unlock()
|
||
b.typingRefs[roomID]--
|
||
last = b.typingRefs[roomID] <= 0
|
||
if last {
|
||
delete(b.typingRefs, roomID)
|
||
}
|
||
return last
|
||
}
|
||
|
||
// setTyping sets/clears the bot's typing indicator (best-effort UX; failures are
|
||
// non-fatal). The 30s server-side timeout is refreshed by startTypingKeepalive.
|
||
func (b *Bot) setTyping(ctx context.Context, roomID string, typing bool) {
|
||
if err := b.mx.SendTyping(ctx, roomID, typing, 30000); err != nil {
|
||
b.log.DebugContext(ctx, "set typing failed", "room", roomID, "typing", typing, "err", err)
|
||
}
|
||
}
|
||
|
||
// buildNoticeContent builds the reply. m.notice (not m.text) so the anti-loop
|
||
// skip catches our own output. threadRoot decides where the answer lands: when non-empty
|
||
// the answer carries an m.thread relation rooted there (F27) — either replying inside an
|
||
// existing thread or auto-rooting a NEW DM conversation on the trigger. The caller's
|
||
// resolveThreadRoot makes that choice and is DM-gated, so a group answer never gets a
|
||
// thread relation it didn't already have. When empty the answer is a plain top-level
|
||
// reply (groups, and DMs with conversations off).
|
||
func buildNoticeContent(replyTo, sender, threadRoot, body string) map[string]any {
|
||
relates := map[string]any{}
|
||
if threadRoot != "" {
|
||
relates["rel_type"] = "m.thread"
|
||
relates["event_id"] = threadRoot
|
||
relates["is_falling_back"] = true
|
||
relates["m.in_reply_to"] = map[string]any{"event_id": replyTo}
|
||
} else {
|
||
relates["m.in_reply_to"] = map[string]any{"event_id": replyTo}
|
||
}
|
||
content := map[string]any{
|
||
"msgtype": "m.notice",
|
||
"body": body,
|
||
"m.mentions": map[string]any{"user_ids": []string{sender}},
|
||
"m.relates_to": relates,
|
||
}
|
||
// The model answers in markdown; render it to org.matrix.custom.html so clients
|
||
// show formatting instead of raw `**`, `#`, lists, code fences. Only attach
|
||
// formatted_body when there's actual formatting — a plain answer keeps rendering
|
||
// from `body` exactly as before.
|
||
if html, formatted := markdownToHTML(body); formatted {
|
||
content["format"] = matrixHTMLFormat
|
||
content["formatted_body"] = html
|
||
}
|
||
return content
|
||
}
|
||
|
||
// --- per-(room,thread) single-flight ---------------------------------------------
|
||
|
||
// resolveThreadRoot decides which conversation a trigger belongs to, returning the thread
|
||
// root event id, or "" for the main timeline. Order: (1) a trigger already inside a thread
|
||
// continues that thread; (2) in a 1:1 DM, a top-level message roots a NEW conversation on
|
||
// itself (ChatGPT-style "new chat"); (3) otherwise — EVERY group message — the main timeline
|
||
// (""). This is the single place auto-threading is decided and it is hard-gated on isDM, so
|
||
// a group is NEVER auto-threaded (the group gate is structural, not a flag). buildNoticeContent
|
||
// emits the matching m.thread relation for the same value, so the conversation we serialize on
|
||
// is always the one the answer lands in.
|
||
func (b *Bot) resolveThreadRoot(isDM bool, ev *Event, mc *MessageContent) string {
|
||
if mc.RelatesTo != nil && mc.RelatesTo.RelType == "m.thread" && mc.RelatesTo.EventID != "" {
|
||
return mc.RelatesTo.EventID
|
||
}
|
||
if isDM {
|
||
return ev.EventID
|
||
}
|
||
return ""
|
||
}
|
||
|
||
// tryClaim marks a (room,thread) conversation as generating and returns true if the caller
|
||
// won the claim (nothing was already in flight for that exact conversation). The loser
|
||
// drops its message. Different threads of one room claim independently — a slow answer in
|
||
// one conversation never blocks another. The check-and-set is atomic under b.mu (one map,
|
||
// no per-thread mutex), so there is no lazy-lock TOCTOU.
|
||
func (b *Bot) tryClaim(roomID, threadRoot string) bool {
|
||
b.mu.Lock()
|
||
defer b.mu.Unlock()
|
||
m := b.inflight[roomID]
|
||
if m == nil {
|
||
m = make(map[string]bool)
|
||
b.inflight[roomID] = m
|
||
}
|
||
if m[threadRoot] {
|
||
return false
|
||
}
|
||
m[threadRoot] = true
|
||
return true
|
||
}
|
||
|
||
func (b *Bot) release(roomID, threadRoot string) {
|
||
b.mu.Lock()
|
||
defer b.mu.Unlock()
|
||
if m := b.inflight[roomID]; m != nil {
|
||
delete(m, threadRoot)
|
||
if len(m) == 0 {
|
||
delete(b.inflight, roomID)
|
||
}
|
||
}
|
||
}
|
||
|
||
// --- per-room metadata helpers (all guarded by b.mu; probes run outside it) -----
|
||
|
||
// getMetaLocked returns (creating if needed) the room's meta. Caller MUST hold b.mu.
|
||
func (b *Bot) getMetaLocked(roomID string) *roomMeta {
|
||
m := b.meta[roomID]
|
||
if m == nil {
|
||
m = &roomMeta{}
|
||
b.meta[roomID] = m
|
||
}
|
||
return m
|
||
}
|
||
|
||
func (b *Bot) invalidateCounts(roomID string) {
|
||
b.mu.Lock()
|
||
defer b.mu.Unlock()
|
||
if m := b.meta[roomID]; m != nil {
|
||
m.countsKnown = false
|
||
}
|
||
}
|
||
|
||
func (b *Bot) setEncrypted(roomID string) {
|
||
b.mu.Lock()
|
||
defer b.mu.Unlock()
|
||
m := b.getMetaLocked(roomID)
|
||
m.encrypted, m.encKnown = true, true
|
||
}
|
||
|
||
func (b *Bot) forgetRoom(roomID string) {
|
||
b.mu.Lock()
|
||
defer b.mu.Unlock()
|
||
delete(b.meta, roomID)
|
||
delete(b.buf, roomID) // drops the room's whole per-thread subtree in one delete
|
||
delete(b.inflight, roomID) // (nested maps keyed by roomID keep forgetRoom O(1))
|
||
delete(b.typingRefs, roomID)
|
||
}
|
||
|
||
// ensureEncryption returns whether the room is encrypted, probing the CS-API once
|
||
// (without holding the lock) and caching the result. On probe error it returns false
|
||
// (treated as not-encrypted this round) and leaves the state unknown for a re-probe.
|
||
func (b *Bot) ensureEncryption(ctx context.Context, roomID string) bool {
|
||
b.mu.Lock()
|
||
if m := b.getMetaLocked(roomID); m.encKnown {
|
||
enc := m.encrypted
|
||
b.mu.Unlock()
|
||
return enc
|
||
}
|
||
b.mu.Unlock()
|
||
|
||
enc, err := b.mx.RoomEncrypted(ctx, roomID)
|
||
if err != nil {
|
||
b.log.WarnContext(ctx, "encryption probe failed", "room", roomID, "err", err)
|
||
return false // leave unknown; re-probed on the next message
|
||
}
|
||
// Re-fetch under the lock instead of writing to the pointer captured before the
|
||
// unlocked probe: if the room was forgotten (leave/ban) mid-probe its meta was
|
||
// deleted, and writing to the captured pointer would resurrect a dead room.
|
||
b.mu.Lock()
|
||
if m := b.meta[roomID]; m != nil {
|
||
m.encrypted, m.encKnown = enc, true
|
||
}
|
||
b.mu.Unlock()
|
||
return enc
|
||
}
|
||
|
||
// ensureCounts returns (countsKnown, isDM, foreign), probing /members once (without
|
||
// holding the lock) and caching the result. On probe error it returns
|
||
// (false, false, false): the caller treats an unclassified room conservatively and
|
||
// logs a visible WARN rather than silently dropping.
|
||
func (b *Bot) ensureCounts(ctx context.Context, roomID string) (countsKnown, isDM, foreign bool) {
|
||
b.mu.Lock()
|
||
known := b.getMetaLocked(roomID).countsKnown
|
||
b.mu.Unlock()
|
||
|
||
if !known {
|
||
joined, invited, servers, err := b.mx.RoomMembership(ctx, roomID)
|
||
if err != nil {
|
||
b.log.WarnContext(ctx, "member probe failed", "room", roomID, "err", err)
|
||
return false, false, false
|
||
}
|
||
isForeign := false
|
||
for s := range servers {
|
||
if !b.cfg.AllowedServers[s] {
|
||
isForeign = true
|
||
break
|
||
}
|
||
}
|
||
// Re-fetch under the lock rather than writing a pointer captured before the
|
||
// unlocked /members probe (see ensureEncryption): a leave/ban mid-probe must
|
||
// not be undone by resurrecting the room's meta.
|
||
b.mu.Lock()
|
||
if m := b.meta[roomID]; m != nil {
|
||
m.joined, m.invited, m.foreign, m.countsKnown = joined, invited, isForeign, true
|
||
}
|
||
b.mu.Unlock()
|
||
}
|
||
|
||
b.mu.Lock()
|
||
defer b.mu.Unlock()
|
||
if m := b.meta[roomID]; m != nil {
|
||
return m.countsKnown, m.isDM(), m.foreign
|
||
}
|
||
return false, false, false
|
||
}
|
||
|
||
// convBuf is one conversation's (one thread's) rolling context window plus a last-touch
|
||
// stamp used for LRU eviction. Without eviction, a long-lived control DM that spawns many
|
||
// ChatGPT-style conversations would accumulate one buffer per conversation for the whole
|
||
// process lifetime (forgetRoom only frees them on room leave/ban).
|
||
type convBuf struct {
|
||
msgs []bufferedMsg
|
||
touched uint64 // b.bufSeq at last append; higher = more recent
|
||
}
|
||
|
||
// maxConvBuffersPerRoom bounds how many conversation buffers a single room retains. A
|
||
// human DM rarely keeps this many live conversations at once; an evicted cold conversation
|
||
// just rebuilds its window from scratch on its next turn (same as after a restart), so the
|
||
// only cost of eviction is a one-turn cold start, never a wrong answer.
|
||
const maxConvBuffersPerRoom = 64
|
||
|
||
func (b *Bot) snapshotBuf(roomID, threadRoot string) []bufferedMsg {
|
||
b.mu.Lock()
|
||
defer b.mu.Unlock()
|
||
// A never-seen conversation has no buffer (nil inner map / nil convBuf) → nil history,
|
||
// exactly what a fresh "new chat" wants (context = system + trigger).
|
||
cb := b.buf[roomID][threadRoot]
|
||
if cb == nil || len(cb.msgs) == 0 {
|
||
return nil
|
||
}
|
||
out := make([]bufferedMsg, len(cb.msgs))
|
||
copy(out, cb.msgs)
|
||
return out
|
||
}
|
||
|
||
func (b *Bot) appendBuf(roomID, threadRoot string, msg bufferedMsg) {
|
||
b.mu.Lock()
|
||
defer b.mu.Unlock()
|
||
limit := b.cfg.MaxCtxEvent * 2
|
||
if limit < 8 {
|
||
limit = 8
|
||
}
|
||
m := b.buf[roomID]
|
||
if m == nil {
|
||
m = make(map[string]*convBuf)
|
||
b.buf[roomID] = m
|
||
}
|
||
cb := m[threadRoot]
|
||
if cb == nil {
|
||
cb = &convBuf{}
|
||
m[threadRoot] = cb
|
||
}
|
||
cb.msgs = append(cb.msgs, msg)
|
||
if len(cb.msgs) > limit {
|
||
cb.msgs = cb.msgs[len(cb.msgs)-limit:]
|
||
}
|
||
b.bufSeq++
|
||
cb.touched = b.bufSeq
|
||
|
||
// LRU-evict the least-recently-touched conversation once the room exceeds the cap. The
|
||
// just-touched conversation has the highest `touched`, so it is never the victim.
|
||
if len(m) > maxConvBuffersPerRoom {
|
||
var victim string
|
||
var oldest uint64
|
||
for k, v := range m {
|
||
if victim == "" || v.touched < oldest {
|
||
victim, oldest = k, v.touched
|
||
}
|
||
}
|
||
delete(m, victim)
|
||
}
|
||
}
|