package main import ( "context" "encoding/json" "errors" "fmt" "time" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool" ) // reserveResult is the outcome of a pre-call limiter reservation. type reserveResult int const ( reserveOK reserveResult = iota reserveDeniedUser // per-user daily request cap hit (⏳ rate-limit reaction, F24) reserveDeniedGlobal // global daily USD ceiling hit (⏳ rate-limit reaction, F24) ) // LRU bounds for the dedup tables (unchanged from the former SQLite store): keep // only the most recent ids so the tables don't grow without limit. const ( maxProcessedTxn = 5000 maxProcessedEvent = 20000 ) // opTimeout bounds every store operation. SQLite (a local file) effectively never // blocked; Postgres is over the docker network, so a cap keeps a stalled DB from // hanging a per-room handler goroutine forever. const opTimeout = 10 * time.Second // Store is the durable bot state: transaction + event dedup, the daily spend // ledger, and the encrypted-room warned set. It holds ONLY operational data — no // message content (the room timeline lives in Synapse). Backed by a dedicated // Postgres database (`vojo_ai`), in line with the per-service bridge databases, so // the spend ledger, dedup state and warned set share the server's backup/restore. type Store struct { pool *pgxpool.Pool } // OpenStore connects to the `vojo_ai` Postgres database via the AI_BOT_DATABASE_URL // DSN, applies pending migrations, and returns a ready Store. A small pool suffices: // the bot processes transactions serially and every statement here is short. func OpenStore(dsn string) (*Store, error) { cfg, err := pgxpool.ParseConfig(dsn) if err != nil { return nil, fmt.Errorf("parse AI_BOT_DATABASE_URL: %w", err) } // The former SQLite store pinned a single connection to serialize all callers; // pgx gives us a real pool. Keep it small — the per-room handler goroutines only // ever issue brief statements, and the shared server runs many other databases. cfg.MaxConns = 4 cfg.MinConns = 1 ctx, cancel := context.WithTimeout(context.Background(), opTimeout) defer cancel() pool, err := pgxpool.NewWithConfig(ctx, cfg) if err != nil { return nil, fmt.Errorf("connect vojo_ai: %w", err) } if err := pool.Ping(ctx); err != nil { pool.Close() return nil, fmt.Errorf("ping vojo_ai: %w", err) } s := &Store{pool: pool} if err := s.migrate(ctx); err != nil { pool.Close() return nil, err } return s, nil } func (s *Store) Close() error { s.pool.Close() return nil } // migrationLockKey namespaces the advisory lock that guards the migration runner so // two starting instances (the bot is single-instance, but be robust) can't race the // version check. Arbitrary fixed constant. const migrationLockKey = 0x76_6f_6a_6f // "vojo" // migrations are applied in order; schema_version records the highest applied // version so re-runs are no-ops. Every step is also idempotent (CREATE TABLE IF NOT // EXISTS) so a half-applied database still converges. var migrations = []string{ // v1: the operational schema — a 1:1 port of the former SQLite tables. // processed_* carry a surrogate identity column because Postgres has no rowid: // the LRU trim orders by it, and txn_id/event_id stay UNIQUE for the upsert. `CREATE TABLE IF NOT EXISTS processed_txn ( id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, txn_id TEXT UNIQUE NOT NULL ); CREATE TABLE IF NOT EXISTS processed_event ( id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, event_id TEXT UNIQUE NOT NULL ); CREATE TABLE IF NOT EXISTS spend ( date TEXT NOT NULL, mxid TEXT NOT NULL, requests INTEGER NOT NULL DEFAULT 0, usd DOUBLE PRECISION NOT NULL DEFAULT 0, PRIMARY KEY (date, mxid) ); CREATE TABLE IF NOT EXISTS warned_encrypted (room_id TEXT PRIMARY KEY);`, // v2: component cost columns + the optimistic reservation column. `reserved_usd` // holds the estimated max-cost of in-flight calls so the global ceiling counts // committed + reserved spend at admission time (the TOCTOU fix, §8.1): without it // a burst of concurrent calls all read the same low committed SUM and slip past // the ceiling, because the USD only lands at settle, AFTER the call. The component // columns let the ceiling see grounding/tool fees too (not just tokens), and feed // the per-component analytics. ADD COLUMN IF NOT EXISTS is idempotent. `ALTER TABLE spend ADD COLUMN IF NOT EXISTS reserved_usd DOUBLE PRECISION NOT NULL DEFAULT 0; ALTER TABLE spend ADD COLUMN IF NOT EXISTS router_usd DOUBLE PRECISION NOT NULL DEFAULT 0; ALTER TABLE spend ADD COLUMN IF NOT EXISTS grounding_usd DOUBLE PRECISION NOT NULL DEFAULT 0; ALTER TABLE spend ADD COLUMN IF NOT EXISTS webtool_usd DOUBLE PRECISION NOT NULL DEFAULT 0;`, // v3: request_log — one row per engaged request, for offline analysis of the route // mix, per-component $/day, latency, escalation/degrade rates (§6.2). Operational, // not message content: query_text is written ONLY when TELEMETRY_STORE_TEXT is on. // Indexed by ts for the time-based retention trim and time-series queries. `CREATE TABLE IF NOT EXISTS request_log ( id TEXT PRIMARY KEY, ts TIMESTAMPTZ NOT NULL DEFAULT now(), room_id TEXT, sender TEXT, route TEXT, router_source TEXT, router_confidence REAL, models JSONB, prompt_tokens INT, cached_tokens INT, completion_tokens INT, token_usd DOUBLE PRECISION, grounding_usd DOUBLE PRECISION, router_usd DOUBLE PRECISION, webtool_usd DOUBLE PRECISION, total_usd DOUBLE PRECISION, latency_ms INT, stage_ms JSONB, escalated BOOL DEFAULT false, fallback_fired BOOL DEFAULT false, cache_hit BOOL DEFAULT false, ceiling_hit BOOL DEFAULT false, per_user_cap_hit BOOL DEFAULT false, prompt_version TEXT, provider_request_id TEXT, degraded TEXT DEFAULT '', err TEXT DEFAULT '', ok BOOL, query_text TEXT ); CREATE INDEX IF NOT EXISTS request_log_ts_idx ON request_log (ts);`, // v4: per-day grounded-prompt counter for the web grounding cap guard (§8.2.3). One // row per UTC day; the cap check + increment is one atomic statement (same TOCTOU // discipline as the spend gate), so a burst can't blow past the $/1k grounding // overage. Day-keyed, so it self-resets and needs no separate trim. `CREATE TABLE IF NOT EXISTS grounding_count ( date TEXT PRIMARY KEY, n INTEGER NOT NULL DEFAULT 0 );`, } // migrate runs all pending migrations on a single connection under a session // advisory lock, recording each in schema_version. func (s *Store) migrate(ctx context.Context) error { conn, err := s.pool.Acquire(ctx) if err != nil { return fmt.Errorf("migrate: acquire: %w", err) } defer conn.Release() if _, err := conn.Exec(ctx, `SELECT pg_advisory_lock($1)`, int64(migrationLockKey)); err != nil { return fmt.Errorf("migrate: lock: %w", err) } defer func() { _, _ = conn.Exec(ctx, `SELECT pg_advisory_unlock($1)`, int64(migrationLockKey)) }() if _, err := conn.Exec(ctx, `CREATE TABLE IF NOT EXISTS schema_version (version INTEGER PRIMARY KEY)`); err != nil { return fmt.Errorf("migrate: schema_version: %w", err) } var current int if err := conn.QueryRow(ctx, `SELECT COALESCE(MAX(version), 0) FROM schema_version`).Scan(¤t); err != nil { return fmt.Errorf("migrate: read version: %w", err) } for v := current; v < len(migrations); v++ { tx, err := conn.Begin(ctx) if err != nil { return fmt.Errorf("migrate: begin %d: %w", v+1, err) } if _, err := tx.Exec(ctx, migrations[v]); err != nil { _ = tx.Rollback(ctx) return fmt.Errorf("migrate: apply %d: %w", v+1, err) } if _, err := tx.Exec(ctx, `INSERT INTO schema_version (version) VALUES ($1)`, v+1); err != nil { _ = tx.Rollback(ctx) return fmt.Errorf("migrate: record %d: %w", v+1, err) } if err := tx.Commit(ctx); err != nil { return fmt.Errorf("migrate: commit %d: %w", v+1, err) } } return nil } func todayUTC() string { return time.Now().UTC().Format("2006-01-02") } // opContext derives a bounded context for a single store operation. func opContext() (context.Context, context.CancelFunc) { return context.WithTimeout(context.Background(), opTimeout) } // HasTxn / MarkTxn give appservice transactions idempotency across restarts: a // transaction Synapse retries (because our 200 was lost) is processed at most // once. The table is bounded to the most recent ids. func (s *Store) HasTxn(txnID string) (bool, error) { ctx, cancel := opContext() defer cancel() var one int err := s.pool.QueryRow(ctx, `SELECT 1 FROM processed_txn WHERE txn_id = $1`, txnID).Scan(&one) if errors.Is(err, pgx.ErrNoRows) { return false, nil } return err == nil, err } func (s *Store) MarkTxn(txnID string) error { ctx, cancel := opContext() defer cancel() if _, err := s.pool.Exec(ctx, `INSERT INTO processed_txn (txn_id) VALUES ($1) ON CONFLICT DO NOTHING`, txnID); err != nil { return err } _, err := s.pool.Exec(ctx, `DELETE FROM processed_txn WHERE id NOT IN (SELECT id FROM processed_txn ORDER BY id DESC LIMIT $1)`, maxProcessedTxn) return err } // SeenEvent records an event id as handled and reports whether it was NEW (true) // or already seen (false) — the DURABLE equivalent of the in-memory dedup set, so // a crash/restart between handling an event and acking its transaction can't make // the bot reprocess it (dup answer + double-bill + cap inflation). Bounded to the // most recent ids. INSERT … ON CONFLICT DO NOTHING affects 1 row on insert and 0 on // conflict, so RowsAffected distinguishes new from already-seen. func (s *Store) SeenEvent(eventID string) (bool, error) { ctx, cancel := opContext() defer cancel() tag, err := s.pool.Exec(ctx, `INSERT INTO processed_event (event_id) VALUES ($1) ON CONFLICT DO NOTHING`, eventID) if err != nil { return false, err } if tag.RowsAffected() == 0 { return false, nil // already recorded → not new } _, err = s.pool.Exec(ctx, `DELETE FROM processed_event WHERE id NOT IN (SELECT id FROM processed_event ORDER BY id DESC LIMIT $1)`, maxProcessedEvent) return true, err } // committedUSDExpr sums every COMMITTED cost component of a spend row — tokens plus // the grounding/web/router fees a cascade can incur — so the wallet ceiling is never // blind to non-token spend. It deliberately excludes reserved_usd (that is in-flight, // not yet spent); the admission gate adds reserved separately. const committedUSDExpr = `usd + router_usd + grounding_usd + webtool_usd` // SpentTodayUSD sums all COMMITTED spend for the current UTC day. SUM over no rows is // NULL, which scans into a nil *float64 → treated as 0. func (s *Store) SpentTodayUSD() (float64, error) { ctx, cancel := opContext() defer cancel() var v *float64 if err := s.pool.QueryRow(ctx, `SELECT SUM(`+committedUSDExpr+`) FROM spend WHERE date = $1`, todayUTC()).Scan(&v); err != nil { return 0, err } if v == nil { return 0, nil } return *v, nil } // reserveDayLockKey namespaces the per-day admission lock so it can't collide with // the migration lock or any other advisory lock. const reserveDayLockKey = "ai-bot:reserve:" // Reserve runs the two admission gates in one transaction, BEFORE the call (F4): the // global USD ceiling protects the wallet; the per-user request cap is anti-abuse. On // success it both increments the per-user request count AND books `estimate` (the // route's max-cost) into reserved_usd, so the global gate counts committed + reserved // spend. The actual USD is settled after the response (Settle), at which point the // reservation is released and the real cost booked. Order: global first (cheapest to // deny), then per-user. // // The check-and-reserve is serialized GLOBALLY for the day by a transaction-scoped // advisory lock keyed on the date (not on date|mxid as the bare port did). This is // the TOCTOU fix (§8.1): the ceiling reads SUM(committed)+SUM(reserved) and then adds // its own reservation atomically, so a burst of DIFFERENT users can overshoot the // ceiling by at most ONE max-reservation rather than slipping through unbounded — the // per-(date,mxid) lock only serialized one user with himself and left the cross-user // ceiling unprotected. The former SQLite store serialized ALL callers on its single // connection anyway, so this restores that exact admission semantics, durably; the // bot is low-volume with per-room single-flight, so a per-day admission lock costs // nothing observable. Settle/Release run lock-free (they only release/convert spend, // never admit). func (s *Store) Reserve(mxid string, perUserCap int, perUserUSD, dailyUSDCeiling, estimate float64) (reserveResult, error) { ctx, cancel := opContext() defer cancel() day := todayUTC() tx, err := s.pool.Begin(ctx) if err != nil { return reserveOK, err } defer tx.Rollback(ctx) if _, err := tx.Exec(ctx, `SELECT pg_advisory_xact_lock(hashtextextended($1, 0))`, reserveDayLockKey+day); err != nil { return reserveOK, err } // committed + reserved. SUM over zero rows is NULL → nil pointer → treat as 0.0, // exactly as the SQLite store's sql.NullFloat64 did. This keeps the gate 1:1 even // at the degenerate dailyUSDCeiling == 0 (deny everything), where 0 >= 0. var inFlight *float64 if err := tx.QueryRow(ctx, `SELECT SUM(`+committedUSDExpr+` + reserved_usd) FROM spend WHERE date = $1`, day).Scan(&inFlight); err != nil { return reserveOK, err } spentToday := 0.0 if inFlight != nil { spentToday = *inFlight } if spentToday >= dailyUSDCeiling { return reserveDeniedGlobal, nil } // Per-user row: read requests AND the user's own committed+reserved $ in one go, so // both per-user gates are checked under the same lock. ErrNoRows → first request of // the day for this user → all zero. var requests int var userUSD float64 err = tx.QueryRow(ctx, `SELECT requests, `+committedUSDExpr+` + reserved_usd FROM spend WHERE date = $1 AND mxid = $2`, day, mxid).Scan(&requests, &userUSD) if err != nil && !errors.Is(err, pgx.ErrNoRows) { return reserveOK, err } if requests >= perUserCap { return reserveDeniedUser, nil } // Optional per-user $ quota (0 = off): keep one user from draining the shared ceiling. if perUserUSD > 0 && userUSD >= perUserUSD { return reserveDeniedUser, nil } if _, err := tx.Exec(ctx, `INSERT INTO spend (date, mxid, requests, reserved_usd) VALUES ($1, $2, 1, $3) ON CONFLICT (date, mxid) DO UPDATE SET requests = spend.requests + 1, reserved_usd = spend.reserved_usd + excluded.reserved_usd`, day, mxid, estimate); err != nil { return reserveOK, err } if err := tx.Commit(ctx); err != nil { return reserveOK, err } return reserveOK, nil } // RefundRequest gives back a reserved request SLOT when the call ultimately failed // (an outage) or the reply couldn't be delivered (paid silence, §8.1), so a transient // failure doesn't burn the user's daily cap. It does NOT touch USD: a 2xx is really // billed even if we then fail to deliver. Never drops below zero. A single UPDATE is // atomic, so concurrent refunds settle correctly without extra locking. func (s *Store) RefundRequest(mxid string) error { ctx, cancel := opContext() defer cancel() _, err := s.pool.Exec(ctx, `UPDATE spend SET requests = GREATEST(0, requests - 1) WHERE date = $1 AND mxid = $2`, todayUTC(), mxid) return err } // ReleaseReservation frees a reservation whose request produced no billable spend, // restoring the global headroom without booking anything. The normal failure paths // settle via Settle (which also releases), so this is the safety valve for an // UNSETTLED exit — a panic in generation, recovered by safego — where it runs with // RefundRequest in respond's deferred guard so a leaked reservation can't drift the // ceiling. GREATEST(0, …) guards against a double-release driving reserved_usd // negative. Lock-free: it only lowers the in-flight reserved total, never admits. func (s *Store) ReleaseReservation(mxid string, estimate float64) error { ctx, cancel := opContext() defer cancel() _, err := s.pool.Exec(ctx, `UPDATE spend SET reserved_usd = GREATEST(0, reserved_usd - $3) WHERE date = $1 AND mxid = $2`, todayUTC(), mxid, estimate) return err } // Settle releases a call's reservation and books its ACTUAL cost in one atomic step // (replacing the old additive Reconcile): reserved_usd drops by the reservation while // the real per-component cost is added to the committed columns. This is non-additive // on the reservation (settle, not accumulate), the semantics the ceiling needs. It is // also the partial-cascade-refund primitive (§8.1): a web_then_grok call that paid // grounding but failed at the final model passes a CostBreakdown carrying only the // grounding it actually spent, releases the rest of the reservation, and refunds the // request slot separately. GREATEST(0, …) keeps reserved_usd from underflowing. // Atomic and commutative per row, so concurrent settles for one user sum correctly. func (s *Store) Settle(mxid string, estimate float64, cost CostBreakdown) error { ctx, cancel := opContext() defer cancel() _, err := s.pool.Exec(ctx, `INSERT INTO spend (date, mxid, requests, usd, router_usd, grounding_usd, webtool_usd, reserved_usd) VALUES ($1, $2, 0, $3, $4, $5, $6, 0) ON CONFLICT (date, mxid) DO UPDATE SET usd = spend.usd + excluded.usd, router_usd = spend.router_usd + excluded.router_usd, grounding_usd = spend.grounding_usd + excluded.grounding_usd, webtool_usd = spend.webtool_usd + excluded.webtool_usd, reserved_usd = GREATEST(0, spend.reserved_usd - $7)`, todayUTC(), mxid, cost.Token, cost.Router, cost.Grounding, cost.WebTool, estimate) return err } // InsertRequestLog writes one analytics row. id is the event id (PRIMARY KEY), so a // re-logged event is a no-op (ON CONFLICT DO NOTHING) — each event takes exactly one // terminal path, so this never overwrites a real outcome. The write is isolated: the // caller runs it off the answer path and only logs a failure, never drops the reply. func (s *Store) InsertRequestLog(rl RequestLog) error { ctx, cancel := opContext() defer cancel() models, err := json.Marshal(rl.Models) if err != nil { return err } stages, err := json.Marshal(rl.StageMS) if err != nil { return err } // query_text is NULL unless text capture is on (the struct carries "" otherwise), // so the analytics table never holds message content by default. var queryText any if rl.QueryText != "" { queryText = rl.QueryText } _, err = s.pool.Exec(ctx, ` INSERT INTO request_log ( id, room_id, sender, route, router_source, router_confidence, models, prompt_tokens, cached_tokens, completion_tokens, token_usd, grounding_usd, router_usd, webtool_usd, total_usd, latency_ms, stage_ms, escalated, fallback_fired, cache_hit, ceiling_hit, per_user_cap_hit, prompt_version, provider_request_id, degraded, err, ok, query_text ) VALUES ( $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23, $24, $25, $26, $27, $28 ) ON CONFLICT (id) DO NOTHING`, rl.ID, rl.RoomID, rl.Sender, rl.Route, rl.RouterSource, rl.RouterConfidence, models, rl.PromptTokens, rl.CachedTokens, rl.CompletionTokens, rl.Cost.Token, rl.Cost.Grounding, rl.Cost.Router, rl.Cost.WebTool, rl.Cost.Total(), rl.LatencyMS, stages, rl.Escalated, rl.FallbackFired, rl.CacheHit, rl.CeilingHit, rl.PerUserCapHit, rl.PromptVersion, rl.ProviderRequestID, rl.Degraded, rl.Err, rl.OK, queryText) return err } // TrimRequestLog deletes analytics rows older than the cutoff (time-based, since the // data is a time series — unlike the count-bounded dedup tables). A no-op for a zero // cutoff. Cheap given the ts index. func (s *Store) TrimRequestLog(olderThan time.Time) error { ctx, cancel := opContext() defer cancel() _, err := s.pool.Exec(ctx, `DELETE FROM request_log WHERE ts < $1`, olderThan) return err } // IncrGroundingIfUnder atomically admits one grounded prompt for today if the day's // count is below cap, returning whether it was admitted. The check-and-increment is a // single statement, so concurrent grounding calls can't race past the cap and into the // per-1k overage (§8.2.3). A non-positive cap denies everything (grounding effectively // off). The counter is day-keyed and self-resets at UTC midnight. func (s *Store) IncrGroundingIfUnder(cap int) (bool, error) { if cap <= 0 { return false, nil } ctx, cancel := opContext() defer cancel() var n int err := s.pool.QueryRow(ctx, ` INSERT INTO grounding_count (date, n) VALUES ($1, 1) ON CONFLICT (date) DO UPDATE SET n = grounding_count.n + 1 WHERE grounding_count.n < $2 RETURNING n`, todayUTC(), cap).Scan(&n) if errors.Is(err, pgx.ErrNoRows) { return false, nil // at/over cap — the conflict update was filtered out } if err != nil { return false, err } return true, nil } // HasWarnedEncrypted / SetWarnedEncrypted persist the one-shot "reacted 🔒 to this // room because I can't read encryption" flag so a restart doesn't re-react on every // message (F5). The bot never reacts to its own events: m.reaction is not an // m.room.message, so it never re-enters handleMessage. func (s *Store) HasWarnedEncrypted(roomID string) (bool, error) { ctx, cancel := opContext() defer cancel() var one int err := s.pool.QueryRow(ctx, `SELECT 1 FROM warned_encrypted WHERE room_id = $1`, roomID).Scan(&one) if errors.Is(err, pgx.ErrNoRows) { return false, nil } return err == nil, err } func (s *Store) SetWarnedEncrypted(roomID string) error { ctx, cancel := opContext() defer cancel() _, err := s.pool.Exec(ctx, `INSERT INTO warned_encrypted (room_id) VALUES ($1) ON CONFLICT DO NOTHING`, roomID) return err }