vojo/apps/ai-bot/store.go

584 lines
25 KiB
Go

package main
import (
"context"
"encoding/json"
"errors"
"fmt"
"time"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
)
// reserveResult is the outcome of a pre-call limiter reservation.
type reserveResult int
const (
reserveOK reserveResult = iota
reserveDeniedUser // per-user daily request cap hit (⏳ rate-limit reaction, F24)
reserveDeniedGlobal // global daily USD ceiling hit (⏳ rate-limit reaction, F24)
)
// LRU bounds for the dedup tables (unchanged from the former SQLite store): keep
// only the most recent ids so the tables don't grow without limit.
const (
maxProcessedTxn = 5000
maxProcessedEvent = 20000
)
// opTimeout bounds every store operation. SQLite (a local file) effectively never
// blocked; Postgres is over the docker network, so a cap keeps a stalled DB from
// hanging a per-room handler goroutine forever.
const opTimeout = 10 * time.Second
// Store is the durable bot state: transaction + event dedup, the daily spend
// ledger, and the encrypted-room warned set. It holds ONLY operational data — no
// message content (the room timeline lives in Synapse). Backed by a dedicated
// Postgres database (`vojo_ai`), in line with the per-service bridge databases, so
// the spend ledger, dedup state and warned set share the server's backup/restore.
type Store struct {
pool *pgxpool.Pool
}
// OpenStore connects to the `vojo_ai` Postgres database via the AI_BOT_DATABASE_URL
// DSN, applies pending migrations, and returns a ready Store. A small pool suffices:
// the bot processes transactions serially and every statement here is short.
func OpenStore(dsn string) (*Store, error) {
cfg, err := pgxpool.ParseConfig(dsn)
if err != nil {
return nil, fmt.Errorf("parse AI_BOT_DATABASE_URL: %w", err)
}
// The former SQLite store pinned a single connection to serialize all callers;
// pgx gives us a real pool. Keep it small — the per-room handler goroutines only
// ever issue brief statements, and the shared server runs many other databases.
cfg.MaxConns = 4
cfg.MinConns = 1
ctx, cancel := context.WithTimeout(context.Background(), opTimeout)
defer cancel()
pool, err := pgxpool.NewWithConfig(ctx, cfg)
if err != nil {
return nil, fmt.Errorf("connect vojo_ai: %w", err)
}
if err := pool.Ping(ctx); err != nil {
pool.Close()
return nil, fmt.Errorf("ping vojo_ai: %w", err)
}
s := &Store{pool: pool}
if err := s.migrate(ctx); err != nil {
pool.Close()
return nil, err
}
return s, nil
}
func (s *Store) Close() error {
s.pool.Close()
return nil
}
// migrationLockKey namespaces the advisory lock that guards the migration runner so
// two starting instances (the bot is single-instance, but be robust) can't race the
// version check. Arbitrary fixed constant.
const migrationLockKey = 0x76_6f_6a_6f // "vojo"
// migrations are applied in order; schema_version records the highest applied
// version so re-runs are no-ops. Every step is also idempotent (CREATE TABLE IF NOT
// EXISTS) so a half-applied database still converges.
var migrations = []string{
// v1: the operational schema — a 1:1 port of the former SQLite tables.
// processed_* carry a surrogate identity column because Postgres has no rowid:
// the LRU trim orders by it, and txn_id/event_id stay UNIQUE for the upsert.
`CREATE TABLE IF NOT EXISTS processed_txn (
id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
txn_id TEXT UNIQUE NOT NULL
);
CREATE TABLE IF NOT EXISTS processed_event (
id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
event_id TEXT UNIQUE NOT NULL
);
CREATE TABLE IF NOT EXISTS spend (
date TEXT NOT NULL,
mxid TEXT NOT NULL,
requests INTEGER NOT NULL DEFAULT 0,
usd DOUBLE PRECISION NOT NULL DEFAULT 0,
PRIMARY KEY (date, mxid)
);
CREATE TABLE IF NOT EXISTS warned_encrypted (room_id TEXT PRIMARY KEY);`,
// v2: component cost columns + the optimistic reservation column. `reserved_usd`
// holds the estimated max-cost of in-flight calls so the global ceiling counts
// committed + reserved spend at admission time (the TOCTOU fix, §8.1): without it
// a burst of concurrent calls all read the same low committed SUM and slip past
// the ceiling, because the USD only lands at settle, AFTER the call. The component
// columns let the ceiling see grounding/tool fees too (not just tokens), and feed
// the per-component analytics. ADD COLUMN IF NOT EXISTS is idempotent.
`ALTER TABLE spend ADD COLUMN IF NOT EXISTS reserved_usd DOUBLE PRECISION NOT NULL DEFAULT 0;
ALTER TABLE spend ADD COLUMN IF NOT EXISTS router_usd DOUBLE PRECISION NOT NULL DEFAULT 0;
ALTER TABLE spend ADD COLUMN IF NOT EXISTS grounding_usd DOUBLE PRECISION NOT NULL DEFAULT 0;
ALTER TABLE spend ADD COLUMN IF NOT EXISTS webtool_usd DOUBLE PRECISION NOT NULL DEFAULT 0;`,
// v3: request_log — one row per engaged request, for offline analysis of the route
// mix, per-component $/day, latency, escalation/degrade rates (§6.2). Operational,
// not message content: query_text is written ONLY when TELEMETRY_STORE_TEXT is on.
// Indexed by ts for the time-based retention trim and time-series queries.
`CREATE TABLE IF NOT EXISTS request_log (
id TEXT PRIMARY KEY,
ts TIMESTAMPTZ NOT NULL DEFAULT now(),
room_id TEXT,
sender TEXT,
route TEXT,
router_source TEXT,
router_confidence REAL,
models JSONB,
prompt_tokens INT,
cached_tokens INT,
completion_tokens INT,
token_usd DOUBLE PRECISION,
grounding_usd DOUBLE PRECISION,
router_usd DOUBLE PRECISION,
webtool_usd DOUBLE PRECISION,
total_usd DOUBLE PRECISION,
latency_ms INT,
stage_ms JSONB,
escalated BOOL DEFAULT false,
fallback_fired BOOL DEFAULT false,
cache_hit BOOL DEFAULT false,
ceiling_hit BOOL DEFAULT false,
per_user_cap_hit BOOL DEFAULT false,
prompt_version TEXT,
provider_request_id TEXT,
degraded TEXT DEFAULT '',
err TEXT DEFAULT '',
ok BOOL,
query_text TEXT
);
CREATE INDEX IF NOT EXISTS request_log_ts_idx ON request_log (ts);`,
// v4: per-day grounded-prompt counter for the web grounding cap guard (§8.2.3). One
// row per UTC day; the cap check + increment is one atomic statement (same TOCTOU
// discipline as the spend gate), so a burst can't blow past the $/1k grounding
// overage. Day-keyed, so it self-resets and needs no separate trim.
`CREATE TABLE IF NOT EXISTS grounding_count (
date TEXT PRIMARY KEY,
n INTEGER NOT NULL DEFAULT 0
);`,
// v5 (router redesign §8): the classifier signals + web outcome the offline eval needs
// to MEASURE misroute / false-web / lie-rate / true-cost / rewrite-quality — none of
// which is derivable from the v3 columns. Append-only (never edit an earlier migration).
// Booleans/counts are metadata, always recorded when telemetry is on; search_query +
// answer_text are content, written ONLY when TELEMETRY_STORE_TEXT (NULL otherwise).
// classifier_confidence is NOT a new column — filter router_confidence on
// router_source='classifier'. grounding_fee_usd is the §7 booked per-prompt fee (it is
// ALSO folded into grounding_usd for the ceiling; this column is the analytics split).
`ALTER TABLE request_log ADD COLUMN IF NOT EXISTS needs_web BOOL DEFAULT false;
ALTER TABLE request_log ADD COLUMN IF NOT EXISTS entity_obscure BOOL DEFAULT false;
ALTER TABLE request_log ADD COLUMN IF NOT EXISTS time_sensitive BOOL DEFAULT false;
ALTER TABLE request_log ADD COLUMN IF NOT EXISTS verifiable BOOL DEFAULT false;
ALTER TABLE request_log ADD COLUMN IF NOT EXISTS trivial_score BOOL DEFAULT false;
ALTER TABLE request_log ADD COLUMN IF NOT EXISTS web_decided_by TEXT DEFAULT '';
ALTER TABLE request_log ADD COLUMN IF NOT EXISTS grounding_fee_usd DOUBLE PRECISION DEFAULT 0;
ALTER TABLE request_log ADD COLUMN IF NOT EXISTS rewrite_used BOOL DEFAULT false;
ALTER TABLE request_log ADD COLUMN IF NOT EXISTS web_grounded BOOL DEFAULT false;
ALTER TABLE request_log ADD COLUMN IF NOT EXISTS citation_count INT DEFAULT 0;
ALTER TABLE request_log ADD COLUMN IF NOT EXISTS search_query TEXT;
ALTER TABLE request_log ADD COLUMN IF NOT EXISTS answer_text TEXT;`,
}
// migrate runs all pending migrations on a single connection under a session
// advisory lock, recording each in schema_version.
func (s *Store) migrate(ctx context.Context) error {
conn, err := s.pool.Acquire(ctx)
if err != nil {
return fmt.Errorf("migrate: acquire: %w", err)
}
defer conn.Release()
if _, err := conn.Exec(ctx, `SELECT pg_advisory_lock($1)`, int64(migrationLockKey)); err != nil {
return fmt.Errorf("migrate: lock: %w", err)
}
defer func() {
_, _ = conn.Exec(ctx, `SELECT pg_advisory_unlock($1)`, int64(migrationLockKey))
}()
if _, err := conn.Exec(ctx, `CREATE TABLE IF NOT EXISTS schema_version (version INTEGER PRIMARY KEY)`); err != nil {
return fmt.Errorf("migrate: schema_version: %w", err)
}
var current int
if err := conn.QueryRow(ctx, `SELECT COALESCE(MAX(version), 0) FROM schema_version`).Scan(&current); err != nil {
return fmt.Errorf("migrate: read version: %w", err)
}
for v := current; v < len(migrations); v++ {
tx, err := conn.Begin(ctx)
if err != nil {
return fmt.Errorf("migrate: begin %d: %w", v+1, err)
}
if _, err := tx.Exec(ctx, migrations[v]); err != nil {
_ = tx.Rollback(ctx)
return fmt.Errorf("migrate: apply %d: %w", v+1, err)
}
if _, err := tx.Exec(ctx, `INSERT INTO schema_version (version) VALUES ($1)`, v+1); err != nil {
_ = tx.Rollback(ctx)
return fmt.Errorf("migrate: record %d: %w", v+1, err)
}
if err := tx.Commit(ctx); err != nil {
return fmt.Errorf("migrate: commit %d: %w", v+1, err)
}
}
return nil
}
func todayUTC() string { return time.Now().UTC().Format("2006-01-02") }
// opContext derives a bounded context for a single store operation.
func opContext() (context.Context, context.CancelFunc) {
return context.WithTimeout(context.Background(), opTimeout)
}
// HasTxn / MarkTxn give appservice transactions idempotency across restarts: a
// transaction Synapse retries (because our 200 was lost) is processed at most
// once. The table is bounded to the most recent ids.
func (s *Store) HasTxn(txnID string) (bool, error) {
ctx, cancel := opContext()
defer cancel()
var one int
err := s.pool.QueryRow(ctx, `SELECT 1 FROM processed_txn WHERE txn_id = $1`, txnID).Scan(&one)
if errors.Is(err, pgx.ErrNoRows) {
return false, nil
}
return err == nil, err
}
func (s *Store) MarkTxn(txnID string) error {
ctx, cancel := opContext()
defer cancel()
if _, err := s.pool.Exec(ctx,
`INSERT INTO processed_txn (txn_id) VALUES ($1) ON CONFLICT DO NOTHING`, txnID); err != nil {
return err
}
_, err := s.pool.Exec(ctx, `DELETE FROM processed_txn WHERE id NOT IN
(SELECT id FROM processed_txn ORDER BY id DESC LIMIT $1)`, maxProcessedTxn)
return err
}
// SeenEvent records an event id as handled and reports whether it was NEW (true)
// or already seen (false) — the DURABLE equivalent of the in-memory dedup set, so
// a crash/restart between handling an event and acking its transaction can't make
// the bot reprocess it (dup answer + double-bill + cap inflation). Bounded to the
// most recent ids. INSERT … ON CONFLICT DO NOTHING affects 1 row on insert and 0 on
// conflict, so RowsAffected distinguishes new from already-seen.
func (s *Store) SeenEvent(eventID string) (bool, error) {
ctx, cancel := opContext()
defer cancel()
tag, err := s.pool.Exec(ctx,
`INSERT INTO processed_event (event_id) VALUES ($1) ON CONFLICT DO NOTHING`, eventID)
if err != nil {
return false, err
}
if tag.RowsAffected() == 0 {
return false, nil // already recorded → not new
}
_, err = s.pool.Exec(ctx, `DELETE FROM processed_event WHERE id NOT IN
(SELECT id FROM processed_event ORDER BY id DESC LIMIT $1)`, maxProcessedEvent)
return true, err
}
// committedUSDExpr sums every COMMITTED cost component of a spend row — tokens plus
// the grounding/web/router fees a cascade can incur — so the wallet ceiling is never
// blind to non-token spend. It deliberately excludes reserved_usd (that is in-flight,
// not yet spent); the admission gate adds reserved separately.
const committedUSDExpr = `usd + router_usd + grounding_usd + webtool_usd`
// SpentTodayUSD sums all COMMITTED spend for the current UTC day. SUM over no rows is
// NULL, which scans into a nil *float64 → treated as 0.
func (s *Store) SpentTodayUSD() (float64, error) {
ctx, cancel := opContext()
defer cancel()
var v *float64
if err := s.pool.QueryRow(ctx, `SELECT SUM(`+committedUSDExpr+`) FROM spend WHERE date = $1`, todayUTC()).Scan(&v); err != nil {
return 0, err
}
if v == nil {
return 0, nil
}
return *v, nil
}
// reserveDayLockKey namespaces the per-day admission lock so it can't collide with
// the migration lock or any other advisory lock.
const reserveDayLockKey = "ai-bot:reserve:"
// Reserve runs the two admission gates in one transaction, BEFORE the call (F4): the
// global USD ceiling protects the wallet; the per-user request cap is anti-abuse. On
// success it both increments the per-user request count AND books `estimate` (the
// route's max-cost) into reserved_usd, so the global gate counts committed + reserved
// spend. The actual USD is settled after the response (Settle), at which point the
// reservation is released and the real cost booked. Order: global first (cheapest to
// deny), then per-user.
//
// The check-and-reserve is serialized GLOBALLY for the day by a transaction-scoped
// advisory lock keyed on the date (not on date|mxid as the bare port did). This is
// the TOCTOU fix (§8.1): the ceiling reads SUM(committed)+SUM(reserved) and then adds
// its own reservation atomically, so a burst of DIFFERENT users can overshoot the
// ceiling by at most ONE max-reservation rather than slipping through unbounded — the
// per-(date,mxid) lock only serialized one user with himself and left the cross-user
// ceiling unprotected. The former SQLite store serialized ALL callers on its single
// connection anyway, so this restores that exact admission semantics, durably; the
// bot is low-volume with per-room single-flight, so a per-day admission lock costs
// nothing observable. Settle/Release run lock-free (they only release/convert spend,
// never admit).
func (s *Store) Reserve(mxid string, perUserCap int, perUserUSD, dailyUSDCeiling, estimate float64) (reserveResult, error) {
ctx, cancel := opContext()
defer cancel()
day := todayUTC()
tx, err := s.pool.Begin(ctx)
if err != nil {
return reserveOK, err
}
defer tx.Rollback(ctx)
if _, err := tx.Exec(ctx, `SELECT pg_advisory_xact_lock(hashtextextended($1, 0))`, reserveDayLockKey+day); err != nil {
return reserveOK, err
}
// committed + reserved. SUM over zero rows is NULL → nil pointer → treat as 0.0,
// exactly as the SQLite store's sql.NullFloat64 did. This keeps the gate 1:1 even
// at the degenerate dailyUSDCeiling == 0 (deny everything), where 0 >= 0.
var inFlight *float64
if err := tx.QueryRow(ctx,
`SELECT SUM(`+committedUSDExpr+` + reserved_usd) FROM spend WHERE date = $1`, day).Scan(&inFlight); err != nil {
return reserveOK, err
}
spentToday := 0.0
if inFlight != nil {
spentToday = *inFlight
}
if spentToday >= dailyUSDCeiling {
return reserveDeniedGlobal, nil
}
// Per-user row: read requests AND the user's own committed+reserved $ in one go, so
// both per-user gates are checked under the same lock. ErrNoRows → first request of
// the day for this user → all zero.
var requests int
var userUSD float64
err = tx.QueryRow(ctx,
`SELECT requests, `+committedUSDExpr+` + reserved_usd FROM spend WHERE date = $1 AND mxid = $2`,
day, mxid).Scan(&requests, &userUSD)
if err != nil && !errors.Is(err, pgx.ErrNoRows) {
return reserveOK, err
}
if requests >= perUserCap {
return reserveDeniedUser, nil
}
// Optional per-user $ quota (0 = off): keep one user from draining the shared ceiling.
if perUserUSD > 0 && userUSD >= perUserUSD {
return reserveDeniedUser, nil
}
if _, err := tx.Exec(ctx,
`INSERT INTO spend (date, mxid, requests, reserved_usd) VALUES ($1, $2, 1, $3)
ON CONFLICT (date, mxid) DO UPDATE SET requests = spend.requests + 1,
reserved_usd = spend.reserved_usd + excluded.reserved_usd`,
day, mxid, estimate); err != nil {
return reserveOK, err
}
if err := tx.Commit(ctx); err != nil {
return reserveOK, err
}
return reserveOK, nil
}
// RefundRequest gives back a reserved request SLOT when the call ultimately failed
// (an outage) or the reply couldn't be delivered (paid silence, §8.1), so a transient
// failure doesn't burn the user's daily cap. It does NOT touch USD: a 2xx is really
// billed even if we then fail to deliver. Never drops below zero. A single UPDATE is
// atomic, so concurrent refunds settle correctly without extra locking.
func (s *Store) RefundRequest(mxid string) error {
ctx, cancel := opContext()
defer cancel()
_, err := s.pool.Exec(ctx,
`UPDATE spend SET requests = GREATEST(0, requests - 1) WHERE date = $1 AND mxid = $2`,
todayUTC(), mxid)
return err
}
// ReleaseReservation frees a reservation whose request produced no billable spend,
// restoring the global headroom without booking anything. The normal failure paths
// settle via Settle (which also releases), so this is the safety valve for an
// UNSETTLED exit — a panic in generation, recovered by safego — where it runs with
// RefundRequest in respond's deferred guard so a leaked reservation can't drift the
// ceiling. GREATEST(0, …) guards against a double-release driving reserved_usd
// negative. Lock-free: it only lowers the in-flight reserved total, never admits.
func (s *Store) ReleaseReservation(mxid string, estimate float64) error {
ctx, cancel := opContext()
defer cancel()
_, err := s.pool.Exec(ctx,
`UPDATE spend SET reserved_usd = GREATEST(0, reserved_usd - $3) WHERE date = $1 AND mxid = $2`,
todayUTC(), mxid, estimate)
return err
}
// Settle releases a call's reservation and books its ACTUAL cost in one atomic step
// (replacing the old additive Reconcile): reserved_usd drops by the reservation while
// the real per-component cost is added to the committed columns. This is non-additive
// on the reservation (settle, not accumulate), the semantics the ceiling needs. It is
// also the partial-cascade-refund primitive (§8.1): a web_then_grok call that paid
// grounding but failed at the final model passes a CostBreakdown carrying only the
// grounding it actually spent, releases the rest of the reservation, and refunds the
// request slot separately. GREATEST(0, …) keeps reserved_usd from underflowing.
// Atomic and commutative per row, so concurrent settles for one user sum correctly.
//
// The per-grounded-prompt FEE (cost.GroundingFee, §7 SG1) is folded into the committed
// grounding_usd column here — so it flows through committedUSDExpr and the $10 ceiling
// finally sees it WITHOUT a spend-table migration. request_log keeps the fee separately
// in grounding_fee_usd for the analytics split.
func (s *Store) Settle(mxid string, estimate float64, cost CostBreakdown) error {
ctx, cancel := opContext()
defer cancel()
grounding := cost.Grounding + cost.GroundingFee
_, err := s.pool.Exec(ctx,
`INSERT INTO spend (date, mxid, requests, usd, router_usd, grounding_usd, webtool_usd, reserved_usd)
VALUES ($1, $2, 0, $3, $4, $5, $6, 0)
ON CONFLICT (date, mxid) DO UPDATE SET
usd = spend.usd + excluded.usd,
router_usd = spend.router_usd + excluded.router_usd,
grounding_usd = spend.grounding_usd + excluded.grounding_usd,
webtool_usd = spend.webtool_usd + excluded.webtool_usd,
reserved_usd = GREATEST(0, spend.reserved_usd - $7)`,
todayUTC(), mxid, cost.Token, cost.Router, grounding, cost.WebTool, estimate)
return err
}
// InsertRequestLog writes one analytics row. id is the event id (PRIMARY KEY), so a
// re-logged event is a no-op (ON CONFLICT DO NOTHING) — each event takes exactly one
// terminal path, so this never overwrites a real outcome. The write is isolated: the
// caller runs it off the answer path and only logs a failure, never drops the reply.
func (s *Store) InsertRequestLog(rl RequestLog) error {
ctx, cancel := opContext()
defer cancel()
models, err := json.Marshal(rl.Models)
if err != nil {
return err
}
stages, err := json.Marshal(rl.StageMS)
if err != nil {
return err
}
// Content columns are NULL unless text capture is on (the struct carries "" otherwise),
// so the analytics table never holds message/model content by default.
nullIfEmpty := func(s string) any {
if s == "" {
return nil
}
return s
}
// request_log.grounding_usd is the TOKEN cost only; the per-prompt FEE is split into its
// own grounding_fee_usd column (the spend ledger folds them — see Settle). total_usd is
// the full Total() including the fee, so the two grounding columns + total stay coherent.
_, err = s.pool.Exec(ctx, `
INSERT INTO request_log (
id, room_id, sender, route, router_source, router_confidence, models,
prompt_tokens, cached_tokens, completion_tokens,
token_usd, grounding_usd, router_usd, webtool_usd, total_usd,
latency_ms, stage_ms, escalated, fallback_fired, cache_hit, ceiling_hit,
per_user_cap_hit, prompt_version, provider_request_id, degraded, err, ok, query_text,
needs_web, entity_obscure, time_sensitive, verifiable, trivial_score, web_decided_by,
grounding_fee_usd, rewrite_used, web_grounded, citation_count, search_query, answer_text
) VALUES (
$1, $2, $3, $4, $5, $6, $7,
$8, $9, $10,
$11, $12, $13, $14, $15,
$16, $17, $18, $19, $20, $21,
$22, $23, $24, $25, $26, $27, $28,
$29, $30, $31, $32, $33, $34,
$35, $36, $37, $38, $39, $40
) ON CONFLICT (id) DO NOTHING`,
rl.ID, rl.RoomID, rl.Sender, rl.Route, rl.RouterSource, rl.RouterConfidence, models,
rl.PromptTokens, rl.CachedTokens, rl.CompletionTokens,
rl.Cost.Token, rl.Cost.Grounding, rl.Cost.Router, rl.Cost.WebTool, rl.Cost.Total(),
rl.LatencyMS, stages, rl.Escalated, rl.FallbackFired, rl.CacheHit, rl.CeilingHit,
rl.PerUserCapHit, rl.PromptVersion, rl.ProviderRequestID, rl.Degraded, rl.Err, rl.OK, nullIfEmpty(rl.QueryText),
rl.NeedsWeb, rl.EntityObscure, rl.TimeSensitive, rl.Verifiable, rl.TrivialScore, rl.WebDecidedBy,
rl.Cost.GroundingFee, rl.RewriteUsed, rl.WebGrounded, rl.CitationCount, nullIfEmpty(rl.SearchQuery), nullIfEmpty(rl.AnswerText))
return err
}
// TrimRequestLog deletes analytics rows older than the cutoff (time-based, since the
// data is a time series — unlike the count-bounded dedup tables). A no-op for a zero
// cutoff. Cheap given the ts index.
func (s *Store) TrimRequestLog(olderThan time.Time) error {
ctx, cancel := opContext()
defer cancel()
_, err := s.pool.Exec(ctx, `DELETE FROM request_log WHERE ts < $1`, olderThan)
return err
}
// IncrGroundingIfUnder atomically admits one grounded prompt for today if the day's
// count is below cap, returning whether it was admitted. The check-and-increment is a
// single statement, so concurrent grounding calls can't race past the cap and into the
// per-1k overage (§8.2.3). A non-positive cap denies everything (grounding effectively
// off). The counter is day-keyed and self-resets at UTC midnight.
func (s *Store) IncrGroundingIfUnder(cap int) (bool, error) {
if cap <= 0 {
return false, nil
}
ctx, cancel := opContext()
defer cancel()
var n int
err := s.pool.QueryRow(ctx, `
INSERT INTO grounding_count (date, n) VALUES ($1, 1)
ON CONFLICT (date) DO UPDATE SET n = grounding_count.n + 1
WHERE grounding_count.n < $2
RETURNING n`, todayUTC(), cap).Scan(&n)
if errors.Is(err, pgx.ErrNoRows) {
return false, nil // at/over cap — the conflict update was filtered out
}
if err != nil {
return false, err
}
return true, nil
}
// DecrGrounding refunds one admitted grounding slot for today when the admitted prompt
// produced no usable grounded digest (no citations, or the fetch failed), so over-routing
// and failed fetches don't burn the day's grounded-answer budget (§7 SG4). It mirrors
// RefundRequest: a single atomic UPDATE, GREATEST(0, …) so a double-refund can't drive the
// counter negative, todayUTC() internally (no date arg). The money side is independent —
// the per-prompt fee stays booked in the ledger; this only touches the quota counter.
func (s *Store) DecrGrounding() error {
ctx, cancel := opContext()
defer cancel()
_, err := s.pool.Exec(ctx,
`UPDATE grounding_count SET n = GREATEST(0, n - 1) WHERE date = $1`, todayUTC())
return err
}
// HasWarnedEncrypted / SetWarnedEncrypted persist the one-shot "reacted 🔒 to this
// room because I can't read encryption" flag so a restart doesn't re-react on every
// message (F5). The bot never reacts to its own events: m.reaction is not an
// m.room.message, so it never re-enters handleMessage.
func (s *Store) HasWarnedEncrypted(roomID string) (bool, error) {
ctx, cancel := opContext()
defer cancel()
var one int
err := s.pool.QueryRow(ctx, `SELECT 1 FROM warned_encrypted WHERE room_id = $1`, roomID).Scan(&one)
if errors.Is(err, pgx.ErrNoRows) {
return false, nil
}
return err == nil, err
}
func (s *Store) SetWarnedEncrypted(roomID string) error {
ctx, cancel := opContext()
defer cancel()
_, err := s.pool.Exec(ctx,
`INSERT INTO warned_encrypted (room_id) VALUES ($1) ON CONFLICT DO NOTHING`, roomID)
return err
}