package main import ( "fmt" "sync" "sync/atomic" "testing" "time" ) // These tests exercise the Postgres-backed store directly. They run only when // AI_BOT_TEST_DATABASE_URL points at a throwaway database (openTestStore skips // otherwise) and start from a clean slate (openTestStore truncates). func TestStoreTxnDedup(t *testing.T) { st := openTestStore(t) defer st.Close() if got, err := st.HasTxn("txn-1"); err != nil || got { t.Fatalf("fresh txn: got (%v,%v), want (false,nil)", got, err) } if err := st.MarkTxn("txn-1"); err != nil { t.Fatalf("mark: %v", err) } if got, err := st.HasTxn("txn-1"); err != nil || !got { t.Fatalf("marked txn: got (%v,%v), want (true,nil)", got, err) } // Re-marking is idempotent (a retried transaction). if err := st.MarkTxn("txn-1"); err != nil { t.Fatalf("re-mark: %v", err) } if got, _ := st.HasTxn("txn-2"); got { t.Fatalf("unrelated txn must be unseen") } } func TestStoreSeenEvent(t *testing.T) { st := openTestStore(t) defer st.Close() first, err := st.SeenEvent("$ev1") if err != nil || !first { t.Fatalf("first SeenEvent: got (%v,%v), want (true,nil)", first, err) } again, err := st.SeenEvent("$ev1") if err != nil || again { t.Fatalf("repeat SeenEvent: got (%v,%v), want (false,nil)", again, err) } other, err := st.SeenEvent("$ev2") if err != nil || !other { t.Fatalf("new SeenEvent: got (%v,%v), want (true,nil)", other, err) } } // Dedup state must survive a process restart — the whole point of the durable store. func TestStoreDedupSurvivesRestart(t *testing.T) { st := openTestStore(t) if _, err := st.SeenEvent("$ev-restart"); err != nil { t.Fatalf("seen: %v", err) } if err := st.MarkTxn("txn-restart"); err != nil { t.Fatalf("mark: %v", err) } st.Close() // Reopen the same database WITHOUT truncating: simulates a container restart. st2, err := OpenStore(testDSN()) if err != nil { t.Fatalf("reopen: %v", err) } defer st2.Close() if isNew, err := st2.SeenEvent("$ev-restart"); err != nil || isNew { t.Fatalf("event after restart must be already-seen: got (%v,%v)", isNew, err) } if seen, err := st2.HasTxn("txn-restart"); err != nil || !seen { t.Fatalf("txn after restart must be seen: got (%v,%v)", seen, err) } } func TestStoreLimiterPerUserCap(t *testing.T) { st := openTestStore(t) defer st.Close() const user = "@u:vojo.chat" const cap, ceiling = 2, 100.0 for i := 0; i < cap; i++ { if res, err := st.Reserve(user, cap, 0, ceiling, 0); err != nil || res != reserveOK { t.Fatalf("reserve %d: got (%v,%v), want reserveOK", i, res, err) } } // The (cap+1)th request is denied per-user. if res, err := st.Reserve(user, cap, 0, ceiling, 0); err != nil || res != reserveDeniedUser { t.Fatalf("over-cap reserve: got (%v,%v), want reserveDeniedUser", res, err) } // A different user is unaffected. if res, err := st.Reserve("@v:vojo.chat", cap, 0, ceiling, 0); err != nil || res != reserveOK { t.Fatalf("other user reserve: got (%v,%v), want reserveOK", res, err) } // Refund returns a slot, so the first user can reserve once more. if err := st.RefundRequest(user); err != nil { t.Fatalf("refund: %v", err) } if res, err := st.Reserve(user, cap, 0, ceiling, 0); err != nil || res != reserveOK { t.Fatalf("post-refund reserve: got (%v,%v), want reserveOK", res, err) } } // A zero per-user cap denies even the first request — the SQLite store's // requests(0) >= cap(0) behaviour, preserved. func TestStoreLimiterZeroCap(t *testing.T) { st := openTestStore(t) defer st.Close() if res, err := st.Reserve("@u:vojo.chat", 0, 0, 100.0, 0); err != nil || res != reserveDeniedUser { t.Fatalf("zero-cap reserve: got (%v,%v), want reserveDeniedUser", res, err) } } // A zero ceiling denies the very first request of the day even before any spend row // exists — the SQLite store treated SUM(NULL) as 0.0 (0 >= 0), and the PG store must // match (SUM over zero rows is NULL). func TestStoreLimiterZeroCeiling(t *testing.T) { st := openTestStore(t) defer st.Close() if res, err := st.Reserve("@u:vojo.chat", 1_000_000, 0, 0, 0); err != nil || res != reserveDeniedGlobal { t.Fatalf("zero-ceiling reserve on empty store: got (%v,%v), want reserveDeniedGlobal", res, err) } } func TestStoreLimiterGlobalCeiling(t *testing.T) { st := openTestStore(t) defer st.Close() const ceiling = 1.0 // Book spend up to the ceiling (Settle is what feeds the global gate). if err := st.Settle("@a:vojo.chat", 0, CostBreakdown{Token: 0.6}); err != nil { t.Fatalf("settle a: %v", err) } if err := st.Settle("@b:vojo.chat", 0, CostBreakdown{Token: 0.5}); err != nil { t.Fatalf("settle b: %v", err) } if spent, err := st.SpentTodayUSD(); err != nil || spent < 1.1 { t.Fatalf("spent today: got (%v,%v), want >= 1.1", spent, err) } // Now any reservation is denied globally, regardless of the per-user cap. if res, err := st.Reserve("@c:vojo.chat", 1_000_000, 0, ceiling, 0); err != nil || res != reserveDeniedGlobal { t.Fatalf("over-ceiling reserve: got (%v,%v), want reserveDeniedGlobal", res, err) } } // The pgx pool is concurrent (the SQLite store serialized on one connection). The // advisory lock in Reserve must still admit EXACTLY perUserCap requests when many // arrive at once for the same user — the same user messaging from several rooms // simultaneously must not slip past the cap. func TestStoreReserveConcurrentRespectsCap(t *testing.T) { st := openTestStore(t) defer st.Close() const user = "@race:vojo.chat" const cap = 10 const goroutines = 50 var ok int64 var wg sync.WaitGroup for i := 0; i < goroutines; i++ { wg.Add(1) go func() { defer wg.Done() res, err := st.Reserve(user, cap, 0, 1e9, 0) if err != nil { t.Errorf("reserve: %v", err) return } if res == reserveOK { atomic.AddInt64(&ok, 1) } }() } wg.Wait() if ok != cap { t.Fatalf("concurrent reserves admitted %d, want exactly %d (the per-user cap)", ok, cap) } } // TestStoreReserveConcurrentCeilingBounded is the §8.1 TOCTOU regression. Many // DIFFERENT users reserving at once against a low ceiling must not overshoot it by // more than ONE max-reservation. The bare pgx port's per-(date,mxid) lock left the // cross-user ceiling unprotected: every user read the same committed SUM(usd)=0 (the // USD only lands at settle, after the call) and slipped through, so all N were // admitted. The per-day admission lock + reserved_usd here bound the overshoot. // Run under -race. func TestStoreReserveConcurrentCeilingBounded(t *testing.T) { st := openTestStore(t) defer st.Close() const estimate = 1.0 // each in-flight call reserves $1 const ceiling = 10.0 // so the gate should admit ~10, not 100 const perUserCap = 1_000_000 // keep the per-user cap out of the way const goroutines = 100 var ok int64 var wg sync.WaitGroup for i := 0; i < goroutines; i++ { wg.Add(1) go func(n int) { defer wg.Done() user := fmt.Sprintf("@u%d:vojo.chat", n) // a DIFFERENT user each time res, err := st.Reserve(user, perUserCap, 0, ceiling, estimate) if err != nil { t.Errorf("reserve: %v", err) return } if res == reserveOK { atomic.AddInt64(&ok, 1) } }(i) } wg.Wait() // committed+reserved < ceiling admits; the last admit can push reserved to just // under ceiling+estimate, so admitted ≤ ceiling/estimate + 1. The pre-fix code // admitted all 100. maxAdmit := int64(ceiling/estimate) + 1 if ok < 1 || ok > maxAdmit { t.Fatalf("admitted %d different users, want in [1, %d] (ceiling + one max-reserve)", ok, maxAdmit) } // Nothing was settled, so committed spend is still 0 — the cap came purely from // reservations, which is the whole point (the USD isn't known until after the call). if spent, err := st.SpentTodayUSD(); err != nil || spent != 0 { t.Fatalf("committed spend = (%v,%v), want 0 (only reservations held)", spent, err) } } // TestStoreSettleReleasesReservation verifies that Settle frees the reservation it // books actual cost for, restoring global headroom — proven through the admission // gate so it doesn't depend on reading the private column. func TestStoreSettleReleasesReservation(t *testing.T) { st := openTestStore(t) defer st.Close() const est = 5.0 const ceiling = 10.0 // Two reservations fill the ceiling (reserved 5 + 5 = 10); the third is denied. if res, _ := st.Reserve("@a:vojo.chat", 1_000_000, 0, ceiling, est); res != reserveOK { t.Fatalf("reserve a: %v", res) } if res, _ := st.Reserve("@b:vojo.chat", 1_000_000, 0, ceiling, est); res != reserveOK { t.Fatalf("reserve b: %v", res) } if res, _ := st.Reserve("@c:vojo.chat", 1_000_000, 0, ceiling, est); res != reserveDeniedGlobal { t.Fatalf("reserve c over full ceiling: got %v, want denied", res) } // Settle a with a small actual cost: reserved 10→5, committed 0→0.01. Headroom // returns, so a new reservation is admitted again. if err := st.Settle("@a:vojo.chat", est, CostBreakdown{Token: 0.01}); err != nil { t.Fatalf("settle a: %v", err) } if res, _ := st.Reserve("@d:vojo.chat", 1_000_000, 0, ceiling, est); res != reserveOK { t.Fatalf("reserve d after settle freed headroom: got %v, want reserveOK", res) } if spent, _ := st.SpentTodayUSD(); spent < 0.009 || spent > 0.011 { t.Fatalf("committed after one settle = %v, want ~0.01", spent) } } // TestStoreReleaseReservation verifies the call-failed path: a released reservation // frees headroom and books no USD, and an over-release clamps reserved_usd to 0 // rather than going negative (a negative reservation would manufacture phantom // headroom past the ceiling). func TestStoreReleaseReservation(t *testing.T) { st := openTestStore(t) defer st.Close() const est = 5.0 const ceiling = 10.0 // Reserve a, then over-release it by far more than it held. if res, _ := st.Reserve("@a:vojo.chat", 1_000_000, 0, ceiling, est); res != reserveOK { t.Fatalf("reserve a: %v", res) } if err := st.ReleaseReservation("@a:vojo.chat", 100); err != nil { t.Fatalf("over-release: %v", err) } // a's reserved must now be 0 (not -95): exactly two more $5 reservations fit the // $10 ceiling, and the third is denied. Were reserved negative, far more would slip // through — so the deny at the third request proves both the headroom was freed and // the clamp held. if res, _ := st.Reserve("@b:vojo.chat", 1_000_000, 0, ceiling, est); res != reserveOK { t.Fatalf("reserve b: %v", res) } if res, _ := st.Reserve("@c:vojo.chat", 1_000_000, 0, ceiling, est); res != reserveOK { t.Fatalf("reserve c: %v", res) } if res, _ := st.Reserve("@d:vojo.chat", 1_000_000, 0, ceiling, est); res != reserveDeniedGlobal { t.Fatalf("reserve d: got %v, want denied (reserved must have clamped to 0, not gone negative)", res) } // Nothing was ever settled, so committed spend stays 0 — release books no USD. if spent, _ := st.SpentTodayUSD(); spent != 0 { t.Fatalf("committed after release = %v, want 0 (a failed call bills nothing)", spent) } } // TestStoreRequestLog covers the analytics row: total_usd is the component sum, // query_text is NULL unless captured, re-inserting one id is a no-op, and the // time-based trim removes old rows. func TestStoreRequestLog(t *testing.T) { st := openTestStore(t) defer st.Close() noText := RequestLog{ ID: "$ev-rl-1", RoomID: "!r:vojo.chat", Sender: "@u:vojo.chat", Route: routeGrokDirect, RouterSource: "default", Models: map[string]string{"final": "grok-x"}, Cost: CostBreakdown{Token: 0.01, Grounding: 0.02}, LatencyMS: 1234, StageMS: map[string]int{"final": 1200}, ProviderRequestID: "prov-1", OK: true, // QueryText empty → NULL } if err := st.InsertRequestLog(noText); err != nil { t.Fatalf("insert: %v", err) } // Re-inserting the same id is a no-op (ON CONFLICT DO NOTHING), not an error. if err := st.InsertRequestLog(noText); err != nil { t.Fatalf("re-insert: %v", err) } withText := RequestLog{ID: "$ev-rl-2", Route: routeTrivial, OK: false, QueryText: "hello"} if err := st.InsertRequestLog(withText); err != nil { t.Fatalf("insert-with-text: %v", err) } ctx, cancel := opContext() defer cancel() var route string var total float64 var ok bool var qt *string if err := st.pool.QueryRow(ctx, `SELECT route, total_usd, ok, query_text FROM request_log WHERE id = $1`, noText.ID). Scan(&route, &total, &ok, &qt); err != nil { t.Fatalf("read row1: %v", err) } if route != routeGrokDirect || !ok { t.Fatalf("row1 = (%q, ok=%v), want (grok_direct, true)", route, ok) } if d := total - 0.03; d > 1e-9 || d < -1e-9 { t.Fatalf("row1 total_usd = %v, want 0.03 (token+grounding)", total) } if qt != nil { t.Fatalf("row1 query_text = %q, want NULL when text capture off", *qt) } if err := st.pool.QueryRow(ctx, `SELECT query_text FROM request_log WHERE id = $1`, withText.ID).Scan(&qt); err != nil { t.Fatalf("read row2: %v", err) } if qt == nil || *qt != "hello" { t.Fatalf("row2 query_text = %v, want \"hello\"", qt) } // Trim everything older than one hour from now → both rows (ts