vojo/apps/ai-bot/cascade.go

287 lines
12 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package main
import (
"context"
"errors"
"fmt"
"strings"
"time"
)
// cascade.go is the generation half of the bot: given an admitted request, it routes
// (router.go), runs the chosen route's provider(s), and ALWAYS degrades to grok_direct
// on any layer being off or failing (§8.2). It returns a genResult the business logic
// (respond) settles, sends, and logs — keeping ledger/never-silent/telemetry in one
// place and the routing here. With every cascade flag off, classify returns grok_direct
// and this collapses to exactly today's single Grok call.
// genResult is everything respond needs from a generation: the answer, the model's
// usage (for token billing), the FULL cost breakdown (router + web + final), and the
// routing metadata for telemetry. cost accumulates across stages, so a partial cascade
// (a paid web fetch that then degraded) still books what it actually spent.
type genResult struct {
text string
usage Usage
cost CostBreakdown
finalModel string
providerID string
decision RouterDecision
route string // the route actually taken (may differ from decision on degrade)
fallback bool // true if we degraded off the decided route
degraded string // degrade reason for request_log
stageMS map[string]int
}
func msSince(t time.Time) int { return int(time.Since(t).Milliseconds()) }
// reserveEstimate is the admission envelope: the most expensive ENABLED route's cost,
// so whichever route the router picks is covered by the reservation (the ceiling can't
// be slipped by routing to a pricier path after admission). With every cascade flag
// off it equals grok_direct's estimate — byte-for-byte today's reservation. Slightly
// generous is fine: Settle books the authoritative actual afterward.
func (b *Bot) reserveEstimate() float64 {
est := b.estimateUSD(b.cfg.XAIModel) // grok_direct / trivial(cheaper)/synthesis base
if b.cfg.WebEnabled {
// web_then_grok = a web fetch fee + the Grok synthesis already counted above.
if b.cfg.WebProvider == webProviderGrokWebSearch {
// fetch can search several times and pull large context; reserve generously.
est += float64(maxWebSearchCalls)*grokWebSearchPerCall + b.estimateUSD(b.cfg.XAIModel)
} else {
est += b.estimateUSD(b.cfg.GeminiModel)
}
}
if b.cfg.ReasoningEnabled {
// Higher reasoning effort can burn more output tokens; reserve double.
est = max(est, 2*b.estimateUSD(b.cfg.ReasoningModel))
}
return est
}
// generate routes and produces an answer, degrading to grok_direct on any failure.
// It returns a terminal error ONLY if even grok_direct fails; every other route falls
// through to grok_direct rather than erroring.
func (b *Bot) generate(ctx context.Context, body string, msgs []Message, convID string) (genResult, error) {
res := genResult{stageMS: map[string]int{}, finalModel: b.cfg.XAIModel}
t0 := time.Now()
res.decision = b.classify(ctx, body, &res.cost) // accumulates cost.Router if Layer-1 runs
res.stageMS["router"] = msSince(t0)
res.route = res.decision.Route
// The router's pre-dispatch verdict (what it chose, why, how sure). On a degrade the
// route that actually runs differs from this — respond logs that final outcome — so
// the two lines together show "router wanted X, we ran Y". DEBUG: routing diagnostics.
b.log.DebugContext(ctx, "route decided",
"route", res.decision.Route, "source", res.decision.Source,
"confidence", res.decision.Confidence, "needs_web", res.decision.NeedsWeb,
"reasoning_level", res.decision.ReasoningLevel)
finalMsgs := msgs
switch res.decision.Route {
case routeTrivial:
if b.cfg.TrivialOffloadEnabled && b.gemini != nil {
if err := b.genTrivial(ctx, msgs, &res); err == nil {
return res, nil
} else {
b.log.WarnContext(ctx, "trivial offload failed; degrading to grok_direct", "err", err)
b.degradeTo(&res, degradeTrivial)
}
}
case routeWebThenGrok:
if b.cfg.WebEnabled && b.web != nil {
if err := b.genWebThenGrok(ctx, body, msgs, convID, &res); err == nil {
return res, nil
} else {
b.log.WarnContext(ctx, "web route failed; degrading to grok_direct", "err", err, "reason", res.degraded)
b.degradeTo(&res, degradeWeb)
// The question wanted fresh facts but we have none — answer from training
// knowledge WITH an honest staleness caveat, not stale-as-current (§8.2.1).
finalMsgs = hedgeMessages(msgs)
}
}
case routeReason:
if b.cfg.ReasoningEnabled {
if err := b.genReason(ctx, msgs, convID, &res); err == nil {
return res, nil
} else {
b.log.WarnContext(ctx, "reasoning route failed; degrading to grok_direct", "err", err)
b.degradeTo(&res, degradeReasoning)
}
}
}
// grok_direct — the default route AND the universal fallback. The only path that
// can return a terminal error (even Grok failed). It preserves any cost already
// spent (router classifier, a partial web fetch) in res.cost.
if err := b.genGrokDirect(ctx, finalMsgs, convID, &res); err != nil {
return res, err
}
return res, nil
}
// degradeTo marks res as a fallback to grok_direct, keeping the first/most-specific
// degrade reason (e.g. a web provider's grounding_cap set inside genWebThenGrok).
func (b *Bot) degradeTo(res *genResult, reason string) {
res.fallback = true
if res.degraded == "" {
res.degraded = reason
}
}
// genGrokDirect is today's path: one Grok call. Also the fallback for every other
// route. On success it fills res (route, final model, text, usage, provider id) and
// adds the token cost.
func (b *Bot) genGrokDirect(ctx context.Context, msgs []Message, convID string, res *genResult) error {
t := time.Now()
resp, err := b.llm.Complete(ctx, LLMRequest{
Model: b.cfg.XAIModel,
Messages: msgs,
MaxTokens: b.cfg.MaxOutTok,
Temperature: b.cfg.XAITemp,
ConvID: convID,
ReasoningEffort: b.cfg.GrokReasoningEffort, // "" → not sent; "none" keeps grok-4.3 fast
})
res.stageMS["final"] = msSince(t)
if err != nil {
return err
}
res.route, res.finalModel = routeGrokDirect, b.cfg.XAIModel
res.text, res.usage, res.providerID = resp.Text, resp.Usage, resp.ProviderRequestID
res.cost.Token += computeUSD(b.cfg.XAIModel, resp.Usage, b.cfg)
return nil
}
// genTrivial answers a trivial message with the cheap Gemini model. An empty reply is
// treated as a failure so the caller degrades to Grok rather than sending nothing.
func (b *Bot) genTrivial(ctx context.Context, msgs []Message, res *genResult) error {
t := time.Now()
resp, err := b.gemini.Complete(ctx, LLMRequest{
Model: b.cfg.GeminiModel,
Messages: msgs,
MaxTokens: b.cfg.MaxOutTok,
Temperature: b.cfg.XAITemp,
})
res.stageMS["final"] = msSince(t)
if err != nil {
return err
}
if strings.TrimSpace(resp.Text) == "" {
return fmt.Errorf("trivial: empty Gemini reply")
}
res.route, res.finalModel = routeTrivial, b.cfg.GeminiModel
res.text, res.usage, res.providerID = resp.Text, resp.Usage, resp.ProviderRequestID
res.cost.Token += computeUSD(b.cfg.GeminiModel, resp.Usage, b.cfg)
return nil
}
// genReason answers with Grok at a higher reasoning effort. Uses the configured
// reasoning-capable model (the default grok-4.20-non-reasoning would reject the param).
func (b *Bot) genReason(ctx context.Context, msgs []Message, convID string, res *genResult) error {
t := time.Now()
resp, err := b.llm.Complete(ctx, LLMRequest{
Model: b.cfg.ReasoningModel,
Messages: msgs,
MaxTokens: b.cfg.MaxOutTok,
Temperature: b.cfg.XAITemp,
ReasoningEffort: b.cfg.ReasoningEffort, // "think harder" level (default high)
ConvID: convID,
})
res.stageMS["final"] = msSince(t)
if err != nil {
return err
}
if strings.TrimSpace(resp.Text) == "" {
return fmt.Errorf("reason: empty reply")
}
res.route, res.finalModel = routeReason, b.cfg.ReasoningModel
res.text, res.usage, res.providerID = resp.Text, resp.Usage, resp.ProviderRequestID
res.cost.Token += computeUSD(b.cfg.ReasoningModel, resp.Usage, b.cfg)
return nil
}
// webStageTimeout bounds the web/grounding fetch independently of the overall budget
// (§8.2.2): a slow search must not eat the whole request before synthesis.
const webStageTimeout = 15 * time.Second
// genWebThenGrok fetches fresh facts via the web provider, then has Grok synthesise the
// answer in voice from that digest. The web fetch's cost+tokens are booked into res
// EVEN ON FAILURE — the call was billed — so a synth failure or empty fetch still
// accounts for the spend before the caller degrades to grok_direct (the partial cascade
// case, §8.1). The daily cap and per-stage deadline are applied here, uniformly for both
// providers.
func (b *Bot) genWebThenGrok(ctx context.Context, body string, msgs []Message, convID string, res *genResult) error {
// Per-stage web/grounding deadline, independent of the overall budget.
wctx, cancelW := context.WithTimeout(ctx, webStageTimeout)
tw := time.Now()
wc, ferr := b.web.Fetch(wctx, body)
cancelW()
res.stageMS["web"] = msSince(tw)
// Book the fetch's fee + tokens whether or not it produced a usable digest — the call
// was billed (the daily cap, if any, is enforced inside the provider).
res.cost.Grounding += wc.Cost.Grounding
res.cost.WebTool += wc.Cost.WebTool
webUsage := wc.Usage
if ferr != nil {
if errors.Is(ferr, errGroundingCapped) {
res.degraded = degradeGroundCap
}
return ferr // web fee already booked; caller degrades to grok_direct (with hedge)
}
tf := time.Now()
resp, err := b.llm.Complete(ctx, LLMRequest{
Model: b.cfg.XAIModel,
Messages: webSynthMessages(msgs, wc),
MaxTokens: b.cfg.MaxOutTok,
Temperature: b.cfg.XAITemp,
ConvID: convID,
ReasoningEffort: b.cfg.GrokReasoningEffort, // same voice, same effort as grok_direct
})
res.stageMS["final"] = msSince(tf)
if err != nil {
return err
}
if strings.TrimSpace(resp.Text) == "" {
return fmt.Errorf("web synth: empty reply")
}
res.route, res.finalModel = routeWebThenGrok, b.cfg.XAIModel
res.text, res.providerID = resp.Text, resp.ProviderRequestID
// Report BOTH calls' tokens so the analytics token totals match the two-call route.
res.usage = Usage{
PromptTokens: resp.Usage.PromptTokens + webUsage.PromptTokens,
CachedTokens: resp.Usage.CachedTokens + webUsage.CachedTokens,
CompletionTokens: resp.Usage.CompletionTokens + webUsage.CompletionTokens,
}
res.cost.Token += computeUSD(b.cfg.XAIModel, resp.Usage, b.cfg)
return nil
}
// webSynthMessages inserts the fresh web digest (and its sources) as a system note just
// after the system prompt, so Grok answers in voice using current facts.
func webSynthMessages(base []Message, wc WebContext) []Message {
facts := "Свежие данные из веба (используй их в ответе и сошлись на источники):\n" + wc.Digest
if len(wc.Citations) > 0 {
facts += "\nИсточники: " + strings.Join(wc.Citations, ", ")
}
return insertSystemNote(base, facts)
}
// hedgeMessages adds an honest staleness caveat for a web→grok_direct degrade: the user
// wanted fresh facts but we couldn't fetch them, so the model must flag that its answer
// is from training knowledge and may be out of date.
func hedgeMessages(base []Message) []Message {
return insertSystemNote(base, "Нет доступа к свежим источникам прямо сейчас — отвечай по знаниям на момент обучения и честно предупреди, что данные могут быть устаревшими.")
}
// insertSystemNote inserts an extra system message right after the system prompt
// (base[0] from buildContext), preserving the rest of the window.
func insertSystemNote(base []Message, content string) []Message {
note := Message{Role: "system", Content: content}
if len(base) == 0 {
return []Message{note}
}
out := make([]Message, 0, len(base)+1)
out = append(out, base[0], note)
out = append(out, base[1:]...)
return out
}