diff --git a/apps/ai-bot/.env.example b/apps/ai-bot/.env.example index 0469b780..7d98d999 100644 --- a/apps/ai-bot/.env.example +++ b/apps/ai-bot/.env.example @@ -33,6 +33,14 @@ XAI_PRICE_INPUT_PER_M=1.25 # fallback per-1M prices that bound t XAI_PRICE_CACHED_PER_M=0.20 XAI_PRICE_OUTPUT_PER_M=2.50 +# --- Database (vojo_ai Postgres) --- +# Operational store (txn/event dedup, the daily spend ledger, the encrypted-warned +# set) — NOT message content (that lives in Synapse). A dedicated database+role on +# the shared Postgres, like each mautrix bridge. Inside the docker network the host +# is the `postgres` service. The DSN embeds the role password, so treat ai-bot.env +# as sensitive (chmod 600). Required. +AI_BOT_DATABASE_URL=postgres://vojo_ai:CHANGE_ME@postgres:5432/vojo_ai?sslmode=disable + # --- Paths (non-secret) --- SYSTEM_PROMPT_PATH=prompts/system_ru.txt STATE_DIR=/state diff --git a/apps/ai-bot/Dockerfile b/apps/ai-bot/Dockerfile index ce46c4ef..4eccf746 100644 --- a/apps/ai-bot/Dockerfile +++ b/apps/ai-bot/Dockerfile @@ -1,4 +1,4 @@ -# Multi-stage: static CGO-free build (pure-Go SQLite) → distroless runtime. +# Multi-stage: static CGO-free build (pure-Go pgx driver) → distroless runtime. FROM golang:1.25 AS build WORKDIR /src @@ -15,7 +15,8 @@ WORKDIR /app COPY --from=build /out/ai-bot /app/ai-bot # System prompt(s) ship with the image; override via SYSTEM_PROMPT_PATH + a mount. COPY --from=build /src/prompts /app/prompts -# STATE_DIR (SQLite: spend ledger + txn dedup) is a mounted volume in compose. +# The operational store now lives in Postgres (AI_BOT_DATABASE_URL → the vojo_ai +# database). STATE_DIR remains the runtime dir (registration.yaml etc.); no DB here. ENV STATE_DIR=/state # Appservice transaction-push port (Synapse → bot). Match AS_ADDR / the # registration `url`. diff --git a/apps/ai-bot/README.md b/apps/ai-bot/README.md index f262eb71..41fb0a35 100644 --- a/apps/ai-bot/README.md +++ b/apps/ai-bot/README.md @@ -34,7 +34,7 @@ apps/ai-bot/ ├── mentions.go # m.mentions + pill/reply fallbacks (F29/F30) ├── context.go # xAI message-window assembly (trigger + bot replies) ├── xai.go # chat/completions client + retry (F6) -├── store.go # SQLite: spend ledger, txn dedup, encrypted-warned set +├── store.go # Postgres (vojo_ai): spend ledger, txn/event dedup, encrypted-warned set ├── messages.go # bot-authored RU notices ├── markdown.go # markdown → org.matrix.custom.html for the reply's formatted_body ├── util.go # bounded dedup set @@ -46,11 +46,36 @@ apps/ai-bot/ ## Configuration All via environment (see `.env.example`). Required: `HOMESERVER_URL`, `BOT_MXID`, -`AS_TOKEN`, `HS_TOKEN`, `XAI_API_KEY`, `ALLOWED_SERVERS`. `AS_ADDR` (default -`:8009`) is the transaction-push listen address — it must match the `url` port in -the registration. The model is env-configurable (`XAI_MODEL`, default -`grok-4.20-0309-non-reasoning`; `grok-4.3` is an alternative — **re-verify the id -+ price on docs.x.ai before deploy**). +`AS_TOKEN`, `HS_TOKEN`, `XAI_API_KEY`, `ALLOWED_SERVERS`, `AI_BOT_DATABASE_URL`. +`AS_ADDR` (default `:8009`) is the transaction-push listen address — it must match +the `url` port in the registration. The model is env-configurable (`XAI_MODEL`, +default `grok-4.20-0309-non-reasoning`; `grok-4.3` is an alternative — **re-verify +the id + price on docs.x.ai before deploy**). + +### Database + +The bot keeps its **operational state** — appservice transaction + event dedup, the +daily spend ledger, and the encrypted-room warned set — in a dedicated Postgres +database `vojo_ai` on the shared server, mirroring the per-service bridge databases +(each bridge owns its own role + DB). It stores **no message content**: the room +timeline is canonical in Synapse, and the bot's xAI context window is the in-memory +buffer in `bot.go`. The schema is created/migrated on startup (a `schema_version` +table + idempotent `CREATE TABLE IF NOT EXISTS`), so a fresh `vojo_ai` needs no +manual DDL — just the role + database: + +```sql +-- once, as the Postgres superuser (e.g. `docker exec vojo-postgres-1 psql -U synapse -d postgres`): +CREATE ROLE vojo_ai LOGIN PASSWORD '<32-char secret>'; -- least privilege; NOT a superuser +CREATE DATABASE vojo_ai OWNER vojo_ai; +``` + +Point the bot at it with `AI_BOT_DATABASE_URL` (libpq/pgx DSN). Inside the docker +network the host is the `postgres` service; `sslmode=disable` matches Synapse and +the bridges on the internal network: + +``` +AI_BOT_DATABASE_URL=postgres://vojo_ai:@postgres:5432/vojo_ai?sslmode=disable +``` The hard USD ceiling is priced from the **API-returned token usage** times the configured `XAI_PRICE_*_PER_M` fallbacks, so a price change only needs those @@ -108,11 +133,11 @@ ai-bot: image: ai-bot:custom container_name: vojo-ai-bot restart: unless-stopped - depends_on: [synapse] - env_file: ./ai-bot/ai-bot.env # NON-SECRET config (chmod 600) + depends_on: [synapse, postgres] # needs both up before it starts + env_file: ./ai-bot/ai-bot.env # config incl. AI_BOT_DATABASE_URL (chmod 600 — embeds the DB password) environment: REGISTRATION_PATH: /data/registration.yaml # tokens (generated; shared with Synapse) - STATE_DIR: /data/state # SQLite: spend ledger + txn dedup + STATE_DIR: /data/state # runtime dir (the operational store is now in Postgres) XAI_API_KEY_FILE: /data/secrets/xai_api_key # the one standalone secret volumes: - ./ai-bot:/data # owned by uid 65532 (see setup) @@ -144,6 +169,17 @@ Compile-level + unit-tested locally: safe-URL allowlist, false-positive guards, oversize/adversarial fallbacks). - ✅ `check-config` reads env + loads the system prompt. +The store-backed tests (appservice transaction handling + the dedup/limiter/warned +store in `store_test.go`, including the concurrent per-user-cap guarantee and +restart-durability) need a throwaway Postgres via `AI_BOT_TEST_DATABASE_URL`; they +**skip** when it is unset, so `go test ./...` stays green without one. To run them: + +```bash +docker run -d --name pg -e POSTGRES_PASSWORD=p -p 5432:5432 postgres:16 +# … create role+db vojo_ai, then: +AI_BOT_TEST_DATABASE_URL=postgres://vojo_ai:…@localhost:5432/vojo_ai?sslmode=disable go test ./... +``` + Deferred to a live homeserver + xAI key + a loaded registration (runtime ✔): - Synapse pushes transactions → bot replies (`authenticated as @ai:vojo.chat` in logs); diff --git a/apps/ai-bot/appservice_test.go b/apps/ai-bot/appservice_test.go index d915221b..55ceee25 100644 --- a/apps/ai-bot/appservice_test.go +++ b/apps/ai-bot/appservice_test.go @@ -6,22 +6,46 @@ import ( "log/slog" "net/http" "net/http/httptest" - "path/filepath" + "os" "strings" "testing" "time" ) +// testDSN is the throwaway Postgres the store-backed tests run against. When unset, +// those tests skip rather than fail, so `go test ./...` stays green on a machine +// without a Postgres (the build/vet gates still cover the package). +func testDSN() string { return os.Getenv("AI_BOT_TEST_DATABASE_URL") } + +// openTestStore opens the store against the test database with a clean slate, so a +// shared/persistent test database doesn't leak rows between tests or runs. Skips the +// test when AI_BOT_TEST_DATABASE_URL is unset. +func openTestStore(t *testing.T) *Store { + t.Helper() + dsn := testDSN() + if dsn == "" { + t.Skip("set AI_BOT_TEST_DATABASE_URL (a throwaway Postgres) to run store-backed tests") + } + st, err := OpenStore(dsn) + if err != nil { + t.Fatalf("open store: %v", err) + } + ctx, cancel := opContext() + defer cancel() + if _, err := st.pool.Exec(ctx, `TRUNCATE processed_txn, processed_event, spend, warned_encrypted`); err != nil { + st.Close() + t.Fatalf("truncate test tables: %v", err) + } + return st +} + // newTestAS wires an AppService whose handler pushes each dispatched batch onto a // channel. Transactions are now processed asynchronously (the 200 is returned before // the handler runs), so tests read from the channel with a timeout instead of // inspecting a slice immediately after the call. func newTestAS(t *testing.T) (*AppService, *Store, chan []Event) { t.Helper() - st, err := OpenStore(filepath.Join(t.TempDir(), "t.db")) - if err != nil { - t.Fatalf("open store: %v", err) - } + st := openTestStore(t) dispatched := make(chan []Event, 8) as := NewAppService( &Config{HSToken: "secret", BotMXID: "@ai:vojo.chat"}, diff --git a/apps/ai-bot/bot.go b/apps/ai-bot/bot.go index 2e424951..30cafa46 100644 --- a/apps/ai-bot/bot.go +++ b/apps/ai-bot/bot.go @@ -51,7 +51,7 @@ func NewBot(ctx context.Context, cfg *Config, logger *slog.Logger) (*Bot, error) mx := NewMatrixClient(cfg.HomeserverURL, cfg.ASToken, cfg.BotMXID) xai := NewXAIClient(cfg.XAIBaseURL, cfg.XAIAPIKey, logger) - st, err := OpenStore(cfg.statePath("ai-bot.db")) + st, err := OpenStore(cfg.DatabaseURL) if err != nil { return nil, err } @@ -164,8 +164,8 @@ func (b *Bot) handleEvent(ctx context.Context, ev *Event) { // markSeen records an event id in both the in-memory set and the durable store and // reports whether it is NEW (first time). The in-memory Add is atomic, and SeenEvent -// is an atomic INSERT OR IGNORE, so two racing goroutines for the same event can never -// both proceed. On a durable-store error we fall through (the in-memory set still +// is an atomic INSERT … ON CONFLICT DO NOTHING, so two racing goroutines for the same +// event can never both proceed. On a durable-store error we fall through (the in-memory set still // guards this session). func (b *Bot) markSeen(eventID string) bool { if !b.seen.Add(eventID) { diff --git a/apps/ai-bot/config.go b/apps/ai-bot/config.go index 816d7c38..f40bd25a 100644 --- a/apps/ai-bot/config.go +++ b/apps/ai-bot/config.go @@ -56,6 +56,12 @@ type Config struct { SystemPromptPath string SystemPrompt string StateDir string + + // DatabaseURL is the libpq/pgx DSN of the bot's dedicated Postgres database + // (`vojo_ai`), e.g. postgres://vojo_ai:***@postgres:5432/vojo_ai?sslmode=disable. + // It holds only operational state (txn/event dedup, the daily spend ledger, the + // encrypted-warned set) — never message content. Required. + DatabaseURL string } func getenv(key, def string) string { @@ -130,6 +136,7 @@ func LoadConfig() (*Config, error) { XAIModel: getenv("XAI_MODEL", "grok-4.20-0309-non-reasoning"), SystemPromptPath: getenv("SYSTEM_PROMPT_PATH", "prompts/system_ru.txt"), StateDir: strings.TrimRight(getenv("STATE_DIR", "/state"), "/"), + DatabaseURL: getenv("AI_BOT_DATABASE_URL", ""), AllowedServers: parseServerSet(getenv("ALLOWED_SERVERS", "")), UnlimitedUsers: parseServerSet(getenv("UNLIMITED_USERS", "")), } @@ -179,6 +186,7 @@ func LoadConfig() (*Config, error) { req("AS_TOKEN", cfg.ASToken) req("HS_TOKEN", cfg.HSToken) req("XAI_API_KEY", cfg.XAIAPIKey) + req("AI_BOT_DATABASE_URL", cfg.DatabaseURL) if len(cfg.AllowedServers) == 0 { problems = append(problems, "ALLOWED_SERVERS is required (comma-separated homeserver allowlist)") } @@ -259,5 +267,6 @@ func (c *Config) Summary() string { c.PriceInputPerM, c.PriceCachedPerM, c.PriceOutputPerM), " SYSTEM_PROMPT_PATH = " + c.SystemPromptPath, " STATE_DIR = " + c.StateDir, + " AI_BOT_DATABASE_URL= " + redact(c.DatabaseURL), }, "\n") } diff --git a/apps/ai-bot/go.mod b/apps/ai-bot/go.mod index f962f787..57b90b5c 100644 --- a/apps/ai-bot/go.mod +++ b/apps/ai-bot/go.mod @@ -3,23 +3,21 @@ module vojo.chat/ai-bot go 1.25.0 require ( + github.com/jackc/pgx/v5 v5.9.2 github.com/microcosm-cc/bluemonday v1.0.27 github.com/yuin/goldmark v1.8.2 gopkg.in/yaml.v3 v3.0.1 - modernc.org/sqlite v1.51.0 ) require ( github.com/aymerick/douceur v0.2.0 // indirect - github.com/dustin/go-humanize v1.0.1 // indirect - github.com/google/uuid v1.6.0 // indirect github.com/gorilla/css v1.0.1 // indirect - github.com/mattn/go-isatty v0.0.20 // indirect - github.com/ncruces/go-strftime v1.0.0 // indirect - github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect + github.com/jackc/pgpassfile v1.0.0 // indirect + github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect + github.com/jackc/puddle/v2 v2.2.2 // indirect + github.com/kr/text v0.2.0 // indirect + github.com/rogpeppe/go-internal v1.15.0 // indirect golang.org/x/net v0.26.0 // indirect - golang.org/x/sys v0.42.0 // indirect - modernc.org/libc v1.72.3 // indirect - modernc.org/mathutil v1.7.1 // indirect - modernc.org/memory v1.11.0 // indirect + golang.org/x/sync v0.20.0 // indirect + golang.org/x/text v0.29.0 // indirect ) diff --git a/apps/ai-bot/go.sum b/apps/ai-bot/go.sum index 53d2947f..39b645f5 100644 --- a/apps/ai-bot/go.sum +++ b/apps/ai-bot/go.sum @@ -1,65 +1,45 @@ github.com/aymerick/douceur v0.2.0 h1:Mv+mAeH1Q+n9Fr+oyamOlAkUNPWPlA8PPGR0QAaYuPk= github.com/aymerick/douceur v0.2.0/go.mod h1:wlT5vV2O3h55X9m7iVYN0TBM0NH/MmbLnd30/FjWUq4= -github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= -github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= -github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e h1:ijClszYn+mADRFY17kjQEVQ1XRhq2/JR1M3sGqeJoxs= -github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e/go.mod h1:boTsfXsheKC2y+lKOCMpSfarhxDeIzfZG1jqGcPl3cA= -github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= -github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/gorilla/css v1.0.1 h1:ntNaBIghp6JmvWnxbZKANoLyuXTPZ4cAMlo6RyhlbO8= github.com/gorilla/css v1.0.1/go.mod h1:BvnYkspnSzMmwRK+b8/xgNPLiIuNZr6vbZBTPQ2A3b0= -github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k= -github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= -github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= -github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= +github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= +github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= +github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo= +github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= +github.com/jackc/pgx/v5 v5.9.2 h1:3ZhOzMWnR4yJ+RW1XImIPsD1aNSz4T4fyP7zlQb56hw= +github.com/jackc/pgx/v5 v5.9.2/go.mod h1:mal1tBGAFfLHvZzaYh77YS/eC6IX9OWbRV1QIIM0Jn4= +github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo= +github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= +github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0= +github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/microcosm-cc/bluemonday v1.0.27 h1:MpEUotklkwCSLeH+Qdx1VJgNqLlpY2KXwXFM08ygZfk= github.com/microcosm-cc/bluemonday v1.0.27/go.mod h1:jFi9vgW+H7c3V0lb6nR74Ib/DIB5OBs92Dimizgw2cA= -github.com/ncruces/go-strftime v1.0.0 h1:HMFp8mLCTPp341M/ZnA4qaf7ZlsbTc+miZjCLOFAw7w= -github.com/ncruces/go-strftime v1.0.0/go.mod h1:Fwc5htZGVVkseilnfgOVb9mKy6w1naJmn9CehxcKcls= -github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE= -github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rogpeppe/go-internal v1.15.0 h1:D0RCU5rMAp+SpgkiNdrjfJ+LX4J1M32V2NeCY7EJ6hc= +github.com/rogpeppe/go-internal v1.15.0/go.mod h1:DrUVZyrJU+txYW5/1kwtXQSMFio52ZOxX7yM1VHvnxs= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= +github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= github.com/yuin/goldmark v1.8.2 h1:kEGpgqJXdgbkhcOgBxkC0X0PmoPG1ZyoZ117rDVp4zE= github.com/yuin/goldmark v1.8.2/go.mod h1:ip/1k0VRfGynBgxOz0yCqHrbZXhcjxyuS66Brc7iBKg= -golang.org/x/mod v0.33.0 h1:tHFzIWbBifEmbwtGz65eaWyGiGZatSrT9prnU8DbVL8= -golang.org/x/mod v0.33.0/go.mod h1:swjeQEj+6r7fODbD2cqrnje9PnziFuw4bmLbBZFrQ5w= golang.org/x/net v0.26.0 h1:soB7SVo0PWrY4vPW/+ay0jKDNScG2X9wFeYlXIvJsOQ= golang.org/x/net v0.26.0/go.mod h1:5YKkiSynbBIh3p6iOc/vibscux0x38BZDkn8sCUPxHE= golang.org/x/sync v0.20.0 h1:e0PTpb7pjO8GAtTs2dQ6jYa5BWYlMuX047Dco/pItO4= golang.org/x/sync v0.20.0/go.mod h1:9xrNwdLfx4jkKbNva9FpL6vEN7evnE43NNNJQ2LF3+0= -golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.42.0 h1:omrd2nAlyT5ESRdCLYdm3+fMfNFE/+Rf4bDIQImRJeo= -golang.org/x/sys v0.42.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw= -golang.org/x/tools v0.42.0 h1:uNgphsn75Tdz5Ji2q36v/nsFSfR/9BRFvqhGBaJGd5k= -golang.org/x/tools v0.42.0/go.mod h1:Ma6lCIwGZvHK6XtgbswSoWroEkhugApmsXyrUmBhfr0= -gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +golang.org/x/text v0.29.0 h1:1neNs90w9YzJ9BocxfsQNHKuAT4pkghyXc4nhZ6sJvk= +golang.org/x/text v0.29.0/go.mod h1:7MhJOA9CD2qZyOKYazxdYMF85OwPdEr9jTtBpO7ydH4= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -modernc.org/cc/v4 v4.28.2 h1:3tQ0lf2ADtoby2EtSP+J7IE2SHwEJdP8ioR59wx7XpY= -modernc.org/cc/v4 v4.28.2/go.mod h1:OnovgIhbbMXMu1aISnJ0wvVD1KnW+cAUJkIrAWh+kVI= -modernc.org/ccgo/v4 v4.34.0 h1:yRLPFZieg532OT4rp4JFNIVcquwalMX26G95WQDqwCQ= -modernc.org/ccgo/v4 v4.34.0/go.mod h1:AS5WYMyBakQ+fhsHhtP8mWB82KTGPkNNJDGfGQCe0/A= -modernc.org/fileutil v1.4.0 h1:j6ZzNTftVS054gi281TyLjHPp6CPHr2KCxEXjEbD6SM= -modernc.org/fileutil v1.4.0/go.mod h1:EqdKFDxiByqxLk8ozOxObDSfcVOv/54xDs/DUHdvCUU= -modernc.org/gc/v2 v2.6.5 h1:nyqdV8q46KvTpZlsw66kWqwXRHdjIlJOhG6kxiV/9xI= -modernc.org/gc/v2 v2.6.5/go.mod h1:YgIahr1ypgfe7chRuJi2gD7DBQiKSLMPgBQe9oIiito= -modernc.org/gc/v3 v3.1.2 h1:ZtDCnhonXSZexk/AYsegNRV1lJGgaNZJuKjJSWKyEqo= -modernc.org/gc/v3 v3.1.2/go.mod h1:HFK/6AGESC7Ex+EZJhJ2Gni6cTaYpSMmU/cT9RmlfYY= -modernc.org/goabi0 v0.2.0 h1:HvEowk7LxcPd0eq6mVOAEMai46V+i7Jrj13t4AzuNks= -modernc.org/goabi0 v0.2.0/go.mod h1:CEFRnnJhKvWT1c1JTI3Avm+tgOWbkOu5oPA8eH8LnMI= -modernc.org/libc v1.72.3 h1:ZnDF4tXn4NBXFutMMQC4vtbTFSXhhKzR73fv0beZEAU= -modernc.org/libc v1.72.3/go.mod h1:dn0dZNnnn1clLyvRxLxYExxiKRZIRENOfqQ8XEeg4Qs= -modernc.org/mathutil v1.7.1 h1:GCZVGXdaN8gTqB1Mf/usp1Y/hSqgI2vAGGP4jZMCxOU= -modernc.org/mathutil v1.7.1/go.mod h1:4p5IwJITfppl0G4sUEDtCr4DthTaT47/N3aT6MhfgJg= -modernc.org/memory v1.11.0 h1:o4QC8aMQzmcwCK3t3Ux/ZHmwFPzE6hf2Y5LbkRs+hbI= -modernc.org/memory v1.11.0/go.mod h1:/JP4VbVC+K5sU2wZi9bHoq2MAkCnrt2r98UGeSK7Mjw= -modernc.org/opt v0.2.0 h1:tGyef5ApycA7FSEOMraay9SaTk5zmbx7Tu+cJs4QKZg= -modernc.org/opt v0.2.0/go.mod h1:03fq9lsNfvkYSfxrfUhZCWPk1lm4cq4N+Bh//bEtgns= -modernc.org/sortutil v1.2.1 h1:+xyoGf15mM3NMlPDnFqrteY07klSFxLElE2PVuWIJ7w= -modernc.org/sortutil v1.2.1/go.mod h1:7ZI3a3REbai7gzCLcotuw9AC4VZVpYMjDzETGsSMqJE= -modernc.org/sqlite v1.51.0 h1:aH/MMSoayAIhozZ7uJbVTT9QO/VhzBf0J9tymmmuC/U= -modernc.org/sqlite v1.51.0/go.mod h1:tcNzv5p84E0skkmJn038y+hWJbLQXQqEnQfeh5r2JLM= -modernc.org/strutil v1.2.1 h1:UneZBkQA+DX2Rp35KcM69cSsNES9ly8mQWD71HKlOA0= -modernc.org/strutil v1.2.1/go.mod h1:EHkiggD70koQxjVdSBM3JKM7k6L0FbGE5eymy9i3B9A= -modernc.org/token v1.1.0 h1:Xl7Ap9dKaEs5kLoOQeQmPWevfnk/DM5qcLcYlA8ys6Y= -modernc.org/token v1.1.0/go.mod h1:UGzOrNV1mAFSEB63lOFHIpNRUVMvYTc6yu1SMY/XTDM= diff --git a/apps/ai-bot/store.go b/apps/ai-bot/store.go index a797fabd..af293084 100644 --- a/apps/ai-bot/store.go +++ b/apps/ai-bot/store.go @@ -1,11 +1,13 @@ package main import ( - "database/sql" + "context" + "errors" "fmt" "time" - _ "modernc.org/sqlite" + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgxpool" ) // reserveResult is the outcome of a pre-call limiter reservation. @@ -17,61 +19,169 @@ const ( reserveDeniedGlobal // global daily USD ceiling hit (⏳ rate-limit reaction, F24) ) +// LRU bounds for the dedup tables (unchanged from the former SQLite store): keep +// only the most recent ids so the tables don't grow without limit. +const ( + maxProcessedTxn = 5000 + maxProcessedEvent = 20000 +) + +// opTimeout bounds every store operation. SQLite (a local file) effectively never +// blocked; Postgres is over the docker network, so a cap keeps a stalled DB from +// hanging a per-room handler goroutine forever. +const opTimeout = 10 * time.Second + // Store is the durable bot state: transaction + event dedup, the daily spend -// ledger, and the encrypted-room warned set. Pure-Go SQLite (no cgo) so the binary -// stays static for a distroless/scratch image. +// ledger, and the encrypted-room warned set. It holds ONLY operational data — no +// message content (the room timeline lives in Synapse). Backed by a dedicated +// Postgres database (`vojo_ai`), in line with the per-service bridge databases, so +// the spend ledger, dedup state and warned set share the server's backup/restore. type Store struct { - db *sql.DB + pool *pgxpool.Pool } -func OpenStore(path string) (*Store, error) { - db, err := sql.Open("sqlite", path+"?_pragma=busy_timeout(5000)&_pragma=journal_mode(WAL)") +// OpenStore connects to the `vojo_ai` Postgres database via the AI_BOT_DATABASE_URL +// DSN, applies pending migrations, and returns a ready Store. A small pool suffices: +// the bot processes transactions serially and every statement here is short. +func OpenStore(dsn string) (*Store, error) { + cfg, err := pgxpool.ParseConfig(dsn) if err != nil { + return nil, fmt.Errorf("parse AI_BOT_DATABASE_URL: %w", err) + } + // The former SQLite store pinned a single connection to serialize all callers; + // pgx gives us a real pool. Keep it small — the per-room handler goroutines only + // ever issue brief statements, and the shared server runs many other databases. + cfg.MaxConns = 4 + cfg.MinConns = 1 + + ctx, cancel := context.WithTimeout(context.Background(), opTimeout) + defer cancel() + + pool, err := pgxpool.NewWithConfig(ctx, cfg) + if err != nil { + return nil, fmt.Errorf("connect vojo_ai: %w", err) + } + if err := pool.Ping(ctx); err != nil { + pool.Close() + return nil, fmt.Errorf("ping vojo_ai: %w", err) + } + + s := &Store{pool: pool} + if err := s.migrate(ctx); err != nil { + pool.Close() return nil, err } - // One connection: database/sql serializes all callers onto it, which keeps the - // now-concurrent handler goroutines from contending on SQLite write locks. All - // statements here are short, so callers block only briefly and never deadlock. - db.SetMaxOpenConns(1) - - schema := ` - CREATE TABLE IF NOT EXISTS processed_txn (txn_id TEXT PRIMARY KEY); - CREATE TABLE IF NOT EXISTS spend ( - date TEXT NOT NULL, mxid TEXT NOT NULL, - requests INTEGER NOT NULL DEFAULT 0, usd REAL NOT NULL DEFAULT 0, - PRIMARY KEY (date, mxid) - ); - CREATE TABLE IF NOT EXISTS warned_encrypted (room_id TEXT PRIMARY KEY); - CREATE TABLE IF NOT EXISTS processed_event (event_id TEXT PRIMARY KEY);` - if _, err := db.Exec(schema); err != nil { - db.Close() - return nil, fmt.Errorf("init schema: %w", err) - } - return &Store{db: db}, nil + return s, nil } -func (s *Store) Close() error { return s.db.Close() } +func (s *Store) Close() error { + s.pool.Close() + return nil +} + +// migrationLockKey namespaces the advisory lock that guards the migration runner so +// two starting instances (the bot is single-instance, but be robust) can't race the +// version check. Arbitrary fixed constant. +const migrationLockKey = 0x76_6f_6a_6f // "vojo" + +// migrations are applied in order; schema_version records the highest applied +// version so re-runs are no-ops. Every step is also idempotent (CREATE TABLE IF NOT +// EXISTS) so a half-applied database still converges. +var migrations = []string{ + // v1: the operational schema — a 1:1 port of the former SQLite tables. + // processed_* carry a surrogate identity column because Postgres has no rowid: + // the LRU trim orders by it, and txn_id/event_id stay UNIQUE for the upsert. + `CREATE TABLE IF NOT EXISTS processed_txn ( + id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, + txn_id TEXT UNIQUE NOT NULL + ); + CREATE TABLE IF NOT EXISTS processed_event ( + id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, + event_id TEXT UNIQUE NOT NULL + ); + CREATE TABLE IF NOT EXISTS spend ( + date TEXT NOT NULL, + mxid TEXT NOT NULL, + requests INTEGER NOT NULL DEFAULT 0, + usd DOUBLE PRECISION NOT NULL DEFAULT 0, + PRIMARY KEY (date, mxid) + ); + CREATE TABLE IF NOT EXISTS warned_encrypted (room_id TEXT PRIMARY KEY);`, +} + +// migrate runs all pending migrations on a single connection under a session +// advisory lock, recording each in schema_version. +func (s *Store) migrate(ctx context.Context) error { + conn, err := s.pool.Acquire(ctx) + if err != nil { + return fmt.Errorf("migrate: acquire: %w", err) + } + defer conn.Release() + + if _, err := conn.Exec(ctx, `SELECT pg_advisory_lock($1)`, int64(migrationLockKey)); err != nil { + return fmt.Errorf("migrate: lock: %w", err) + } + defer func() { + _, _ = conn.Exec(ctx, `SELECT pg_advisory_unlock($1)`, int64(migrationLockKey)) + }() + + if _, err := conn.Exec(ctx, `CREATE TABLE IF NOT EXISTS schema_version (version INTEGER PRIMARY KEY)`); err != nil { + return fmt.Errorf("migrate: schema_version: %w", err) + } + var current int + if err := conn.QueryRow(ctx, `SELECT COALESCE(MAX(version), 0) FROM schema_version`).Scan(¤t); err != nil { + return fmt.Errorf("migrate: read version: %w", err) + } + for v := current; v < len(migrations); v++ { + tx, err := conn.Begin(ctx) + if err != nil { + return fmt.Errorf("migrate: begin %d: %w", v+1, err) + } + if _, err := tx.Exec(ctx, migrations[v]); err != nil { + _ = tx.Rollback(ctx) + return fmt.Errorf("migrate: apply %d: %w", v+1, err) + } + if _, err := tx.Exec(ctx, `INSERT INTO schema_version (version) VALUES ($1)`, v+1); err != nil { + _ = tx.Rollback(ctx) + return fmt.Errorf("migrate: record %d: %w", v+1, err) + } + if err := tx.Commit(ctx); err != nil { + return fmt.Errorf("migrate: commit %d: %w", v+1, err) + } + } + return nil +} func todayUTC() string { return time.Now().UTC().Format("2006-01-02") } +// opContext derives a bounded context for a single store operation. +func opContext() (context.Context, context.CancelFunc) { + return context.WithTimeout(context.Background(), opTimeout) +} + // HasTxn / MarkTxn give appservice transactions idempotency across restarts: a // transaction Synapse retries (because our 200 was lost) is processed at most // once. The table is bounded to the most recent ids. func (s *Store) HasTxn(txnID string) (bool, error) { + ctx, cancel := opContext() + defer cancel() var one int - err := s.db.QueryRow(`SELECT 1 FROM processed_txn WHERE txn_id = ?`, txnID).Scan(&one) - if err == sql.ErrNoRows { + err := s.pool.QueryRow(ctx, `SELECT 1 FROM processed_txn WHERE txn_id = $1`, txnID).Scan(&one) + if errors.Is(err, pgx.ErrNoRows) { return false, nil } return err == nil, err } func (s *Store) MarkTxn(txnID string) error { - if _, err := s.db.Exec(`INSERT OR IGNORE INTO processed_txn (txn_id) VALUES (?)`, txnID); err != nil { + ctx, cancel := opContext() + defer cancel() + if _, err := s.pool.Exec(ctx, + `INSERT INTO processed_txn (txn_id) VALUES ($1) ON CONFLICT DO NOTHING`, txnID); err != nil { return err } - _, err := s.db.Exec(`DELETE FROM processed_txn WHERE rowid NOT IN - (SELECT rowid FROM processed_txn ORDER BY rowid DESC LIMIT 5000)`) + _, err := s.pool.Exec(ctx, `DELETE FROM processed_txn WHERE id NOT IN + (SELECT id FROM processed_txn ORDER BY id DESC LIMIT $1)`, maxProcessedTxn) return err } @@ -79,28 +189,37 @@ func (s *Store) MarkTxn(txnID string) error { // or already seen (false) — the DURABLE equivalent of the in-memory dedup set, so // a crash/restart between handling an event and acking its transaction can't make // the bot reprocess it (dup answer + double-bill + cap inflation). Bounded to the -// most recent ids. +// most recent ids. INSERT … ON CONFLICT DO NOTHING affects 1 row on insert and 0 on +// conflict, so RowsAffected distinguishes new from already-seen. func (s *Store) SeenEvent(eventID string) (bool, error) { - res, err := s.db.Exec(`INSERT OR IGNORE INTO processed_event (event_id) VALUES (?)`, eventID) + ctx, cancel := opContext() + defer cancel() + tag, err := s.pool.Exec(ctx, + `INSERT INTO processed_event (event_id) VALUES ($1) ON CONFLICT DO NOTHING`, eventID) if err != nil { return false, err } - if n, _ := res.RowsAffected(); n == 0 { + if tag.RowsAffected() == 0 { return false, nil // already recorded → not new } - _, err = s.db.Exec(`DELETE FROM processed_event WHERE rowid NOT IN - (SELECT rowid FROM processed_event ORDER BY rowid DESC LIMIT 20000)`) + _, err = s.pool.Exec(ctx, `DELETE FROM processed_event WHERE id NOT IN + (SELECT id FROM processed_event ORDER BY id DESC LIMIT $1)`, maxProcessedEvent) return true, err } -// SpentTodayUSD sums all spend for the current UTC day. +// SpentTodayUSD sums all spend for the current UTC day. SUM over no rows is NULL, +// which scans into a nil *float64 → treated as 0. func (s *Store) SpentTodayUSD() (float64, error) { - var v sql.NullFloat64 - err := s.db.QueryRow(`SELECT SUM(usd) FROM spend WHERE date = ?`, todayUTC()).Scan(&v) - if err != nil { + ctx, cancel := opContext() + defer cancel() + var v *float64 + if err := s.pool.QueryRow(ctx, `SELECT SUM(usd) FROM spend WHERE date = $1`, todayUTC()).Scan(&v); err != nil { return 0, err } - return v.Float64, nil + if v == nil { + return 0, nil + } + return *v, nil } // Reserve runs the two independent gates in one transaction, BEFORE the xAI call @@ -108,38 +227,62 @@ func (s *Store) SpentTodayUSD() (float64, error) { // anti-abuse. It increments the per-user request count on success; the USD is // reconciled after the response. Order: global first (cheapest to deny), then // per-user. +// +// A transaction-scoped advisory lock on (date, mxid) serializes concurrent +// reservations for the SAME user+day, so the per-user check-then-increment stays +// atomic. The former SQLite store got this for free (one connection serialized all +// callers); the pgx pool is concurrent, and the same user messaging from two rooms +// at once would otherwise be able to slip past the per-user cap. Different users +// never contend. func (s *Store) Reserve(mxid string, perUserCap int, dailyUSDCeiling float64) (reserveResult, error) { + ctx, cancel := opContext() + defer cancel() day := todayUTC() - tx, err := s.db.Begin() + + tx, err := s.pool.Begin(ctx) if err != nil { return reserveOK, err } - defer tx.Rollback() + defer tx.Rollback(ctx) - var global sql.NullFloat64 - if err := tx.QueryRow(`SELECT SUM(usd) FROM spend WHERE date = ?`, day).Scan(&global); err != nil { + // Key on date|mxid. The separator only needs to avoid cross-key ambiguity; a + // hash collision would merely over-serialize two unrelated users, never corrupt a + // count. (NUL is rejected by Postgres text, so use a printable separator.) + if _, err := tx.Exec(ctx, `SELECT pg_advisory_xact_lock(hashtextextended($1, 0))`, day+"|"+mxid); err != nil { return reserveOK, err } - if global.Float64 >= dailyUSDCeiling { + + // SUM over zero rows is NULL → nil pointer → treat as 0.0, exactly as the SQLite + // store's sql.NullFloat64 did (and as SpentTodayUSD does). This keeps the gate 1:1 + // even at the degenerate dailyUSDCeiling == 0 (deny everything), where 0 >= 0. + var global *float64 + if err := tx.QueryRow(ctx, `SELECT SUM(usd) FROM spend WHERE date = $1`, day).Scan(&global); err != nil { + return reserveOK, err + } + spentToday := 0.0 + if global != nil { + spentToday = *global + } + if spentToday >= dailyUSDCeiling { return reserveDeniedGlobal, nil } var requests int - err = tx.QueryRow(`SELECT requests FROM spend WHERE date = ? AND mxid = ?`, day, mxid).Scan(&requests) - if err != nil && err != sql.ErrNoRows { + err = tx.QueryRow(ctx, `SELECT requests FROM spend WHERE date = $1 AND mxid = $2`, day, mxid).Scan(&requests) + if err != nil && !errors.Is(err, pgx.ErrNoRows) { return reserveOK, err } if requests >= perUserCap { return reserveDeniedUser, nil } - if _, err := tx.Exec( - `INSERT INTO spend (date, mxid, requests, usd) VALUES (?, ?, 1, 0) - ON CONFLICT(date, mxid) DO UPDATE SET requests = requests + 1`, + if _, err := tx.Exec(ctx, + `INSERT INTO spend (date, mxid, requests, usd) VALUES ($1, $2, 1, 0) + ON CONFLICT (date, mxid) DO UPDATE SET requests = spend.requests + 1`, day, mxid); err != nil { return reserveOK, err } - if err := tx.Commit(); err != nil { + if err := tx.Commit(ctx); err != nil { return reserveOK, err } return reserveOK, nil @@ -147,20 +290,26 @@ func (s *Store) Reserve(mxid string, perUserCap int, dailyUSDCeiling float64) (r // RefundRequest gives back a reserved request slot when the call ultimately // failed (e.g. an xAI outage), so a transient failure doesn't burn the user's -// daily cap. Never drops below zero. +// daily cap. Never drops below zero. A single UPDATE is atomic, so concurrent +// refunds settle correctly without extra locking. func (s *Store) RefundRequest(mxid string) error { - _, err := s.db.Exec( - `UPDATE spend SET requests = MAX(0, requests - 1) WHERE date = ? AND mxid = ?`, + ctx, cancel := opContext() + defer cancel() + _, err := s.pool.Exec(ctx, + `UPDATE spend SET requests = GREATEST(0, requests - 1) WHERE date = $1 AND mxid = $2`, todayUTC(), mxid) return err } // Reconcile books the actual USD cost of a completed call against the user's -// daily row (and thus the global total). +// daily row (and thus the global total). The accumulating upsert is atomic and +// commutative, so concurrent reconciles for the same user sum correctly. func (s *Store) Reconcile(mxid string, usd float64) error { - _, err := s.db.Exec( - `INSERT INTO spend (date, mxid, requests, usd) VALUES (?, ?, 0, ?) - ON CONFLICT(date, mxid) DO UPDATE SET usd = usd + excluded.usd`, + ctx, cancel := opContext() + defer cancel() + _, err := s.pool.Exec(ctx, + `INSERT INTO spend (date, mxid, requests, usd) VALUES ($1, $2, 0, $3) + ON CONFLICT (date, mxid) DO UPDATE SET usd = spend.usd + excluded.usd`, todayUTC(), mxid, usd) return err } @@ -170,15 +319,20 @@ func (s *Store) Reconcile(mxid string, usd float64) error { // message (F5). The bot never reacts to its own events: m.reaction is not an // m.room.message, so it never re-enters handleMessage. func (s *Store) HasWarnedEncrypted(roomID string) (bool, error) { + ctx, cancel := opContext() + defer cancel() var one int - err := s.db.QueryRow(`SELECT 1 FROM warned_encrypted WHERE room_id = ?`, roomID).Scan(&one) - if err == sql.ErrNoRows { + err := s.pool.QueryRow(ctx, `SELECT 1 FROM warned_encrypted WHERE room_id = $1`, roomID).Scan(&one) + if errors.Is(err, pgx.ErrNoRows) { return false, nil } return err == nil, err } func (s *Store) SetWarnedEncrypted(roomID string) error { - _, err := s.db.Exec(`INSERT OR IGNORE INTO warned_encrypted (room_id) VALUES (?)`, roomID) + ctx, cancel := opContext() + defer cancel() + _, err := s.pool.Exec(ctx, + `INSERT INTO warned_encrypted (room_id) VALUES ($1) ON CONFLICT DO NOTHING`, roomID) return err } diff --git a/apps/ai-bot/store_test.go b/apps/ai-bot/store_test.go new file mode 100644 index 00000000..a6edc3e9 --- /dev/null +++ b/apps/ai-bot/store_test.go @@ -0,0 +1,211 @@ +package main + +import ( + "sync" + "sync/atomic" + "testing" +) + +// These tests exercise the Postgres-backed store directly. They run only when +// AI_BOT_TEST_DATABASE_URL points at a throwaway database (openTestStore skips +// otherwise) and start from a clean slate (openTestStore truncates). + +func TestStoreTxnDedup(t *testing.T) { + st := openTestStore(t) + defer st.Close() + + if got, err := st.HasTxn("txn-1"); err != nil || got { + t.Fatalf("fresh txn: got (%v,%v), want (false,nil)", got, err) + } + if err := st.MarkTxn("txn-1"); err != nil { + t.Fatalf("mark: %v", err) + } + if got, err := st.HasTxn("txn-1"); err != nil || !got { + t.Fatalf("marked txn: got (%v,%v), want (true,nil)", got, err) + } + // Re-marking is idempotent (a retried transaction). + if err := st.MarkTxn("txn-1"); err != nil { + t.Fatalf("re-mark: %v", err) + } + if got, _ := st.HasTxn("txn-2"); got { + t.Fatalf("unrelated txn must be unseen") + } +} + +func TestStoreSeenEvent(t *testing.T) { + st := openTestStore(t) + defer st.Close() + + first, err := st.SeenEvent("$ev1") + if err != nil || !first { + t.Fatalf("first SeenEvent: got (%v,%v), want (true,nil)", first, err) + } + again, err := st.SeenEvent("$ev1") + if err != nil || again { + t.Fatalf("repeat SeenEvent: got (%v,%v), want (false,nil)", again, err) + } + other, err := st.SeenEvent("$ev2") + if err != nil || !other { + t.Fatalf("new SeenEvent: got (%v,%v), want (true,nil)", other, err) + } +} + +// Dedup state must survive a process restart — the whole point of the durable store. +func TestStoreDedupSurvivesRestart(t *testing.T) { + st := openTestStore(t) + if _, err := st.SeenEvent("$ev-restart"); err != nil { + t.Fatalf("seen: %v", err) + } + if err := st.MarkTxn("txn-restart"); err != nil { + t.Fatalf("mark: %v", err) + } + st.Close() + + // Reopen the same database WITHOUT truncating: simulates a container restart. + st2, err := OpenStore(testDSN()) + if err != nil { + t.Fatalf("reopen: %v", err) + } + defer st2.Close() + + if isNew, err := st2.SeenEvent("$ev-restart"); err != nil || isNew { + t.Fatalf("event after restart must be already-seen: got (%v,%v)", isNew, err) + } + if seen, err := st2.HasTxn("txn-restart"); err != nil || !seen { + t.Fatalf("txn after restart must be seen: got (%v,%v)", seen, err) + } +} + +func TestStoreLimiterPerUserCap(t *testing.T) { + st := openTestStore(t) + defer st.Close() + + const user = "@u:vojo.chat" + const cap, ceiling = 2, 100.0 + + for i := 0; i < cap; i++ { + if res, err := st.Reserve(user, cap, ceiling); err != nil || res != reserveOK { + t.Fatalf("reserve %d: got (%v,%v), want reserveOK", i, res, err) + } + } + // The (cap+1)th request is denied per-user. + if res, err := st.Reserve(user, cap, ceiling); err != nil || res != reserveDeniedUser { + t.Fatalf("over-cap reserve: got (%v,%v), want reserveDeniedUser", res, err) + } + // A different user is unaffected. + if res, err := st.Reserve("@v:vojo.chat", cap, ceiling); err != nil || res != reserveOK { + t.Fatalf("other user reserve: got (%v,%v), want reserveOK", res, err) + } + // Refund returns a slot, so the first user can reserve once more. + if err := st.RefundRequest(user); err != nil { + t.Fatalf("refund: %v", err) + } + if res, err := st.Reserve(user, cap, ceiling); err != nil || res != reserveOK { + t.Fatalf("post-refund reserve: got (%v,%v), want reserveOK", res, err) + } +} + +// A zero per-user cap denies even the first request — the SQLite store's +// requests(0) >= cap(0) behaviour, preserved. +func TestStoreLimiterZeroCap(t *testing.T) { + st := openTestStore(t) + defer st.Close() + if res, err := st.Reserve("@u:vojo.chat", 0, 100.0); err != nil || res != reserveDeniedUser { + t.Fatalf("zero-cap reserve: got (%v,%v), want reserveDeniedUser", res, err) + } +} + +// A zero ceiling denies the very first request of the day even before any spend row +// exists — the SQLite store treated SUM(NULL) as 0.0 (0 >= 0), and the PG store must +// match (SUM over zero rows is NULL). +func TestStoreLimiterZeroCeiling(t *testing.T) { + st := openTestStore(t) + defer st.Close() + if res, err := st.Reserve("@u:vojo.chat", 1_000_000, 0); err != nil || res != reserveDeniedGlobal { + t.Fatalf("zero-ceiling reserve on empty store: got (%v,%v), want reserveDeniedGlobal", res, err) + } +} + +func TestStoreLimiterGlobalCeiling(t *testing.T) { + st := openTestStore(t) + defer st.Close() + + const ceiling = 1.0 + // Book spend up to the ceiling (Reconcile is what feeds the global gate). + if err := st.Reconcile("@a:vojo.chat", 0.6); err != nil { + t.Fatalf("reconcile a: %v", err) + } + if err := st.Reconcile("@b:vojo.chat", 0.5); err != nil { + t.Fatalf("reconcile b: %v", err) + } + if spent, err := st.SpentTodayUSD(); err != nil || spent < 1.1 { + t.Fatalf("spent today: got (%v,%v), want >= 1.1", spent, err) + } + // Now any reservation is denied globally, regardless of the per-user cap. + if res, err := st.Reserve("@c:vojo.chat", 1_000_000, ceiling); err != nil || res != reserveDeniedGlobal { + t.Fatalf("over-ceiling reserve: got (%v,%v), want reserveDeniedGlobal", res, err) + } +} + +// The pgx pool is concurrent (the SQLite store serialized on one connection). The +// advisory lock in Reserve must still admit EXACTLY perUserCap requests when many +// arrive at once for the same user — the same user messaging from several rooms +// simultaneously must not slip past the cap. +func TestStoreReserveConcurrentRespectsCap(t *testing.T) { + st := openTestStore(t) + defer st.Close() + + const user = "@race:vojo.chat" + const cap = 10 + const goroutines = 50 + + var ok int64 + var wg sync.WaitGroup + for i := 0; i < goroutines; i++ { + wg.Add(1) + go func() { + defer wg.Done() + res, err := st.Reserve(user, cap, 1e9) + if err != nil { + t.Errorf("reserve: %v", err) + return + } + if res == reserveOK { + atomic.AddInt64(&ok, 1) + } + }() + } + wg.Wait() + if ok != cap { + t.Fatalf("concurrent reserves admitted %d, want exactly %d (the per-user cap)", ok, cap) + } +} + +func TestStoreWarnedEncrypted(t *testing.T) { + st := openTestStore(t) + const room = "!enc:vojo.chat" + if warned, err := st.HasWarnedEncrypted(room); err != nil || warned { + t.Fatalf("fresh room: got (%v,%v), want (false,nil)", warned, err) + } + if err := st.SetWarnedEncrypted(room); err != nil { + t.Fatalf("set: %v", err) + } + // Setting twice is idempotent. + if err := st.SetWarnedEncrypted(room); err != nil { + t.Fatalf("re-set: %v", err) + } + if warned, err := st.HasWarnedEncrypted(room); err != nil || !warned { + t.Fatalf("warned room: got (%v,%v), want (true,nil)", warned, err) + } + st.Close() + + // The one-shot flag must outlive a restart (F5: no re-react after restart). + st2, err := OpenStore(testDSN()) + if err != nil { + t.Fatalf("reopen: %v", err) + } + defer st2.Close() + if warned, err := st2.HasWarnedEncrypted(room); err != nil || !warned { + t.Fatalf("warned after restart: got (%v,%v), want (true,nil)", warned, err) + } +} diff --git a/docs/ai/server-side.md b/docs/ai/server-side.md index a7a86aef..758f05b3 100644 --- a/docs/ai/server-side.md +++ b/docs/ai/server-side.md @@ -40,7 +40,8 @@ you're touching server config, SSH to the box and read the live file. Last refre ## Postgres roles & databases -`synapse` is the historical superuser. Each bridge has its own role + DB with a 32-char +`synapse` is the historical superuser. Each bridge — and the Vojo AI bot — has its own +non-superuser role + DB with a 32-char `openssl rand -base64 32 | tr -d '/+=' | head -c 32` password. | Role | Database | Used by | @@ -49,6 +50,7 @@ you're touching server config, SSH to the box and read the live file. Last refre | `mautrix_telegram` | `mautrix_telegram` | Telegram bridge | | `mautrix_discord` | `mautrix_discord` | Discord bridge | | `mautrix_whatsapp` | `mautrix_whatsapp` | WhatsApp bridge | +| `vojo_ai` | `vojo_ai` | Vojo AI bot — operational store (txn/event dedup, daily spend ledger, encrypted-warned set; **no** message content). DSN via `AI_BOT_DATABASE_URL`. | ## `~/vojo/synapse/homeserver.yaml` — relevant slices @@ -130,7 +132,7 @@ in doubt. | `telegram-bridge` | `dock.mau.dev/mautrix/telegram:` | `./bridges/telegram:/data` | | `discord-bridge` | `dock.mau.dev/mautrix/discord:v0.7.5` | `./bridges/discord:/data` (legacy bridge — runtime reports `0.7.6+dev`) | | `whatsapp-bridge` | `dock.mau.dev/mautrix/whatsapp:v0.12.4` | `./bridges/whatsapp:/data` | -| `ai-bot` | `ai-bot:custom` (built locally from [`apps/ai-bot/`](../../apps/ai-bot/), shipped via `docker save \| ssh docker load` — VS Code task **Deploy AI bot**) | **Vojo AI** = `@ai:vojo.chat`, an xAI-Grok-backed **application service** (NOT a normal bot user). Answers `@`-mentions in groups + everything in 1:1s; the Grok reply (markdown) is rendered to `org.matrix.custom.html` and sent as `formatted_body` (in-bot `markdown.go`, zero deps; emits only tags Cinny's sanitizer keeps, escapes all model text), falling back to the plain `body` when there's no formatting. Mounts `./ai-bot:/data` (owned **uid 65532**, distroless nonroot) holding `registration.yaml` (self-generated, `generate-registration`), `state/` (SQLite spend ledger + txn dedup) and `secrets/xai_api_key`. Push port `:8009` (registration `url: http://ai-bot:8009`). Secrets via env/`*_FILE`; `as_token`/`hs_token` read from `registration.yaml` (no rotation). See [`apps/ai-bot/README.md`](../../apps/ai-bot/README.md). | +| `ai-bot` | `ai-bot:custom` (built locally from [`apps/ai-bot/`](../../apps/ai-bot/), shipped via `docker save \| ssh docker load` — VS Code task **Deploy AI bot**) | **Vojo AI** = `@ai:vojo.chat`, an xAI-Grok-backed **application service** (NOT a normal bot user). Answers `@`-mentions in groups + everything in 1:1s; the Grok reply (markdown) is rendered to `org.matrix.custom.html` and sent as `formatted_body` (in-bot `markdown.go`, zero deps; emits only tags Cinny's sanitizer keeps, escapes all model text), falling back to the plain `body` when there's no formatting. Mounts `./ai-bot:/data` (owned **uid 65532**, distroless nonroot) holding `registration.yaml` (self-generated, `generate-registration`), `state/` (runtime dir) and `secrets/xai_api_key`. Its **operational store** (txn/event dedup, daily spend ledger, encrypted-warned set) lives in the dedicated `vojo_ai` Postgres DB via `AI_BOT_DATABASE_URL` — `depends_on: [synapse, postgres]`. Push port `:8009` (registration `url: http://ai-bot:8009`). Secrets via env/`*_FILE`; `as_token`/`hs_token` read from `registration.yaml` (no rotation). See [`apps/ai-bot/README.md`](../../apps/ai-bot/README.md). | ### Bridge service stanza (template)