diff --git a/apps/ai-bot/README.md b/apps/ai-bot/README.md index 41fb0a35..70179d59 100644 --- a/apps/ai-bot/README.md +++ b/apps/ai-bot/README.md @@ -32,12 +32,20 @@ apps/ai-bot/ ├── registration.go # generate + read registration.yaml (tokens, mautrix idiom) ├── events.go # Matrix event types + decoders ├── mentions.go # m.mentions + pill/reply fallbacks (F29/F30) -├── context.go # xAI message-window assembly (trigger + bot replies) -├── xai.go # chat/completions client + retry (F6) -├── store.go # Postgres (vojo_ai): spend ledger, txn/event dedup, encrypted-warned set -├── messages.go # bot-authored RU notices +├── context.go # provider-neutral message-window assembly (trigger + bot replies) +├── llm.go # provider-neutral types + LLMClient interface (no vendor names) +├── httpllm.go # shared OpenAI-compatible chat/completions transport + retry (F6) +├── provider_xai.go # thin xAI/Grok adapter over the shared transport +├── provider_gemini.go # Gemini adapter: OpenAI-compat client + native v1beta grounding +├── pricing.go # per-model price table (priceFor) + CostBreakdown +├── router.go # cascade router: Layer-0 heuristic + optional Layer-1 Gemini classifier +├── cascade.go # generate(): route dispatch with degrade-to-grok_direct +├── web.go # WebProvider: grok_web_search (Live Search) | gemini_grounding + cap guard +├── telemetry.go # request_log analytics row + async emit + retention trim +├── store.go # Postgres (vojo_ai): spend ledger (+reservation/components), dedup, request_log, grounding cap +├── messages.go # language-free emoji status reactions ├── markdown.go # markdown → org.matrix.custom.html for the reply's formatted_body -├── util.go # bounded dedup set +├── util.go # bounded dedup set + small hash ├── prompts/system_ru.txt ├── Dockerfile # CGO-free static build → distroless, EXPOSE 8009 └── .env.example @@ -49,8 +57,15 @@ All via environment (see `.env.example`). Required: `HOMESERVER_URL`, `BOT_MXID` `AS_TOKEN`, `HS_TOKEN`, `XAI_API_KEY`, `ALLOWED_SERVERS`, `AI_BOT_DATABASE_URL`. `AS_ADDR` (default `:8009`) is the transaction-push listen address — it must match the `url` port in the registration. The model is env-configurable (`XAI_MODEL`, -default `grok-4.20-0309-non-reasoning`; `grok-4.3` is an alternative — **re-verify -the id + price on docs.x.ai before deploy**). +default `grok-4.20-0309-non-reasoning`). + +`grok-4.3` is the newer unified model (same price, 1M context): one model with a +`reasoning_effort` dial. If you switch `XAI_MODEL=grok-4.3`, set +`GROK_REASONING_EFFORT=none` to keep the default voice fast/cheap — otherwise the API +defaults to `low` and reasons on **every** reply. `GROK_REASONING_EFFORT` (accepted: +`none|low|medium|high`, default empty = not sent) is applied to the normal Grok voice +(grok_direct + web synthesis); leave it **empty** for `grok-4.20-non-reasoning`, which +rejects the param. The reason_then_grok route always uses `high` regardless. ### Database @@ -78,8 +93,49 @@ AI_BOT_DATABASE_URL=postgres://vojo_ai:@postgres:5432/vojo_ai?sslmode=di ``` The hard USD ceiling is priced from the **API-returned token usage** times the -configured `XAI_PRICE_*_PER_M` fallbacks, so a price change only needs those -constants updated — it can't silently blow the cap. +per-model price table (`XAI_PRICE_*_PER_M`, `GEMINI_PRICE_*_PER_M`), so a price +change only needs those constants updated — it can't silently blow the cap. The +ceiling is enforced with an optimistic **reservation** (`reserved_usd`): a request's +estimated max-cost is booked at admission and settled to the real cost afterward, so +a burst of concurrent requests can't slip past `DAILY_USD_CEILING` (it would +otherwise, since the USD only lands after each call). + +### Operator accounting (Phase 1, on by default) + +- `REQUEST_BUDGET_SECONDS` (default 180) — overall per-request deadline shared by all + model calls, so a slow/retried call (or a cascade) can't accrete minutes. +- `GROK_PROMPT_CACHE` (default false) — Grok caches prompt prefixes automatically; this + toggle only adds the `x-grok-conv-id` routing header (a per-room id) to raise the + cache hit rate. There is no `prompt_cache` body param (verified on docs.x.ai). +- `TELEMETRY_ENABLED` (default false) — write a `request_log` analytics row per engaged + request (route, per-component $, latency, degrade/ceiling reasons). The write is async + and isolated — its failure never drops a reply. `TELEMETRY_STORE_TEXT` (default false) + additionally keeps the query text (for offline eval); `TELEMETRY_RETENTION_DAYS` + (default 30) time-trims old rows. Turn telemetry on to MEASURE the base before enabling + any cascade layer. + +### Cascade (Phase 2-4) — behind flags, **default OFF** (every layer off == today's bot) + +All optional; an unset env is exactly today's single grok_direct call. Any layer off or +failing **degrades to grok_direct** (never silence). Do **not** enable in prod until the +offline-eval gate (misroute < 2-3% AND measured saving > the second provider's cost; see +`docs/plans/ai_backend_build_plan.md` §9). + +| Env | Default | Meaning | +|---|---|---| +| `ROUTER_ENABLED` | false | Layer-0 heuristic router (else everything → grok_direct) | +| `ROUTER_CLASSIFIER_ENABLED` | false | Layer-1 Gemini classifier on uncertain cases (requires `ROUTER_ENABLED` + Gemini key) | +| `TRIVIAL_OFFLOAD_ENABLED` | false | answer trivial messages with Gemini (requires Gemini key) | +| `WEB_ENABLED` | false | web_then_grok route (Gemini/Grok fetches fresh facts, **Grok stays the voice**) | +| `WEB_PROVIDER` | `grok_web_search` | `grok_web_search` (xAI Agent Tools `web_search` on the Responses API, $5/1k calls, no Gemini key) or `gemini_grounding` (**cheapest**: Gemini does the fetch via native v1beta `google_search`, Grok voices it — ~$0.0013/query, validated on `gemini-2.5-flash-lite`; the F-EXT-3 "Gemini-3 only" caveat is the OpenAI-compat endpoint, native v1beta works on 2.5). Requires `GEMINI_API_KEY`. | +| `WEB_GROUNDING_DAILY_CAP` | 450 | durable per-day cap for `gemini_grounding` before degrading (keep < the 500/day free grounding RPD; guards the per-1k overage) | +| `REASONING_ENABLED` | false | manual "think harder" route on `REASONING_TRIGGER` | +| `REASONING_TRIGGER` | `подумай глубже` | trigger phrase | +| `REASONING_MODEL` | `grok-4.3` | a **reasoning-capable** model (the default `grok-4.20-non-reasoning` rejects `reasoning_effort`) | +| `REASONING_EFFORT` | `high` | the reasoning_effort the "think harder" route sends (`none|low|medium|high`) | +| `GEMINI_API_KEY` / `_FILE` | — | required only when a Gemini-using layer is on (fail-fast at startup otherwise) | +| `GEMINI_MODEL` | `gemini-2.5-flash-lite` | cheap model for trivial/classifier | +| `GEMINI_BASE_URL` | `…/v1beta/openai` | OpenAI-compat endpoint (native grounding endpoint derived from it) | ## One-time setup (appservice registration) diff --git a/apps/ai-bot/appservice_test.go b/apps/ai-bot/appservice_test.go index 55ceee25..437b1859 100644 --- a/apps/ai-bot/appservice_test.go +++ b/apps/ai-bot/appservice_test.go @@ -32,7 +32,7 @@ func openTestStore(t *testing.T) *Store { } ctx, cancel := opContext() defer cancel() - if _, err := st.pool.Exec(ctx, `TRUNCATE processed_txn, processed_event, spend, warned_encrypted`); err != nil { + if _, err := st.pool.Exec(ctx, `TRUNCATE processed_txn, processed_event, spend, warned_encrypted, request_log, grounding_count`); err != nil { st.Close() t.Fatalf("truncate test tables: %v", err) } diff --git a/apps/ai-bot/bot.go b/apps/ai-bot/bot.go index 30cafa46..220e5879 100644 --- a/apps/ai-bot/bot.go +++ b/apps/ai-bot/bot.go @@ -5,6 +5,7 @@ import ( "fmt" "log/slog" "sync" + "sync/atomic" "time" ) @@ -30,9 +31,22 @@ type Bot struct { cfg *Config log *slog.Logger mx *MatrixClient - xai *XAIClient + llm LLMClient st *Store + // gemini is the cheap chat backend for the trivial route and the Layer-1 classifier + // (an LLMClient so tests can fake it); nil unless a layer that uses it is enabled. + // web is the web-freshness provider, built only when WEB_ENABLED. Both nil → the + // cascade can only ever produce grok_direct. + gemini LLMClient + web WebProvider + + // promptVersion is a short stable hash of the system prompt, logged with each + // request so prompt changes are visible in the analytics (A/B + regressions). + promptVersion string + // telemetryWrites paces the retention trim (every telemetryTrimEvery writes). + telemetryWrites atomic.Uint64 + // mu guards the in-memory maps/sets below. Each transaction is acked to Synapse // immediately (appservice.go) and its events are processed in transaction order, // but the slow xAI generation runs in a per-room goroutine and the lazy probes run @@ -49,7 +63,7 @@ type Bot struct { func NewBot(ctx context.Context, cfg *Config, logger *slog.Logger) (*Bot, error) { mx := NewMatrixClient(cfg.HomeserverURL, cfg.ASToken, cfg.BotMXID) - xai := NewXAIClient(cfg.XAIBaseURL, cfg.XAIAPIKey, logger) + llm := NewXAIClient(cfg.XAIBaseURL, cfg.XAIAPIKey, logger) st, err := OpenStore(cfg.DatabaseURL) if err != nil { @@ -57,16 +71,35 @@ func NewBot(ctx context.Context, cfg *Config, logger *slog.Logger) (*Bot, error) } b := &Bot{ - cfg: cfg, - log: logger, - mx: mx, - xai: xai, - st: st, - seen: newLRUSet(5000), - botSent: newLRUSet(5000), - meta: make(map[string]*roomMeta), - buf: make(map[string][]bufferedMsg), - inflight: make(map[string]bool), + cfg: cfg, + log: logger, + mx: mx, + llm: llm, + st: st, + promptVersion: fmt.Sprintf("%08x", hashString(cfg.SystemPrompt)), + seen: newLRUSet(5000), + botSent: newLRUSet(5000), + meta: make(map[string]*roomMeta), + buf: make(map[string][]bufferedMsg), + inflight: make(map[string]bool), + } + + // Build the cascade backends only for enabled layers (config already fail-fast + // validated that the keys exist). With every cascade flag off these stay nil and + // generate() can only produce grok_direct — today's bot. The grounding web provider + // needs the concrete client (for the native generateContent call), so keep a typed + // handle alongside the LLMClient face. + var gc *geminiClient + if cfg.needsGemini() { + gc = NewGeminiClient(cfg.GeminiBaseURL, cfg.GeminiAPIKey, cfg.GeminiModel, logger) + b.gemini = gc + } + if cfg.WebEnabled { + if cfg.WebProvider == webProviderGeminiGrounding { + b.web = &geminiGrounding{gem: gc, st: st, cfg: cfg} + } else { + b.web = newGrokWebSearch(cfg, logger) + } } // Confirm the as_token + user_id resolves to BOT_MXID before serving. @@ -223,7 +256,11 @@ func (b *Bot) handleMessage(ctx context.Context, ev *Event) { // bot can't read it. The probe runs without the lock. if b.ensureEncryption(ctx, roomID) { b.log.Debug("skip: encrypted room", "room", roomID) - b.reactEncryptedOnce(ctx, roomID, ev.EventID) + // Log the skip only when we actually react (once per room), so an encrypted room + // the bot can't read doesn't flood request_log with one row per message. + if b.reactEncryptedOnce(ctx, roomID, ev.EventID) { + b.recordSkip(ev, degradeEncrypted) + } return } @@ -255,6 +292,7 @@ func (b *Bot) handleMessage(ctx context.Context, ev *Event) { // "leak" the bot into) a federated room with non-consenting third parties. if foreign { b.leaveForeign(ctx, roomID) + b.recordSkip(ev, degradeForeign) return } @@ -282,6 +320,7 @@ func (b *Bot) handleMessage(ctx context.Context, ev *Event) { if isMedia { b.log.Debug("skip: non-text msgtype (reacted)", "room", roomID, "sender", ev.Sender, "msgtype", mc.MsgType) b.react(ctx, roomID, ev.EventID, reactMedia) + b.recordSkip(ev, degradeMedia) return } @@ -312,20 +351,46 @@ func (b *Bot) handleMessage(ctx context.Context, ev *Event) { const unlimitedCap = 1 << 30 func (b *Bot) respond(ctx context.Context, roomID string, isDM bool, ev *Event, mc *MessageContent, history []bufferedMsg) { + started := time.Now() + // One telemetry row per request, populated as the flow decides its outcome and + // emitted once via defer — so every exit (deny, error, empty, paid silence, success) + // is recorded without scattering writes (F-FUNC-5). It starts as route=none/ok=false; + // proceeding to the model sets the route, success sets ok=true. + rl := RequestLog{ + ID: ev.EventID, RoomID: roomID, Sender: ev.Sender, + Route: routeNone, RouterSource: "default", + PromptVersion: b.promptVersion, + QueryText: mc.Body, + Models: map[string]string{"final": b.cfg.XAIModel}, + } + defer func() { + rl.LatencyMS = int(time.Since(started).Milliseconds()) + b.recordTelemetry(rl) + }() + perUserCap := b.cfg.PerUserDailyCap + perUserUSD := b.cfg.PerUserDailyUSD if b.cfg.UnlimitedUsers[ev.Sender] { perUserCap = unlimitedCap + perUserUSD = 0 // exempt from both per-user gates; the global ceiling still applies } - switch res, err := b.st.Reserve(ev.Sender, perUserCap, b.cfg.DailyUSDCeiling); { + // Reserve the route's estimated max-cost (not $0) so the global ceiling counts + // this in-flight call BEFORE it returns — the TOCTOU fix (§8.1). The envelope covers + // the most expensive ENABLED route, so whichever the router picks is admitted within + // the reservation; with the cascade off it is exactly grok_direct's estimate. + estimate := b.reserveEstimate() + switch res, err := b.st.Reserve(ev.Sender, perUserCap, perUserUSD, b.cfg.DailyUSDCeiling, estimate); { case err != nil: // A limiter failure is on our side — don't leave the user wondering. b.log.Error("limiter reserve failed", "sender", ev.Sender, "err", err) + rl.Degraded, rl.Err = degradeReserveErr, err.Error() b.react(ctx, roomID, ev.EventID, reactError) return case res == reserveDeniedUser: // Per-user cap (anti-abuse, F24): stop answering, but always signal the limit — // no message addressed to the bot is left without feedback. b.log.Info("per-user daily cap reached; reacting", "sender", ev.Sender) + rl.PerUserCapHit = true b.react(ctx, roomID, ev.EventID, reactRateLimit) return case res == reserveDeniedGlobal: @@ -333,23 +398,80 @@ func (b *Bot) respond(ctx context.Context, roomID string, isDM bool, ev *Event, // once-per-day text notice), so signal every affected message rather than // going silent after the first. b.log.Warn("global daily USD ceiling reached", "room", roomID, "sender", ev.Sender) + rl.CeilingHit = true b.react(ctx, roomID, ev.EventID, reactRateLimit) return } + // Past admission, a reservation + request slot are held. Guarantee they're freed on + // ANY exit that didn't settle — including a panic in generate() (recovered by safego) + // — so a leaked reservation can never permanently drift the global ceiling down + // (§8.1c). The normal paths set settled=true at Settle, so this defer then no-ops; it + // fires only on the panic/unexpected-return path, where it also reacts so the failure + // isn't silent. + settled := false + defer func() { + if settled { + return + } + rl.Degraded, rl.Err = "panic", "generation panicked or returned without settling" + if rerr := b.st.ReleaseReservation(ev.Sender, estimate); rerr != nil { + b.log.Error("release reservation (unsettled) failed", "sender", ev.Sender, "err", rerr) + } + if rerr := b.st.RefundRequest(ev.Sender); rerr != nil { + b.log.Error("refund (unsettled) failed", "sender", ev.Sender, "err", rerr) + } + b.react(ctx, roomID, ev.EventID, reactError) + }() + // Show "Vojo AI печатает…" for the whole generation. The keepalive refreshes the // typing notification every 20s (the server expires it after 30s) so the indicator // never lapses on a slow/retried answer, and the deferred stop clears it on exit. stopTyping := b.startTypingKeepalive(ctx, roomID) defer stopTyping() - msgs := buildContext(b.cfg.SystemPrompt, history, isDM, mc.Body, b.cfg.MaxCtxEvent, 8000) - resp, err := b.xai.Complete(ctx, b.cfg.XAIModel, msgs, b.cfg.MaxOutTok, b.cfg.XAITemp) + // Overall per-request deadline (§8.2.2): every model call in the cascade shares this + // single budget (genCtx), so a multi-stage route can't accrete minutes the way + // per-stage 3×60s retries would. react/send/store ops use the live room ctx, NOT + // genCtx, so a budget timeout still surfaces as a ⚠️ react, never silence. + genCtx, cancel := context.WithTimeout(ctx, b.cfg.RequestBudget) + defer cancel() + + msgs := buildContext(b.cfg.SystemPrompt, history, isDM, mc.Body, b.cfg.MaxCtxEvent, maxPromptTokens) + res, err := b.generate(genCtx, mc.Body, msgs, b.convID(roomID)) + + // Record what the routing + generation actually did, whatever the outcome. + rl.Route = res.route + rl.RouterSource = res.decision.Source + rl.RouterConfidence = res.decision.Confidence + rl.FallbackFired = res.fallback + rl.Escalated = res.route == routeReason + rl.Cost = res.cost + if res.stageMS != nil { + rl.StageMS = res.stageMS + } + if res.finalModel != "" { + rl.Models["final"] = res.finalModel + } + if res.decision.Source == "classifier" { + rl.Models["router"] = b.cfg.GeminiModel + } + if res.degraded != "" { + rl.Degraded = res.degraded + } + if err != nil { - // at-most-once already retried transient failures inside Complete; refund the - // reserved request so an xAI outage doesn't burn the user's daily cap, and - // signal the failure (react → no anti-loop, no language). - b.log.Error("xai completion failed", "sender", ev.Sender, "err", err) + // Terminal: even grok_direct failed. Settle whatever the cascade ACTUALLY spent + // (e.g. a paid web fetch before the failure) and release the rest of the + // reservation in one step, then refund the request slot so an outage doesn't burn + // the cap, and react (never silent). Settle with an all-zero cost is just a + // release, so a pure grok_direct failure books nothing — exactly as before. + b.log.Error("generation failed", "sender", ev.Sender, "route", res.route, "err", err) + rl.Err = err.Error() + if serr := b.st.Settle(ev.Sender, estimate, res.cost); serr != nil { + b.log.Error("settle (failed request) failed", "sender", ev.Sender, "err", serr) + } + settled = true if rerr := b.st.RefundRequest(ev.Sender); rerr != nil { b.log.Error("refund failed", "sender", ev.Sender, "err", rerr) } @@ -357,39 +479,86 @@ func (b *Bot) respond(ctx context.Context, roomID string, isDM bool, ev *Event, return } - // A 2xx from xAI is billed even if the text came back empty — always book the real - // cost so both caps see it (an empty 200 must not bypass the per-user cap and the - // global ceiling). - usd := computeUSD(resp.Usage, b.cfg) - if err := b.st.Reconcile(ev.Sender, usd); err != nil { - b.log.Error("reconcile spend failed", "sender", ev.Sender, "err", err) + // Success from some route. Settle: release the reservation and book the real + // per-component cost, so both caps see grounding/tool fees too — not just tokens. + if err := b.st.Settle(ev.Sender, estimate, res.cost); err != nil { + b.log.Error("settle spend failed", "sender", ev.Sender, "err", err) } + settled = true + rl.PromptTokens, rl.CachedTokens, rl.CompletionTokens = + res.usage.PromptTokens, res.usage.CachedTokens, res.usage.CompletionTokens + rl.CacheHit = res.usage.CachedTokens > 0 + rl.ProviderRequestID = res.providerID - text := resp.Text() + text := res.text if text == "" { // Billed but no usable text (content filter / length cap / empty choices). Never - // leave a billed request without feedback — react "couldn't answer". - b.log.Warn("xai returned empty completion (billed, reacting)", "sender", ev.Sender, "usd", usd) + // leave a billed request without feedback — react "couldn't answer". The slot + // stays consumed (the 2xx was real); no refund, or an empty reply could be forced + // to dodge the cap. + b.log.Warn("empty completion (billed, reacting)", "sender", ev.Sender, "usd", res.cost.Total()) + rl.Degraded = degradeEmpty b.react(ctx, roomID, ev.EventID, reactError) return } - b.log.Info("answered", "room", roomID, "sender", ev.Sender, "dm", isDM, - "usd", usd, "prompt_tokens", resp.Usage.PromptTokens, "completion_tokens", resp.Usage.CompletionTokens) - b.sendReply(ctx, roomID, ev, mc, text) + b.log.Info("answered", "room", roomID, "sender", ev.Sender, "dm", isDM, "route", res.route, + "usd", res.cost.Total(), "prompt_tokens", res.usage.PromptTokens, "completion_tokens", res.usage.CompletionTokens) + if err := b.sendReply(ctx, roomID, ev, mc, text); err != nil { + // Paid silence (§8.1): the spend is real (USD is kept — refunding it would + // under-count the ceiling), but the reply never landed. Refund the request SLOT + // so the user can retry, and react ⚠️ so the failure isn't silent. + b.log.Error("send reply failed after billing; refunding slot + reacting", "sender", ev.Sender, "err", err) + rl.Degraded, rl.Err = degradeSendFailed, err.Error() + if rerr := b.st.RefundRequest(ev.Sender); rerr != nil { + b.log.Error("refund failed", "sender", ev.Sender, "err", rerr) + } + b.react(ctx, roomID, ev.EventID, reactError) + return + } + rl.OK = true } -// computeUSD prices the call from the API-returned token usage (authoritative -// counts) and the configured per-1M prices — so the hard ceiling tracks real -// usage even if the model/price changes (only the constants need updating). -func computeUSD(u xaiUsage, cfg *Config) float64 { - cached := u.PromptTokensDetails.CachedTokens - nonCached := u.PromptTokens - cached +// maxPromptTokens bounds the assembled prompt (history is trimmed to fit) and feeds +// the reservation estimate, so the two never disagree about a request's size. +const maxPromptTokens = 8000 + +// estimateUSD is the conservative max-cost reserved for a route before the call, so +// the global ceiling can count an in-flight request (§8.1). It prices a full prompt +// (maxPromptTokens) plus the max output at the model's non-cached rates — an upper-ish +// bound, since real calls send fewer tokens and get the cheaper cached rate. Settle +// later books the authoritative actual cost regardless, so a slightly-off estimate +// only nudges admission, never the final accounting. +func (b *Bot) estimateUSD(model string) float64 { + p := b.cfg.priceFor(model) + return float64(maxPromptTokens)/1e6*p.InputPerM + float64(b.cfg.MaxOutTok)/1e6*p.OutputPerM +} + +// convID returns the prompt-cache routing hint sent as x-grok-conv-id, or "" when +// GROK_PROMPT_CACHE is off. Grok caches prompt prefixes automatically; the header +// only pins a conversation to the same backend to raise the hit rate (docs.x.ai), so +// a stable per-room id is the right unit — every turn in a room shares the system +// prompt and history prefix. It carries no PII (the room id is opaque) and is hashed +// to keep it compact and non-identifying. +func (b *Bot) convID(roomID string) string { + if !b.cfg.GrokPromptCache { + return "" + } + return fmt.Sprintf("vojo-%08x", hashString(roomID)) +} + +// computeUSD prices a call from the API-returned token usage (authoritative +// counts) and the per-model price table — so the hard ceiling tracks real usage +// even if the model/price changes (only the price table needs updating), and a +// call books at the price of the model that actually served it. +func computeUSD(model string, u Usage, cfg *Config) float64 { + p := cfg.priceFor(model) + nonCached := u.PromptTokens - u.CachedTokens if nonCached < 0 { nonCached = 0 } - return float64(nonCached)/1e6*cfg.PriceInputPerM + - float64(cached)/1e6*cfg.PriceCachedPerM + - float64(u.CompletionTokens)/1e6*cfg.PriceOutputPerM + return float64(nonCached)/1e6*p.InputPerM + + float64(u.CachedTokens)/1e6*p.CachedPerM + + float64(u.CompletionTokens)/1e6*p.OutputPerM } // react adds an emoji m.reaction to the triggering event — the bot's language-free @@ -415,27 +584,31 @@ func (b *Bot) react(ctx context.Context, roomID, eventID, emoji string) { // default, so this is a near-dead safety path; the reaction is far less intrusive // than the old text notice, but the once-gate keeps it from annotating every message // in the rare encrypted room. -func (b *Bot) reactEncryptedOnce(ctx context.Context, roomID, eventID string) { +// reactEncryptedOnce returns whether it reacted this call (true only the first time +// for a room), so the caller can log the skip exactly once too. +func (b *Bot) reactEncryptedOnce(ctx context.Context, roomID, eventID string) bool { warned, err := b.st.HasWarnedEncrypted(roomID) if err != nil { b.log.Error("warned-flag read failed", "room", roomID, "err", err) - return + return false } if warned { - return + return false } b.react(ctx, roomID, eventID, reactEncrypted) if err := b.st.SetWarnedEncrypted(roomID); err != nil { b.log.Error("persist warned-flag failed", "room", roomID, "err", err) } + return true } // sendReply sends the model's actual answer and records the completed exchange in the -// conversation buffer so the next turn has context. -func (b *Bot) sendReply(ctx context.Context, roomID string, trigger *Event, triggerMC *MessageContent, body string) { - id := b.sendMessage(ctx, roomID, trigger, triggerMC, body) - if id == "" { - return +// conversation buffer so the next turn has context. It RETURNS the send error so the +// caller can handle paid silence (§8.1): a billed answer that failed to deliver must +// refund the slot and react, not vanish. +func (b *Bot) sendReply(ctx context.Context, roomID string, trigger *Event, triggerMC *MessageContent, body string) error { + if err := b.sendMessage(ctx, roomID, trigger, triggerMC, body); err != nil { + return err } // Record the user trigger AND the assistant answer together, only AFTER the answer // was sent, so a failed or empty generation never leaves a dangling user turn (a @@ -443,20 +616,21 @@ func (b *Bot) sendReply(ctx context.Context, roomID string, trigger *Event, trig // Single-flight guarantees no other turn for this room interleaves between the two. b.appendBuf(roomID, bufferedMsg{sender: trigger.Sender, body: triggerMC.Body, isBot: false}) b.appendBuf(roomID, bufferedMsg{sender: b.cfg.BotMXID, body: body, isBot: true}) + return nil } -// sendMessage builds and sends an m.notice reply, tracks our own event id, and returns -// the new event id ("" on failure). -func (b *Bot) sendMessage(ctx context.Context, roomID string, trigger *Event, triggerMC *MessageContent, body string) string { +// sendMessage builds and sends an m.notice reply and tracks our own event id. Returns +// the send error (nil on success) so the caller can detect a failed delivery. +func (b *Bot) sendMessage(ctx context.Context, roomID string, trigger *Event, triggerMC *MessageContent, body string) error { content := buildNoticeContent(trigger.EventID, trigger.Sender, triggerMC.RelatesTo, body) id, err := b.mx.SendEvent(ctx, roomID, "m.room.message", content) if err != nil { b.log.Error("send failed", "room", roomID, "err", err) - return "" + return err } // Track our own reply so a future reply-to-it is recognised as addressing us. b.botSent.Add(id) - return id + return nil } // startTypingKeepalive starts the typing indicator and keeps it alive for the whole diff --git a/apps/ai-bot/bot_test.go b/apps/ai-bot/bot_test.go index d0b4174c..5fa3bb73 100644 --- a/apps/ai-bot/bot_test.go +++ b/apps/ai-bot/bot_test.go @@ -71,17 +71,22 @@ func TestStripReplyFallback(t *testing.T) { } func TestComputeUSD(t *testing.T) { - cfg := &Config{PriceInputPerM: 1.25, PriceCachedPerM: 0.20, PriceOutputPerM: 2.50} - var u xaiUsage - u.PromptTokens = 1_000_000 - u.PromptTokensDetails.CachedTokens = 400_000 - u.CompletionTokens = 1_000_000 + const model = "grok-test" + cfg := &Config{XAIModel: model, Prices: map[string]ModelPrice{ + model: {InputPerM: 1.25, CachedPerM: 0.20, OutputPerM: 2.50}, + }} + u := Usage{PromptTokens: 1_000_000, CachedTokens: 400_000, CompletionTokens: 1_000_000} // nonCached 600k*1.25 + cached 400k*0.20 + out 1M*2.50 = 0.75 + 0.08 + 2.50 - got := computeUSD(u, cfg) + got := computeUSD(model, u, cfg) want := 0.75 + 0.08 + 2.50 if diff := got - want; diff > 1e-9 || diff < -1e-9 { t.Fatalf("computeUSD = %v, want %v", got, want) } + // An unknown model falls back to the default model's price (never $0, which would + // blind the ceiling). + if got := computeUSD("unknown-model", u, cfg); got != want { + t.Fatalf("unknown-model fallback = %v, want default %v", got, want) + } } func TestBuildContextGroupDropsThirdParties(t *testing.T) { diff --git a/apps/ai-bot/cascade.go b/apps/ai-bot/cascade.go new file mode 100644 index 00000000..6d0ae96c --- /dev/null +++ b/apps/ai-bot/cascade.go @@ -0,0 +1,279 @@ +package main + +import ( + "context" + "errors" + "fmt" + "strings" + "time" +) + +// cascade.go is the generation half of the bot: given an admitted request, it routes +// (router.go), runs the chosen route's provider(s), and ALWAYS degrades to grok_direct +// on any layer being off or failing (§8.2). It returns a genResult the business logic +// (respond) settles, sends, and logs — keeping ledger/never-silent/telemetry in one +// place and the routing here. With every cascade flag off, classify returns grok_direct +// and this collapses to exactly today's single Grok call. + +// genResult is everything respond needs from a generation: the answer, the model's +// usage (for token billing), the FULL cost breakdown (router + web + final), and the +// routing metadata for telemetry. cost accumulates across stages, so a partial cascade +// (a paid web fetch that then degraded) still books what it actually spent. +type genResult struct { + text string + usage Usage + cost CostBreakdown + finalModel string + providerID string + decision RouterDecision + route string // the route actually taken (may differ from decision on degrade) + fallback bool // true if we degraded off the decided route + degraded string // degrade reason for request_log + stageMS map[string]int +} + +func msSince(t time.Time) int { return int(time.Since(t).Milliseconds()) } + +// reserveEstimate is the admission envelope: the most expensive ENABLED route's cost, +// so whichever route the router picks is covered by the reservation (the ceiling can't +// be slipped by routing to a pricier path after admission). With every cascade flag +// off it equals grok_direct's estimate — byte-for-byte today's reservation. Slightly +// generous is fine: Settle books the authoritative actual afterward. +func (b *Bot) reserveEstimate() float64 { + est := b.estimateUSD(b.cfg.XAIModel) // grok_direct / trivial(cheaper)/synthesis base + if b.cfg.WebEnabled { + // web_then_grok = a web fetch fee + the Grok synthesis already counted above. + if b.cfg.WebProvider == webProviderGrokWebSearch { + // fetch can search several times and pull large context; reserve generously. + est += float64(maxWebSearchCalls)*grokWebSearchPerCall + b.estimateUSD(b.cfg.XAIModel) + } else { + est += b.estimateUSD(b.cfg.GeminiModel) + } + } + if b.cfg.ReasoningEnabled { + // Higher reasoning effort can burn more output tokens; reserve double. + est = max(est, 2*b.estimateUSD(b.cfg.ReasoningModel)) + } + return est +} + +// generate routes and produces an answer, degrading to grok_direct on any failure. +// It returns a terminal error ONLY if even grok_direct fails; every other route falls +// through to grok_direct rather than erroring. +func (b *Bot) generate(ctx context.Context, body string, msgs []Message, convID string) (genResult, error) { + res := genResult{stageMS: map[string]int{}, finalModel: b.cfg.XAIModel} + + t0 := time.Now() + res.decision = b.classify(ctx, body, &res.cost) // accumulates cost.Router if Layer-1 runs + res.stageMS["router"] = msSince(t0) + res.route = res.decision.Route + + finalMsgs := msgs + switch res.decision.Route { + case routeTrivial: + if b.cfg.TrivialOffloadEnabled && b.gemini != nil { + if err := b.genTrivial(ctx, msgs, &res); err == nil { + return res, nil + } else { + b.log.Warn("trivial offload failed; degrading to grok_direct", "err", err) + b.degradeTo(&res, degradeTrivial) + } + } + case routeWebThenGrok: + if b.cfg.WebEnabled && b.web != nil { + if err := b.genWebThenGrok(ctx, body, msgs, convID, &res); err == nil { + return res, nil + } else { + b.log.Warn("web route failed; degrading to grok_direct", "err", err, "reason", res.degraded) + b.degradeTo(&res, degradeWeb) + // The question wanted fresh facts but we have none — answer from training + // knowledge WITH an honest staleness caveat, not stale-as-current (§8.2.1). + finalMsgs = hedgeMessages(msgs) + } + } + case routeReason: + if b.cfg.ReasoningEnabled { + if err := b.genReason(ctx, msgs, convID, &res); err == nil { + return res, nil + } else { + b.log.Warn("reasoning route failed; degrading to grok_direct", "err", err) + b.degradeTo(&res, degradeReasoning) + } + } + } + + // grok_direct — the default route AND the universal fallback. The only path that + // can return a terminal error (even Grok failed). It preserves any cost already + // spent (router classifier, a partial web fetch) in res.cost. + if err := b.genGrokDirect(ctx, finalMsgs, convID, &res); err != nil { + return res, err + } + return res, nil +} + +// degradeTo marks res as a fallback to grok_direct, keeping the first/most-specific +// degrade reason (e.g. a web provider's grounding_cap set inside genWebThenGrok). +func (b *Bot) degradeTo(res *genResult, reason string) { + res.fallback = true + if res.degraded == "" { + res.degraded = reason + } +} + +// genGrokDirect is today's path: one Grok call. Also the fallback for every other +// route. On success it fills res (route, final model, text, usage, provider id) and +// adds the token cost. +func (b *Bot) genGrokDirect(ctx context.Context, msgs []Message, convID string, res *genResult) error { + t := time.Now() + resp, err := b.llm.Complete(ctx, LLMRequest{ + Model: b.cfg.XAIModel, + Messages: msgs, + MaxTokens: b.cfg.MaxOutTok, + Temperature: b.cfg.XAITemp, + ConvID: convID, + ReasoningEffort: b.cfg.GrokReasoningEffort, // "" → not sent; "none" keeps grok-4.3 fast + }) + res.stageMS["final"] = msSince(t) + if err != nil { + return err + } + res.route, res.finalModel = routeGrokDirect, b.cfg.XAIModel + res.text, res.usage, res.providerID = resp.Text, resp.Usage, resp.ProviderRequestID + res.cost.Token += computeUSD(b.cfg.XAIModel, resp.Usage, b.cfg) + return nil +} + +// genTrivial answers a trivial message with the cheap Gemini model. An empty reply is +// treated as a failure so the caller degrades to Grok rather than sending nothing. +func (b *Bot) genTrivial(ctx context.Context, msgs []Message, res *genResult) error { + t := time.Now() + resp, err := b.gemini.Complete(ctx, LLMRequest{ + Model: b.cfg.GeminiModel, + Messages: msgs, + MaxTokens: b.cfg.MaxOutTok, + Temperature: b.cfg.XAITemp, + }) + res.stageMS["final"] = msSince(t) + if err != nil { + return err + } + if strings.TrimSpace(resp.Text) == "" { + return fmt.Errorf("trivial: empty Gemini reply") + } + res.route, res.finalModel = routeTrivial, b.cfg.GeminiModel + res.text, res.usage, res.providerID = resp.Text, resp.Usage, resp.ProviderRequestID + res.cost.Token += computeUSD(b.cfg.GeminiModel, resp.Usage, b.cfg) + return nil +} + +// genReason answers with Grok at a higher reasoning effort. Uses the configured +// reasoning-capable model (the default grok-4.20-non-reasoning would reject the param). +func (b *Bot) genReason(ctx context.Context, msgs []Message, convID string, res *genResult) error { + t := time.Now() + resp, err := b.llm.Complete(ctx, LLMRequest{ + Model: b.cfg.ReasoningModel, + Messages: msgs, + MaxTokens: b.cfg.MaxOutTok, + Temperature: b.cfg.XAITemp, + ReasoningEffort: b.cfg.ReasoningEffort, // "think harder" level (default high) + ConvID: convID, + }) + res.stageMS["final"] = msSince(t) + if err != nil { + return err + } + if strings.TrimSpace(resp.Text) == "" { + return fmt.Errorf("reason: empty reply") + } + res.route, res.finalModel = routeReason, b.cfg.ReasoningModel + res.text, res.usage, res.providerID = resp.Text, resp.Usage, resp.ProviderRequestID + res.cost.Token += computeUSD(b.cfg.ReasoningModel, resp.Usage, b.cfg) + return nil +} + +// webStageTimeout bounds the web/grounding fetch independently of the overall budget +// (§8.2.2): a slow search must not eat the whole request before synthesis. +const webStageTimeout = 15 * time.Second + +// genWebThenGrok fetches fresh facts via the web provider, then has Grok synthesise the +// answer in voice from that digest. The web fetch's cost+tokens are booked into res +// EVEN ON FAILURE — the call was billed — so a synth failure or empty fetch still +// accounts for the spend before the caller degrades to grok_direct (the partial cascade +// case, §8.1). The daily cap and per-stage deadline are applied here, uniformly for both +// providers. +func (b *Bot) genWebThenGrok(ctx context.Context, body string, msgs []Message, convID string, res *genResult) error { + // Per-stage web/grounding deadline, independent of the overall budget. + wctx, cancelW := context.WithTimeout(ctx, webStageTimeout) + tw := time.Now() + wc, ferr := b.web.Fetch(wctx, body) + cancelW() + res.stageMS["web"] = msSince(tw) + // Book the fetch's fee + tokens whether or not it produced a usable digest — the call + // was billed (the daily cap, if any, is enforced inside the provider). + res.cost.Grounding += wc.Cost.Grounding + res.cost.WebTool += wc.Cost.WebTool + webUsage := wc.Usage + if ferr != nil { + if errors.Is(ferr, errGroundingCapped) { + res.degraded = degradeGroundCap + } + return ferr // web fee already booked; caller degrades to grok_direct (with hedge) + } + + tf := time.Now() + resp, err := b.llm.Complete(ctx, LLMRequest{ + Model: b.cfg.XAIModel, + Messages: webSynthMessages(msgs, wc), + MaxTokens: b.cfg.MaxOutTok, + Temperature: b.cfg.XAITemp, + ConvID: convID, + ReasoningEffort: b.cfg.GrokReasoningEffort, // same voice, same effort as grok_direct + }) + res.stageMS["final"] = msSince(tf) + if err != nil { + return err + } + if strings.TrimSpace(resp.Text) == "" { + return fmt.Errorf("web synth: empty reply") + } + res.route, res.finalModel = routeWebThenGrok, b.cfg.XAIModel + res.text, res.providerID = resp.Text, resp.ProviderRequestID + // Report BOTH calls' tokens so the analytics token totals match the two-call route. + res.usage = Usage{ + PromptTokens: resp.Usage.PromptTokens + webUsage.PromptTokens, + CachedTokens: resp.Usage.CachedTokens + webUsage.CachedTokens, + CompletionTokens: resp.Usage.CompletionTokens + webUsage.CompletionTokens, + } + res.cost.Token += computeUSD(b.cfg.XAIModel, resp.Usage, b.cfg) + return nil +} + +// webSynthMessages inserts the fresh web digest (and its sources) as a system note just +// after the system prompt, so Grok answers in voice using current facts. +func webSynthMessages(base []Message, wc WebContext) []Message { + facts := "Свежие данные из веба (используй их в ответе и сошлись на источники):\n" + wc.Digest + if len(wc.Citations) > 0 { + facts += "\nИсточники: " + strings.Join(wc.Citations, ", ") + } + return insertSystemNote(base, facts) +} + +// hedgeMessages adds an honest staleness caveat for a web→grok_direct degrade: the user +// wanted fresh facts but we couldn't fetch them, so the model must flag that its answer +// is from training knowledge and may be out of date. +func hedgeMessages(base []Message) []Message { + return insertSystemNote(base, "Нет доступа к свежим источникам прямо сейчас — отвечай по знаниям на момент обучения и честно предупреди, что данные могут быть устаревшими.") +} + +// insertSystemNote inserts an extra system message right after the system prompt +// (base[0] from buildContext), preserving the rest of the window. +func insertSystemNote(base []Message, content string) []Message { + note := Message{Role: "system", Content: content} + if len(base) == 0 { + return []Message{note} + } + out := make([]Message, 0, len(base)+1) + out = append(out, base[0], note) + out = append(out, base[1:]...) + return out +} diff --git a/apps/ai-bot/cascade_test.go b/apps/ai-bot/cascade_test.go new file mode 100644 index 00000000..a801c574 --- /dev/null +++ b/apps/ai-bot/cascade_test.go @@ -0,0 +1,275 @@ +package main + +import ( + "context" + "errors" + "io" + "log/slog" + "testing" +) + +func discardLog() *slog.Logger { return slog.New(slog.NewTextHandler(io.Discard, nil)) } + +// fakeLLM is a scriptable LLMClient for dispatch/degrade tests. +type fakeLLM struct { + text string + usage Usage + err error + calls int + lastReq LLMRequest +} + +func (f *fakeLLM) Complete(_ context.Context, req LLMRequest) (*LLMResponse, error) { + f.calls++ + f.lastReq = req + if f.err != nil { + return nil, f.err + } + return &LLMResponse{Text: f.text, Usage: f.usage, ProviderRequestID: "fake"}, nil +} + +type fakeWeb struct { + wc WebContext + err error + calls int +} + +func (f *fakeWeb) Fetch(_ context.Context, _ string) (WebContext, error) { + f.calls++ + if f.err != nil { + return WebContext{}, f.err + } + return f.wc, nil +} + +// cascadeCfg is a config with the model/price table set and EVERY cascade flag off. +// Tests flip individual flags on a copy. +func cascadeCfg() Config { + return Config{ + XAIModel: "grok-x", GeminiModel: "gemini-x", ReasoningModel: "grok-reason", + MaxOutTok: 100, XAITemp: 0.5, + ReasoningTrigger: "подумай глубже", + ReasoningEffort: "high", + WebProvider: webProviderGrokWebSearch, + Prices: map[string]ModelPrice{ + "grok-x": {InputPerM: 1, CachedPerM: 0.2, OutputPerM: 2}, + "gemini-x": {InputPerM: 0.1, CachedPerM: 0.1, OutputPerM: 0.4}, + }, + } +} + +func msgs(body string) []Message { + return []Message{{Role: "system", Content: "SYS"}, {Role: "user", Content: body}} +} + +// TestGenerateAllFlagsOffIsGrokDirect is the cascade-off parity invariant: even a +// "trivial"-looking message goes to Grok, and Gemini is never touched, when the router +// is off. +func TestGenerateAllFlagsOffIsGrokDirect(t *testing.T) { + grok := &fakeLLM{text: "grok answer"} + gem := &fakeLLM{text: "should not run"} + cfg := cascadeCfg() + b := &Bot{cfg: &cfg, llm: grok, gemini: gem, log: discardLog()} + + res, err := b.generate(context.Background(), "привет", msgs("привет"), "") + if err != nil { + t.Fatalf("generate: %v", err) + } + if res.route != routeGrokDirect || res.text != "grok answer" { + t.Fatalf("res = (%q,%q), want grok_direct/\"grok answer\"", res.route, res.text) + } + if res.decision.Source != "default" { + t.Fatalf("router source = %q, want default (router off)", res.decision.Source) + } + if grok.calls != 1 || gem.calls != 0 { + t.Fatalf("calls grok=%d gem=%d, want 1/0", grok.calls, gem.calls) + } +} + +func TestGenerateTrivialOffload(t *testing.T) { + grok := &fakeLLM{text: "grok"} + gem := &fakeLLM{text: "gemini trivial"} + cfg := cascadeCfg() + cfg.RouterEnabled, cfg.TrivialOffloadEnabled = true, true + b := &Bot{cfg: &cfg, llm: grok, gemini: gem, log: discardLog()} + + res, err := b.generate(context.Background(), "привет", msgs("привет"), "") + if err != nil { + t.Fatalf("generate: %v", err) + } + if res.route != routeTrivial || res.text != "gemini trivial" || res.finalModel != "gemini-x" { + t.Fatalf("res = (%q,%q,%q), want trivial/gemini", res.route, res.text, res.finalModel) + } + if gem.calls != 1 || grok.calls != 0 { + t.Fatalf("calls grok=%d gem=%d, want 0/1 (Gemini answered)", grok.calls, gem.calls) + } +} + +// TestGenerateTrivialDegradesToGrok: Gemini failing on the trivial route must fall back +// to Grok, never go silent. +func TestGenerateTrivialDegradesToGrok(t *testing.T) { + grok := &fakeLLM{text: "grok fallback"} + gem := &fakeLLM{err: errors.New("gemini down")} + cfg := cascadeCfg() + cfg.RouterEnabled, cfg.TrivialOffloadEnabled = true, true + b := &Bot{cfg: &cfg, llm: grok, gemini: gem, log: discardLog()} + + res, err := b.generate(context.Background(), "привет", msgs("привет"), "") + if err != nil { + t.Fatalf("generate: %v", err) + } + if res.route != routeGrokDirect || res.text != "grok fallback" { + t.Fatalf("res = (%q,%q), want grok_direct fallback", res.route, res.text) + } + if !res.fallback || res.degraded != degradeTrivial { + t.Fatalf("fallback=%v degraded=%q, want true/trivial_failed", res.fallback, res.degraded) + } + if gem.calls != 1 || grok.calls != 1 { + t.Fatalf("calls grok=%d gem=%d, want 1/1", grok.calls, gem.calls) + } +} + +func TestGenerateWebThenGrok(t *testing.T) { + grok := &fakeLLM{text: "synthesised", usage: Usage{PromptTokens: 100, CompletionTokens: 50}} + web := &fakeWeb{wc: WebContext{Digest: "fresh facts", Citations: []string{"http://src"}, Cost: CostBreakdown{WebTool: 0.1}}} + cfg := cascadeCfg() + cfg.RouterEnabled, cfg.WebEnabled = true, true + b := &Bot{cfg: &cfg, llm: grok, web: web, log: discardLog()} + + res, err := b.generate(context.Background(), "какие новости сегодня", msgs("какие новости сегодня"), "") + if err != nil { + t.Fatalf("generate: %v", err) + } + if res.route != routeWebThenGrok || res.text != "synthesised" { + t.Fatalf("res = (%q,%q), want web_then_grok/synthesised", res.route, res.text) + } + if res.cost.WebTool != 0.1 || res.cost.Token <= 0 { + t.Fatalf("cost = %+v, want WebTool 0.1 + Token>0", res.cost) + } + if web.calls != 1 || grok.calls != 1 { + t.Fatalf("calls web=%d grok=%d, want 1/1", web.calls, grok.calls) + } +} + +// TestGenerateWebDegradesToGrok: a web fetch failure (provider down or cap hit) degrades +// to grok_direct and books no web cost. +func TestGenerateWebDegradesToGrok(t *testing.T) { + grok := &fakeLLM{text: "grok fallback"} + web := &fakeWeb{err: errGroundingCapped} + cfg := cascadeCfg() + cfg.RouterEnabled, cfg.WebEnabled = true, true + b := &Bot{cfg: &cfg, llm: grok, web: web, log: discardLog()} + + res, err := b.generate(context.Background(), "новости сегодня", msgs("новости сегодня"), "") + if err != nil { + t.Fatalf("generate: %v", err) + } + if res.route != routeGrokDirect || res.text != "grok fallback" || !res.fallback { + t.Fatalf("res = (%q,%q,fallback=%v), want grok_direct fallback", res.route, res.text, res.fallback) + } + if res.degraded != degradeGroundCap { + t.Fatalf("degraded = %q, want grounding_cap (the specific reason)", res.degraded) + } + if res.cost.WebTool != 0 || res.cost.Grounding != 0 { + t.Fatalf("web cost = %+v, want 0 (fetch failed before billing)", res.cost) + } +} + +// TestGenerateReasoningForced: the manual trigger routes to the reasoning model with +// reasoning_effort, independent of ROUTER_ENABLED. +func TestGenerateReasoningForced(t *testing.T) { + grok := &fakeLLM{text: "deep answer"} + cfg := cascadeCfg() + cfg.ReasoningEnabled = true // ROUTER_ENABLED deliberately left off + b := &Bot{cfg: &cfg, llm: grok, log: discardLog()} + + res, err := b.generate(context.Background(), "подумай глубже про сознание", msgs("подумай глубже про сознание"), "") + if err != nil { + t.Fatalf("generate: %v", err) + } + if res.route != routeReason || res.decision.Source != "forced" { + t.Fatalf("res route=%q source=%q, want reason/forced", res.route, res.decision.Source) + } + if grok.lastReq.ReasoningEffort != "high" || grok.lastReq.Model != "grok-reason" { + t.Fatalf("reasoning req = (effort %q, model %q), want high/grok-reason", grok.lastReq.ReasoningEffort, grok.lastReq.Model) + } +} + +// TestClassifierConfidenceFloor: a Layer-1 classifier label that escalates off the safe +// floor (trivial/web) must clear the confidence floor, else the request stays on +// grok_direct — the false-trivial voice-leak guard (§8.6). +func TestClassifierConfidenceFloor(t *testing.T) { + cfg := cascadeCfg() + cfg.RouterEnabled, cfg.RouterClassifierEnabled = true, true + gem := &fakeLLM{} + b := &Bot{cfg: &cfg, gemini: gem, log: discardLog()} + var cost CostBreakdown + const substantive = "напиши подробное эссе про историю римской империи" // Layer-0 → grok_direct + + gem.text = `{"route":"trivial","confidence":0.2}` // low-confidence escalation + if d := b.classify(context.Background(), substantive, &cost); d.Route != routeGrokDirect { + t.Fatalf("low-confidence trivial must stay grok_direct (safe floor), got %q", d.Route) + } + gem.text = `{"route":"trivial","confidence":0.95}` // confident escalation is honoured + if d := b.classify(context.Background(), substantive, &cost); d.Route != routeTrivial { + t.Fatalf("high-confidence trivial should route trivial, got %q", d.Route) + } + // A classifier error degrades to the Layer-0 verdict (grok_direct), never silence. + gem.text, gem.err = "", errors.New("gemini down") + if d := b.classify(context.Background(), substantive, &cost); d.Route != routeGrokDirect { + t.Fatalf("classifier failure must fall back to heuristic grok_direct, got %q", d.Route) + } +} + +// TestGrokReasoningEffort: GROK_REASONING_EFFORT is sent on grok_direct (so grok-4.3 can +// be kept fast with "none"), empty means not sent (compat with grok-4.20-non-reasoning), +// and the reason route always overrides to "high" regardless. +func TestGrokReasoningEffort(t *testing.T) { + // Configured effort reaches grok_direct. + grok := &fakeLLM{text: "ok"} + cfg := cascadeCfg() + cfg.GrokReasoningEffort = "none" + b := &Bot{cfg: &cfg, llm: grok, log: discardLog()} + if _, err := b.generate(context.Background(), "hello", msgs("hello"), ""); err != nil { + t.Fatal(err) + } + if grok.lastReq.ReasoningEffort != "none" { + t.Fatalf("grok_direct effort = %q, want none", grok.lastReq.ReasoningEffort) + } + + // Empty default → not sent (so grok-4.20-non-reasoning keeps working). + grokDef := &fakeLLM{text: "ok"} + cfgDef := cascadeCfg() // GrokReasoningEffort == "" + bDef := &Bot{cfg: &cfgDef, llm: grokDef, log: discardLog()} + if _, err := bDef.generate(context.Background(), "hello", msgs("hello"), ""); err != nil { + t.Fatal(err) + } + if grokDef.lastReq.ReasoningEffort != "" { + t.Fatalf("default effort = %q, want empty (not sent)", grokDef.lastReq.ReasoningEffort) + } + + // The reason route ignores GROK_REASONING_EFFORT and always uses "high". + grokR := &fakeLLM{text: "deep"} + cfgR := cascadeCfg() + cfgR.GrokReasoningEffort = "none" + cfgR.ReasoningEnabled = true + bR := &Bot{cfg: &cfgR, llm: grokR, log: discardLog()} + if _, err := bR.generate(context.Background(), "подумай глубже про X", msgs("подумай глубже про X"), ""); err != nil { + t.Fatal(err) + } + if grokR.lastReq.ReasoningEffort != "high" { + t.Fatalf("reason route effort = %q, want high (overrides GROK_REASONING_EFFORT)", grokR.lastReq.ReasoningEffort) + } +} + +// TestGenerateTerminalErrorPropagates: if even grok_direct fails, generate returns the +// error (respond turns it into refund + react), not a silent empty success. +func TestGenerateTerminalErrorPropagates(t *testing.T) { + grok := &fakeLLM{err: errors.New("xai down")} + cfg := cascadeCfg() + b := &Bot{cfg: &cfg, llm: grok, log: discardLog()} + + if _, err := b.generate(context.Background(), "hello", msgs("hello"), ""); err == nil { + t.Fatal("want terminal error when grok_direct fails, got nil") + } +} diff --git a/apps/ai-bot/config.go b/apps/ai-bot/config.go index f40bd25a..63847b9e 100644 --- a/apps/ai-bot/config.go +++ b/apps/ai-bot/config.go @@ -5,6 +5,7 @@ import ( "os" "strconv" "strings" + "time" ) // Config is the fully-resolved runtime configuration, parsed once from the @@ -36,23 +37,106 @@ type Config struct { MaxOutTok int MaxCtxEvent int + // GrokReasoningEffort is the reasoning_effort sent on the normal Grok voice calls + // (grok_direct + web synthesis). Empty = don't send it (the default — required for + // grok-4.20-non-reasoning, which rejects the param). On a unified model like + // grok-4.3 the API otherwise defaults to "low" (it thinks on every reply); set this + // to "none" to keep the default voice fast/cheap. The reason_then_grok route ignores + // this and always uses "high". Accepted: "" | none | low | medium | high. + GrokReasoningEffort string + // Allowlist of homeservers whose users may pull the bot into a room. Gates // the *inviter* (F11). Comma-separated env, stored as a set. AllowedServers map[string]bool DailyUSDCeiling float64 PerUserDailyCap int + // PerUserDailyUSD is an optional per-user daily $ quota (0 = off) on top of the + // request count cap, so one user on expensive routes can't drain the shared global + // ceiling and deny everyone else. Checked against the user's own committed+reserved + // spend in Reserve. + PerUserDailyUSD float64 // mxids exempt from PER_USER_DAILY_CAP (e.g. the owner/admins testing). Still // subject to the global DAILY_USD_CEILING, so the wallet stays protected. UnlimitedUsers map[string]bool - // USD-per-1M-token prices applied to the API-returned token usage so the - // hard ceiling tracks real usage even if the model/price changes. + // USD-per-1M-token prices for the default (final-voice) model, applied to the + // API-returned token usage so the hard ceiling tracks real usage even if the + // model/price changes. Kept as the back-compat XAI_PRICE_* source; folded into + // Prices below. PriceInputPerM float64 PriceCachedPerM float64 PriceOutputPerM float64 + // Prices is the per-model price table (LiteLLM pattern) read by priceFor(model), + // so a call books at the price of the model that actually served it. Built in + // LoadConfig; the default model's entry comes from the XAI_PRICE_* envs, and a + // second model (Gemini) adds its own entry when that layer lands. + Prices map[string]ModelPrice + + // RequestBudget bounds one whole request (all model calls share it), so a slow or + // retried call — or a multi-stage cascade — can't accrete minutes. The default + // matches the previous effective ceiling for a single grok_direct call. + RequestBudget time.Duration + + // GrokPromptCache, when true, sends the x-grok-conv-id routing header to raise the + // prompt-cache hit rate (Grok caches automatically; the header only pins routing). + GrokPromptCache bool + + // TelemetryEnabled writes the request_log analytics row for every request. Default + // off so the cascade-off path adds no extra write; turned on to measure the base. + // Its write is isolated — a failure logs a WARN, never drops the answer. + TelemetryEnabled bool + // TelemetryStoreText additionally stores the query text in request_log (for offline + // eval). Default off — only metadata is kept. + TelemetryStoreText bool + // TelemetryRetention trims request_log rows older than this (time-based, since the + // analytics are a time series). 0 disables trimming. + TelemetryRetention time.Duration + + // --- Cascade (Phase 2-4). EVERY flag defaults OFF, so an unset environment is + // exactly today's bot: one grok_direct call. Any layer off or failing degrades to + // grok_direct (§8.2). None of these is enabled in prod until the offline-eval gate + // (§9) passes. --- + + // RouterEnabled turns on the Layer-0 heuristic router; off → everything is + // grok_direct. RouterClassifierEnabled additionally consults the Gemini Layer-1 + // classifier on cases the heuristic left as grok_direct. + RouterEnabled bool + RouterClassifierEnabled bool + // TrivialOffloadEnabled lets the trivial route answer with Gemini; off → trivial + // still goes to Grok. + TrivialOffloadEnabled bool + // WebEnabled turns on the web_then_grok route. WebProvider selects the source: + // grok_web_search (default, works on chat/completions via Live Search) or + // gemini_grounding (Gemini-3 native only — see F-EXT-3). + WebEnabled bool + WebProvider string + // WebGroundingDailyCap caps grounded prompts/day (durable counter) before falling + // back, guarding the $/1k grounding overage. WebGroundingTier records the Gemini + // plan the cap reflects. + WebGroundingDailyCap int + WebGroundingTier string + // Reasoning route: a manual "think harder" trigger. ReasoningModel must be a + // reasoning-capable model (the default grok-4.20-non-reasoning is NOT — see the + // docs.x.ai finding); set REASONING_MODEL to e.g. grok-4.3 to use it. + ReasoningEnabled bool + ReasoningTrigger string + ReasoningModel string + // ReasoningEffort is the reasoning_effort the reason_then_grok route sends on the + // manual "think harder" trigger. Default "high". Accepted: none|low|medium|high. + ReasoningEffort string + // CanaryPercent routes a fraction of traffic through the new path for A/B before a + // full enable. 0 = off (scaffold; not yet consulted by the dispatch). + CanaryPercent int + + // Gemini backend (the cheap/router/grounding model). Required only when a layer + // that uses it is enabled (validated below). + GeminiBaseURL string + GeminiAPIKey string + GeminiModel string + SystemPromptPath string SystemPrompt string StateDir string @@ -111,6 +195,23 @@ func getenvFloat(key string, def float64) (float64, error) { return f, nil } +// getenvBool parses a boolean flag. Accepts the usual 1/0/true/false/yes/no/on/off +// (case-insensitive); empty → default. Every cascade flag defaults false, so an unset +// or blank env keeps today's behaviour. +func getenvBool(key string, def bool) (bool, error) { + raw := strings.TrimSpace(getenv(key, "")) + if raw == "" { + return def, nil + } + switch strings.ToLower(raw) { + case "1", "true", "yes", "on": + return true, nil + case "0", "false", "no", "off": + return false, nil + } + return false, fmt.Errorf("%s must be a boolean (true/false), got %q", key, raw) +} + func parseServerSet(raw string) map[string]bool { set := make(map[string]bool) for _, s := range strings.Split(raw, ",") { @@ -139,6 +240,16 @@ func LoadConfig() (*Config, error) { DatabaseURL: getenv("AI_BOT_DATABASE_URL", ""), AllowedServers: parseServerSet(getenv("ALLOWED_SERVERS", "")), UnlimitedUsers: parseServerSet(getenv("UNLIMITED_USERS", "")), + + // Cascade string-valued config (flags/ints/secrets parsed below). + GrokReasoningEffort: strings.ToLower(strings.TrimSpace(getenv("GROK_REASONING_EFFORT", ""))), + WebProvider: getenv("WEB_PROVIDER", webProviderGrokWebSearch), + WebGroundingTier: getenv("WEB_GROUNDING_TIER", "free"), + ReasoningTrigger: getenv("REASONING_TRIGGER", "подумай глубже"), + ReasoningModel: getenv("REASONING_MODEL", "grok-4.3"), + ReasoningEffort: strings.ToLower(strings.TrimSpace(getenv("REASONING_EFFORT", "high"))), + GeminiBaseURL: strings.TrimRight(getenv("GEMINI_BASE_URL", "https://generativelanguage.googleapis.com/v1beta/openai"), "/"), + GeminiModel: getenv("GEMINI_MODEL", "gemini-2.5-flash-lite"), } var problems []string @@ -152,6 +263,7 @@ func LoadConfig() (*Config, error) { {"AS_TOKEN", &cfg.ASToken}, {"HS_TOKEN", &cfg.HSToken}, {"XAI_API_KEY", &cfg.XAIAPIKey}, + {"GEMINI_API_KEY", &cfg.GeminiAPIKey}, // optional; required only if a Gemini layer is on } { v, err := getSecret(s.key) if err != nil { @@ -207,6 +319,9 @@ func LoadConfig() (*Config, error) { if cfg.PerUserDailyCap, err = getenvInt("PER_USER_DAILY_CAP", 30); err != nil { problems = append(problems, err.Error()) } + if cfg.PerUserDailyUSD, err = getenvFloat("PER_USER_DAILY_USD", 0); err != nil { + problems = append(problems, err.Error()) + } if cfg.PriceInputPerM, err = getenvFloat("XAI_PRICE_INPUT_PER_M", 1.25); err != nil { problems = append(problems, err.Error()) } @@ -216,6 +331,110 @@ func LoadConfig() (*Config, error) { if cfg.PriceOutputPerM, err = getenvFloat("XAI_PRICE_OUTPUT_PER_M", 2.50); err != nil { problems = append(problems, err.Error()) } + // Per-model price table. The default (final-voice) model is priced from the + // XAI_PRICE_* envs; additional models register their own entry as their layer + // lands. priceFor falls back to this default model for an unknown model. + cfg.Prices = map[string]ModelPrice{ + cfg.XAIModel: { + InputPerM: cfg.PriceInputPerM, + CachedPerM: cfg.PriceCachedPerM, + OutputPerM: cfg.PriceOutputPerM, + }, + } + + var budgetSec, retentionDays int + if budgetSec, err = getenvInt("REQUEST_BUDGET_SECONDS", 180); err != nil { + problems = append(problems, err.Error()) + } + cfg.RequestBudget = time.Duration(budgetSec) * time.Second + if cfg.GrokPromptCache, err = getenvBool("GROK_PROMPT_CACHE", false); err != nil { + problems = append(problems, err.Error()) + } + if cfg.TelemetryEnabled, err = getenvBool("TELEMETRY_ENABLED", false); err != nil { + problems = append(problems, err.Error()) + } + if cfg.TelemetryStoreText, err = getenvBool("TELEMETRY_STORE_TEXT", false); err != nil { + problems = append(problems, err.Error()) + } + if retentionDays, err = getenvInt("TELEMETRY_RETENTION_DAYS", 30); err != nil { + problems = append(problems, err.Error()) + } + cfg.TelemetryRetention = time.Duration(retentionDays) * 24 * time.Hour + + // Cascade flags — every one defaults false, so an unset env is today's bot. + for _, f := range []struct { + key string + dest *bool + }{ + {"ROUTER_ENABLED", &cfg.RouterEnabled}, + {"ROUTER_CLASSIFIER_ENABLED", &cfg.RouterClassifierEnabled}, + {"TRIVIAL_OFFLOAD_ENABLED", &cfg.TrivialOffloadEnabled}, + {"WEB_ENABLED", &cfg.WebEnabled}, + {"REASONING_ENABLED", &cfg.ReasoningEnabled}, + } { + if *f.dest, err = getenvBool(f.key, false); err != nil { + problems = append(problems, err.Error()) + } + } + if cfg.WebGroundingDailyCap, err = getenvInt("WEB_GROUNDING_DAILY_CAP", 450); err != nil { + problems = append(problems, err.Error()) + } + if cfg.CanaryPercent, err = getenvInt("CANARY_PERCENT", 0); err != nil { + problems = append(problems, err.Error()) + } + + // Gemini pricing → the per-model table (defaults: gemini-2.5-flash-lite $0.10/$0.40 + // per 1M; cached priced as input, a conservative over-count). Lets the ceiling and + // request_log price Gemini calls at Gemini rates. + var gIn, gOut float64 + if gIn, err = getenvFloat("GEMINI_PRICE_INPUT_PER_M", 0.10); err != nil { + problems = append(problems, err.Error()) + } + if gOut, err = getenvFloat("GEMINI_PRICE_OUTPUT_PER_M", 0.40); err != nil { + problems = append(problems, err.Error()) + } + cfg.Prices[cfg.GeminiModel] = ModelPrice{InputPerM: gIn, CachedPerM: gIn, OutputPerM: gOut} + // Reasoning model price (defaults to the final-voice grok rates — grok-4.3 ≈ 4.20), + // so the reasoning route reserves/bills at its own price instead of falling back. + var rIn, rOut float64 + if rIn, err = getenvFloat("REASONING_PRICE_INPUT_PER_M", cfg.PriceInputPerM); err != nil { + problems = append(problems, err.Error()) + } + if rOut, err = getenvFloat("REASONING_PRICE_OUTPUT_PER_M", cfg.PriceOutputPerM); err != nil { + problems = append(problems, err.Error()) + } + cfg.Prices[cfg.ReasoningModel] = ModelPrice{InputPerM: rIn, CachedPerM: cfg.PriceCachedPerM, OutputPerM: rOut} + + // Fail-fast on broken cascade wiring (§5/F-FUNC-9), at EVERY start (not just + // check-config): a layer that needs Gemini but has no key would silently never + // fire. Better to refuse to start than to quietly run degraded. + needsGemini := cfg.TrivialOffloadEnabled || cfg.RouterClassifierEnabled || + (cfg.WebEnabled && cfg.WebProvider == webProviderGeminiGrounding) + if needsGemini && cfg.GeminiAPIKey == "" { + problems = append(problems, "GEMINI_API_KEY is required when TRIVIAL_OFFLOAD_ENABLED, ROUTER_CLASSIFIER_ENABLED, or WEB_ENABLED with gemini_grounding is set") + } + if cfg.RouterClassifierEnabled && !cfg.RouterEnabled { + problems = append(problems, "ROUTER_CLASSIFIER_ENABLED requires ROUTER_ENABLED") + } + if cfg.WebEnabled && cfg.WebProvider != webProviderGrokWebSearch && cfg.WebProvider != webProviderGeminiGrounding { + problems = append(problems, fmt.Sprintf("WEB_PROVIDER must be %q or %q, got %q", + webProviderGrokWebSearch, webProviderGeminiGrounding, cfg.WebProvider)) + } + if cfg.ReasoningEnabled && cfg.ReasoningModel == "" { + problems = append(problems, "REASONING_MODEL is required when REASONING_ENABLED is set") + } + switch cfg.GrokReasoningEffort { + case "", "none", "low", "medium", "high": + default: + problems = append(problems, fmt.Sprintf( + "GROK_REASONING_EFFORT must be one of none/low/medium/high (or empty), got %q", cfg.GrokReasoningEffort)) + } + switch cfg.ReasoningEffort { + case "none", "low", "medium", "high": + default: + problems = append(problems, fmt.Sprintf( + "REASONING_EFFORT must be one of none/low/medium/high, got %q", cfg.ReasoningEffort)) + } if len(problems) > 0 { return nil, fmt.Errorf("invalid configuration:\n - %s", strings.Join(problems, "\n - ")) @@ -223,6 +442,14 @@ func LoadConfig() (*Config, error) { return cfg, nil } +// needsGemini reports whether any enabled layer requires the Gemini backend — the +// cheap trivial route, the Layer-1 classifier, or Gemini-native web grounding. Drives +// both the fail-fast key check and whether the client is built at all. +func (c *Config) needsGemini() bool { + return c.TrivialOffloadEnabled || c.RouterClassifierEnabled || + (c.WebEnabled && c.WebProvider == webProviderGeminiGrounding) +} + // Summary returns a human-readable, SECRET-REDACTED dump for the startup log. func (c *Config) Summary() string { servers := make([]string, 0, len(c.AllowedServers)) @@ -255,6 +482,12 @@ func (c *Config) Summary() string { " HS_TOKEN = " + redact(c.HSToken), " XAI_BASE_URL = " + c.XAIBaseURL, " XAI_MODEL = " + c.XAIModel, + " GROK_REASONING_EFFORT = " + func() string { + if c.GrokReasoningEffort == "" { + return "(unset — not sent; provider default)" + } + return c.GrokReasoningEffort + }(), " XAI_API_KEY = " + redact(c.XAIAPIKey), fmt.Sprintf(" XAI_TEMPERATURE = %g", c.XAITemp), fmt.Sprintf(" MAX_OUTPUT_TOKENS = %d", c.MaxOutTok), @@ -268,5 +501,14 @@ func (c *Config) Summary() string { " SYSTEM_PROMPT_PATH = " + c.SystemPromptPath, " STATE_DIR = " + c.StateDir, " AI_BOT_DATABASE_URL= " + redact(c.DatabaseURL), + fmt.Sprintf(" REQUEST_BUDGET = %s", c.RequestBudget), + fmt.Sprintf(" GROK_PROMPT_CACHE = %t", c.GrokPromptCache), + fmt.Sprintf(" TELEMETRY_ENABLED = %t (store_text=%t, retention=%s)", + c.TelemetryEnabled, c.TelemetryStoreText, c.TelemetryRetention), + fmt.Sprintf(" CASCADE: router=%t classifier=%t trivial=%t web=%t(%s, cap=%d) reason=%t(%s)", + c.RouterEnabled, c.RouterClassifierEnabled, c.TrivialOffloadEnabled, + c.WebEnabled, c.WebProvider, c.WebGroundingDailyCap, c.ReasoningEnabled, c.ReasoningEffort), + " GEMINI_MODEL = " + c.GeminiModel, + " GEMINI_API_KEY = " + redact(c.GeminiAPIKey), }, "\n") } diff --git a/apps/ai-bot/config_test.go b/apps/ai-bot/config_test.go new file mode 100644 index 00000000..2c3c6221 --- /dev/null +++ b/apps/ai-bot/config_test.go @@ -0,0 +1,98 @@ +package main + +import ( + "strings" + "testing" +) + +// setBaseEnv sets the minimal valid environment (all cascade flags off) so each test +// can toggle one combination and assert the fail-fast validation (F-FUNC-9). +func setBaseEnv(t *testing.T) { + t.Helper() + t.Setenv("HOMESERVER_URL", "http://hs") + t.Setenv("BOT_MXID", "@ai:vojo.chat") + t.Setenv("AS_TOKEN", "as") + t.Setenv("HS_TOKEN", "hs") + t.Setenv("XAI_API_KEY", "xai") + t.Setenv("AI_BOT_DATABASE_URL", "postgres://x") + t.Setenv("ALLOWED_SERVERS", "vojo.chat") + // Force a clean baseline so the host environment can't leak in. + for _, k := range []string{ + "GEMINI_API_KEY", "GEMINI_API_KEY_FILE", "ROUTER_ENABLED", "ROUTER_CLASSIFIER_ENABLED", + "TRIVIAL_OFFLOAD_ENABLED", "WEB_ENABLED", "REASONING_ENABLED", "WEB_PROVIDER", "REASONING_MODEL", + } { + t.Setenv(k, "") + } +} + +func TestConfigBaseValid(t *testing.T) { + setBaseEnv(t) + if _, err := LoadConfig(); err != nil { + t.Fatalf("base config should be valid: %v", err) + } +} + +func TestConfigAllCascadeFlagsDefaultOff(t *testing.T) { + setBaseEnv(t) + cfg, err := LoadConfig() + if err != nil { + t.Fatalf("%v", err) + } + if cfg.RouterEnabled || cfg.RouterClassifierEnabled || cfg.TrivialOffloadEnabled || + cfg.WebEnabled || cfg.ReasoningEnabled || cfg.TelemetryEnabled || cfg.GrokPromptCache { + t.Fatal("every cascade/telemetry flag must default off (cascade-off == today)") + } + if cfg.WebProvider != webProviderGrokWebSearch { + t.Fatalf("default WEB_PROVIDER = %q, want grok_web_search", cfg.WebProvider) + } +} + +func TestConfigTrivialNeedsGeminiKey(t *testing.T) { + setBaseEnv(t) + t.Setenv("TRIVIAL_OFFLOAD_ENABLED", "true") + if _, err := LoadConfig(); err == nil || !strings.Contains(err.Error(), "GEMINI_API_KEY") { + t.Fatalf("want GEMINI_API_KEY error, got %v", err) + } + t.Setenv("GEMINI_API_KEY", "gk") + if _, err := LoadConfig(); err != nil { + t.Fatalf("with key it should be valid: %v", err) + } +} + +func TestConfigClassifierNeedsRouter(t *testing.T) { + setBaseEnv(t) + t.Setenv("GEMINI_API_KEY", "gk") + t.Setenv("ROUTER_CLASSIFIER_ENABLED", "true") // without ROUTER_ENABLED + if _, err := LoadConfig(); err == nil || !strings.Contains(err.Error(), "ROUTER_ENABLED") { + t.Fatalf("want ROUTER_ENABLED error, got %v", err) + } +} + +func TestConfigBadWebProvider(t *testing.T) { + setBaseEnv(t) + t.Setenv("WEB_ENABLED", "true") + t.Setenv("WEB_PROVIDER", "bing") + if _, err := LoadConfig(); err == nil || !strings.Contains(err.Error(), "WEB_PROVIDER") { + t.Fatalf("want WEB_PROVIDER error, got %v", err) + } +} + +// The default web provider (grok_web_search) uses the existing xAI key, so WEB_ENABLED +// alone must NOT demand a Gemini key. +func TestConfigWebGrokNeedsNoGeminiKey(t *testing.T) { + setBaseEnv(t) + t.Setenv("WEB_ENABLED", "true") + if _, err := LoadConfig(); err != nil { + t.Fatalf("web+grok_web_search should not need a Gemini key: %v", err) + } +} + +// gemini_grounding DOES need a Gemini key. +func TestConfigWebGeminiGroundingNeedsKey(t *testing.T) { + setBaseEnv(t) + t.Setenv("WEB_ENABLED", "true") + t.Setenv("WEB_PROVIDER", webProviderGeminiGrounding) + if _, err := LoadConfig(); err == nil || !strings.Contains(err.Error(), "GEMINI_API_KEY") { + t.Fatalf("want GEMINI_API_KEY error, got %v", err) + } +} diff --git a/apps/ai-bot/context.go b/apps/ai-bot/context.go index 6c21dade..7c7cd727 100644 --- a/apps/ai-bot/context.go +++ b/apps/ai-bot/context.go @@ -9,20 +9,20 @@ type bufferedMsg struct { isBot bool } -// buildContext assembles the xAI message list under the owner's minimisation -// rule ("trigger + bot replies only", §6/F8): +// buildContext assembles the provider-neutral message list under the owner's +// minimisation rule ("trigger + bot replies only", §6/F8): // // - GROUP rooms: send ONLY the bot's own prior replies (assistant turns) plus // the single triggering message (user turn). Other participants' messages and -// display names never reach xAI — the third-party-consent mitigation. +// display names never reach the model — the third-party-consent mitigation. // - 1:1 rooms: there are no third parties, so the peer's recent turns are // included too for coherence. Still no display names (pseudo "user"). // // `history` is the recent room window EXCLUDING the trigger; `triggerBody` is the // message that addressed the bot. Bodies are stripped of reply-fallback quotes so // quoted third-party text doesn't leak. -func buildContext(system string, history []bufferedMsg, isDM bool, triggerBody string, maxEvents, maxTokens int) []xaiMessage { - msgs := []xaiMessage{{Role: "system", Content: system}} +func buildContext(system string, history []bufferedMsg, isDM bool, triggerBody string, maxEvents, maxTokens int) []Message { + msgs := []Message{{Role: "system", Content: system}} // Keep at most the last maxEvents history items. if len(history) > maxEvents { @@ -34,16 +34,16 @@ func buildContext(system string, history []bufferedMsg, isDM bool, triggerBody s continue } if h.isBot { - msgs = append(msgs, xaiMessage{Role: "assistant", Content: body}) + msgs = append(msgs, Message{Role: "assistant", Content: body}) continue } if isDM { - msgs = append(msgs, xaiMessage{Role: "user", Content: body}) + msgs = append(msgs, Message{Role: "user", Content: body}) } // group + non-bot history → dropped (privacy minimisation) } - msgs = append(msgs, xaiMessage{Role: "user", Content: stripReplyFallback(triggerBody)}) + msgs = append(msgs, Message{Role: "user", Content: stripReplyFallback(triggerBody)}) return truncateToTokens(msgs, maxTokens) } @@ -57,7 +57,7 @@ func estimateTokens(s string) int { // truncateToTokens drops the oldest non-system, non-final messages until the // estimate fits maxTokens. The system prompt (index 0) and the final user // trigger are always preserved. -func truncateToTokens(msgs []xaiMessage, maxTokens int) []xaiMessage { +func truncateToTokens(msgs []Message, maxTokens int) []Message { total := 0 for _, m := range msgs { total += estimateTokens(m.Content) diff --git a/apps/ai-bot/httpllm.go b/apps/ai-bot/httpllm.go new file mode 100644 index 00000000..07760906 --- /dev/null +++ b/apps/ai-bot/httpllm.go @@ -0,0 +1,200 @@ +package main + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "log/slog" + "math/rand" + "net/http" + "time" +) + +// httpllm.go is the shared OpenAI-compatible Chat Completions transport: one +// HTTP+retry implementation reused by every provider adapter. Grok and Gemini both +// expose this wire format, so the retry/backoff classification (429/5xx/network = +// retryable, other 4xx = terminal) lives once here, parameterised by base/key/ +// headers, instead of being copied per provider. + +// openAIClient performs OpenAI-compatible /chat/completions calls with retry. +type openAIClient struct { + name string // provider label for logs/errors ("xai", "gemini") + base string + key string + http *http.Client + maxTry int + headers map[string]string // extra static headers (provider-specific), may be nil + log *slog.Logger +} + +func newOpenAIClient(name, base, key string, headers map[string]string, logger *slog.Logger) *openAIClient { + return &openAIClient{ + name: name, + base: base, + key: key, + http: &http.Client{}, + maxTry: 3, + headers: headers, + log: logger, + } +} + +// --- OpenAI-compatible wire types ------------------------------------------------- + +type openAIMessage struct { + Role string `json:"role"` + Content string `json:"content"` +} + +// openAITool is the wire shape of a model tool (e.g. web search). Only serialized +// when the request carries tools, so a plain completion's body is unchanged. +type openAITool struct { + Type string `json:"type"` +} + +type openAIRequest struct { + Model string `json:"model"` + Messages []openAIMessage `json:"messages"` + MaxTokens int `json:"max_tokens"` + Temperature float64 `json:"temperature"` + Stream bool `json:"stream"` + // Optional; omitempty keeps the grok_direct body byte-identical to before. + Tools []openAITool `json:"tools,omitempty"` + ReasoningEffort string `json:"reasoning_effort,omitempty"` + // SearchParameters drives xAI Live Search on chat/completions (the web route's + // grok_web_search provider). nil for every non-web call, so it serializes away. + SearchParameters any `json:"search_parameters,omitempty"` +} + +type openAIUsage struct { + PromptTokens int `json:"prompt_tokens"` + CompletionTokens int `json:"completion_tokens"` + PromptTokensDetails struct { + CachedTokens int `json:"cached_tokens"` + } `json:"prompt_tokens_details"` +} + +type openAIResponse struct { + ID string `json:"id"` + Choices []struct { + Message struct { + Content string `json:"content"` + } `json:"message"` + FinishReason string `json:"finish_reason"` + } `json:"choices"` + Usage openAIUsage `json:"usage"` + // Citations is the source list xAI Live Search returns by default (absent on a + // non-web call → nil). + Citations []string `json:"citations"` +} + +func (r *openAIResponse) Text() string { + if len(r.Choices) == 0 { + return "" + } + return r.Choices[0].Message.Content +} + +// complete calls Chat Completions with retry on transient failures (429 / 5xx / +// network timeout, exponential backoff + jitter). Non-retryable 4xx fail +// immediately. On exhaustion the caller refunds the reserved request and notifies +// the user, so a transient failure is never silently swallowed (F6). reqHeaders are +// per-request headers (e.g. x-grok-conv-id) merged on top of the static ones; nil is +// fine. +func (c *openAIClient) complete(ctx context.Context, reqBody openAIRequest, reqHeaders map[string]string) (*openAIResponse, error) { + payload, err := json.Marshal(reqBody) + if err != nil { + return nil, err + } + + var lastErr error + for attempt := 0; attempt < c.maxTry; attempt++ { + if attempt > 0 { + // 0.5s, 1s, 2s … capped at 8s, plus up to 250ms jitter. + backoff := time.Duration(500< 8*time.Second { + backoff = 8 * time.Second + } + backoff += time.Duration(rand.Intn(250)) * time.Millisecond + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-time.After(backoff): + } + } + + resp, retryable, err := c.attempt(ctx, payload, reqHeaders) + if err == nil { + return resp, nil + } + lastErr = err + if ctx.Err() != nil { + return nil, ctx.Err() + } + if !retryable { + return nil, err + } + if c.log != nil { + c.log.Warn(c.name+" attempt failed, will retry", "attempt", attempt+1, "max", c.maxTry, "err", err) + } + } + return nil, fmt.Errorf("%s: exhausted %d attempts: %w", c.name, c.maxTry, lastErr) +} + +// attempt performs one HTTP call. It returns retryable=true for 429/5xx and +// network errors, false for other non-2xx (terminal 4xx). The per-attempt deadline +// bounds a single hung connection; the overall per-request deadline (set by the +// caller via ctx) bounds the whole retry loop so a cascade can't accrete minutes. +func (c *openAIClient) attempt(ctx context.Context, payload []byte, reqHeaders map[string]string) (*openAIResponse, bool, error) { + attemptCtx, cancel := context.WithTimeout(ctx, 60*time.Second) + defer cancel() + + req, err := http.NewRequestWithContext(attemptCtx, http.MethodPost, c.base+"/chat/completions", bytes.NewReader(payload)) + if err != nil { + return nil, false, err + } + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Authorization", "Bearer "+c.key) + for k, v := range c.headers { + req.Header.Set(k, v) + } + for k, v := range reqHeaders { + req.Header.Set(k, v) + } + + resp, err := c.http.Do(req) + if err != nil { + // Network error / timeout — retryable (unless the parent ctx is done). + return nil, ctx.Err() == nil, err + } + defer resp.Body.Close() + data, _ := io.ReadAll(resp.Body) + + if resp.StatusCode == http.StatusTooManyRequests || resp.StatusCode >= 500 { + return nil, true, fmt.Errorf("%s http %d: %s", c.name, resp.StatusCode, snippet(data)) + } + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + return nil, false, fmt.Errorf("%s http %d: %s", c.name, resp.StatusCode, snippet(data)) + } + + var out openAIResponse + if err := json.Unmarshal(data, &out); err != nil { + return nil, false, fmt.Errorf("%s decode: %w", c.name, err) + } + // A 2xx is a billed call even when the model returns empty content (content + // filter, finish_reason=length with no text, or no choices). Return it as a + // success so the caller books the real cost via the ledger instead of refunding + // the slot and losing the spend — which would let empty replies bypass BOTH the + // per-user cap and the global ceiling. The caller just won't send an empty body. + return &out, false, nil +} + +func snippet(b []byte) string { + const max = 300 + if len(b) > max { + return string(b[:max]) + "…" + } + return string(b) +} diff --git a/apps/ai-bot/llm.go b/apps/ai-bot/llm.go new file mode 100644 index 00000000..c9093f70 --- /dev/null +++ b/apps/ai-bot/llm.go @@ -0,0 +1,65 @@ +package main + +import "context" + +// llm.go is the provider-neutral seam between the bot's business logic and the +// concrete model backends. Nothing here names a vendor: the bot composes its +// context, prices usage, and books spend against these types, and a thin adapter +// (provider_xai.go, provider_gemini.go) maps them to/from each backend's wire +// format. This is what lets a second model (Gemini) slot in behind a flag without +// the business logic learning a new shape. + +// Message is one provider-neutral chat turn. +type Message struct { + Role string // "system" | "user" | "assistant" + Content string +} + +// Usage is the provider-neutral token accounting returned with a completion. It +// drives billing (computeUSD) — the counts are the API's own, authoritative even +// if our price constants drift. +type Usage struct { + PromptTokens int + CachedTokens int // subset of PromptTokens served from the provider's prompt cache + CompletionTokens int +} + +// Tool is a provider-neutral tool the model may invoke (e.g. web search). Empty +// today; the web-freshness layer (Phase 3) populates it. Carried here so the +// request type is stable across phases. +type Tool struct { + // Type names the tool, e.g. "web_search". Adapters translate it to each + // backend's tool wire shape. + Type string +} + +// LLMRequest is a provider-neutral completion request. New optional fields (Tools, +// ReasoningEffort) serialize away when empty, so a plain grok_direct call produces +// exactly the same wire body it did before this seam existed. +type LLMRequest struct { + Model string + Messages []Message + MaxTokens int + Temperature float64 + Tools []Tool // optional; populated by the web layer + ReasoningEffort string // optional; "" = default, e.g. "low"|"high" for the reasoning route + // ConvID is an optional prompt-cache routing hint. Adapters that support it (xAI's + // x-grok-conv-id) pin a conversation to one backend to raise cache hit rate; "" = + // don't send it. It is a header, not part of the request body, so it never changes + // the wire body and an unset value is a no-op. + ConvID string +} + +// LLMResponse is a provider-neutral completion result. +type LLMResponse struct { + Text string + Usage Usage + ProviderRequestID string // the backend's response id, logged for support/debug +} + +// LLMClient is any chat-completion backend (Grok, Gemini, …). Implementations are +// thin adapters over a wire protocol; the bot depends only on this interface, so +// Bot.llm can be swapped or routed without touching business logic. +type LLMClient interface { + Complete(ctx context.Context, req LLMRequest) (*LLMResponse, error) +} diff --git a/apps/ai-bot/main.go b/apps/ai-bot/main.go index c0549373..eeeb5176 100644 --- a/apps/ai-bot/main.go +++ b/apps/ai-bot/main.go @@ -11,7 +11,6 @@ import ( "fmt" "os" "os/signal" - "path/filepath" "syscall" ) @@ -91,8 +90,3 @@ func main() { } logger.Info("shut down cleanly") } - -// statePath joins a filename under the configured state directory. -func (c *Config) statePath(name string) string { - return filepath.Join(c.StateDir, name) -} diff --git a/apps/ai-bot/pricing.go b/apps/ai-bot/pricing.go new file mode 100644 index 00000000..220b343a --- /dev/null +++ b/apps/ai-bot/pricing.go @@ -0,0 +1,42 @@ +package main + +// pricing.go centralises model pricing as a per-model table (the LiteLLM pattern) +// instead of three hardcoded Grok fields. The spend ledger prices each call by the +// model it actually used, so when a second model (Gemini) starts answering some +// routes, its cost books correctly against the same global ceiling. + +// ModelPrice is the per-1M-token USD price for one model, applied to the API's +// returned usage so the wallet ceiling tracks real cost even as prices change. +type ModelPrice struct { + InputPerM float64 // non-cached prompt tokens + CachedPerM float64 // prompt tokens served from the provider cache (cheaper) + OutputPerM float64 // completion tokens +} + +// CostBreakdown is the per-component USD cost of answering one request. A plain +// grok_direct call has only Token; a cascade adds Router (the cheap classifier), +// Grounding (Gemini Google-search) and/or WebTool (Grok web search) on top. Settle +// books each column separately so the ledger and request_log can attribute spend, +// and so a half-finished cascade can book only what it actually spent (§8.1). +type CostBreakdown struct { + Token float64 + Grounding float64 + WebTool float64 + Router float64 +} + +// Total is the grand total across all components (the number the wallet ceiling and +// request_log.total_usd care about). Computed, never stored, so it can't drift. +func (c CostBreakdown) Total() float64 { + return c.Token + c.Grounding + c.WebTool + c.Router +} + +// priceFor returns the configured price for a model. An unknown model falls back to +// the default (final-voice) model's price rather than $0 — a $0 price would silently +// blind the global ceiling to that call, the one failure mode we never want. +func (c *Config) priceFor(model string) ModelPrice { + if p, ok := c.Prices[model]; ok { + return p + } + return c.Prices[c.XAIModel] +} diff --git a/apps/ai-bot/provider_gemini.go b/apps/ai-bot/provider_gemini.go new file mode 100644 index 00000000..cea01a85 --- /dev/null +++ b/apps/ai-bot/provider_gemini.go @@ -0,0 +1,189 @@ +package main + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "log/slog" + "net/http" + "net/url" + "strings" + "time" +) + +// provider_gemini.go is the Gemini backend. Two faces: +// +// - geminiClient: a thin LLMClient over the OpenAI-compatible endpoint, used for the +// cheap trivial route and the Layer-1 router classifier. Same wire format as Grok, +// so it reuses the shared transport (httpllm.go). +// - groundedSearch: a SEPARATE call against the NATIVE v1beta generateContent endpoint +// with the google_search tool. Grounding does NOT work on the OpenAI-compat layer +// (it is silently ignored there, and only on Gemini 3+) — verified against Google's +// docs (F-EXT-3) — so the web layer that wants Gemini grounding must use this native +// path and VERIFY citations came back, else degrade. +type geminiClient struct { + http *openAIClient + nativeBase string // …/v1beta — derived from the OpenAI-compat base by dropping /openai + key string + model string + httpc *http.Client + log *slog.Logger +} + +// NewGeminiClient builds the Gemini backend. base is the OpenAI-compatible endpoint +// (…/v1beta/openai); the native grounding endpoint is derived from it. Returns the +// concrete type (not just LLMClient) because the web layer needs groundedSearch too. +func NewGeminiClient(base, key, model string, logger *slog.Logger) *geminiClient { + return &geminiClient{ + http: newOpenAIClient("gemini", base, key, nil, logger), + nativeBase: strings.TrimSuffix(base, "/openai"), + key: key, + model: model, + httpc: &http.Client{}, + log: logger, + } +} + +// Complete answers via the OpenAI-compatible endpoint (trivial route + classifier). +func (c *geminiClient) Complete(ctx context.Context, req LLMRequest) (*LLMResponse, error) { + msgs := make([]openAIMessage, len(req.Messages)) + for i, m := range req.Messages { + msgs[i] = openAIMessage{Role: m.Role, Content: m.Content} + } + resp, err := c.http.complete(ctx, openAIRequest{ + Model: req.Model, + Messages: msgs, + MaxTokens: req.MaxTokens, + Temperature: req.Temperature, + Stream: false, + }, nil) + if err != nil { + return nil, err + } + return &LLMResponse{ + Text: resp.Text(), + Usage: Usage{ + PromptTokens: resp.Usage.PromptTokens, + CachedTokens: resp.Usage.PromptTokensDetails.CachedTokens, + CompletionTokens: resp.Usage.CompletionTokens, + }, + ProviderRequestID: resp.ID, + }, nil +} + +// --- native v1beta grounded search (google_search tool) --------------------------- + +type geminiGroundResult struct { + Digest string + Citations []string + Usage Usage +} + +// native generateContent wire types (only the fields we read/write). +type geminiNativeRequest struct { + Contents []geminiContent `json:"contents"` + Tools []geminiTool `json:"tools"` +} +type geminiContent struct { + Role string `json:"role,omitempty"` + Parts []geminiPart `json:"parts"` +} +type geminiPart struct { + Text string `json:"text"` +} +type geminiTool struct { + // google_search is the current grounding tool (Gemini 3 / current models). The + // empty object enables it. + GoogleSearch struct{} `json:"google_search"` +} +type geminiNativeResponse struct { + Candidates []struct { + Content struct { + Parts []geminiPart `json:"parts"` + } `json:"content"` + GroundingMetadata struct { + GroundingChunks []struct { + Web struct { + URI string `json:"uri"` + Title string `json:"title"` + } `json:"web"` + } `json:"groundingChunks"` + } `json:"groundingMetadata"` + } `json:"candidates"` + UsageMetadata struct { + PromptTokenCount int `json:"promptTokenCount"` + CandidatesTokenCount int `json:"candidatesTokenCount"` + CachedContentTokenCount int `json:"cachedContentTokenCount"` + } `json:"usageMetadata"` +} + +// groundedSearch runs one grounded generateContent against the native endpoint and +// returns the model's grounded answer plus the source URLs. It REQUIRES citations: +// if groundingMetadata has no chunks the request was not actually grounded (the +// silent-ignore failure mode, F-EXT-3), so it errors and the caller degrades rather +// than passing off ungrounded — possibly stale — text as fresh. +func (c *geminiClient) groundedSearch(ctx context.Context, query string) (geminiGroundResult, error) { + body, err := json.Marshal(geminiNativeRequest{ + Contents: []geminiContent{{Role: "user", Parts: []geminiPart{{Text: query}}}}, + Tools: []geminiTool{{}}, + }) + if err != nil { + return geminiGroundResult{}, err + } + + // API key in the query string is the native v1beta convention. + endpoint := fmt.Sprintf("%s/models/%s:generateContent?key=%s", + c.nativeBase, url.PathEscape(c.model), url.QueryEscape(c.key)) + + reqCtx, cancel := context.WithTimeout(ctx, 15*time.Second) // web/grounding budget (§8.2.2) + defer cancel() + req, err := http.NewRequestWithContext(reqCtx, http.MethodPost, endpoint, bytes.NewReader(body)) + if err != nil { + return geminiGroundResult{}, err + } + req.Header.Set("Content-Type", "application/json") + + resp, err := c.httpc.Do(req) + if err != nil { + return geminiGroundResult{}, err + } + defer resp.Body.Close() + data, _ := io.ReadAll(resp.Body) + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + return geminiGroundResult{}, fmt.Errorf("gemini grounding http %d: %s", resp.StatusCode, snippet(data)) + } + + var out geminiNativeResponse + if err := json.Unmarshal(data, &out); err != nil { + return geminiGroundResult{}, fmt.Errorf("gemini grounding decode: %w", err) + } + if len(out.Candidates) == 0 { + return geminiGroundResult{}, fmt.Errorf("gemini grounding: no candidates") + } + + var sb strings.Builder + for _, p := range out.Candidates[0].Content.Parts { + sb.WriteString(p.Text) + } + var citations []string + for _, ch := range out.Candidates[0].GroundingMetadata.GroundingChunks { + if ch.Web.URI != "" { + citations = append(citations, ch.Web.URI) + } + } + // The verify-gate: no citations ⇒ not actually grounded ⇒ degrade. + if len(citations) == 0 { + return geminiGroundResult{}, fmt.Errorf("gemini grounding: no citations (ungrounded — degrade)") + } + return geminiGroundResult{ + Digest: strings.TrimSpace(sb.String()), + Citations: citations, + Usage: Usage{ + PromptTokens: out.UsageMetadata.PromptTokenCount, + CachedTokens: out.UsageMetadata.CachedContentTokenCount, + CompletionTokens: out.UsageMetadata.CandidatesTokenCount, + }, + }, nil +} diff --git a/apps/ai-bot/provider_xai.go b/apps/ai-bot/provider_xai.go new file mode 100644 index 00000000..d618077d --- /dev/null +++ b/apps/ai-bot/provider_xai.go @@ -0,0 +1,62 @@ +package main + +import ( + "context" + "log/slog" +) + +// provider_xai.go is the thin adapter for xAI's Grok backend. xAI speaks the +// OpenAI Chat Completions wire format, so this is a shell over the shared +// openAIClient transport (httpllm.go): it only maps the neutral LLMRequest/ +// LLMResponse to/from the wire types. Any xAI-specific request shaping would live +// here, but Grok needs none today. +type xaiClient struct { + http *openAIClient +} + +// NewXAIClient builds the Grok backend. Returns the neutral LLMClient so the bot +// holds no vendor type. +func NewXAIClient(base, key string, logger *slog.Logger) LLMClient { + return &xaiClient{http: newOpenAIClient("xai", base, key, nil, logger)} +} + +func (c *xaiClient) Complete(ctx context.Context, req LLMRequest) (*LLMResponse, error) { + msgs := make([]openAIMessage, len(req.Messages)) + for i, m := range req.Messages { + msgs[i] = openAIMessage{Role: m.Role, Content: m.Content} + } + var tools []openAITool + for _, t := range req.Tools { + tools = append(tools, openAITool{Type: t.Type}) + } + + // x-grok-conv-id pins this conversation to one backend to raise the prompt-cache + // hit rate (caching itself is automatic on xAI). Only sent when set, so the + // default path adds no header. + var headers map[string]string + if req.ConvID != "" { + headers = map[string]string{"x-grok-conv-id": req.ConvID} + } + + resp, err := c.http.complete(ctx, openAIRequest{ + Model: req.Model, + Messages: msgs, + MaxTokens: req.MaxTokens, + Temperature: req.Temperature, + Stream: false, + Tools: tools, + ReasoningEffort: req.ReasoningEffort, + }, headers) + if err != nil { + return nil, err + } + return &LLMResponse{ + Text: resp.Text(), + Usage: Usage{ + PromptTokens: resp.Usage.PromptTokens, + CachedTokens: resp.Usage.PromptTokensDetails.CachedTokens, + CompletionTokens: resp.Usage.CompletionTokens, + }, + ProviderRequestID: resp.ID, + }, nil +} diff --git a/apps/ai-bot/router.go b/apps/ai-bot/router.go new file mode 100644 index 00000000..4de7fd6e --- /dev/null +++ b/apps/ai-bot/router.go @@ -0,0 +1,180 @@ +package main + +import ( + "context" + "encoding/json" + "regexp" + "strings" +) + +// router.go classifies a message into a route. It runs INSIDE respond() — after the +// mention/media/foreign/single-flight gates (F-FUNC-7) — so a paid Layer-1 classifier +// is never spent on a message today's bot drops for free. +// +// Two layers, both conservative (doubt → grok_direct, the safe floor that keeps +// substantive questions on Grok, §8.6): +// - Layer-0: free regex heuristics (RU+EN). Always runs when ROUTER_ENABLED. +// - Layer-1: a cheap Gemini JSON classifier, consulted ONLY on Layer-0 grok_direct +// when ROUTER_CLASSIFIER_ENABLED. Any failure falls back to the Layer-0 verdict. + +// RouterDecision is the route plus the signals behind it (logged for threshold +// calibration). Only Route/Source/Confidence/NeedsWeb drive behaviour today; the rest +// are recorded for the offline router-replay eval (§9). +type RouterDecision struct { + Route string + Source string // heuristic | classifier | default | forced | degraded + Confidence float64 + NeedsWeb bool + Freshness string + ReasoningLevel string + Domain string + Difficulty string +} + +// Heuristic patterns. Kept deliberately tight: a false "trivial" leaks a real question +// to the cheap model, so trivial fires only on short, unmistakable greetings/acks or +// bare arithmetic. Freshness words route to web (a false web-route only costs a fetch +// and degrades cleanly — never a wrong answer). +var ( + greetingRe = regexp.MustCompile(`^(привет(ик)?|здравствуй(те)?|хай|прив|ку|добрый\s+(день|вечер|утро)|спасибо|спс|благодарю|пока|ок(ей)?|угу|ага|hello|hi|hey|yo|thanks|thank\s+you|thx|ty|bye|goodbye|ok|okay|cool|nice)[\s!.,)]*$`) + arithmeticRe = regexp.MustCompile(`^[\s(]*\d+(\s*[-+*/×÷]\s*\d+)+[\s)=?]*$`) + freshnessRe = regexp.MustCompile(`(новост|сегодня|сейчас|последн|курс\s|погод|котировк|расписани|прогноз|breaking|today|right now|latest|current(ly)?|news|weather|stock price|exchange rate|score)`) +) + +// routeLayer0 is the free heuristic. Confidence is a rough self-estimate used only for +// logging/threshold tuning, not control flow. +func routeLayer0(body string) RouterDecision { + s := strings.ToLower(strings.TrimSpace(body)) + if s == "" { + return RouterDecision{Route: routeGrokDirect, Source: "heuristic", Confidence: 0.5} + } + if freshnessRe.MatchString(s) { + return RouterDecision{Route: routeWebThenGrok, Source: "heuristic", Confidence: 0.7, NeedsWeb: true, Freshness: "recent"} + } + if isTrivial(s) { + return RouterDecision{Route: routeTrivial, Source: "heuristic", Confidence: 0.85, Difficulty: "trivial"} + } + return RouterDecision{Route: routeGrokDirect, Source: "heuristic", Confidence: 0.6} +} + +// isTrivial: a short greeting/ack or a bare arithmetic expression, with no sign of a +// real question. Length-bounded so "thanks, now explain quantum tunnelling" is NOT +// trivial. +func isTrivial(s string) bool { + if arithmeticRe.MatchString(s) { + return true + } + if len(strings.Fields(s)) <= 4 && greetingRe.MatchString(s) { + return true + } + return false +} + +// classify produces the final RouterDecision for a request. The manual reasoning +// trigger is honoured independently of the heuristic router (it's a deliberate user +// signal). Layer-1's cost, when it runs, is accumulated into cost.Router. +func (b *Bot) classify(ctx context.Context, body string, cost *CostBreakdown) RouterDecision { + if b.cfg.ReasoningEnabled && containsTrigger(body, b.cfg.ReasoningTrigger) { + return RouterDecision{Route: routeReason, Source: "forced", Confidence: 1, ReasoningLevel: "high"} + } + if !b.cfg.RouterEnabled { + return RouterDecision{Route: routeGrokDirect, Source: "default"} + } + d := routeLayer0(body) + // Layer-1 only refines the uncertain grok_direct verdict, and only if enabled and + // the Gemini client exists. Anything else stands on the heuristic. + if d.Route != routeGrokDirect || !b.cfg.RouterClassifierEnabled || b.gemini == nil { + return d + } + refined, err := b.routeLayer1(ctx, body, cost) + if err != nil { + b.log.Warn("layer-1 classifier failed; using heuristic", "err", err) + return d // degrade to the heuristic verdict + } + return refined +} + +// classifierConfidenceFloor is the bar a Layer-1 escalation OFF the safe floor +// (trivial/web/reason) must clear. Below it, the verdict is treated as doubt and the +// request stays on grok_direct — the owner's "substantive stays on Grok" rule (§8.6). +// A low-confidence "trivial" is exactly the false-trivial voice leak we must not take. +const classifierConfidenceFloor = 0.8 + +// classifierPrompt asks Gemini for a strict JSON verdict. Kept terse to bound tokens. +const classifierPrompt = `You are a router. Classify the user message into exactly one route and reply with ONLY a JSON object, no prose. +Routes: "trivial" (greeting/ack/tiny arithmetic), "web" (needs fresh/current facts: news, prices, weather, "today"), "normal" (everything else). +Schema: {"route":"trivial|web|normal","confidence":0.0-1.0,"needs_web":true|false} +Message: ` + +// routeLayer1 runs the Gemini classifier and parses its JSON. A non-JSON or unknown +// answer is an error so classify() degrades to the heuristic — the cheap model never +// gets to silently mis-route by returning garbage. +func (b *Bot) routeLayer1(ctx context.Context, body string, cost *CostBreakdown) (RouterDecision, error) { + resp, err := b.gemini.Complete(ctx, LLMRequest{ + Model: b.cfg.GeminiModel, + Messages: []Message{{Role: "user", Content: classifierPrompt + body}}, + MaxTokens: 60, + Temperature: 0, + }) + if err != nil { + return RouterDecision{}, err + } + cost.Router += computeUSD(b.cfg.GeminiModel, resp.Usage, b.cfg) + + var parsed struct { + Route string `json:"route"` + Confidence float64 `json:"confidence"` + NeedsWeb bool `json:"needs_web"` + } + if err := json.Unmarshal([]byte(extractJSON(resp.Text)), &parsed); err != nil { + return RouterDecision{}, err + } + route := normalizeRoute(parsed.Route) + // Safe floor: a low-confidence escalation off grok_direct is doubt — keep it on + // Grok rather than leak a possibly-substantive question to the cheap model. + if route != routeGrokDirect && parsed.Confidence < classifierConfidenceFloor { + return RouterDecision{Route: routeGrokDirect, Source: "classifier", Confidence: parsed.Confidence}, nil + } + return RouterDecision{ + Route: route, + Source: "classifier", + Confidence: parsed.Confidence, + NeedsWeb: parsed.NeedsWeb || route == routeWebThenGrok, + }, nil +} + +// normalizeRoute maps a classifier label to a route constant, defaulting unknown +// labels to grok_direct — the safe floor, so a confused classifier never escalates. +func normalizeRoute(label string) string { + switch strings.ToLower(strings.TrimSpace(label)) { + case "trivial", "trivial_direct": + return routeTrivial + case "web", "web_then_grok": + return routeWebThenGrok + case "reason", "reason_then_grok": + return routeReason + default: + return routeGrokDirect + } +} + +// extractJSON pulls the first {...} object out of a model reply, tolerating prose or +// code fences around it. Returns "" if none (→ a parse error → degrade). +func extractJSON(s string) string { + i := strings.IndexByte(s, '{') + j := strings.LastIndexByte(s, '}') + if i < 0 || j < i { + return "" + } + return s[i : j+1] +} + +// containsTrigger reports whether body contains the manual trigger phrase +// (case-insensitive, whitespace-trimmed). Empty trigger never matches. +func containsTrigger(body, trigger string) bool { + trigger = strings.TrimSpace(strings.ToLower(trigger)) + if trigger == "" { + return false + } + return strings.Contains(strings.ToLower(body), trigger) +} diff --git a/apps/ai-bot/router_test.go b/apps/ai-bot/router_test.go new file mode 100644 index 00000000..f5502da6 --- /dev/null +++ b/apps/ai-bot/router_test.go @@ -0,0 +1,81 @@ +package main + +import "testing" + +// TestRouteLayer0 is the heuristic golden set (RU+EN). The critical property is the +// safe floor: anything substantive must land on grok_direct, and a long message that +// merely starts with a greeting must NOT be trivial (no leaking real questions to the +// cheap model, §8.6). +func TestRouteLayer0(t *testing.T) { + cases := []struct { + body string + want string + }{ + // trivial: short greetings/acks and bare arithmetic + {"привет", routeTrivial}, + {"Привет!", routeTrivial}, + {"спасибо", routeTrivial}, + {"спс", routeTrivial}, + {"ок", routeTrivial}, + {"hi", routeTrivial}, + {"hello", routeTrivial}, + {"thanks", routeTrivial}, + {"thank you", routeTrivial}, + {"ok", routeTrivial}, + {"2+2", routeTrivial}, + {"100 * 50", routeTrivial}, + {"12 / 4 - 1", routeTrivial}, + // web: freshness signals + {"какие новости сегодня?", routeWebThenGrok}, + {"что сейчас происходит в мире", routeWebThenGrok}, + {"курс доллара сегодня", routeWebThenGrok}, + {"what's the weather today", routeWebThenGrok}, + {"latest news on AI", routeWebThenGrok}, + {"current bitcoin price", routeWebThenGrok}, + // grok_direct: substantive (the safe floor) + {"посоветуй фильм на вечер", routeGrokDirect}, + {"explain how TCP works", routeGrokDirect}, + {"расскажи историю римской империи", routeGrokDirect}, + {"спасибо, а теперь подробно объясни квантовую запутанность", routeGrokDirect}, // starts w/ ack but long + {"hi, can you help me debug this Go program?", routeGrokDirect}, // starts w/ hi but a real ask + {"напиши функцию сортировки на python", routeGrokDirect}, + } + for _, c := range cases { + if got := routeLayer0(c.body).Route; got != c.want { + t.Errorf("routeLayer0(%q) = %q, want %q", c.body, got, c.want) + } + } +} + +func TestNormalizeRoute(t *testing.T) { + cases := map[string]string{ + "trivial": routeTrivial, "web": routeWebThenGrok, "reason": routeReason, + "normal": routeGrokDirect, "garbage": routeGrokDirect, "": routeGrokDirect, + } + for in, want := range cases { + if got := normalizeRoute(in); got != want { + t.Errorf("normalizeRoute(%q) = %q, want %q", in, got, want) + } + } +} + +func TestExtractJSON(t *testing.T) { + if got := extractJSON("prefix {\"route\":\"web\"} suffix"); got != `{"route":"web"}` { + t.Errorf("extractJSON = %q", got) + } + if got := extractJSON("no json here"); got != "" { + t.Errorf("extractJSON(no json) = %q, want empty", got) + } +} + +func TestContainsTrigger(t *testing.T) { + if !containsTrigger("ну подумай глубже про это", "подумай глубже") { + t.Error("should match trigger phrase mid-sentence") + } + if containsTrigger("just a normal question", "подумай глубже") { + t.Error("must not match when phrase absent") + } + if containsTrigger("anything", "") { + t.Error("empty trigger must never match") + } +} diff --git a/apps/ai-bot/store.go b/apps/ai-bot/store.go index af293084..8d3d4c08 100644 --- a/apps/ai-bot/store.go +++ b/apps/ai-bot/store.go @@ -2,6 +2,7 @@ package main import ( "context" + "encoding/json" "errors" "fmt" "time" @@ -107,6 +108,64 @@ var migrations = []string{ PRIMARY KEY (date, mxid) ); CREATE TABLE IF NOT EXISTS warned_encrypted (room_id TEXT PRIMARY KEY);`, + + // v2: component cost columns + the optimistic reservation column. `reserved_usd` + // holds the estimated max-cost of in-flight calls so the global ceiling counts + // committed + reserved spend at admission time (the TOCTOU fix, §8.1): without it + // a burst of concurrent calls all read the same low committed SUM and slip past + // the ceiling, because the USD only lands at settle, AFTER the call. The component + // columns let the ceiling see grounding/tool fees too (not just tokens), and feed + // the per-component analytics. ADD COLUMN IF NOT EXISTS is idempotent. + `ALTER TABLE spend ADD COLUMN IF NOT EXISTS reserved_usd DOUBLE PRECISION NOT NULL DEFAULT 0; + ALTER TABLE spend ADD COLUMN IF NOT EXISTS router_usd DOUBLE PRECISION NOT NULL DEFAULT 0; + ALTER TABLE spend ADD COLUMN IF NOT EXISTS grounding_usd DOUBLE PRECISION NOT NULL DEFAULT 0; + ALTER TABLE spend ADD COLUMN IF NOT EXISTS webtool_usd DOUBLE PRECISION NOT NULL DEFAULT 0;`, + + // v3: request_log — one row per engaged request, for offline analysis of the route + // mix, per-component $/day, latency, escalation/degrade rates (§6.2). Operational, + // not message content: query_text is written ONLY when TELEMETRY_STORE_TEXT is on. + // Indexed by ts for the time-based retention trim and time-series queries. + `CREATE TABLE IF NOT EXISTS request_log ( + id TEXT PRIMARY KEY, + ts TIMESTAMPTZ NOT NULL DEFAULT now(), + room_id TEXT, + sender TEXT, + route TEXT, + router_source TEXT, + router_confidence REAL, + models JSONB, + prompt_tokens INT, + cached_tokens INT, + completion_tokens INT, + token_usd DOUBLE PRECISION, + grounding_usd DOUBLE PRECISION, + router_usd DOUBLE PRECISION, + webtool_usd DOUBLE PRECISION, + total_usd DOUBLE PRECISION, + latency_ms INT, + stage_ms JSONB, + escalated BOOL DEFAULT false, + fallback_fired BOOL DEFAULT false, + cache_hit BOOL DEFAULT false, + ceiling_hit BOOL DEFAULT false, + per_user_cap_hit BOOL DEFAULT false, + prompt_version TEXT, + provider_request_id TEXT, + degraded TEXT DEFAULT '', + err TEXT DEFAULT '', + ok BOOL, + query_text TEXT + ); + CREATE INDEX IF NOT EXISTS request_log_ts_idx ON request_log (ts);`, + + // v4: per-day grounded-prompt counter for the web grounding cap guard (§8.2.3). One + // row per UTC day; the cap check + increment is one atomic statement (same TOCTOU + // discipline as the spend gate), so a burst can't blow past the $/1k grounding + // overage. Day-keyed, so it self-resets and needs no separate trim. + `CREATE TABLE IF NOT EXISTS grounding_count ( + date TEXT PRIMARY KEY, + n INTEGER NOT NULL DEFAULT 0 + );`, } // migrate runs all pending migrations on a single connection under a session @@ -207,13 +266,19 @@ func (s *Store) SeenEvent(eventID string) (bool, error) { return true, err } -// SpentTodayUSD sums all spend for the current UTC day. SUM over no rows is NULL, -// which scans into a nil *float64 → treated as 0. +// committedUSDExpr sums every COMMITTED cost component of a spend row — tokens plus +// the grounding/web/router fees a cascade can incur — so the wallet ceiling is never +// blind to non-token spend. It deliberately excludes reserved_usd (that is in-flight, +// not yet spent); the admission gate adds reserved separately. +const committedUSDExpr = `usd + router_usd + grounding_usd + webtool_usd` + +// SpentTodayUSD sums all COMMITTED spend for the current UTC day. SUM over no rows is +// NULL, which scans into a nil *float64 → treated as 0. func (s *Store) SpentTodayUSD() (float64, error) { ctx, cancel := opContext() defer cancel() var v *float64 - if err := s.pool.QueryRow(ctx, `SELECT SUM(usd) FROM spend WHERE date = $1`, todayUTC()).Scan(&v); err != nil { + if err := s.pool.QueryRow(ctx, `SELECT SUM(`+committedUSDExpr+`) FROM spend WHERE date = $1`, todayUTC()).Scan(&v); err != nil { return 0, err } if v == nil { @@ -222,19 +287,30 @@ func (s *Store) SpentTodayUSD() (float64, error) { return *v, nil } -// Reserve runs the two independent gates in one transaction, BEFORE the xAI call -// (F4): the global USD ceiling protects the wallet; the per-user request cap is -// anti-abuse. It increments the per-user request count on success; the USD is -// reconciled after the response. Order: global first (cheapest to deny), then -// per-user. +// reserveDayLockKey namespaces the per-day admission lock so it can't collide with +// the migration lock or any other advisory lock. +const reserveDayLockKey = "ai-bot:reserve:" + +// Reserve runs the two admission gates in one transaction, BEFORE the call (F4): the +// global USD ceiling protects the wallet; the per-user request cap is anti-abuse. On +// success it both increments the per-user request count AND books `estimate` (the +// route's max-cost) into reserved_usd, so the global gate counts committed + reserved +// spend. The actual USD is settled after the response (Settle), at which point the +// reservation is released and the real cost booked. Order: global first (cheapest to +// deny), then per-user. // -// A transaction-scoped advisory lock on (date, mxid) serializes concurrent -// reservations for the SAME user+day, so the per-user check-then-increment stays -// atomic. The former SQLite store got this for free (one connection serialized all -// callers); the pgx pool is concurrent, and the same user messaging from two rooms -// at once would otherwise be able to slip past the per-user cap. Different users -// never contend. -func (s *Store) Reserve(mxid string, perUserCap int, dailyUSDCeiling float64) (reserveResult, error) { +// The check-and-reserve is serialized GLOBALLY for the day by a transaction-scoped +// advisory lock keyed on the date (not on date|mxid as the bare port did). This is +// the TOCTOU fix (§8.1): the ceiling reads SUM(committed)+SUM(reserved) and then adds +// its own reservation atomically, so a burst of DIFFERENT users can overshoot the +// ceiling by at most ONE max-reservation rather than slipping through unbounded — the +// per-(date,mxid) lock only serialized one user with himself and left the cross-user +// ceiling unprotected. The former SQLite store serialized ALL callers on its single +// connection anyway, so this restores that exact admission semantics, durably; the +// bot is low-volume with per-room single-flight, so a per-day admission lock costs +// nothing observable. Settle/Release run lock-free (they only release/convert spend, +// never admit). +func (s *Store) Reserve(mxid string, perUserCap int, perUserUSD, dailyUSDCeiling, estimate float64) (reserveResult, error) { ctx, cancel := opContext() defer cancel() day := todayUTC() @@ -245,41 +321,50 @@ func (s *Store) Reserve(mxid string, perUserCap int, dailyUSDCeiling float64) (r } defer tx.Rollback(ctx) - // Key on date|mxid. The separator only needs to avoid cross-key ambiguity; a - // hash collision would merely over-serialize two unrelated users, never corrupt a - // count. (NUL is rejected by Postgres text, so use a printable separator.) - if _, err := tx.Exec(ctx, `SELECT pg_advisory_xact_lock(hashtextextended($1, 0))`, day+"|"+mxid); err != nil { + if _, err := tx.Exec(ctx, `SELECT pg_advisory_xact_lock(hashtextextended($1, 0))`, reserveDayLockKey+day); err != nil { return reserveOK, err } - // SUM over zero rows is NULL → nil pointer → treat as 0.0, exactly as the SQLite - // store's sql.NullFloat64 did (and as SpentTodayUSD does). This keeps the gate 1:1 - // even at the degenerate dailyUSDCeiling == 0 (deny everything), where 0 >= 0. - var global *float64 - if err := tx.QueryRow(ctx, `SELECT SUM(usd) FROM spend WHERE date = $1`, day).Scan(&global); err != nil { + // committed + reserved. SUM over zero rows is NULL → nil pointer → treat as 0.0, + // exactly as the SQLite store's sql.NullFloat64 did. This keeps the gate 1:1 even + // at the degenerate dailyUSDCeiling == 0 (deny everything), where 0 >= 0. + var inFlight *float64 + if err := tx.QueryRow(ctx, + `SELECT SUM(`+committedUSDExpr+` + reserved_usd) FROM spend WHERE date = $1`, day).Scan(&inFlight); err != nil { return reserveOK, err } spentToday := 0.0 - if global != nil { - spentToday = *global + if inFlight != nil { + spentToday = *inFlight } if spentToday >= dailyUSDCeiling { return reserveDeniedGlobal, nil } + // Per-user row: read requests AND the user's own committed+reserved $ in one go, so + // both per-user gates are checked under the same lock. ErrNoRows → first request of + // the day for this user → all zero. var requests int - err = tx.QueryRow(ctx, `SELECT requests FROM spend WHERE date = $1 AND mxid = $2`, day, mxid).Scan(&requests) + var userUSD float64 + err = tx.QueryRow(ctx, + `SELECT requests, `+committedUSDExpr+` + reserved_usd FROM spend WHERE date = $1 AND mxid = $2`, + day, mxid).Scan(&requests, &userUSD) if err != nil && !errors.Is(err, pgx.ErrNoRows) { return reserveOK, err } if requests >= perUserCap { return reserveDeniedUser, nil } + // Optional per-user $ quota (0 = off): keep one user from draining the shared ceiling. + if perUserUSD > 0 && userUSD >= perUserUSD { + return reserveDeniedUser, nil + } if _, err := tx.Exec(ctx, - `INSERT INTO spend (date, mxid, requests, usd) VALUES ($1, $2, 1, 0) - ON CONFLICT (date, mxid) DO UPDATE SET requests = spend.requests + 1`, - day, mxid); err != nil { + `INSERT INTO spend (date, mxid, requests, reserved_usd) VALUES ($1, $2, 1, $3) + ON CONFLICT (date, mxid) DO UPDATE SET requests = spend.requests + 1, + reserved_usd = spend.reserved_usd + excluded.reserved_usd`, + day, mxid, estimate); err != nil { return reserveOK, err } if err := tx.Commit(ctx); err != nil { @@ -288,10 +373,11 @@ func (s *Store) Reserve(mxid string, perUserCap int, dailyUSDCeiling float64) (r return reserveOK, nil } -// RefundRequest gives back a reserved request slot when the call ultimately -// failed (e.g. an xAI outage), so a transient failure doesn't burn the user's -// daily cap. Never drops below zero. A single UPDATE is atomic, so concurrent -// refunds settle correctly without extra locking. +// RefundRequest gives back a reserved request SLOT when the call ultimately failed +// (an outage) or the reply couldn't be delivered (paid silence, §8.1), so a transient +// failure doesn't burn the user's daily cap. It does NOT touch USD: a 2xx is really +// billed even if we then fail to deliver. Never drops below zero. A single UPDATE is +// atomic, so concurrent refunds settle correctly without extra locking. func (s *Store) RefundRequest(mxid string) error { ctx, cancel := opContext() defer cancel() @@ -301,19 +387,128 @@ func (s *Store) RefundRequest(mxid string) error { return err } -// Reconcile books the actual USD cost of a completed call against the user's -// daily row (and thus the global total). The accumulating upsert is atomic and -// commutative, so concurrent reconciles for the same user sum correctly. -func (s *Store) Reconcile(mxid string, usd float64) error { +// ReleaseReservation frees a reservation whose request produced no billable spend, +// restoring the global headroom without booking anything. The normal failure paths +// settle via Settle (which also releases), so this is the safety valve for an +// UNSETTLED exit — a panic in generation, recovered by safego — where it runs with +// RefundRequest in respond's deferred guard so a leaked reservation can't drift the +// ceiling. GREATEST(0, …) guards against a double-release driving reserved_usd +// negative. Lock-free: it only lowers the in-flight reserved total, never admits. +func (s *Store) ReleaseReservation(mxid string, estimate float64) error { ctx, cancel := opContext() defer cancel() _, err := s.pool.Exec(ctx, - `INSERT INTO spend (date, mxid, requests, usd) VALUES ($1, $2, 0, $3) - ON CONFLICT (date, mxid) DO UPDATE SET usd = spend.usd + excluded.usd`, - todayUTC(), mxid, usd) + `UPDATE spend SET reserved_usd = GREATEST(0, reserved_usd - $3) WHERE date = $1 AND mxid = $2`, + todayUTC(), mxid, estimate) return err } +// Settle releases a call's reservation and books its ACTUAL cost in one atomic step +// (replacing the old additive Reconcile): reserved_usd drops by the reservation while +// the real per-component cost is added to the committed columns. This is non-additive +// on the reservation (settle, not accumulate), the semantics the ceiling needs. It is +// also the partial-cascade-refund primitive (§8.1): a web_then_grok call that paid +// grounding but failed at the final model passes a CostBreakdown carrying only the +// grounding it actually spent, releases the rest of the reservation, and refunds the +// request slot separately. GREATEST(0, …) keeps reserved_usd from underflowing. +// Atomic and commutative per row, so concurrent settles for one user sum correctly. +func (s *Store) Settle(mxid string, estimate float64, cost CostBreakdown) error { + ctx, cancel := opContext() + defer cancel() + _, err := s.pool.Exec(ctx, + `INSERT INTO spend (date, mxid, requests, usd, router_usd, grounding_usd, webtool_usd, reserved_usd) + VALUES ($1, $2, 0, $3, $4, $5, $6, 0) + ON CONFLICT (date, mxid) DO UPDATE SET + usd = spend.usd + excluded.usd, + router_usd = spend.router_usd + excluded.router_usd, + grounding_usd = spend.grounding_usd + excluded.grounding_usd, + webtool_usd = spend.webtool_usd + excluded.webtool_usd, + reserved_usd = GREATEST(0, spend.reserved_usd - $7)`, + todayUTC(), mxid, cost.Token, cost.Router, cost.Grounding, cost.WebTool, estimate) + return err +} + +// InsertRequestLog writes one analytics row. id is the event id (PRIMARY KEY), so a +// re-logged event is a no-op (ON CONFLICT DO NOTHING) — each event takes exactly one +// terminal path, so this never overwrites a real outcome. The write is isolated: the +// caller runs it off the answer path and only logs a failure, never drops the reply. +func (s *Store) InsertRequestLog(rl RequestLog) error { + ctx, cancel := opContext() + defer cancel() + + models, err := json.Marshal(rl.Models) + if err != nil { + return err + } + stages, err := json.Marshal(rl.StageMS) + if err != nil { + return err + } + // query_text is NULL unless text capture is on (the struct carries "" otherwise), + // so the analytics table never holds message content by default. + var queryText any + if rl.QueryText != "" { + queryText = rl.QueryText + } + + _, err = s.pool.Exec(ctx, ` + INSERT INTO request_log ( + id, room_id, sender, route, router_source, router_confidence, models, + prompt_tokens, cached_tokens, completion_tokens, + token_usd, grounding_usd, router_usd, webtool_usd, total_usd, + latency_ms, stage_ms, escalated, fallback_fired, cache_hit, ceiling_hit, + per_user_cap_hit, prompt_version, provider_request_id, degraded, err, ok, query_text + ) VALUES ( + $1, $2, $3, $4, $5, $6, $7, + $8, $9, $10, + $11, $12, $13, $14, $15, + $16, $17, $18, $19, $20, $21, + $22, $23, $24, $25, $26, $27, $28 + ) ON CONFLICT (id) DO NOTHING`, + rl.ID, rl.RoomID, rl.Sender, rl.Route, rl.RouterSource, rl.RouterConfidence, models, + rl.PromptTokens, rl.CachedTokens, rl.CompletionTokens, + rl.Cost.Token, rl.Cost.Grounding, rl.Cost.Router, rl.Cost.WebTool, rl.Cost.Total(), + rl.LatencyMS, stages, rl.Escalated, rl.FallbackFired, rl.CacheHit, rl.CeilingHit, + rl.PerUserCapHit, rl.PromptVersion, rl.ProviderRequestID, rl.Degraded, rl.Err, rl.OK, queryText) + return err +} + +// TrimRequestLog deletes analytics rows older than the cutoff (time-based, since the +// data is a time series — unlike the count-bounded dedup tables). A no-op for a zero +// cutoff. Cheap given the ts index. +func (s *Store) TrimRequestLog(olderThan time.Time) error { + ctx, cancel := opContext() + defer cancel() + _, err := s.pool.Exec(ctx, `DELETE FROM request_log WHERE ts < $1`, olderThan) + return err +} + +// IncrGroundingIfUnder atomically admits one grounded prompt for today if the day's +// count is below cap, returning whether it was admitted. The check-and-increment is a +// single statement, so concurrent grounding calls can't race past the cap and into the +// per-1k overage (§8.2.3). A non-positive cap denies everything (grounding effectively +// off). The counter is day-keyed and self-resets at UTC midnight. +func (s *Store) IncrGroundingIfUnder(cap int) (bool, error) { + if cap <= 0 { + return false, nil + } + ctx, cancel := opContext() + defer cancel() + var n int + err := s.pool.QueryRow(ctx, ` + INSERT INTO grounding_count (date, n) VALUES ($1, 1) + ON CONFLICT (date) DO UPDATE SET n = grounding_count.n + 1 + WHERE grounding_count.n < $2 + RETURNING n`, todayUTC(), cap).Scan(&n) + if errors.Is(err, pgx.ErrNoRows) { + return false, nil // at/over cap — the conflict update was filtered out + } + if err != nil { + return false, err + } + return true, nil +} + // HasWarnedEncrypted / SetWarnedEncrypted persist the one-shot "reacted 🔒 to this // room because I can't read encryption" flag so a restart doesn't re-react on every // message (F5). The bot never reacts to its own events: m.reaction is not an diff --git a/apps/ai-bot/store_test.go b/apps/ai-bot/store_test.go index a6edc3e9..5253fd3b 100644 --- a/apps/ai-bot/store_test.go +++ b/apps/ai-bot/store_test.go @@ -1,9 +1,11 @@ package main import ( + "fmt" "sync" "sync/atomic" "testing" + "time" ) // These tests exercise the Postgres-backed store directly. They run only when @@ -84,23 +86,23 @@ func TestStoreLimiterPerUserCap(t *testing.T) { const cap, ceiling = 2, 100.0 for i := 0; i < cap; i++ { - if res, err := st.Reserve(user, cap, ceiling); err != nil || res != reserveOK { + if res, err := st.Reserve(user, cap, 0, ceiling, 0); err != nil || res != reserveOK { t.Fatalf("reserve %d: got (%v,%v), want reserveOK", i, res, err) } } // The (cap+1)th request is denied per-user. - if res, err := st.Reserve(user, cap, ceiling); err != nil || res != reserveDeniedUser { + if res, err := st.Reserve(user, cap, 0, ceiling, 0); err != nil || res != reserveDeniedUser { t.Fatalf("over-cap reserve: got (%v,%v), want reserveDeniedUser", res, err) } // A different user is unaffected. - if res, err := st.Reserve("@v:vojo.chat", cap, ceiling); err != nil || res != reserveOK { + if res, err := st.Reserve("@v:vojo.chat", cap, 0, ceiling, 0); err != nil || res != reserveOK { t.Fatalf("other user reserve: got (%v,%v), want reserveOK", res, err) } // Refund returns a slot, so the first user can reserve once more. if err := st.RefundRequest(user); err != nil { t.Fatalf("refund: %v", err) } - if res, err := st.Reserve(user, cap, ceiling); err != nil || res != reserveOK { + if res, err := st.Reserve(user, cap, 0, ceiling, 0); err != nil || res != reserveOK { t.Fatalf("post-refund reserve: got (%v,%v), want reserveOK", res, err) } } @@ -110,7 +112,7 @@ func TestStoreLimiterPerUserCap(t *testing.T) { func TestStoreLimiterZeroCap(t *testing.T) { st := openTestStore(t) defer st.Close() - if res, err := st.Reserve("@u:vojo.chat", 0, 100.0); err != nil || res != reserveDeniedUser { + if res, err := st.Reserve("@u:vojo.chat", 0, 0, 100.0, 0); err != nil || res != reserveDeniedUser { t.Fatalf("zero-cap reserve: got (%v,%v), want reserveDeniedUser", res, err) } } @@ -121,7 +123,7 @@ func TestStoreLimiterZeroCap(t *testing.T) { func TestStoreLimiterZeroCeiling(t *testing.T) { st := openTestStore(t) defer st.Close() - if res, err := st.Reserve("@u:vojo.chat", 1_000_000, 0); err != nil || res != reserveDeniedGlobal { + if res, err := st.Reserve("@u:vojo.chat", 1_000_000, 0, 0, 0); err != nil || res != reserveDeniedGlobal { t.Fatalf("zero-ceiling reserve on empty store: got (%v,%v), want reserveDeniedGlobal", res, err) } } @@ -131,18 +133,18 @@ func TestStoreLimiterGlobalCeiling(t *testing.T) { defer st.Close() const ceiling = 1.0 - // Book spend up to the ceiling (Reconcile is what feeds the global gate). - if err := st.Reconcile("@a:vojo.chat", 0.6); err != nil { - t.Fatalf("reconcile a: %v", err) + // Book spend up to the ceiling (Settle is what feeds the global gate). + if err := st.Settle("@a:vojo.chat", 0, CostBreakdown{Token: 0.6}); err != nil { + t.Fatalf("settle a: %v", err) } - if err := st.Reconcile("@b:vojo.chat", 0.5); err != nil { - t.Fatalf("reconcile b: %v", err) + if err := st.Settle("@b:vojo.chat", 0, CostBreakdown{Token: 0.5}); err != nil { + t.Fatalf("settle b: %v", err) } if spent, err := st.SpentTodayUSD(); err != nil || spent < 1.1 { t.Fatalf("spent today: got (%v,%v), want >= 1.1", spent, err) } // Now any reservation is denied globally, regardless of the per-user cap. - if res, err := st.Reserve("@c:vojo.chat", 1_000_000, ceiling); err != nil || res != reserveDeniedGlobal { + if res, err := st.Reserve("@c:vojo.chat", 1_000_000, 0, ceiling, 0); err != nil || res != reserveDeniedGlobal { t.Fatalf("over-ceiling reserve: got (%v,%v), want reserveDeniedGlobal", res, err) } } @@ -165,7 +167,7 @@ func TestStoreReserveConcurrentRespectsCap(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - res, err := st.Reserve(user, cap, 1e9) + res, err := st.Reserve(user, cap, 0, 1e9, 0) if err != nil { t.Errorf("reserve: %v", err) return @@ -181,6 +183,266 @@ func TestStoreReserveConcurrentRespectsCap(t *testing.T) { } } +// TestStoreReserveConcurrentCeilingBounded is the §8.1 TOCTOU regression. Many +// DIFFERENT users reserving at once against a low ceiling must not overshoot it by +// more than ONE max-reservation. The bare pgx port's per-(date,mxid) lock left the +// cross-user ceiling unprotected: every user read the same committed SUM(usd)=0 (the +// USD only lands at settle, after the call) and slipped through, so all N were +// admitted. The per-day admission lock + reserved_usd here bound the overshoot. +// Run under -race. +func TestStoreReserveConcurrentCeilingBounded(t *testing.T) { + st := openTestStore(t) + defer st.Close() + + const estimate = 1.0 // each in-flight call reserves $1 + const ceiling = 10.0 // so the gate should admit ~10, not 100 + const perUserCap = 1_000_000 // keep the per-user cap out of the way + const goroutines = 100 + + var ok int64 + var wg sync.WaitGroup + for i := 0; i < goroutines; i++ { + wg.Add(1) + go func(n int) { + defer wg.Done() + user := fmt.Sprintf("@u%d:vojo.chat", n) // a DIFFERENT user each time + res, err := st.Reserve(user, perUserCap, 0, ceiling, estimate) + if err != nil { + t.Errorf("reserve: %v", err) + return + } + if res == reserveOK { + atomic.AddInt64(&ok, 1) + } + }(i) + } + wg.Wait() + + // committed+reserved < ceiling admits; the last admit can push reserved to just + // under ceiling+estimate, so admitted ≤ ceiling/estimate + 1. The pre-fix code + // admitted all 100. + maxAdmit := int64(ceiling/estimate) + 1 + if ok < 1 || ok > maxAdmit { + t.Fatalf("admitted %d different users, want in [1, %d] (ceiling + one max-reserve)", ok, maxAdmit) + } + // Nothing was settled, so committed spend is still 0 — the cap came purely from + // reservations, which is the whole point (the USD isn't known until after the call). + if spent, err := st.SpentTodayUSD(); err != nil || spent != 0 { + t.Fatalf("committed spend = (%v,%v), want 0 (only reservations held)", spent, err) + } +} + +// TestStoreSettleReleasesReservation verifies that Settle frees the reservation it +// books actual cost for, restoring global headroom — proven through the admission +// gate so it doesn't depend on reading the private column. +func TestStoreSettleReleasesReservation(t *testing.T) { + st := openTestStore(t) + defer st.Close() + const est = 5.0 + const ceiling = 10.0 + + // Two reservations fill the ceiling (reserved 5 + 5 = 10); the third is denied. + if res, _ := st.Reserve("@a:vojo.chat", 1_000_000, 0, ceiling, est); res != reserveOK { + t.Fatalf("reserve a: %v", res) + } + if res, _ := st.Reserve("@b:vojo.chat", 1_000_000, 0, ceiling, est); res != reserveOK { + t.Fatalf("reserve b: %v", res) + } + if res, _ := st.Reserve("@c:vojo.chat", 1_000_000, 0, ceiling, est); res != reserveDeniedGlobal { + t.Fatalf("reserve c over full ceiling: got %v, want denied", res) + } + // Settle a with a small actual cost: reserved 10→5, committed 0→0.01. Headroom + // returns, so a new reservation is admitted again. + if err := st.Settle("@a:vojo.chat", est, CostBreakdown{Token: 0.01}); err != nil { + t.Fatalf("settle a: %v", err) + } + if res, _ := st.Reserve("@d:vojo.chat", 1_000_000, 0, ceiling, est); res != reserveOK { + t.Fatalf("reserve d after settle freed headroom: got %v, want reserveOK", res) + } + if spent, _ := st.SpentTodayUSD(); spent < 0.009 || spent > 0.011 { + t.Fatalf("committed after one settle = %v, want ~0.01", spent) + } +} + +// TestStoreReleaseReservation verifies the call-failed path: a released reservation +// frees headroom and books no USD, and an over-release clamps reserved_usd to 0 +// rather than going negative (a negative reservation would manufacture phantom +// headroom past the ceiling). +func TestStoreReleaseReservation(t *testing.T) { + st := openTestStore(t) + defer st.Close() + const est = 5.0 + const ceiling = 10.0 + + // Reserve a, then over-release it by far more than it held. + if res, _ := st.Reserve("@a:vojo.chat", 1_000_000, 0, ceiling, est); res != reserveOK { + t.Fatalf("reserve a: %v", res) + } + if err := st.ReleaseReservation("@a:vojo.chat", 100); err != nil { + t.Fatalf("over-release: %v", err) + } + // a's reserved must now be 0 (not -95): exactly two more $5 reservations fit the + // $10 ceiling, and the third is denied. Were reserved negative, far more would slip + // through — so the deny at the third request proves both the headroom was freed and + // the clamp held. + if res, _ := st.Reserve("@b:vojo.chat", 1_000_000, 0, ceiling, est); res != reserveOK { + t.Fatalf("reserve b: %v", res) + } + if res, _ := st.Reserve("@c:vojo.chat", 1_000_000, 0, ceiling, est); res != reserveOK { + t.Fatalf("reserve c: %v", res) + } + if res, _ := st.Reserve("@d:vojo.chat", 1_000_000, 0, ceiling, est); res != reserveDeniedGlobal { + t.Fatalf("reserve d: got %v, want denied (reserved must have clamped to 0, not gone negative)", res) + } + // Nothing was ever settled, so committed spend stays 0 — release books no USD. + if spent, _ := st.SpentTodayUSD(); spent != 0 { + t.Fatalf("committed after release = %v, want 0 (a failed call bills nothing)", spent) + } +} + +// TestStoreRequestLog covers the analytics row: total_usd is the component sum, +// query_text is NULL unless captured, re-inserting one id is a no-op, and the +// time-based trim removes old rows. +func TestStoreRequestLog(t *testing.T) { + st := openTestStore(t) + defer st.Close() + + noText := RequestLog{ + ID: "$ev-rl-1", RoomID: "!r:vojo.chat", Sender: "@u:vojo.chat", + Route: routeGrokDirect, RouterSource: "default", + Models: map[string]string{"final": "grok-x"}, + Cost: CostBreakdown{Token: 0.01, Grounding: 0.02}, + LatencyMS: 1234, StageMS: map[string]int{"final": 1200}, + ProviderRequestID: "prov-1", OK: true, // QueryText empty → NULL + } + if err := st.InsertRequestLog(noText); err != nil { + t.Fatalf("insert: %v", err) + } + // Re-inserting the same id is a no-op (ON CONFLICT DO NOTHING), not an error. + if err := st.InsertRequestLog(noText); err != nil { + t.Fatalf("re-insert: %v", err) + } + withText := RequestLog{ID: "$ev-rl-2", Route: routeTrivial, OK: false, QueryText: "hello"} + if err := st.InsertRequestLog(withText); err != nil { + t.Fatalf("insert-with-text: %v", err) + } + + ctx, cancel := opContext() + defer cancel() + var route string + var total float64 + var ok bool + var qt *string + if err := st.pool.QueryRow(ctx, + `SELECT route, total_usd, ok, query_text FROM request_log WHERE id = $1`, noText.ID). + Scan(&route, &total, &ok, &qt); err != nil { + t.Fatalf("read row1: %v", err) + } + if route != routeGrokDirect || !ok { + t.Fatalf("row1 = (%q, ok=%v), want (grok_direct, true)", route, ok) + } + if d := total - 0.03; d > 1e-9 || d < -1e-9 { + t.Fatalf("row1 total_usd = %v, want 0.03 (token+grounding)", total) + } + if qt != nil { + t.Fatalf("row1 query_text = %q, want NULL when text capture off", *qt) + } + if err := st.pool.QueryRow(ctx, `SELECT query_text FROM request_log WHERE id = $1`, withText.ID).Scan(&qt); err != nil { + t.Fatalf("read row2: %v", err) + } + if qt == nil || *qt != "hello" { + t.Fatalf("row2 query_text = %v, want \"hello\"", qt) + } + + // Trim everything older than one hour from now → both rows (ts= 300 { + return WebContext{}, fmt.Errorf("grok web search http %d: %s", resp.StatusCode, snippet(data)) + } + var out grokResponsesResponse + if err := json.Unmarshal(data, &out); err != nil { + return WebContext{}, fmt.Errorf("grok web search decode: %w", err) + } + + var digest string + var citations []string + for _, item := range out.Output { + if item.Type != "message" { + continue + } + for _, c := range item.Content { + if c.Type == "output_text" { + digest += c.Text + } + for _, a := range c.Annotations { + if a.Type == "url_citation" && a.URL != "" { + citations = append(citations, a.URL) + } + } + } + } + usage := Usage{ + PromptTokens: out.Usage.InputTokens, + CachedTokens: out.Usage.InputTokensDetails.CachedTokens, + CompletionTokens: out.Usage.OutputTokens, + } + // Cost = the call's tokens + the $5/1k fee times the ACTUAL number of web_search + // calls the request made (one request can search several times). Booked even when the + // digest is empty (the 2xx was billed), so the caller accounts for it before degrading. + // Cross-checked live against the API's own cost_in_usd_ticks — matched to 4 dp. + wc := WebContext{ + Digest: digest, + Citations: citations, + Usage: usage, + Cost: CostBreakdown{ + WebTool: computeUSD(p.model, usage, p.cfg) + + float64(out.Usage.ServerSideToolUsageDetails.WebSearchCalls)*grokWebSearchPerCall, + }, + } + if digest == "" { + return wc, fmt.Errorf("grok web search: empty result") + } + return wc, nil +} + +// --- gemini_grounding (Gemini-3 native only) -------------------------------------- + +type geminiGrounding struct { + gem *geminiClient + st *Store + cfg *Config +} + +func (p *geminiGrounding) Fetch(ctx context.Context, query string) (WebContext, error) { + // Durable, atomic daily cap FIRST: a grounded prompt is billed whether or not it + // grounds, and the per-prompt overage ($35/1k on 2.5) is the cost this guard exists + // to bound. Admit against the cap before spending. (grok_web_search needs no such + // cap — its $5/1k per-call fee is fully reserved per request and bounded by the + // per-user request cap + global ceiling.) + if ok, err := p.st.IncrGroundingIfUnder(p.cfg.WebGroundingDailyCap); err != nil { + return WebContext{}, err + } else if !ok { + return WebContext{}, errGroundingCapped + } + res, err := p.gem.groundedSearch(ctx, query) // errors (incl. no-citations) → caller degrades + cost := CostBreakdown{Grounding: computeUSD(p.cfg.GeminiModel, res.Usage, p.cfg)} + if err != nil { + return WebContext{Cost: cost, Usage: res.Usage}, err + } + return WebContext{Digest: res.Digest, Citations: res.Citations, Usage: res.Usage, Cost: cost}, nil +} diff --git a/apps/ai-bot/xai.go b/apps/ai-bot/xai.go deleted file mode 100644 index c55f832b..00000000 --- a/apps/ai-bot/xai.go +++ /dev/null @@ -1,171 +0,0 @@ -package main - -import ( - "bytes" - "context" - "encoding/json" - "fmt" - "io" - "log/slog" - "math/rand" - "net/http" - "time" -) - -// XAIClient talks the OpenAI-compatible Chat Completions endpoint at -// {base}/chat/completions with a Bearer key. -type XAIClient struct { - base string - key string - http *http.Client - maxTry int - log *slog.Logger -} - -func NewXAIClient(base, key string, logger *slog.Logger) *XAIClient { - return &XAIClient{ - base: base, - key: key, - http: &http.Client{}, - maxTry: 3, - log: logger, - } -} - -type xaiMessage struct { - Role string `json:"role"` - Content string `json:"content"` -} - -type xaiRequest struct { - Model string `json:"model"` - Messages []xaiMessage `json:"messages"` - MaxTokens int `json:"max_tokens"` - Temperature float64 `json:"temperature"` - Stream bool `json:"stream"` -} - -type xaiUsage struct { - PromptTokens int `json:"prompt_tokens"` - CompletionTokens int `json:"completion_tokens"` - PromptTokensDetails struct { - CachedTokens int `json:"cached_tokens"` - } `json:"prompt_tokens_details"` -} - -type xaiResponse struct { - Choices []struct { - Message struct { - Content string `json:"content"` - } `json:"message"` - FinishReason string `json:"finish_reason"` - } `json:"choices"` - Usage xaiUsage `json:"usage"` -} - -func (r *xaiResponse) Text() string { - if len(r.Choices) == 0 { - return "" - } - return r.Choices[0].Message.Content -} - -// Complete calls Chat Completions with retry on transient failures (429 / 5xx / -// network timeout, exponential backoff + jitter). Non-retryable 4xx fail -// immediately. On exhaustion the caller refunds the reserved request and notifies -// the user, so a transient failure is never silently swallowed (F6). -func (x *XAIClient) Complete(ctx context.Context, model string, msgs []xaiMessage, maxTokens int, temp float64) (*xaiResponse, error) { - reqBody := xaiRequest{ - Model: model, - Messages: msgs, - MaxTokens: maxTokens, - Temperature: temp, - Stream: false, - } - payload, err := json.Marshal(reqBody) - if err != nil { - return nil, err - } - - var lastErr error - for attempt := 0; attempt < x.maxTry; attempt++ { - if attempt > 0 { - // 0.5s, 1s, 2s … capped at 8s, plus up to 250ms jitter. - backoff := time.Duration(500< 8*time.Second { - backoff = 8 * time.Second - } - backoff += time.Duration(rand.Intn(250)) * time.Millisecond - select { - case <-ctx.Done(): - return nil, ctx.Err() - case <-time.After(backoff): - } - } - - resp, retryable, err := x.attempt(ctx, payload) - if err == nil { - return resp, nil - } - lastErr = err - if ctx.Err() != nil { - return nil, ctx.Err() - } - if !retryable { - return nil, err - } - if x.log != nil { - x.log.Warn("xai attempt failed, will retry", "attempt", attempt+1, "max", x.maxTry, "err", err) - } - } - return nil, fmt.Errorf("xai: exhausted %d attempts: %w", x.maxTry, lastErr) -} - -// attempt performs one HTTP call. It returns retryable=true for 429/5xx and -// network errors, false for other non-2xx (terminal 4xx). -func (x *XAIClient) attempt(ctx context.Context, payload []byte) (*xaiResponse, bool, error) { - // Per-attempt deadline so a hung connection doesn't block the whole loop. - attemptCtx, cancel := context.WithTimeout(ctx, 60*time.Second) - defer cancel() - - req, err := http.NewRequestWithContext(attemptCtx, http.MethodPost, x.base+"/chat/completions", bytes.NewReader(payload)) - if err != nil { - return nil, false, err - } - req.Header.Set("Content-Type", "application/json") - req.Header.Set("Authorization", "Bearer "+x.key) - - resp, err := x.http.Do(req) - if err != nil { - // Network error / timeout — retryable (unless the parent ctx is done). - return nil, ctx.Err() == nil, err - } - defer resp.Body.Close() - data, _ := io.ReadAll(resp.Body) - - if resp.StatusCode == http.StatusTooManyRequests || resp.StatusCode >= 500 { - return nil, true, fmt.Errorf("xai http %d: %s", resp.StatusCode, snippet(data)) - } - if resp.StatusCode < 200 || resp.StatusCode >= 300 { - return nil, false, fmt.Errorf("xai http %d: %s", resp.StatusCode, snippet(data)) - } - - var out xaiResponse - if err := json.Unmarshal(data, &out); err != nil { - return nil, false, fmt.Errorf("xai decode: %w", err) - } - // A 2xx is a billed call even when the model returns empty content (content - // filter, finish_reason=length with no text, or no choices). Return it as a - // success so the caller books the real cost via Reconcile instead of refunding - // the slot and losing the spend — which would let empty replies bypass BOTH the - // per-user cap and the global ceiling. The caller just won't send an empty body. - return &out, false, nil -} - -func snippet(b []byte) string { - const max = 300 - if len(b) > max { - return string(b[:max]) + "…" - } - return string(b) -}