vojo/apps/ai-bot/cascade.go

392 lines
18 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package main
import (
"context"
"errors"
"fmt"
"strings"
"time"
)
// cascade.go is the generation half of the bot: given an admitted request, it routes
// (router.go), runs the chosen route's provider(s), and ALWAYS degrades to grok_direct
// on any layer being off or failing (§8.2). It returns a genResult the business logic
// (respond) settles, sends, and logs — keeping ledger/never-silent/telemetry in one
// place and the routing here. With every cascade flag off, classify returns grok_direct
// and this collapses to exactly today's single Grok call.
// genResult is everything respond needs from a generation: the answer, the model's
// usage (for token billing), the FULL cost breakdown (router + web + final), and the
// routing metadata for telemetry. cost accumulates across stages, so a partial cascade
// (a paid web fetch that then degraded) still books what it actually spent.
type genResult struct {
text string
usage Usage
cost CostBreakdown
finalModel string
providerID string
decision RouterDecision
route string // the route actually taken (may differ from decision on degrade)
fallback bool // true if we degraded off the decided route
degraded string // degrade reason for request_log
stageMS map[string]int
// Web-route outcome (for request_log §8): the resolved query actually sent to Fetch,
// whether the context-resolved rewrite was used (vs the bare body), and whether the
// fetch came back grounded with citations (a zero-citation synth is a silent false-web).
searchQuery string
rewriteUsed bool
webGrounded bool
citationCount int
}
func msSince(t time.Time) int { return int(time.Since(t).Milliseconds()) }
// reserveEstimate is the admission envelope: the most expensive ENABLED route's cost,
// so whichever route the router picks is covered by the reservation (the ceiling can't
// be slipped by routing to a pricier path after admission). With every cascade flag
// off it equals grok_direct's estimate — byte-for-byte today's reservation. Slightly
// generous is fine: Settle books the authoritative actual afterward.
func (b *Bot) reserveEstimate() float64 {
est := b.estimateUSD(b.cfg.XAIModel) // grok_direct / trivial(cheaper)/synthesis base
if b.cfg.WebEnabled {
// web_then_grok = a web fetch fee + the Grok synthesis already counted above.
if b.cfg.WebProvider == webProviderGrokWebSearch {
// fetch can search several times and pull large context; reserve generously.
est += float64(maxWebSearchCalls)*grokWebSearchPerCall + b.estimateUSD(b.cfg.XAIModel)
} else {
// gemini grounding: the fetch's tokens PLUS the per-grounded-prompt fee (§7
// SG2), so the admission envelope is a true upper bound once the fee is booked.
est += b.estimateUSD(b.cfg.GeminiModel) + b.cfg.GeminiGroundingPerPrompt
}
}
if b.cfg.ReasoningEnabled {
// Higher reasoning effort can burn more output tokens; reserve double.
est = max(est, 2*b.estimateUSD(b.cfg.ReasoningModel))
}
// The always-on Layer-1 classifier leg (§7 Finding 4): a cheap Gemini call on every
// message when the classifier is enabled, so reserved ≥ actual stays true. Added after
// the max() so it is never swallowed by the reasoning branch.
if b.cfg.RouterClassifierEnabled {
est += b.estimateUSD(b.cfg.GeminiModel)
}
return est
}
// generate routes and produces an answer, degrading to grok_direct on any failure.
// It returns a terminal error ONLY if even grok_direct fails; every other route falls
// through to grok_direct rather than erroring.
func (b *Bot) generate(ctx context.Context, body string, msgs []Message, convID string, isDM bool) (genResult, error) {
res := genResult{stageMS: map[string]int{}, finalModel: b.cfg.XAIModel}
// The privacy-minimised conversation window for the classifier + follow-up rewrite.
// DM-resolved (last ≤2 turns); bare trigger in groups (no cross-member subject bleed).
rcx := routerContext(msgs, isDM)
t0 := time.Now()
res.decision = b.classify(ctx, body, rcx, &res.cost) // accumulates cost.Router if Layer-1 runs
res.stageMS["router"] = msSince(t0)
res.route = res.decision.Route
// The router's pre-dispatch verdict (what it chose, why, how sure). On a degrade the
// route that actually runs differs from this — respond logs that final outcome — so
// the two lines together show "router wanted X, we ran Y". DEBUG: routing diagnostics,
// content-free (the resolved search_query is NOT logged here — it's a gated path, §8).
b.log.DebugContext(ctx, "route decided",
"route", res.decision.Route, "source", res.decision.Source,
"confidence", res.decision.Confidence, "needs_web", res.decision.NeedsWeb,
"web_decided_by", res.decision.WebDecidedBy, "verifiable", res.decision.Verifiable,
"entity_obscure", res.decision.EntityObscure, "time_sensitive", res.decision.TimeSensitive,
"trivial", res.decision.TrivialScore, "lookup_hint", res.decision.LookupHint,
"reasoning_level", res.decision.ReasoningLevel)
finalMsgs := msgs
switch res.decision.Route {
case routeTrivial:
if b.cfg.TrivialOffloadEnabled && b.gemini != nil {
if err := b.genTrivial(ctx, msgs, &res); err == nil {
return res, nil
} else {
b.log.WarnContext(ctx, "trivial offload failed; degrading to grok_direct", "err", err)
b.degradeTo(&res, degradeTrivial)
}
}
case routeWebThenGrok:
if b.cfg.WebEnabled && b.web != nil {
if err := b.genWebThenGrok(ctx, body, isDM, msgs, convID, &res); err == nil {
return res, nil
} else {
b.log.WarnContext(ctx, "web route failed; degrading to grok_direct", "err", err, "reason", res.degraded)
b.degradeTo(&res, degradeWeb)
// We have no fresh facts. For a RECENCY miss, hedge with an honest staleness
// caveat (§8.2.1). For a STATIC verifiable-fact miss (a film cast, a date),
// the staleness caveat is wrong — a stale caveat on a wrong cast still ships
// the wrong cast — so instruct Grok to ABSTAIN on specific names/dates/numbers
// instead of emitting a confident guess (§4.4).
if res.decision.factualMiss() {
finalMsgs = factualAbstainMessages(msgs)
} else {
finalMsgs = hedgeMessages(msgs)
}
}
}
case routeReason:
if b.cfg.ReasoningEnabled {
if err := b.genReason(ctx, msgs, convID, &res); err == nil {
return res, nil
} else {
b.log.WarnContext(ctx, "reasoning route failed; degrading to grok_direct", "err", err)
b.degradeTo(&res, degradeReasoning)
}
}
}
// grok_direct — the default route AND the universal fallback. The only path that
// can return a terminal error (even Grok failed). It preserves any cost already
// spent (router classifier, a partial web fetch) in res.cost.
if err := b.genGrokDirect(ctx, finalMsgs, convID, &res); err != nil {
return res, err
}
return res, nil
}
// degradeTo marks res as a fallback to grok_direct, keeping the first/most-specific
// degrade reason (e.g. a web provider's grounding_cap set inside genWebThenGrok).
func (b *Bot) degradeTo(res *genResult, reason string) {
res.fallback = true
if res.degraded == "" {
res.degraded = reason
}
}
// genGrokDirect is today's path: one Grok call. Also the fallback for every other
// route. On success it fills res (route, final model, text, usage, provider id) and
// adds the token cost.
func (b *Bot) genGrokDirect(ctx context.Context, msgs []Message, convID string, res *genResult) error {
t := time.Now()
resp, err := b.llm.Complete(ctx, LLMRequest{
Model: b.cfg.XAIModel,
Messages: msgs,
MaxTokens: b.cfg.MaxOutTok,
Temperature: b.cfg.XAITemp,
ConvID: convID,
ReasoningEffort: b.cfg.GrokReasoningEffort, // "" → not sent; "none" keeps grok-4.3 fast
})
res.stageMS["final"] = msSince(t)
if err != nil {
return err
}
res.route, res.finalModel = routeGrokDirect, b.cfg.XAIModel
res.text, res.usage, res.providerID = resp.Text, resp.Usage, resp.ProviderRequestID
res.cost.Token += computeUSD(b.cfg.XAIModel, resp.Usage, b.cfg)
return nil
}
// genTrivial answers a trivial message with the cheap Gemini model. An empty reply is
// treated as a failure so the caller degrades to Grok rather than sending nothing.
func (b *Bot) genTrivial(ctx context.Context, msgs []Message, res *genResult) error {
t := time.Now()
resp, err := b.gemini.Complete(ctx, LLMRequest{
Model: b.cfg.GeminiModel,
Messages: msgs,
MaxTokens: b.cfg.MaxOutTok,
Temperature: b.cfg.XAITemp,
})
res.stageMS["final"] = msSince(t)
if err != nil {
return err
}
if strings.TrimSpace(resp.Text) == "" {
return fmt.Errorf("trivial: empty Gemini reply")
}
res.route, res.finalModel = routeTrivial, b.cfg.GeminiModel
res.text, res.usage, res.providerID = resp.Text, resp.Usage, resp.ProviderRequestID
res.cost.Token += computeUSD(b.cfg.GeminiModel, resp.Usage, b.cfg)
return nil
}
// genReason answers with Grok at a higher reasoning effort. Uses the configured
// reasoning-capable model (the default grok-4.20-non-reasoning would reject the param).
func (b *Bot) genReason(ctx context.Context, msgs []Message, convID string, res *genResult) error {
t := time.Now()
resp, err := b.llm.Complete(ctx, LLMRequest{
Model: b.cfg.ReasoningModel,
Messages: msgs,
MaxTokens: b.cfg.MaxOutTok,
Temperature: b.cfg.XAITemp,
ReasoningEffort: b.cfg.ReasoningEffort, // "think harder" level (default high)
ConvID: convID,
})
res.stageMS["final"] = msSince(t)
if err != nil {
return err
}
if strings.TrimSpace(resp.Text) == "" {
return fmt.Errorf("reason: empty reply")
}
res.route, res.finalModel = routeReason, b.cfg.ReasoningModel
res.text, res.usage, res.providerID = resp.Text, resp.Usage, resp.ProviderRequestID
res.cost.Token += computeUSD(b.cfg.ReasoningModel, resp.Usage, b.cfg)
return nil
}
// webStageTimeout bounds the web/grounding fetch independently of the overall budget
// (§8.2.2): a slow search must not eat the whole request before synthesis.
const webStageTimeout = 15 * time.Second
// genWebThenGrok fetches fresh facts via the web provider, then has Grok synthesise the
// answer in voice from that digest. The web fetch's cost+tokens are booked into res
// EVEN ON FAILURE — the call was billed — so a synth failure or empty fetch still
// accounts for the spend before the caller degrades to grok_direct (the partial cascade
// case, §8.1). The daily cap and per-stage deadline are applied here, uniformly for both
// providers.
func (b *Bot) genWebThenGrok(ctx context.Context, body string, isDM bool, msgs []Message, convID string, res *genResult) error {
// DM-gated rewrite-with-fallback (§6): use the classifier's self-contained,
// follow-up-resolved query, but ONLY in a DM (a group buffer interleaves members'
// topics) and only when it's present and not over-long; otherwise the bare body — so
// the fetch is never worse than today. Sanitise before egress (it is model-authored
// text going to an external search API): collapse control chars/whitespace, cap length.
q := body
if isDM {
if sq := strings.TrimSpace(res.decision.SearchQuery); sq != "" && len([]rune(sq)) <= 200 {
q, res.rewriteUsed = sq, true
}
}
q = sanitizeSearchQuery(q)
if q == "" {
q, res.rewriteUsed = sanitizeSearchQuery(body), false // never send an empty query
}
res.searchQuery = q
// Per-stage web/grounding deadline, independent of the overall budget.
wctx, cancelW := context.WithTimeout(ctx, webStageTimeout)
tw := time.Now()
wc, ferr := b.web.Fetch(wctx, q)
cancelW()
res.stageMS["web"] = msSince(tw)
// Book the fetch's fee + tokens whether or not it produced a usable digest — the call
// was billed (the daily cap, if any, is enforced inside the provider). GroundingFee is
// the per-grounded-prompt overage (§7 SG1), booked even on the error return.
res.cost.Grounding += wc.Cost.Grounding
res.cost.GroundingFee += wc.Cost.GroundingFee
res.cost.WebTool += wc.Cost.WebTool
res.citationCount = len(wc.Citations)
res.webGrounded = len(wc.Citations) > 0
webUsage := wc.Usage
if ferr != nil {
if errors.Is(ferr, errGroundingCapped) {
res.degraded = degradeGroundCap
}
return ferr // web fee already booked; caller degrades to grok_direct (with hedge)
}
// A non-empty digest with NO citations is a silent false-web (the answer is synthesised
// from an ungrounded fetch). gemini_grounding errors out before here; grok_web_search
// can reach this — surface it at WARN so it's visible at the default level (§8).
if len(wc.Citations) == 0 {
b.log.WarnContext(ctx, "web no-citation synth (ungrounded digest)", "provider", b.cfg.WebProvider)
}
tf := time.Now()
resp, err := b.llm.Complete(ctx, LLMRequest{
Model: b.cfg.XAIModel,
Messages: webSynthMessages(msgs, wc),
MaxTokens: b.cfg.MaxOutTok,
Temperature: b.cfg.XAITemp,
ConvID: convID,
ReasoningEffort: b.cfg.GrokReasoningEffort, // same voice, same effort as grok_direct
})
res.stageMS["final"] = msSince(tf)
if err != nil {
return err
}
if strings.TrimSpace(resp.Text) == "" {
return fmt.Errorf("web synth: empty reply")
}
res.route, res.finalModel = routeWebThenGrok, b.cfg.XAIModel
res.text, res.providerID = resp.Text, resp.ProviderRequestID
// Report BOTH calls' tokens so the analytics token totals match the two-call route.
res.usage = Usage{
PromptTokens: resp.Usage.PromptTokens + webUsage.PromptTokens,
CachedTokens: resp.Usage.CachedTokens + webUsage.CachedTokens,
CompletionTokens: resp.Usage.CompletionTokens + webUsage.CompletionTokens,
}
res.cost.Token += computeUSD(b.cfg.XAIModel, resp.Usage, b.cfg)
return nil
}
// webSynthMessages inserts the fresh web digest as a system note just after the system
// prompt, so Grok answers in voice using current facts. It deliberately does NOT pass the
// raw citation URLs into the prompt, nor ask Grok to "cite sources": gemini grounding
// returns opaque vertexaisearch.../grounding-api-redirect/... redirect links (not publisher
// URLs), and instructing Grok to cite made it paste those ugly redirects verbatim into the
// reply and mis-attribute them ("ссылок из твоего сообщения"). The grounding already
// happened (citation_count is recorded for telemetry); the user wants the answer, not
// Google's internal redirect links. Real source attribution (resolving redirects to
// domains) is a separate, deferred feature.
func webSynthMessages(base []Message, wc WebContext) []Message {
facts := "Свежие данные из веба — ответь на их основе, кратко и по делу, без URL и ссылок:\n" + wc.Digest
return insertSystemNote(base, facts)
}
// hedgeMessages adds an honest staleness caveat for a web→grok_direct degrade on a
// RECENCY query: the user wanted fresh facts but we couldn't fetch them, so the model
// must flag that its answer is from training knowledge and may be out of date.
func hedgeMessages(base []Message) []Message {
return insertSystemNote(base, "Нет доступа к свежим источникам прямо сейчас — отвечай по знаниям на момент обучения и честно предупреди, что данные могут быть устаревшими.")
}
// factualAbstainMessages is the degrade hedge for a STATIC verifiable-fact miss (§4.4):
// a staleness caveat is wrong here (the fact isn't stale, it's checkable and the model
// may simply not know it), so instruct Grok to ABSTAIN on specific names/dates/numbers
// rather than ship a confident guess — the exact failure (the hallucinated film cast)
// this redesign exists to stop.
func factualAbstainMessages(base []Message) []Message {
return insertSystemNote(base, "Не удалось проверить факты через веб. Если ответ зависит от конкретных имён, дат, годов, чисел или состава — честно скажи, что не уверен в точной фактуре и можешь ошибаться; НЕ выдавай догадку за факт.")
}
// factualMiss reports whether a web degrade should use the abstain hedge (a static
// checkable-fact question) rather than the staleness hedge (a recency question). A
// recency signal (freshnessRe or the classifier's time_sensitive) always means
// staleness; otherwise a verifiable / obscure-entity question — OR any non-recency
// needs_web verdict (so an off-spec needs_web-only verdict still abstains rather than
// emit a confident guess) — means abstain.
func (d RouterDecision) factualMiss() bool {
if d.Freshness != "" || d.TimeSensitive {
return false
}
return d.Verifiable || d.EntityObscure || d.NeedsWeb
}
// sanitizeSearchQuery prepares a (possibly model-authored) query for egress to an
// external search API: collapse newlines/control chars/runs of whitespace to single
// spaces and cap the rune length. Never trusts the model to have produced clean,
// bounded text.
func sanitizeSearchQuery(q string) string {
q = strings.Map(func(r rune) rune {
if r == '\n' || r == '\r' || r == '\t' {
return ' '
}
if r < 0x20 || r == 0x7f {
return -1 // drop other control chars
}
return r
}, q)
q = strings.Join(strings.Fields(q), " ") // collapse whitespace runs
if r := []rune(q); len(r) > 200 {
q = strings.TrimSpace(string(r[:200]))
}
return q
}
// insertSystemNote inserts an extra system message right after the system prompt
// (base[0] from buildContext), preserving the rest of the window.
func insertSystemNote(base []Message, content string) []Message {
note := Message{Role: "system", Content: content}
if len(base) == 0 {
return []Message{note}
}
out := make([]Message, 0, len(base)+1)
out = append(out, base[0], note)
out = append(out, base[1:]...)
return out
}