vojo/apps/ai-bot/telemetry.go

149 lines
5.5 KiB
Go

package main
import (
"context"
"time"
rd "vojo.chat/ai-bot/internal/routedecide"
)
// telemetry.go is the request_log analytics path: it captures route, cost, latency
// and outcome for each engaged request so the real $/day and route mix can be
// MEASURED (the build plan's whole "is the cascade worth it" question) instead of
// modelled. It is strictly off the answer path — gated by TELEMETRY_ENABLED, written
// in a recovered goroutine, and a write failure only logs a WARN. A request never
// fails to be answered because telemetry couldn't be recorded.
// Route names (also the request_log.route values). grok_direct is today's path; the
// rest land behind flags in later phases. "none" means no model ran (a skip or a
// limiter denial).
const (
routeNone = "none"
routeGrokDirect = rd.RouteGrokDirect
routeTrivial = rd.RouteTrivial
routeWebThenGrok = rd.RouteWeb
routeReason = rd.RouteReason
)
// Degrade/skip reason strings (request_log.degraded). Stable tokens so the analytics
// can GROUP BY them.
const (
degradeEncrypted = "encrypted_room"
degradeMedia = "media"
degradeForeign = "foreign_room"
degradeEmpty = "empty_completion"
degradeSendFailed = "send_failed"
degradeReserveErr = "reserve_error"
degradeRouter = "router_failed"
degradeWeb = "web_failed"
degradeTrivial = "trivial_failed"
degradeGroundCap = "grounding_cap"
degradeReasoning = "reasoning_failed"
)
// telemetryTrimEvery bounds how often the retention trim runs — once per N writes,
// off the hot path, so the analytics table stays time-bounded without a separate
// lifecycle or a DELETE on every insert.
const telemetryTrimEvery = 200
// RequestLog is one analytics row (the request_log columns). Zero values are the
// "didn't apply" case — a grok_direct request leaves the cascade fields zero.
type RequestLog struct {
ID string
RoomID string
Sender string
Route string
RouterSource string // heuristic|classifier|default|forced|degraded
RouterConfidence float64
Models map[string]string // {"router":"…","final":"…"}
PromptTokens int
CachedTokens int
CompletionTokens int
Cost CostBreakdown
LatencyMS int
StageMS map[string]int // {"router":12,"web":1400,"final":2100}
Escalated bool
FallbackFired bool
CacheHit bool
CeilingHit bool
PerUserCapHit bool
PromptVersion string
ProviderRequestID string
Degraded string
Err string
OK bool
QueryText string // stored only when TELEMETRY_STORE_TEXT; stripped otherwise
// Router/classifier signals + web outcome (§8) — the inputs the offline eval needs to
// measure misroute / false-web / lie-rate / true-cost / rewrite-quality. The boolean
// signals + WebDecidedBy are metadata (always stored when telemetry is on); SearchQuery
// and AnswerText are model-/user-derived content and are stripped unless
// TELEMETRY_STORE_TEXT (like QueryText). RouterConfidence above doubles as the
// classifier confidence (filter request_log on router_source='classifier').
NeedsWeb bool
EntityObscure bool
TimeSensitive bool
Verifiable bool
TrivialScore bool
WebDecidedBy string
RewriteUsed bool
WebGrounded bool
CitationCount int
SearchQuery string // resolved query sent to Fetch; stored only when TELEMETRY_STORE_TEXT
AnswerText string // the final answer; stored only when TELEMETRY_STORE_TEXT (lie-label input)
}
// recordTelemetry persists a row off the answer path. No-op unless TELEMETRY_ENABLED.
// The query text is stripped unless TELEMETRY_STORE_TEXT, so message content never
// lands in the analytics table by default. Runs in a recovered goroutine and only
// logs failures, so it can never drop or delay the reply.
func (b *Bot) recordTelemetry(ctx context.Context, rl RequestLog) {
if !b.cfg.TelemetryEnabled {
return
}
if !b.cfg.TelemetryStoreText {
// One text-gate governs ALL stored content: the user query, the model-authored
// search query, and the answer. Metadata signals (NeedsWeb, WebDecidedBy, …) stay.
rl.QueryText, rl.SearchQuery, rl.AnswerText = "", "", ""
}
b.safego(ctx, "telemetry", func() {
if err := b.st.InsertRequestLog(rl); err != nil {
b.log.WarnContext(ctx, "request_log insert failed (non-fatal)", "id", rl.ID, "err", err)
}
b.maybeTrimTelemetry(ctx)
})
}
// recordSkip logs a request the bot was addressed by but couldn't fully serve before
// any model ran (encrypted/media/foreign). These are low-frequency, so a direct row
// (route=none + reason) keeps the "why no answer" visible without flooding the table
// with the common not-addressed drops, which are not logged (pre-claim best-effort).
func (b *Bot) recordSkip(ctx context.Context, ev *Event, reason string) {
b.recordTelemetry(ctx, RequestLog{
ID: ev.EventID,
RoomID: ev.RoomID,
Sender: ev.Sender,
Route: routeNone,
RouterSource: "default",
PromptVersion: b.promptVersion,
Degraded: reason,
OK: false,
})
}
// maybeTrimTelemetry runs the time-based retention trim once per telemetryTrimEvery
// writes. Best-effort and off the hot path (called from the telemetry goroutine).
func (b *Bot) maybeTrimTelemetry(ctx context.Context) {
if b.cfg.TelemetryRetention <= 0 {
return
}
if b.telemetryWrites.Add(1)%telemetryTrimEvery != 0 {
return
}
if err := b.st.TrimRequestLog(time.Now().Add(-b.cfg.TelemetryRetention)); err != nil {
b.log.WarnContext(ctx, "request_log trim failed (non-fatal)", "err", err)
}
}