392 lines
18 KiB
Go
392 lines
18 KiB
Go
package main
|
||
|
||
import (
|
||
"context"
|
||
"errors"
|
||
"fmt"
|
||
"strings"
|
||
"time"
|
||
)
|
||
|
||
// cascade.go is the generation half of the bot: given an admitted request, it routes
|
||
// (router.go), runs the chosen route's provider(s), and ALWAYS degrades to grok_direct
|
||
// on any layer being off or failing (§8.2). It returns a genResult the business logic
|
||
// (respond) settles, sends, and logs — keeping ledger/never-silent/telemetry in one
|
||
// place and the routing here. With every cascade flag off, classify returns grok_direct
|
||
// and this collapses to exactly today's single Grok call.
|
||
|
||
// genResult is everything respond needs from a generation: the answer, the model's
|
||
// usage (for token billing), the FULL cost breakdown (router + web + final), and the
|
||
// routing metadata for telemetry. cost accumulates across stages, so a partial cascade
|
||
// (a paid web fetch that then degraded) still books what it actually spent.
|
||
type genResult struct {
|
||
text string
|
||
usage Usage
|
||
cost CostBreakdown
|
||
finalModel string
|
||
providerID string
|
||
decision RouterDecision
|
||
route string // the route actually taken (may differ from decision on degrade)
|
||
fallback bool // true if we degraded off the decided route
|
||
degraded string // degrade reason for request_log
|
||
stageMS map[string]int
|
||
|
||
// Web-route outcome (for request_log §8): the resolved query actually sent to Fetch,
|
||
// whether the context-resolved rewrite was used (vs the bare body), and whether the
|
||
// fetch came back grounded with citations (a zero-citation synth is a silent false-web).
|
||
searchQuery string
|
||
rewriteUsed bool
|
||
webGrounded bool
|
||
citationCount int
|
||
}
|
||
|
||
func msSince(t time.Time) int { return int(time.Since(t).Milliseconds()) }
|
||
|
||
// reserveEstimate is the admission envelope: the most expensive ENABLED route's cost,
|
||
// so whichever route the router picks is covered by the reservation (the ceiling can't
|
||
// be slipped by routing to a pricier path after admission). With every cascade flag
|
||
// off it equals grok_direct's estimate — byte-for-byte today's reservation. Slightly
|
||
// generous is fine: Settle books the authoritative actual afterward.
|
||
func (b *Bot) reserveEstimate() float64 {
|
||
est := b.estimateUSD(b.cfg.XAIModel) // grok_direct / trivial(cheaper)/synthesis base
|
||
if b.cfg.WebEnabled {
|
||
// web_then_grok = a web fetch fee + the Grok synthesis already counted above.
|
||
if b.cfg.WebProvider == webProviderGrokWebSearch {
|
||
// fetch can search several times and pull large context; reserve generously.
|
||
est += float64(maxWebSearchCalls)*grokWebSearchPerCall + b.estimateUSD(b.cfg.XAIModel)
|
||
} else {
|
||
// gemini grounding: the fetch's tokens PLUS the per-grounded-prompt fee (§7
|
||
// SG2), so the admission envelope is a true upper bound once the fee is booked.
|
||
est += b.estimateUSD(b.cfg.GeminiModel) + b.cfg.GeminiGroundingPerPrompt
|
||
}
|
||
}
|
||
if b.cfg.ReasoningEnabled {
|
||
// Higher reasoning effort can burn more output tokens; reserve double.
|
||
est = max(est, 2*b.estimateUSD(b.cfg.ReasoningModel))
|
||
}
|
||
// The always-on Layer-1 classifier leg (§7 Finding 4): a cheap Gemini call on every
|
||
// message when the classifier is enabled, so reserved ≥ actual stays true. Added after
|
||
// the max() so it is never swallowed by the reasoning branch.
|
||
if b.cfg.RouterClassifierEnabled {
|
||
est += b.estimateUSD(b.cfg.GeminiModel)
|
||
}
|
||
return est
|
||
}
|
||
|
||
// generate routes and produces an answer, degrading to grok_direct on any failure.
|
||
// It returns a terminal error ONLY if even grok_direct fails; every other route falls
|
||
// through to grok_direct rather than erroring.
|
||
func (b *Bot) generate(ctx context.Context, body string, msgs []Message, convID string, isDM bool) (genResult, error) {
|
||
res := genResult{stageMS: map[string]int{}, finalModel: b.cfg.XAIModel}
|
||
|
||
// The privacy-minimised conversation window for the classifier + follow-up rewrite.
|
||
// DM-resolved (last ≤2 turns); bare trigger in groups (no cross-member subject bleed).
|
||
rcx := routerContext(msgs, isDM)
|
||
|
||
t0 := time.Now()
|
||
res.decision = b.classify(ctx, body, rcx, &res.cost) // accumulates cost.Router if Layer-1 runs
|
||
res.stageMS["router"] = msSince(t0)
|
||
res.route = res.decision.Route
|
||
|
||
// The router's pre-dispatch verdict (what it chose, why, how sure). On a degrade the
|
||
// route that actually runs differs from this — respond logs that final outcome — so
|
||
// the two lines together show "router wanted X, we ran Y". DEBUG: routing diagnostics,
|
||
// content-free (the resolved search_query is NOT logged here — it's a gated path, §8).
|
||
b.log.DebugContext(ctx, "route decided",
|
||
"route", res.decision.Route, "source", res.decision.Source,
|
||
"confidence", res.decision.Confidence, "needs_web", res.decision.NeedsWeb,
|
||
"web_decided_by", res.decision.WebDecidedBy, "verifiable", res.decision.Verifiable,
|
||
"entity_obscure", res.decision.EntityObscure, "time_sensitive", res.decision.TimeSensitive,
|
||
"trivial", res.decision.TrivialScore, "lookup_hint", res.decision.LookupHint,
|
||
"reasoning_level", res.decision.ReasoningLevel)
|
||
|
||
finalMsgs := msgs
|
||
switch res.decision.Route {
|
||
case routeTrivial:
|
||
if b.cfg.TrivialOffloadEnabled && b.gemini != nil {
|
||
if err := b.genTrivial(ctx, msgs, &res); err == nil {
|
||
return res, nil
|
||
} else {
|
||
b.log.WarnContext(ctx, "trivial offload failed; degrading to grok_direct", "err", err)
|
||
b.degradeTo(&res, degradeTrivial)
|
||
}
|
||
}
|
||
case routeWebThenGrok:
|
||
if b.cfg.WebEnabled && b.web != nil {
|
||
if err := b.genWebThenGrok(ctx, body, isDM, msgs, convID, &res); err == nil {
|
||
return res, nil
|
||
} else {
|
||
b.log.WarnContext(ctx, "web route failed; degrading to grok_direct", "err", err, "reason", res.degraded)
|
||
b.degradeTo(&res, degradeWeb)
|
||
// We have no fresh facts. For a RECENCY miss, hedge with an honest staleness
|
||
// caveat (§8.2.1). For a STATIC verifiable-fact miss (a film cast, a date),
|
||
// the staleness caveat is wrong — a stale caveat on a wrong cast still ships
|
||
// the wrong cast — so instruct Grok to ABSTAIN on specific names/dates/numbers
|
||
// instead of emitting a confident guess (§4.4).
|
||
if res.decision.factualMiss() {
|
||
finalMsgs = factualAbstainMessages(msgs)
|
||
} else {
|
||
finalMsgs = hedgeMessages(msgs)
|
||
}
|
||
}
|
||
}
|
||
case routeReason:
|
||
if b.cfg.ReasoningEnabled {
|
||
if err := b.genReason(ctx, msgs, convID, &res); err == nil {
|
||
return res, nil
|
||
} else {
|
||
b.log.WarnContext(ctx, "reasoning route failed; degrading to grok_direct", "err", err)
|
||
b.degradeTo(&res, degradeReasoning)
|
||
}
|
||
}
|
||
}
|
||
|
||
// grok_direct — the default route AND the universal fallback. The only path that
|
||
// can return a terminal error (even Grok failed). It preserves any cost already
|
||
// spent (router classifier, a partial web fetch) in res.cost.
|
||
if err := b.genGrokDirect(ctx, finalMsgs, convID, &res); err != nil {
|
||
return res, err
|
||
}
|
||
return res, nil
|
||
}
|
||
|
||
// degradeTo marks res as a fallback to grok_direct, keeping the first/most-specific
|
||
// degrade reason (e.g. a web provider's grounding_cap set inside genWebThenGrok).
|
||
func (b *Bot) degradeTo(res *genResult, reason string) {
|
||
res.fallback = true
|
||
if res.degraded == "" {
|
||
res.degraded = reason
|
||
}
|
||
}
|
||
|
||
// genGrokDirect is today's path: one Grok call. Also the fallback for every other
|
||
// route. On success it fills res (route, final model, text, usage, provider id) and
|
||
// adds the token cost.
|
||
func (b *Bot) genGrokDirect(ctx context.Context, msgs []Message, convID string, res *genResult) error {
|
||
t := time.Now()
|
||
resp, err := b.llm.Complete(ctx, LLMRequest{
|
||
Model: b.cfg.XAIModel,
|
||
Messages: msgs,
|
||
MaxTokens: b.cfg.MaxOutTok,
|
||
Temperature: b.cfg.XAITemp,
|
||
ConvID: convID,
|
||
ReasoningEffort: b.cfg.GrokReasoningEffort, // "" → not sent; "none" keeps grok-4.3 fast
|
||
})
|
||
res.stageMS["final"] = msSince(t)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
res.route, res.finalModel = routeGrokDirect, b.cfg.XAIModel
|
||
res.text, res.usage, res.providerID = resp.Text, resp.Usage, resp.ProviderRequestID
|
||
res.cost.Token += computeUSD(b.cfg.XAIModel, resp.Usage, b.cfg)
|
||
return nil
|
||
}
|
||
|
||
// genTrivial answers a trivial message with the cheap Gemini model. An empty reply is
|
||
// treated as a failure so the caller degrades to Grok rather than sending nothing.
|
||
func (b *Bot) genTrivial(ctx context.Context, msgs []Message, res *genResult) error {
|
||
t := time.Now()
|
||
resp, err := b.gemini.Complete(ctx, LLMRequest{
|
||
Model: b.cfg.GeminiModel,
|
||
Messages: msgs,
|
||
MaxTokens: b.cfg.MaxOutTok,
|
||
Temperature: b.cfg.XAITemp,
|
||
})
|
||
res.stageMS["final"] = msSince(t)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
if strings.TrimSpace(resp.Text) == "" {
|
||
return fmt.Errorf("trivial: empty Gemini reply")
|
||
}
|
||
res.route, res.finalModel = routeTrivial, b.cfg.GeminiModel
|
||
res.text, res.usage, res.providerID = resp.Text, resp.Usage, resp.ProviderRequestID
|
||
res.cost.Token += computeUSD(b.cfg.GeminiModel, resp.Usage, b.cfg)
|
||
return nil
|
||
}
|
||
|
||
// genReason answers with Grok at a higher reasoning effort. Uses the configured
|
||
// reasoning-capable model (the default grok-4.20-non-reasoning would reject the param).
|
||
func (b *Bot) genReason(ctx context.Context, msgs []Message, convID string, res *genResult) error {
|
||
t := time.Now()
|
||
resp, err := b.llm.Complete(ctx, LLMRequest{
|
||
Model: b.cfg.ReasoningModel,
|
||
Messages: msgs,
|
||
MaxTokens: b.cfg.MaxOutTok,
|
||
Temperature: b.cfg.XAITemp,
|
||
ReasoningEffort: b.cfg.ReasoningEffort, // "think harder" level (default high)
|
||
ConvID: convID,
|
||
})
|
||
res.stageMS["final"] = msSince(t)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
if strings.TrimSpace(resp.Text) == "" {
|
||
return fmt.Errorf("reason: empty reply")
|
||
}
|
||
res.route, res.finalModel = routeReason, b.cfg.ReasoningModel
|
||
res.text, res.usage, res.providerID = resp.Text, resp.Usage, resp.ProviderRequestID
|
||
res.cost.Token += computeUSD(b.cfg.ReasoningModel, resp.Usage, b.cfg)
|
||
return nil
|
||
}
|
||
|
||
// webStageTimeout bounds the web/grounding fetch independently of the overall budget
|
||
// (§8.2.2): a slow search must not eat the whole request before synthesis.
|
||
const webStageTimeout = 15 * time.Second
|
||
|
||
// genWebThenGrok fetches fresh facts via the web provider, then has Grok synthesise the
|
||
// answer in voice from that digest. The web fetch's cost+tokens are booked into res
|
||
// EVEN ON FAILURE — the call was billed — so a synth failure or empty fetch still
|
||
// accounts for the spend before the caller degrades to grok_direct (the partial cascade
|
||
// case, §8.1). The daily cap and per-stage deadline are applied here, uniformly for both
|
||
// providers.
|
||
func (b *Bot) genWebThenGrok(ctx context.Context, body string, isDM bool, msgs []Message, convID string, res *genResult) error {
|
||
// DM-gated rewrite-with-fallback (§6): use the classifier's self-contained,
|
||
// follow-up-resolved query, but ONLY in a DM (a group buffer interleaves members'
|
||
// topics) and only when it's present and not over-long; otherwise the bare body — so
|
||
// the fetch is never worse than today. Sanitise before egress (it is model-authored
|
||
// text going to an external search API): collapse control chars/whitespace, cap length.
|
||
q := body
|
||
if isDM {
|
||
if sq := strings.TrimSpace(res.decision.SearchQuery); sq != "" && len([]rune(sq)) <= 200 {
|
||
q, res.rewriteUsed = sq, true
|
||
}
|
||
}
|
||
q = sanitizeSearchQuery(q)
|
||
if q == "" {
|
||
q, res.rewriteUsed = sanitizeSearchQuery(body), false // never send an empty query
|
||
}
|
||
res.searchQuery = q
|
||
|
||
// Per-stage web/grounding deadline, independent of the overall budget.
|
||
wctx, cancelW := context.WithTimeout(ctx, webStageTimeout)
|
||
tw := time.Now()
|
||
wc, ferr := b.web.Fetch(wctx, q)
|
||
cancelW()
|
||
res.stageMS["web"] = msSince(tw)
|
||
// Book the fetch's fee + tokens whether or not it produced a usable digest — the call
|
||
// was billed (the daily cap, if any, is enforced inside the provider). GroundingFee is
|
||
// the per-grounded-prompt overage (§7 SG1), booked even on the error return.
|
||
res.cost.Grounding += wc.Cost.Grounding
|
||
res.cost.GroundingFee += wc.Cost.GroundingFee
|
||
res.cost.WebTool += wc.Cost.WebTool
|
||
res.citationCount = len(wc.Citations)
|
||
res.webGrounded = len(wc.Citations) > 0
|
||
webUsage := wc.Usage
|
||
if ferr != nil {
|
||
if errors.Is(ferr, errGroundingCapped) {
|
||
res.degraded = degradeGroundCap
|
||
}
|
||
return ferr // web fee already booked; caller degrades to grok_direct (with hedge)
|
||
}
|
||
// A non-empty digest with NO citations is a silent false-web (the answer is synthesised
|
||
// from an ungrounded fetch). gemini_grounding errors out before here; grok_web_search
|
||
// can reach this — surface it at WARN so it's visible at the default level (§8).
|
||
if len(wc.Citations) == 0 {
|
||
b.log.WarnContext(ctx, "web no-citation synth (ungrounded digest)", "provider", b.cfg.WebProvider)
|
||
}
|
||
|
||
tf := time.Now()
|
||
resp, err := b.llm.Complete(ctx, LLMRequest{
|
||
Model: b.cfg.XAIModel,
|
||
Messages: webSynthMessages(msgs, wc),
|
||
MaxTokens: b.cfg.MaxOutTok,
|
||
Temperature: b.cfg.XAITemp,
|
||
ConvID: convID,
|
||
ReasoningEffort: b.cfg.GrokReasoningEffort, // same voice, same effort as grok_direct
|
||
})
|
||
res.stageMS["final"] = msSince(tf)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
if strings.TrimSpace(resp.Text) == "" {
|
||
return fmt.Errorf("web synth: empty reply")
|
||
}
|
||
res.route, res.finalModel = routeWebThenGrok, b.cfg.XAIModel
|
||
res.text, res.providerID = resp.Text, resp.ProviderRequestID
|
||
// Report BOTH calls' tokens so the analytics token totals match the two-call route.
|
||
res.usage = Usage{
|
||
PromptTokens: resp.Usage.PromptTokens + webUsage.PromptTokens,
|
||
CachedTokens: resp.Usage.CachedTokens + webUsage.CachedTokens,
|
||
CompletionTokens: resp.Usage.CompletionTokens + webUsage.CompletionTokens,
|
||
}
|
||
res.cost.Token += computeUSD(b.cfg.XAIModel, resp.Usage, b.cfg)
|
||
return nil
|
||
}
|
||
|
||
// webSynthMessages inserts the fresh web digest as a system note just after the system
|
||
// prompt, so Grok answers in voice using current facts. It deliberately does NOT pass the
|
||
// raw citation URLs into the prompt, nor ask Grok to "cite sources": gemini grounding
|
||
// returns opaque vertexaisearch.../grounding-api-redirect/... redirect links (not publisher
|
||
// URLs), and instructing Grok to cite made it paste those ugly redirects verbatim into the
|
||
// reply and mis-attribute them ("ссылок из твоего сообщения"). The grounding already
|
||
// happened (citation_count is recorded for telemetry); the user wants the answer, not
|
||
// Google's internal redirect links. Real source attribution (resolving redirects to
|
||
// domains) is a separate, deferred feature.
|
||
func webSynthMessages(base []Message, wc WebContext) []Message {
|
||
facts := "Свежие данные из веба — ответь на их основе, кратко и по делу, без URL и ссылок:\n" + wc.Digest
|
||
return insertSystemNote(base, facts)
|
||
}
|
||
|
||
// hedgeMessages adds an honest staleness caveat for a web→grok_direct degrade on a
|
||
// RECENCY query: the user wanted fresh facts but we couldn't fetch them, so the model
|
||
// must flag that its answer is from training knowledge and may be out of date.
|
||
func hedgeMessages(base []Message) []Message {
|
||
return insertSystemNote(base, "Нет доступа к свежим источникам прямо сейчас — отвечай по знаниям на момент обучения и честно предупреди, что данные могут быть устаревшими.")
|
||
}
|
||
|
||
// factualAbstainMessages is the degrade hedge for a STATIC verifiable-fact miss (§4.4):
|
||
// a staleness caveat is wrong here (the fact isn't stale, it's checkable and the model
|
||
// may simply not know it), so instruct Grok to ABSTAIN on specific names/dates/numbers
|
||
// rather than ship a confident guess — the exact failure (the hallucinated film cast)
|
||
// this redesign exists to stop.
|
||
func factualAbstainMessages(base []Message) []Message {
|
||
return insertSystemNote(base, "Не удалось проверить факты через веб. Если ответ зависит от конкретных имён, дат, годов, чисел или состава — честно скажи, что не уверен в точной фактуре и можешь ошибаться; НЕ выдавай догадку за факт.")
|
||
}
|
||
|
||
// factualMiss reports whether a web degrade should use the abstain hedge (a static
|
||
// checkable-fact question) rather than the staleness hedge (a recency question). A
|
||
// recency signal (freshnessRe or the classifier's time_sensitive) always means
|
||
// staleness; otherwise a verifiable / obscure-entity question — OR any non-recency
|
||
// needs_web verdict (so an off-spec needs_web-only verdict still abstains rather than
|
||
// emit a confident guess) — means abstain.
|
||
func (d RouterDecision) factualMiss() bool {
|
||
if d.Freshness != "" || d.TimeSensitive {
|
||
return false
|
||
}
|
||
return d.Verifiable || d.EntityObscure || d.NeedsWeb
|
||
}
|
||
|
||
// sanitizeSearchQuery prepares a (possibly model-authored) query for egress to an
|
||
// external search API: collapse newlines/control chars/runs of whitespace to single
|
||
// spaces and cap the rune length. Never trusts the model to have produced clean,
|
||
// bounded text.
|
||
func sanitizeSearchQuery(q string) string {
|
||
q = strings.Map(func(r rune) rune {
|
||
if r == '\n' || r == '\r' || r == '\t' {
|
||
return ' '
|
||
}
|
||
if r < 0x20 || r == 0x7f {
|
||
return -1 // drop other control chars
|
||
}
|
||
return r
|
||
}, q)
|
||
q = strings.Join(strings.Fields(q), " ") // collapse whitespace runs
|
||
if r := []rune(q); len(r) > 200 {
|
||
q = strings.TrimSpace(string(r[:200]))
|
||
}
|
||
return q
|
||
}
|
||
|
||
// insertSystemNote inserts an extra system message right after the system prompt
|
||
// (base[0] from buildContext), preserving the rest of the window.
|
||
func insertSystemNote(base []Message, content string) []Message {
|
||
note := Message{Role: "system", Content: content}
|
||
if len(base) == 0 {
|
||
return []Message{note}
|
||
}
|
||
out := make([]Message, 0, len(base)+1)
|
||
out = append(out, base[0], note)
|
||
out = append(out, base[1:]...)
|
||
return out
|
||
}
|