fix(ai-bot): ack transactions instantly with async per-room processing to stop slow-call freezes, and signal system states via emoji reactions
This commit is contained in:
parent
a5fcce4d77
commit
ebb2363d9d
8 changed files with 539 additions and 193 deletions
|
|
@ -103,15 +103,23 @@ func (a *AppService) handleTransaction(w http.ResponseWriter, r *http.Request) {
|
|||
writeError(w, http.StatusBadRequest, "M_NOT_JSON", "invalid transaction body")
|
||||
return
|
||||
}
|
||||
a.log.Debug("transaction received", "txn", txnID, "events", len(txn.Events))
|
||||
|
||||
// Process with the bot's long-lived context (not the request context) so a
|
||||
// homeserver-side timeout can't cancel an in-flight reply mid-send.
|
||||
a.handler(a.baseCtx, txn.Events)
|
||||
a.log.Debug("transaction accepted", "txn", txnID, "events", len(txn.Events))
|
||||
|
||||
// Mark the transaction done BEFORE processing and ack 200 immediately. The bot
|
||||
// must answer Synapse fast: Synapse delivers transactions serially and waits for
|
||||
// the 200, and if it's late (the handler used to block on the ~180s xAI call) it
|
||||
// marks the AS down and replays with growing backoff — the "bot silent for
|
||||
// minutes" symptom. Per-event durable dedup (Store.SeenEvent) is the real guard
|
||||
// against double-answers, so acking before the work finishes is safe (at-most-once:
|
||||
// a hard crash mid-processing drops the message rather than answering it twice).
|
||||
if err := a.store.MarkTxn(txnID); err != nil {
|
||||
a.log.Error("txn mark failed", "txn", txnID, "err", err)
|
||||
}
|
||||
// Process off the request path with the bot's long-lived context (not the request
|
||||
// context) so the work — and the eventual reply — survives the homeserver dropping
|
||||
// the connection.
|
||||
go a.handler(a.baseCtx, txn.Events)
|
||||
|
||||
writeJSON(w, http.StatusOK, struct{}{})
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -9,22 +9,39 @@ import (
|
|||
"path/filepath"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func newTestAS(t *testing.T, dispatched *[][]Event) (*AppService, *Store) {
|
||||
// newTestAS wires an AppService whose handler pushes each dispatched batch onto a
|
||||
// channel. Transactions are now processed asynchronously (the 200 is returned before
|
||||
// the handler runs), so tests read from the channel with a timeout instead of
|
||||
// inspecting a slice immediately after the call.
|
||||
func newTestAS(t *testing.T) (*AppService, *Store, chan []Event) {
|
||||
t.Helper()
|
||||
st, err := OpenStore(filepath.Join(t.TempDir(), "t.db"))
|
||||
if err != nil {
|
||||
t.Fatalf("open store: %v", err)
|
||||
}
|
||||
dispatched := make(chan []Event, 8)
|
||||
as := NewAppService(
|
||||
&Config{HSToken: "secret", BotMXID: "@ai:vojo.chat"},
|
||||
slog.New(slog.NewTextHandler(io.Discard, nil)),
|
||||
st,
|
||||
func(_ context.Context, ev []Event) { *dispatched = append(*dispatched, ev) },
|
||||
func(_ context.Context, ev []Event) { dispatched <- ev },
|
||||
)
|
||||
as.baseCtx = context.Background()
|
||||
return as, st
|
||||
return as, st, dispatched
|
||||
}
|
||||
|
||||
// waitDispatch returns the next dispatched batch, or (nil,false) if none arrives
|
||||
// within the timeout.
|
||||
func waitDispatch(ch chan []Event, timeout time.Duration) ([]Event, bool) {
|
||||
select {
|
||||
case ev := <-ch:
|
||||
return ev, true
|
||||
case <-time.After(timeout):
|
||||
return nil, false
|
||||
}
|
||||
}
|
||||
|
||||
func txnReq(txnID, auth, body string) *http.Request {
|
||||
|
|
@ -37,8 +54,7 @@ func txnReq(txnID, auth, body string) *http.Request {
|
|||
}
|
||||
|
||||
func TestTransactionAuthAndIdempotency(t *testing.T) {
|
||||
var dispatched [][]Event
|
||||
as, st := newTestAS(t, &dispatched)
|
||||
as, st, dispatched := newTestAS(t)
|
||||
defer st.Close()
|
||||
body := `{"events":[{"type":"m.room.message","room_id":"!r:vojo.chat","event_id":"$1","sender":"@u:vojo.chat"}]}`
|
||||
|
||||
|
|
@ -48,18 +64,19 @@ func TestTransactionAuthAndIdempotency(t *testing.T) {
|
|||
if w.Code != http.StatusForbidden {
|
||||
t.Fatalf("bad token: got %d, want 403", w.Code)
|
||||
}
|
||||
if len(dispatched) != 0 {
|
||||
t.Fatalf("bad token must not dispatch, got %d", len(dispatched))
|
||||
if _, ok := waitDispatch(dispatched, 100*time.Millisecond); ok {
|
||||
t.Fatalf("bad token must not dispatch")
|
||||
}
|
||||
|
||||
// Good hs_token → 200, one batch dispatched.
|
||||
// Good hs_token → 200, one batch dispatched (asynchronously).
|
||||
w = httptest.NewRecorder()
|
||||
as.handleTransaction(w, txnReq("txn1", "secret", body))
|
||||
if w.Code != http.StatusOK {
|
||||
t.Fatalf("good token: got %d, want 200", w.Code)
|
||||
}
|
||||
if len(dispatched) != 1 || len(dispatched[0]) != 1 {
|
||||
t.Fatalf("expected one dispatched batch of one event, got %v", dispatched)
|
||||
batch, ok := waitDispatch(dispatched, time.Second)
|
||||
if !ok || len(batch) != 1 {
|
||||
t.Fatalf("expected one dispatched batch of one event, got %v ok=%v", batch, ok)
|
||||
}
|
||||
|
||||
// Same txnId again → idempotent no-op (still 200, no re-dispatch).
|
||||
|
|
@ -68,14 +85,13 @@ func TestTransactionAuthAndIdempotency(t *testing.T) {
|
|||
if w.Code != http.StatusOK {
|
||||
t.Fatalf("retry: got %d, want 200", w.Code)
|
||||
}
|
||||
if len(dispatched) != 1 {
|
||||
t.Fatalf("retried transaction must not re-dispatch, got %d batches", len(dispatched))
|
||||
if _, ok := waitDispatch(dispatched, 100*time.Millisecond); ok {
|
||||
t.Fatalf("retried transaction must not re-dispatch")
|
||||
}
|
||||
}
|
||||
|
||||
func TestTransactionLegacyQueryTokenAccepted(t *testing.T) {
|
||||
var dispatched [][]Event
|
||||
as, st := newTestAS(t, &dispatched)
|
||||
as, st, _ := newTestAS(t)
|
||||
defer st.Close()
|
||||
|
||||
r := httptest.NewRequest(http.MethodPut, "/transactions/txnX?access_token=secret", strings.NewReader(`{"events":[]}`))
|
||||
|
|
@ -88,8 +104,7 @@ func TestTransactionLegacyQueryTokenAccepted(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestUserQuery(t *testing.T) {
|
||||
var dispatched [][]Event
|
||||
as, st := newTestAS(t, &dispatched)
|
||||
as, st, _ := newTestAS(t)
|
||||
defer st.Close()
|
||||
|
||||
mk := func(uid string) *http.Request {
|
||||
|
|
|
|||
|
|
@ -5,6 +5,7 @@ import (
|
|||
"fmt"
|
||||
"log/slog"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
// roomMeta caches per-room classification we need to handle a message: member
|
||||
|
|
@ -13,7 +14,9 @@ import (
|
|||
// (appservice transactions carry no room summary) and INVALIDATED whenever a
|
||||
// third party's membership changes, so a 1:1 that gains a member is reclassified
|
||||
// out of DM mode (no DM-mode third-party leak) and a newly added foreign member
|
||||
// is caught.
|
||||
// is caught. All fields are guarded by Bot.mu — never read or written without it,
|
||||
// because the slow generation and the lazy CS-API probes run in concurrent per-room
|
||||
// goroutines.
|
||||
type roomMeta struct {
|
||||
joined, invited int
|
||||
countsKnown bool
|
||||
|
|
@ -30,14 +33,18 @@ type Bot struct {
|
|||
xai *XAIClient
|
||||
st *Store
|
||||
|
||||
// Transactions are delivered one at a time by Synapse, but guard the shared
|
||||
// maps/sets anyway so an unexpected concurrent call can't corrupt them.
|
||||
mu sync.Mutex
|
||||
seen *lruSet // event ids already handled (dedup within a session)
|
||||
botSent *lruSet // event ids the bot itself sent (reply-parent detection)
|
||||
meta map[string]*roomMeta
|
||||
buf map[string][]bufferedMsg
|
||||
globalNote map[string]string // roomID → UTC date we last sent the global daily-limit notice
|
||||
// mu guards the in-memory maps/sets below. Each transaction is acked to Synapse
|
||||
// immediately (appservice.go) and its events are processed in transaction order,
|
||||
// but the slow xAI generation runs in a per-room goroutine and the lazy probes run
|
||||
// off-lock, so several goroutines touch this shared state at once. mu is held only
|
||||
// for short map operations and is NEVER held across a network or xAI call — that
|
||||
// head-of-line hold was the root cause of the multi-minute silence.
|
||||
mu sync.Mutex
|
||||
seen *lruSet // event ids already handled (dedup within a session; self-locking)
|
||||
botSent *lruSet // event ids the bot itself sent (reply-parent detection; self-locking)
|
||||
meta map[string]*roomMeta
|
||||
buf map[string][]bufferedMsg
|
||||
inflight map[string]bool // roomID currently generating a reply (per-room single-flight)
|
||||
}
|
||||
|
||||
func NewBot(ctx context.Context, cfg *Config, logger *slog.Logger) (*Bot, error) {
|
||||
|
|
@ -50,16 +57,16 @@ func NewBot(ctx context.Context, cfg *Config, logger *slog.Logger) (*Bot, error)
|
|||
}
|
||||
|
||||
b := &Bot{
|
||||
cfg: cfg,
|
||||
log: logger,
|
||||
mx: mx,
|
||||
xai: xai,
|
||||
st: st,
|
||||
seen: newLRUSet(5000),
|
||||
botSent: newLRUSet(5000),
|
||||
meta: make(map[string]*roomMeta),
|
||||
buf: make(map[string][]bufferedMsg),
|
||||
globalNote: make(map[string]string),
|
||||
cfg: cfg,
|
||||
log: logger,
|
||||
mx: mx,
|
||||
xai: xai,
|
||||
st: st,
|
||||
seen: newLRUSet(5000),
|
||||
botSent: newLRUSet(5000),
|
||||
meta: make(map[string]*roomMeta),
|
||||
buf: make(map[string][]bufferedMsg),
|
||||
inflight: make(map[string]bool),
|
||||
}
|
||||
|
||||
// Confirm the as_token + user_id resolves to BOT_MXID before serving.
|
||||
|
|
@ -98,52 +105,83 @@ func (b *Bot) Run(ctx context.Context) error {
|
|||
return as.Serve(ctx)
|
||||
}
|
||||
|
||||
// handleTransaction processes one pushed transaction's events in order.
|
||||
// handleTransaction processes one already-acked transaction's events. It runs in a
|
||||
// background goroutine (the 200 has already been returned to Synapse). Events are
|
||||
// processed IN ORDER (dedup + classification are synchronous) so the per-room
|
||||
// single-flight claim is taken in arrival order; only the slow xAI generation is
|
||||
// spawned per room, so different rooms answer concurrently and a slow call never
|
||||
// blocks the next event or another room. Ordering within a room is therefore kept,
|
||||
// while the head-of-line freeze is gone.
|
||||
func (b *Bot) handleTransaction(ctx context.Context, events []Event) {
|
||||
b.mu.Lock()
|
||||
defer b.mu.Unlock()
|
||||
for i := range events {
|
||||
b.handleEvent(ctx, &events[i])
|
||||
}
|
||||
}
|
||||
|
||||
// safego runs fn in a goroutine with panic recovery. The slow per-room work is
|
||||
// detached from the HTTP handler, so an unrecovered panic there would crash the
|
||||
// whole process and silence the bot for EVERY room — recover + log instead, so one
|
||||
// malformed event can never take the bot down.
|
||||
func (b *Bot) safego(what string, fn func()) {
|
||||
go func() {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
b.log.Error("recovered panic in handler goroutine", "what", what, "panic", r)
|
||||
}
|
||||
}()
|
||||
fn()
|
||||
}()
|
||||
}
|
||||
|
||||
func (b *Bot) handleEvent(ctx context.Context, ev *Event) {
|
||||
if ev.EventID == "" || ev.RoomID == "" {
|
||||
return
|
||||
}
|
||||
if !b.seen.Add(ev.EventID) {
|
||||
return // already handled this session (fast in-memory path)
|
||||
}
|
||||
// Durable dedup across restarts: if a previous run already handled this event
|
||||
// but crashed before its transaction was acked, Synapse re-pushes it — don't
|
||||
// reprocess (no dup answer / double-bill). On a DB error, fall through; the
|
||||
// in-memory set still guards this session.
|
||||
if isNew, err := b.st.SeenEvent(ev.EventID); err != nil {
|
||||
b.log.Error("durable dedup check failed", "id", ev.EventID, "err", err)
|
||||
} else if !isNew {
|
||||
return
|
||||
if !b.markSeen(ev.EventID) {
|
||||
return // already handled (in-memory or durable dedup)
|
||||
}
|
||||
b.log.Debug("event", "type", ev.Type, "room", ev.RoomID, "sender", ev.Sender, "id", ev.EventID)
|
||||
switch ev.Type {
|
||||
case "m.room.member":
|
||||
if ev.StateKey != nil && *ev.StateKey == b.cfg.BotMXID {
|
||||
b.handleSelfMembership(ctx, ev)
|
||||
} else if m := b.meta[ev.RoomID]; m != nil {
|
||||
// A third party's membership changed: counts + foreign flag are now
|
||||
// stale. Re-probe on the next message so a 1:1 that gains a member drops
|
||||
// out of DM mode (no third-party leak) and a new foreign member is caught.
|
||||
m.countsKnown = false
|
||||
b.safego("self-membership", func() { b.handleSelfMembership(ctx, ev) })
|
||||
} else {
|
||||
// A third party's membership changed: counts + foreign flag are now stale.
|
||||
// Re-probe on the next message so a 1:1 that gains a member drops out of DM
|
||||
// mode (no third-party leak) and a new foreign member is caught.
|
||||
b.invalidateCounts(ev.RoomID)
|
||||
}
|
||||
case "m.room.encryption":
|
||||
m := b.getMeta(ev.RoomID)
|
||||
m.encrypted, m.encKnown = true, true
|
||||
b.setEncrypted(ev.RoomID)
|
||||
case "m.room.message":
|
||||
// Synchronous (in transaction order) up to the single-flight claim; only the
|
||||
// slow generation inside handleMessage is spawned as a goroutine. This keeps
|
||||
// per-room event order — the earlier message wins the claim — while different
|
||||
// rooms still run concurrently.
|
||||
b.handleMessage(ctx, ev)
|
||||
}
|
||||
}
|
||||
|
||||
// markSeen records an event id in both the in-memory set and the durable store and
|
||||
// reports whether it is NEW (first time). The in-memory Add is atomic, and SeenEvent
|
||||
// is an atomic INSERT OR IGNORE, so two racing goroutines for the same event can never
|
||||
// both proceed. On a durable-store error we fall through (the in-memory set still
|
||||
// guards this session).
|
||||
func (b *Bot) markSeen(eventID string) bool {
|
||||
if !b.seen.Add(eventID) {
|
||||
return false
|
||||
}
|
||||
isNew, err := b.st.SeenEvent(eventID)
|
||||
if err != nil {
|
||||
b.log.Error("durable dedup check failed", "id", eventID, "err", err)
|
||||
return true
|
||||
}
|
||||
return isNew
|
||||
}
|
||||
|
||||
// handleSelfMembership reacts to membership changes for the bot user: auto-join
|
||||
// invites from allowed servers (F11), reject others, forget rooms we leave.
|
||||
// invites from allowed servers (F11), reject others, forget rooms we leave. Runs in
|
||||
// its own goroutine because JoinRoom/LeaveRoom are network calls.
|
||||
func (b *Bot) handleSelfMembership(ctx context.Context, ev *Event) {
|
||||
switch ev.membershipOf() {
|
||||
case "invite":
|
||||
|
|
@ -161,14 +199,11 @@ func (b *Bot) handleSelfMembership(ctx context.Context, ev *Event) {
|
|||
}
|
||||
// Fully-on-allowed-servers gate: a vojo.chat inviter can still pull the bot
|
||||
// into a room that already holds federated third parties — leave at once.
|
||||
m := b.getMeta(ev.RoomID)
|
||||
b.ensureCounts(ctx, ev.RoomID, m)
|
||||
if m.countsKnown && m.foreign {
|
||||
if _, _, foreign := b.ensureCounts(ctx, ev.RoomID); foreign {
|
||||
b.leaveForeign(ctx, ev.RoomID)
|
||||
}
|
||||
case "leave", "ban":
|
||||
delete(b.meta, ev.RoomID)
|
||||
delete(b.buf, ev.RoomID)
|
||||
b.forgetRoom(ev.RoomID)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -183,116 +218,148 @@ func (b *Bot) leaveForeign(ctx context.Context, roomID string) {
|
|||
|
||||
func (b *Bot) handleMessage(ctx context.Context, ev *Event) {
|
||||
roomID := ev.RoomID
|
||||
m := b.getMeta(roomID)
|
||||
|
||||
// A9/F15: re-check encryption; if (or once) encrypted, warn once and skip —
|
||||
// the bot can't read it.
|
||||
b.ensureEncryption(ctx, roomID, m)
|
||||
if m.encrypted {
|
||||
// A9/F15: re-check encryption; if (or once) encrypted, react once and skip — the
|
||||
// bot can't read it. The probe runs without the lock.
|
||||
if b.ensureEncryption(ctx, roomID) {
|
||||
b.log.Debug("skip: encrypted room", "room", roomID)
|
||||
b.warnEncryptedOnce(ctx, roomID)
|
||||
b.reactEncryptedOnce(ctx, roomID, ev.EventID)
|
||||
return
|
||||
}
|
||||
|
||||
mc, ok := ev.DecodeMessage()
|
||||
if !ok {
|
||||
// A message addressed to the bot that we can't decode shouldn't vanish without
|
||||
// a trace (no silent drops): log at WARN so it's visible at the default level.
|
||||
b.log.Warn("skip: undecodable message content", "room", roomID, "sender", ev.Sender, "id", ev.EventID)
|
||||
return
|
||||
}
|
||||
// Edits re-carry m.mentions; never re-trigger or replay them (F16).
|
||||
if mc.IsReplace() {
|
||||
return
|
||||
}
|
||||
// Only plain text ever reaches xAI: drop media (m.image/m.file/m.audio/…) and
|
||||
// other custom msgtypes outright — the bot doesn't fetch or forward media, and
|
||||
// a caption/filename is third-party content we keep out of xAI. m.notice falls
|
||||
// through to its existing anti-loop handling below.
|
||||
if mc.MsgType != "m.text" && mc.MsgType != "m.emote" && mc.MsgType != "m.notice" {
|
||||
b.log.Debug("skip: non-text msgtype", "room", roomID, "sender", ev.Sender, "msgtype", mc.MsgType)
|
||||
return
|
||||
}
|
||||
|
||||
// Buffer prior context BEFORE classifying so buildContext sees history only.
|
||||
history := b.buf[roomID]
|
||||
b.appendBuf(roomID, bufferedMsg{sender: ev.Sender, body: mc.Body, isBot: ev.Sender == b.cfg.BotMXID})
|
||||
|
||||
if ev.Sender == b.cfg.BotMXID {
|
||||
return // our own message (also tracked via botSent)
|
||||
return // our own message (the reply is buffered when we send it, not on echo-back)
|
||||
}
|
||||
if mc.MsgType == "m.notice" {
|
||||
return // anti-loop: ignore notices (ours and other bots')
|
||||
}
|
||||
|
||||
b.ensureCounts(ctx, roomID, m)
|
||||
// Media / non-text is handled only once we know the message is addressed (below),
|
||||
// so a stray image in a group the bot isn't mentioned in stays silent (correct),
|
||||
// while media in a 1:1 or an @-mention gets a clear "text only" reaction.
|
||||
isMedia := mc.MsgType != "m.text" && mc.MsgType != "m.emote"
|
||||
|
||||
countsKnown, isDM, foreign := b.ensureCounts(ctx, roomID)
|
||||
// Stay only in rooms hosted entirely on allowed servers — never operate in (or
|
||||
// "leak" the bot into) a federated room with non-consenting third parties.
|
||||
if m.countsKnown && m.foreign {
|
||||
if foreign {
|
||||
b.leaveForeign(ctx, roomID)
|
||||
return
|
||||
}
|
||||
|
||||
replyParentIsBot := mc.RelatesTo != nil && mc.RelatesTo.InReplyTo != nil &&
|
||||
b.botSent.Has(mc.RelatesTo.InReplyTo.EventID)
|
||||
|
||||
mentioned := mentionsBot(mc, b.cfg.BotMXID, replyParentIsBot)
|
||||
if !(m.isDM() || mentioned) {
|
||||
b.log.Debug("skip: not addressed (group without mention)", "room", roomID, "sender", ev.Sender,
|
||||
"dm", m.isDM(), "joined", m.joined, "invited", m.invited, "countsKnown", m.countsKnown, "mentioned", mentioned)
|
||||
|
||||
if !(isDM || mentioned) {
|
||||
if !countsKnown {
|
||||
// We couldn't classify the room (member probe failed) and the message isn't
|
||||
// an explicit mention, so we can't tell a 1:1 (answer everything) from a
|
||||
// group (answer only on mention). Log at WARN — not a silent Debug drop —
|
||||
// so it's visible; we don't react because reacting in a group the bot isn't
|
||||
// addressed in would be wrong. Re-probed on the next message.
|
||||
b.log.Warn("skip: room unclassified (member probe failed), message not an explicit mention",
|
||||
"room", roomID, "sender", ev.Sender)
|
||||
} else {
|
||||
b.log.Debug("skip: not addressed (group without mention)", "room", roomID, "sender", ev.Sender,
|
||||
"dm", isDM, "mentioned", mentioned)
|
||||
}
|
||||
return
|
||||
}
|
||||
b.respond(ctx, roomID, m, ev, mc, history)
|
||||
|
||||
// Addressed but not text: react "text only" (no silent drop).
|
||||
if isMedia {
|
||||
b.log.Debug("skip: non-text msgtype (reacted)", "room", roomID, "sender", ev.Sender, "msgtype", mc.MsgType)
|
||||
b.react(ctx, roomID, ev.EventID, reactMedia)
|
||||
return
|
||||
}
|
||||
|
||||
// Per-room single-flight: while a generation is in flight for this room, drop
|
||||
// further messages (like a chat — no queue, no new session, no token burn). No
|
||||
// reaction here: the typing indicator is already showing for this room, which is
|
||||
// the language-free "I'm busy" signal. The claim is taken here, synchronously and
|
||||
// in transaction order, so the FIRST message for a room wins and later ones are
|
||||
// dropped until release — never the reverse.
|
||||
if !b.tryClaim(roomID) {
|
||||
b.log.Debug("drop: room busy generating", "room", roomID, "sender", ev.Sender)
|
||||
return
|
||||
}
|
||||
|
||||
// Snapshot the room history (excludes this trigger) under the claim, then run the
|
||||
// slow generation in its own goroutine so this transaction's remaining events and
|
||||
// other rooms are not blocked by the xAI call. respond appends the trigger+answer
|
||||
// to the buffer itself, only on success (see sendReply), and releases the claim.
|
||||
history := b.snapshotBuf(roomID)
|
||||
b.safego("respond", func() {
|
||||
defer b.release(roomID)
|
||||
b.respond(ctx, roomID, isDM, ev, mc, history)
|
||||
})
|
||||
}
|
||||
|
||||
// unlimitedCap is the effective per-user cap for UNLIMITED_USERS — high enough to
|
||||
// never trip the per-user gate, while the global DAILY_USD_CEILING still applies.
|
||||
const unlimitedCap = 1 << 30
|
||||
|
||||
func (b *Bot) respond(ctx context.Context, roomID string, m *roomMeta, ev *Event, mc *MessageContent, history []bufferedMsg) {
|
||||
func (b *Bot) respond(ctx context.Context, roomID string, isDM bool, ev *Event, mc *MessageContent, history []bufferedMsg) {
|
||||
perUserCap := b.cfg.PerUserDailyCap
|
||||
if b.cfg.UnlimitedUsers[ev.Sender] {
|
||||
perUserCap = unlimitedCap
|
||||
}
|
||||
switch res, err := b.st.Reserve(ev.Sender, perUserCap, b.cfg.DailyUSDCeiling); {
|
||||
case err != nil:
|
||||
// A limiter failure is on our side — don't leave the user wondering.
|
||||
b.log.Error("limiter reserve failed", "sender", ev.Sender, "err", err)
|
||||
b.react(ctx, roomID, ev.EventID, reactError)
|
||||
return
|
||||
case res == reserveDeniedUser:
|
||||
// Per-user cap (anti-abuse, F24): stop answering, but always tell the user
|
||||
// their request hit the limit — no message addressed to the bot is left
|
||||
// silent. (m.notice → the anti-loop skip keeps this from re-triggering.)
|
||||
b.log.Info("per-user daily cap reached; notifying", "sender", ev.Sender)
|
||||
b.sendNotice(ctx, roomID, ev, mc, noticeUserLimit)
|
||||
// Per-user cap (anti-abuse, F24): stop answering, but always signal the limit —
|
||||
// no message addressed to the bot is left without feedback.
|
||||
b.log.Info("per-user daily cap reached; reacting", "sender", ev.Sender)
|
||||
b.react(ctx, roomID, ev.EventID, reactRateLimit)
|
||||
return
|
||||
case res == reserveDeniedGlobal:
|
||||
// Global USD ceiling — notice once per room per day, then stay quiet.
|
||||
// Global USD ceiling. A reaction is cheap and non-intrusive (unlike the old
|
||||
// once-per-day text notice), so signal every affected message rather than
|
||||
// going silent after the first.
|
||||
b.log.Warn("global daily USD ceiling reached", "room", roomID, "sender", ev.Sender)
|
||||
if b.globalNote[roomID] != todayUTC() {
|
||||
b.globalNote[roomID] = todayUTC()
|
||||
b.sendNotice(ctx, roomID, ev, mc, noticeDailyLimit)
|
||||
}
|
||||
b.react(ctx, roomID, ev.EventID, reactRateLimit)
|
||||
return
|
||||
}
|
||||
|
||||
// Show "Vojo AI печатает…" while we build the answer; the deferred clear fires
|
||||
// on every exit (success or failure). Pure UX — typing failures are best-effort.
|
||||
b.setTyping(ctx, roomID, true)
|
||||
defer b.setTyping(ctx, roomID, false)
|
||||
// Show "Vojo AI печатает…" for the whole generation. The keepalive refreshes the
|
||||
// typing notification every 20s (the server expires it after 30s) so the indicator
|
||||
// never lapses on a slow/retried answer, and the deferred stop clears it on exit.
|
||||
stopTyping := b.startTypingKeepalive(ctx, roomID)
|
||||
defer stopTyping()
|
||||
|
||||
msgs := buildContext(b.cfg.SystemPrompt, history, m.isDM(), mc.Body, b.cfg.MaxCtxEvent, 8000)
|
||||
msgs := buildContext(b.cfg.SystemPrompt, history, isDM, mc.Body, b.cfg.MaxCtxEvent, 8000)
|
||||
resp, err := b.xai.Complete(ctx, b.cfg.XAIModel, msgs, b.cfg.MaxOutTok, b.cfg.XAITemp)
|
||||
if err != nil {
|
||||
// at-most-once already retried transient failures inside Complete; refund
|
||||
// the reserved request so an xAI outage doesn't burn the user's daily cap,
|
||||
// and tell the user we couldn't answer (notice → no anti-loop re-trigger).
|
||||
// at-most-once already retried transient failures inside Complete; refund the
|
||||
// reserved request so an xAI outage doesn't burn the user's daily cap, and
|
||||
// signal the failure (react → no anti-loop, no language).
|
||||
b.log.Error("xai completion failed", "sender", ev.Sender, "err", err)
|
||||
if rerr := b.st.RefundRequest(ev.Sender); rerr != nil {
|
||||
b.log.Error("refund failed", "sender", ev.Sender, "err", rerr)
|
||||
}
|
||||
b.sendNotice(ctx, roomID, ev, mc, noticeError)
|
||||
b.react(ctx, roomID, ev.EventID, reactError)
|
||||
return
|
||||
}
|
||||
|
||||
// A 2xx from xAI is billed even if the text came back empty — always book the
|
||||
// real cost so both caps see it (the old refund-without-reconcile on an empty
|
||||
// 200 let such calls bypass the per-user cap and the global ceiling).
|
||||
// A 2xx from xAI is billed even if the text came back empty — always book the real
|
||||
// cost so both caps see it (an empty 200 must not bypass the per-user cap and the
|
||||
// global ceiling).
|
||||
usd := computeUSD(resp.Usage, b.cfg)
|
||||
if err := b.st.Reconcile(ev.Sender, usd); err != nil {
|
||||
b.log.Error("reconcile spend failed", "sender", ev.Sender, "err", err)
|
||||
|
|
@ -300,12 +367,15 @@ func (b *Bot) respond(ctx context.Context, roomID string, m *roomMeta, ev *Event
|
|||
|
||||
text := resp.Text()
|
||||
if text == "" {
|
||||
b.log.Warn("xai returned empty completion (billed, nothing to send)", "sender", ev.Sender, "usd", usd)
|
||||
// Billed but no usable text (content filter / length cap / empty choices). Never
|
||||
// leave a billed request without feedback — react "couldn't answer".
|
||||
b.log.Warn("xai returned empty completion (billed, reacting)", "sender", ev.Sender, "usd", usd)
|
||||
b.react(ctx, roomID, ev.EventID, reactError)
|
||||
return
|
||||
}
|
||||
b.log.Info("answered", "room", roomID, "sender", ev.Sender, "dm", m.isDM(),
|
||||
b.log.Info("answered", "room", roomID, "sender", ev.Sender, "dm", isDM,
|
||||
"usd", usd, "prompt_tokens", resp.Usage.PromptTokens, "completion_tokens", resp.Usage.CompletionTokens)
|
||||
b.sendNotice(ctx, roomID, ev, mc, text)
|
||||
b.sendReply(ctx, roomID, ev, mc, text)
|
||||
}
|
||||
|
||||
// computeUSD prices the call from the API-returned token usage (authoritative
|
||||
|
|
@ -322,30 +392,30 @@ func computeUSD(u xaiUsage, cfg *Config) float64 {
|
|||
float64(u.CompletionTokens)/1e6*cfg.PriceOutputPerM
|
||||
}
|
||||
|
||||
func (b *Bot) sendNotice(ctx context.Context, roomID string, trigger *Event, triggerMC *MessageContent, body string) {
|
||||
content := buildNoticeContent(trigger.EventID, trigger.Sender, triggerMC.RelatesTo, body)
|
||||
id, err := b.mx.SendEvent(ctx, roomID, "m.room.message", content)
|
||||
if err != nil {
|
||||
b.log.Error("send notice failed", "room", roomID, "err", err)
|
||||
return
|
||||
// react adds an emoji m.reaction to the triggering event — the bot's language-free
|
||||
// way to signal a system state (error / rate limit / encrypted / media) it can't
|
||||
// express as a model-generated answer. Best-effort: a failed reaction is logged, not
|
||||
// retried. Reactions are m.reaction (not m.room.message), so they never re-enter
|
||||
// handleMessage and need no anti-loop tracking.
|
||||
func (b *Bot) react(ctx context.Context, roomID, eventID, emoji string) {
|
||||
content := map[string]any{
|
||||
"m.relates_to": map[string]any{
|
||||
"rel_type": "m.annotation",
|
||||
"event_id": eventID,
|
||||
"key": emoji,
|
||||
},
|
||||
}
|
||||
// Track our own reply so a future reply-to-it is recognised as addressing us,
|
||||
// and add it to the room buffer as an assistant turn for context continuity.
|
||||
b.botSent.Add(id)
|
||||
b.appendBuf(roomID, bufferedMsg{sender: b.cfg.BotMXID, body: body, isBot: true})
|
||||
}
|
||||
|
||||
// setTyping sets/clears the bot's typing indicator (best-effort UX; failures are
|
||||
// non-fatal). The 30s timeout comfortably covers a normal completion, and
|
||||
// respond() defers a clear so the indicator ends the moment the answer is sent
|
||||
// or fails.
|
||||
func (b *Bot) setTyping(ctx context.Context, roomID string, typing bool) {
|
||||
if err := b.mx.SendTyping(ctx, roomID, typing, 30000); err != nil {
|
||||
b.log.Debug("set typing failed", "room", roomID, "typing", typing, "err", err)
|
||||
if _, err := b.mx.SendEvent(ctx, roomID, "m.reaction", content); err != nil {
|
||||
b.log.Error("react failed", "room", roomID, "emoji", emoji, "err", err)
|
||||
}
|
||||
}
|
||||
|
||||
func (b *Bot) warnEncryptedOnce(ctx context.Context, roomID string) {
|
||||
// reactEncryptedOnce reacts 🔒 to the first message seen in an encrypted room and
|
||||
// records a durable flag so a restart doesn't re-react (F5). Vojo disables E2EE by
|
||||
// default, so this is a near-dead safety path; the reaction is far less intrusive
|
||||
// than the old text notice, but the once-gate keeps it from annotating every message
|
||||
// in the rare encrypted room.
|
||||
func (b *Bot) reactEncryptedOnce(ctx context.Context, roomID, eventID string) {
|
||||
warned, err := b.st.HasWarnedEncrypted(roomID)
|
||||
if err != nil {
|
||||
b.log.Error("warned-flag read failed", "room", roomID, "err", err)
|
||||
|
|
@ -354,16 +424,79 @@ func (b *Bot) warnEncryptedOnce(ctx context.Context, roomID string) {
|
|||
if warned {
|
||||
return
|
||||
}
|
||||
content := map[string]any{"msgtype": "m.notice", "body": noticeEncryptedUnsupported}
|
||||
if _, err := b.mx.SendEvent(ctx, roomID, "m.room.message", content); err != nil {
|
||||
b.log.Error("encrypted-notice failed", "room", roomID, "err", err)
|
||||
return
|
||||
}
|
||||
b.react(ctx, roomID, eventID, reactEncrypted)
|
||||
if err := b.st.SetWarnedEncrypted(roomID); err != nil {
|
||||
b.log.Error("persist warned-flag failed", "room", roomID, "err", err)
|
||||
}
|
||||
}
|
||||
|
||||
// sendReply sends the model's actual answer and records the completed exchange in the
|
||||
// conversation buffer so the next turn has context.
|
||||
func (b *Bot) sendReply(ctx context.Context, roomID string, trigger *Event, triggerMC *MessageContent, body string) {
|
||||
id := b.sendMessage(ctx, roomID, trigger, triggerMC, body)
|
||||
if id == "" {
|
||||
return
|
||||
}
|
||||
// Record the user trigger AND the assistant answer together, only AFTER the answer
|
||||
// was sent, so a failed or empty generation never leaves a dangling user turn (a
|
||||
// question with no reply) in the buffer — which would skew later completions.
|
||||
// Single-flight guarantees no other turn for this room interleaves between the two.
|
||||
b.appendBuf(roomID, bufferedMsg{sender: trigger.Sender, body: triggerMC.Body, isBot: false})
|
||||
b.appendBuf(roomID, bufferedMsg{sender: b.cfg.BotMXID, body: body, isBot: true})
|
||||
}
|
||||
|
||||
// sendMessage builds and sends an m.notice reply, tracks our own event id, and returns
|
||||
// the new event id ("" on failure).
|
||||
func (b *Bot) sendMessage(ctx context.Context, roomID string, trigger *Event, triggerMC *MessageContent, body string) string {
|
||||
content := buildNoticeContent(trigger.EventID, trigger.Sender, triggerMC.RelatesTo, body)
|
||||
id, err := b.mx.SendEvent(ctx, roomID, "m.room.message", content)
|
||||
if err != nil {
|
||||
b.log.Error("send failed", "room", roomID, "err", err)
|
||||
return ""
|
||||
}
|
||||
// Track our own reply so a future reply-to-it is recognised as addressing us.
|
||||
b.botSent.Add(id)
|
||||
return id
|
||||
}
|
||||
|
||||
// startTypingKeepalive starts the typing indicator and keeps it alive for the whole
|
||||
// generation (the CS-API server-side typing notification expires after the 30s we
|
||||
// pass, so we refresh every 20s). The returned stop clears the indicator and is safe
|
||||
// to call once via defer. Typing is best-effort UX — failures are non-fatal.
|
||||
func (b *Bot) startTypingKeepalive(ctx context.Context, roomID string) func() {
|
||||
b.setTyping(ctx, roomID, true)
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
t := time.NewTicker(20 * time.Second)
|
||||
defer t.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-done:
|
||||
return
|
||||
case <-t.C:
|
||||
b.setTyping(ctx, roomID, true)
|
||||
}
|
||||
}
|
||||
}()
|
||||
var once sync.Once
|
||||
return func() {
|
||||
once.Do(func() {
|
||||
close(done)
|
||||
b.setTyping(ctx, roomID, false)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// setTyping sets/clears the bot's typing indicator (best-effort UX; failures are
|
||||
// non-fatal). The 30s server-side timeout is refreshed by startTypingKeepalive.
|
||||
func (b *Bot) setTyping(ctx context.Context, roomID string, typing bool) {
|
||||
if err := b.mx.SendTyping(ctx, roomID, typing, 30000); err != nil {
|
||||
b.log.Debug("set typing failed", "room", roomID, "typing", typing, "err", err)
|
||||
}
|
||||
}
|
||||
|
||||
// buildNoticeContent builds the reply. m.notice (not m.text) so the anti-loop
|
||||
// skip catches our own output. Thread-aware (F27): a trigger from a thread gets a
|
||||
// thread relation so the answer lands in the thread, not the main timeline.
|
||||
|
|
@ -394,9 +527,30 @@ func buildNoticeContent(replyTo, sender string, triggerRelates *RelatesTo, body
|
|||
return content
|
||||
}
|
||||
|
||||
// --- per-room metadata helpers -------------------------------------------------
|
||||
// --- per-room single-flight ----------------------------------------------------
|
||||
|
||||
func (b *Bot) getMeta(roomID string) *roomMeta {
|
||||
// tryClaim marks a room as generating and returns true if the caller won the claim
|
||||
// (no generation was already in flight). The loser must drop its message.
|
||||
func (b *Bot) tryClaim(roomID string) bool {
|
||||
b.mu.Lock()
|
||||
defer b.mu.Unlock()
|
||||
if b.inflight[roomID] {
|
||||
return false
|
||||
}
|
||||
b.inflight[roomID] = true
|
||||
return true
|
||||
}
|
||||
|
||||
func (b *Bot) release(roomID string) {
|
||||
b.mu.Lock()
|
||||
defer b.mu.Unlock()
|
||||
delete(b.inflight, roomID)
|
||||
}
|
||||
|
||||
// --- per-room metadata helpers (all guarded by b.mu; probes run outside it) -----
|
||||
|
||||
// getMetaLocked returns (creating if needed) the room's meta. Caller MUST hold b.mu.
|
||||
func (b *Bot) getMetaLocked(roomID string) *roomMeta {
|
||||
m := b.meta[roomID]
|
||||
if m == nil {
|
||||
m = &roomMeta{}
|
||||
|
|
@ -405,38 +559,112 @@ func (b *Bot) getMeta(roomID string) *roomMeta {
|
|||
return m
|
||||
}
|
||||
|
||||
func (b *Bot) ensureEncryption(ctx context.Context, roomID string, m *roomMeta) {
|
||||
if m.encKnown {
|
||||
return
|
||||
func (b *Bot) invalidateCounts(roomID string) {
|
||||
b.mu.Lock()
|
||||
defer b.mu.Unlock()
|
||||
if m := b.meta[roomID]; m != nil {
|
||||
m.countsKnown = false
|
||||
}
|
||||
}
|
||||
|
||||
func (b *Bot) setEncrypted(roomID string) {
|
||||
b.mu.Lock()
|
||||
defer b.mu.Unlock()
|
||||
m := b.getMetaLocked(roomID)
|
||||
m.encrypted, m.encKnown = true, true
|
||||
}
|
||||
|
||||
func (b *Bot) forgetRoom(roomID string) {
|
||||
b.mu.Lock()
|
||||
defer b.mu.Unlock()
|
||||
delete(b.meta, roomID)
|
||||
delete(b.buf, roomID)
|
||||
delete(b.inflight, roomID)
|
||||
}
|
||||
|
||||
// ensureEncryption returns whether the room is encrypted, probing the CS-API once
|
||||
// (without holding the lock) and caching the result. On probe error it returns false
|
||||
// (treated as not-encrypted this round) and leaves the state unknown for a re-probe.
|
||||
func (b *Bot) ensureEncryption(ctx context.Context, roomID string) bool {
|
||||
b.mu.Lock()
|
||||
if m := b.getMetaLocked(roomID); m.encKnown {
|
||||
enc := m.encrypted
|
||||
b.mu.Unlock()
|
||||
return enc
|
||||
}
|
||||
b.mu.Unlock()
|
||||
|
||||
enc, err := b.mx.RoomEncrypted(ctx, roomID)
|
||||
if err != nil {
|
||||
b.log.Warn("encryption probe failed", "room", roomID, "err", err)
|
||||
return // leave unknown; re-probed on the next message
|
||||
return false // leave unknown; re-probed on the next message
|
||||
}
|
||||
m.encrypted, m.encKnown = enc, true
|
||||
// Re-fetch under the lock instead of writing to the pointer captured before the
|
||||
// unlocked probe: if the room was forgotten (leave/ban) mid-probe its meta was
|
||||
// deleted, and writing to the captured pointer would resurrect a dead room.
|
||||
b.mu.Lock()
|
||||
if m := b.meta[roomID]; m != nil {
|
||||
m.encrypted, m.encKnown = enc, true
|
||||
}
|
||||
b.mu.Unlock()
|
||||
return enc
|
||||
}
|
||||
|
||||
func (b *Bot) ensureCounts(ctx context.Context, roomID string, m *roomMeta) {
|
||||
if m.countsKnown {
|
||||
return
|
||||
}
|
||||
joined, invited, servers, err := b.mx.RoomMembership(ctx, roomID)
|
||||
if err != nil {
|
||||
b.log.Warn("member probe failed", "room", roomID, "err", err)
|
||||
return
|
||||
}
|
||||
foreign := false
|
||||
for s := range servers {
|
||||
if !b.cfg.AllowedServers[s] {
|
||||
foreign = true
|
||||
break
|
||||
// ensureCounts returns (countsKnown, isDM, foreign), probing /members once (without
|
||||
// holding the lock) and caching the result. On probe error it returns
|
||||
// (false, false, false): the caller treats an unclassified room conservatively and
|
||||
// logs a visible WARN rather than silently dropping.
|
||||
func (b *Bot) ensureCounts(ctx context.Context, roomID string) (countsKnown, isDM, foreign bool) {
|
||||
b.mu.Lock()
|
||||
known := b.getMetaLocked(roomID).countsKnown
|
||||
b.mu.Unlock()
|
||||
|
||||
if !known {
|
||||
joined, invited, servers, err := b.mx.RoomMembership(ctx, roomID)
|
||||
if err != nil {
|
||||
b.log.Warn("member probe failed", "room", roomID, "err", err)
|
||||
return false, false, false
|
||||
}
|
||||
isForeign := false
|
||||
for s := range servers {
|
||||
if !b.cfg.AllowedServers[s] {
|
||||
isForeign = true
|
||||
break
|
||||
}
|
||||
}
|
||||
// Re-fetch under the lock rather than writing a pointer captured before the
|
||||
// unlocked /members probe (see ensureEncryption): a leave/ban mid-probe must
|
||||
// not be undone by resurrecting the room's meta.
|
||||
b.mu.Lock()
|
||||
if m := b.meta[roomID]; m != nil {
|
||||
m.joined, m.invited, m.foreign, m.countsKnown = joined, invited, isForeign, true
|
||||
}
|
||||
b.mu.Unlock()
|
||||
}
|
||||
m.joined, m.invited, m.foreign, m.countsKnown = joined, invited, foreign, true
|
||||
|
||||
b.mu.Lock()
|
||||
defer b.mu.Unlock()
|
||||
if m := b.meta[roomID]; m != nil {
|
||||
return m.countsKnown, m.isDM(), m.foreign
|
||||
}
|
||||
return false, false, false
|
||||
}
|
||||
|
||||
func (b *Bot) snapshotBuf(roomID string) []bufferedMsg {
|
||||
b.mu.Lock()
|
||||
defer b.mu.Unlock()
|
||||
src := b.buf[roomID]
|
||||
if len(src) == 0 {
|
||||
return nil
|
||||
}
|
||||
out := make([]bufferedMsg, len(src))
|
||||
copy(out, src)
|
||||
return out
|
||||
}
|
||||
|
||||
func (b *Bot) appendBuf(roomID string, msg bufferedMsg) {
|
||||
b.mu.Lock()
|
||||
defer b.mu.Unlock()
|
||||
limit := b.cfg.MaxCtxEvent * 2
|
||||
if limit < 8 {
|
||||
limit = 8
|
||||
|
|
|
|||
82
apps/ai-bot/concurrency_test.go
Normal file
82
apps/ai-bot/concurrency_test.go
Normal file
|
|
@ -0,0 +1,82 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"testing"
|
||||
)
|
||||
|
||||
// TestSingleFlightClaim documents the per-room single-flight invariant the async
|
||||
// refactor relies on: at most one generation per room at a time, the claim is
|
||||
// independent per room, and a release re-arms the room. handleEvent takes this claim
|
||||
// synchronously in transaction order, so the FIRST message for a room wins and later
|
||||
// ones are dropped until release (never the reverse).
|
||||
func TestSingleFlightClaim(t *testing.T) {
|
||||
b := &Bot{inflight: make(map[string]bool)}
|
||||
|
||||
if !b.tryClaim("!a") {
|
||||
t.Fatal("first claim on !a should win")
|
||||
}
|
||||
if b.tryClaim("!a") {
|
||||
t.Fatal("second claim on !a must fail while in flight")
|
||||
}
|
||||
if !b.tryClaim("!b") {
|
||||
t.Fatal("a different room must claim independently")
|
||||
}
|
||||
b.release("!a")
|
||||
if !b.tryClaim("!a") {
|
||||
t.Fatal("after release !a must be claimable again")
|
||||
}
|
||||
}
|
||||
|
||||
// TestSingleFlightClaimExactlyOneWinner runs many goroutines racing for the same
|
||||
// room and asserts EXACTLY ONE wins the claim — the property that prevents two
|
||||
// concurrent generations (double xAI spend) for one room. Run under -race.
|
||||
func TestSingleFlightClaimExactlyOneWinner(t *testing.T) {
|
||||
b := &Bot{inflight: make(map[string]bool)}
|
||||
const n = 64
|
||||
var wins int64
|
||||
var mu sync.Mutex
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(n)
|
||||
for i := 0; i < n; i++ {
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
if b.tryClaim("!room") {
|
||||
mu.Lock()
|
||||
wins++
|
||||
mu.Unlock()
|
||||
}
|
||||
}()
|
||||
}
|
||||
wg.Wait()
|
||||
if wins != 1 {
|
||||
t.Fatalf("exactly one goroutine must win the claim, got %d", wins)
|
||||
}
|
||||
}
|
||||
|
||||
// TestLRUSetConcurrentAddOnce asserts the dedup set's check-and-insert is atomic:
|
||||
// with many goroutines racing on the same id, Add returns true exactly once. This is
|
||||
// the in-memory half of markSeen, now called from concurrent per-room goroutines.
|
||||
// Run under -race.
|
||||
func TestLRUSetConcurrentAddOnce(t *testing.T) {
|
||||
s := newLRUSet(1000)
|
||||
const n = 64
|
||||
var trues int64
|
||||
var mu sync.Mutex
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(n)
|
||||
for i := 0; i < n; i++ {
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
if s.Add("$evt") {
|
||||
mu.Lock()
|
||||
trues++
|
||||
mu.Unlock()
|
||||
}
|
||||
}()
|
||||
}
|
||||
wg.Wait()
|
||||
if trues != 1 {
|
||||
t.Fatalf("Add must return true exactly once for one id, got %d", trues)
|
||||
}
|
||||
}
|
||||
|
|
@ -1,15 +1,15 @@
|
|||
package main
|
||||
|
||||
// Bot-authored, user-facing notices. Vojo is a Russian-market product, so these
|
||||
// are RU. They are NOT the i18n bundle of the cinny client — this is a separate
|
||||
// service; keep the few strings here.
|
||||
// Language-free status signals. The bot answers questions in the user's own
|
||||
// language — the model handles that — but it cannot localize system states like
|
||||
// "rate limited" or "xAI is down": an appservice transaction carries no per-user
|
||||
// locale, so there is no reliable language to pick, and the bot serves a mixed
|
||||
// RU/EN audience. Rather than hardcode prose in one language (and rather than drop
|
||||
// silently), the bot REACTS to the triggering message with a self-evident emoji.
|
||||
// These are symbols, not translatable copy — edit the glyphs freely.
|
||||
const (
|
||||
noticeEncryptedUnsupported = "Я не читаю зашифрованные комнаты, поэтому не отвечаю здесь. " +
|
||||
"Напишите мне в обычном (незашифрованном) чате."
|
||||
|
||||
noticeDailyLimit = "Достигнут дневной лимит обращений к ИИ в этом сервисе. Попробуйте позже."
|
||||
|
||||
noticeUserLimit = "Вы исчерпали свой дневной лимит обращений к ИИ. Попробуйте позже."
|
||||
|
||||
noticeError = "⚠️ Не удалось получить ответ от ИИ. Попробуйте ещё раз чуть позже."
|
||||
reactError = "⚠️" // couldn't answer — xAI failed or returned nothing usable
|
||||
reactRateLimit = "⏳" // daily limit reached (per-user or global) — try later
|
||||
reactEncrypted = "🔒" // encrypted room — the bot can't read it
|
||||
reactMedia = "🚫" // non-text message — the bot only reads text
|
||||
)
|
||||
|
|
|
|||
|
|
@ -13,13 +13,13 @@ type reserveResult int
|
|||
|
||||
const (
|
||||
reserveOK reserveResult = iota
|
||||
reserveDeniedUser // per-user daily request cap hit (silent drop, F24)
|
||||
reserveDeniedGlobal // global daily USD ceiling hit (notice, F24)
|
||||
reserveDeniedUser // per-user daily request cap hit (⏳ rate-limit reaction, F24)
|
||||
reserveDeniedGlobal // global daily USD ceiling hit (⏳ rate-limit reaction, F24)
|
||||
)
|
||||
|
||||
// Store is the durable bot state: /sync cursor, daily spend ledger, and the
|
||||
// encrypted-room warned set. Pure-Go SQLite (no cgo) so the binary stays static
|
||||
// for a distroless/scratch image.
|
||||
// Store is the durable bot state: transaction + event dedup, the daily spend
|
||||
// ledger, and the encrypted-room warned set. Pure-Go SQLite (no cgo) so the binary
|
||||
// stays static for a distroless/scratch image.
|
||||
type Store struct {
|
||||
db *sql.DB
|
||||
}
|
||||
|
|
@ -29,7 +29,9 @@ func OpenStore(path string) (*Store, error) {
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// Single writer — the loop is serial; one connection avoids lock churn.
|
||||
// One connection: database/sql serializes all callers onto it, which keeps the
|
||||
// now-concurrent handler goroutines from contending on SQLite write locks. All
|
||||
// statements here are short, so callers block only briefly and never deadlock.
|
||||
db.SetMaxOpenConns(1)
|
||||
|
||||
schema := `
|
||||
|
|
@ -163,10 +165,10 @@ func (s *Store) Reconcile(mxid string, usd float64) error {
|
|||
return err
|
||||
}
|
||||
|
||||
// HasWarnedEncrypted / SetWarnedEncrypted persist the one-shot "told this room I
|
||||
// can't read encryption" flag so a restart doesn't re-spam the notice (F5) — the
|
||||
// bot never sees its own notice (the sync filter drops nothing, but the loop skips
|
||||
// m.notice and self).
|
||||
// HasWarnedEncrypted / SetWarnedEncrypted persist the one-shot "reacted 🔒 to this
|
||||
// room because I can't read encryption" flag so a restart doesn't re-react on every
|
||||
// message (F5). The bot never reacts to its own events: m.reaction is not an
|
||||
// m.room.message, so it never re-enters handleMessage.
|
||||
func (s *Store) HasWarnedEncrypted(roomID string) (bool, error) {
|
||||
var one int
|
||||
err := s.db.QueryRow(`SELECT 1 FROM warned_encrypted WHERE room_id = ?`, roomID).Scan(&one)
|
||||
|
|
|
|||
|
|
@ -1,8 +1,13 @@
|
|||
package main
|
||||
|
||||
import "sync"
|
||||
|
||||
// lruSet is a bounded insertion-ordered string set used for event-id dedup and
|
||||
// tracking our own sent event ids. Oldest entries evict once cap is reached.
|
||||
// Self-locking: events are now processed in concurrent per-message goroutines, so
|
||||
// Add/Has must be safe to call from several goroutines at once.
|
||||
type lruSet struct {
|
||||
mu sync.Mutex
|
||||
cap int
|
||||
set map[string]struct{}
|
||||
order []string
|
||||
|
|
@ -13,12 +18,18 @@ func newLRUSet(cap int) *lruSet {
|
|||
}
|
||||
|
||||
func (l *lruSet) Has(k string) bool {
|
||||
l.mu.Lock()
|
||||
defer l.mu.Unlock()
|
||||
_, ok := l.set[k]
|
||||
return ok
|
||||
}
|
||||
|
||||
// Add inserts k and returns true if it was newly added (false if already present).
|
||||
// The check-and-insert is atomic, so two goroutines racing on the same id can
|
||||
// never both get true — the in-memory dedup stays correct under concurrency.
|
||||
func (l *lruSet) Add(k string) bool {
|
||||
l.mu.Lock()
|
||||
defer l.mu.Unlock()
|
||||
if _, ok := l.set[k]; ok {
|
||||
return false
|
||||
}
|
||||
|
|
|
|||
|
|
@ -70,10 +70,10 @@ func (r *xaiResponse) Text() string {
|
|||
return r.Choices[0].Message.Content
|
||||
}
|
||||
|
||||
// Complete calls Chat Completions with at-most-once retry on transient failures
|
||||
// (429 / 5xx / network timeout, exponential backoff + jitter). Non-retryable 4xx
|
||||
// fail immediately. The caller advances since/seen only AFTER this returns so a
|
||||
// transient failure isn't silently swallowed by a moved cursor (F6).
|
||||
// Complete calls Chat Completions with retry on transient failures (429 / 5xx /
|
||||
// network timeout, exponential backoff + jitter). Non-retryable 4xx fail
|
||||
// immediately. On exhaustion the caller refunds the reserved request and notifies
|
||||
// the user, so a transient failure is never silently swallowed (F6).
|
||||
func (x *XAIClient) Complete(ctx context.Context, model string, msgs []xaiMessage, maxTokens int, temp float64) (*xaiResponse, error) {
|
||||
reqBody := xaiRequest{
|
||||
Model: model,
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue