package main import ( "bytes" "context" "encoding/json" "fmt" "io" "log/slog" "math/rand" "net/http" "strings" "sync" "time" ) // httpllm.go is the shared OpenAI-compatible Chat Completions transport: one // HTTP+retry implementation reused by every provider adapter. Grok and Gemini both // expose this wire format, so the retry/backoff classification (429/5xx/network = // retryable, other 4xx = terminal) lives once here, parameterised by base/key/ // headers, instead of being copied per provider. // openAIClient performs OpenAI-compatible /chat/completions calls with retry. type openAIClient struct { name string // provider label for logs/errors ("xai", "gemini") base string key string http *http.Client maxTry int headers map[string]string // extra static headers (provider-specific), may be nil log *slog.Logger // noReasoningEffort remembers models that 400'd on the reasoning_effort param so we // drop it up front on every later call instead of paying the 400+heal each time. A // reasoning_effort/model mismatch is a config error (operator set both); we heal it // ONCE (and WARN once, in markNoReasoningEffort) rather than per-message. Guarded by mu. mu sync.Mutex noReasoningEffort map[string]bool } func newOpenAIClient(name, base, key string, headers map[string]string, logger *slog.Logger) *openAIClient { return &openAIClient{ name: name, base: base, key: key, http: &http.Client{}, maxTry: 3, headers: headers, log: logger, noReasoningEffort: map[string]bool{}, } } // rejectsReasoningEffort reports whether a prior call already learned this model 400s on // the reasoning_effort param (so we omit it up front). func (c *openAIClient) rejectsReasoningEffort(model string) bool { c.mu.Lock() defer c.mu.Unlock() return c.noReasoningEffort[model] } // markNoReasoningEffort records that a model rejects reasoning_effort and WARNs exactly // once (the first time), so the operator sees the config mismatch without a per-message log. func (c *openAIClient) markNoReasoningEffort(ctx context.Context, model string) { c.mu.Lock() first := !c.noReasoningEffort[model] c.noReasoningEffort[model] = true c.mu.Unlock() if first && c.log != nil { c.log.WarnContext(ctx, c.name+": model rejects reasoning_effort; dropping it for this model — unset GROK_REASONING_EFFORT or use a model that supports it", "model", model) } } // --- OpenAI-compatible wire types ------------------------------------------------- type openAIMessage struct { Role string `json:"role"` Content string `json:"content"` } // openAITool is the wire shape of a model tool (e.g. web search). Only serialized // when the request carries tools, so a plain completion's body is unchanged. type openAITool struct { Type string `json:"type"` } type openAIRequest struct { Model string `json:"model"` Messages []openAIMessage `json:"messages"` MaxTokens int `json:"max_tokens"` Temperature float64 `json:"temperature"` Stream bool `json:"stream"` // Optional; omitempty keeps the grok_direct body byte-identical to before. Tools []openAITool `json:"tools,omitempty"` ReasoningEffort string `json:"reasoning_effort,omitempty"` // SearchParameters drives xAI Live Search on chat/completions (the web route's // grok_web_search provider). nil for every non-web call, so it serializes away. SearchParameters any `json:"search_parameters,omitempty"` } type openAIUsage struct { PromptTokens int `json:"prompt_tokens"` CompletionTokens int `json:"completion_tokens"` PromptTokensDetails struct { CachedTokens int `json:"cached_tokens"` } `json:"prompt_tokens_details"` } type openAIResponse struct { ID string `json:"id"` Choices []struct { Message struct { Content string `json:"content"` } `json:"message"` FinishReason string `json:"finish_reason"` } `json:"choices"` Usage openAIUsage `json:"usage"` // Citations is the source list xAI Live Search returns by default (absent on a // non-web call → nil). Citations []string `json:"citations"` } func (r *openAIResponse) Text() string { if len(r.Choices) == 0 { return "" } return r.Choices[0].Message.Content } // complete calls Chat Completions with retry on transient failures (429 / 5xx / // network timeout, exponential backoff + jitter). Non-retryable 4xx fail // immediately. On exhaustion the caller refunds the reserved request and notifies // the user, so a transient failure is never silently swallowed (F6). reqHeaders are // per-request headers (e.g. x-grok-conv-id) merged on top of the static ones; nil is // fine. func (c *openAIClient) complete(ctx context.Context, reqBody openAIRequest, reqHeaders map[string]string) (*openAIResponse, error) { // If a prior call already learned this model rejects reasoning_effort, drop it up front // so we never pay the 400+heal again (healed once below; this is the steady state after). if reqBody.ReasoningEffort != "" && c.rejectsReasoningEffort(reqBody.Model) { reqBody.ReasoningEffort = "" } payload, err := json.Marshal(reqBody) if err != nil { return nil, err } var lastErr error for attempt := 0; attempt < c.maxTry; attempt++ { if attempt > 0 { // 0.5s, 1s, 2s … capped at 8s, plus up to 250ms jitter. backoff := time.Duration(500< 8*time.Second { backoff = 8 * time.Second } backoff += time.Duration(rand.Intn(250)) * time.Millisecond select { case <-ctx.Done(): return nil, ctx.Err() case <-time.After(backoff): } } resp, retryable, err := c.attempt(ctx, payload, reqHeaders) if err == nil { return resp, nil } lastErr = err if ctx.Err() != nil { return nil, ctx.Err() } if !retryable { // Self-heal (first time only): a model that doesn't support reasoning_effort rejects // it with a 400. Remember the model (so every later call drops the param up front — // see top of complete), then strip it and retry ONCE, immediately and inline — the // error is deterministic, so a backoff buys nothing, and the retry must NOT depend on // a remaining loop slot (else a 400 on the final attempt would never be re-tried). // markNoReasoningEffort WARNs once. This lets switching XAI_MODEL to such a model // degrade gracefully instead of hard-failing every request into a react. if reqBody.ReasoningEffort != "" && isReasoningEffortUnsupported(err) { reqBody.ReasoningEffort = "" c.markNoReasoningEffort(ctx, reqBody.Model) if p, mErr := json.Marshal(reqBody); mErr == nil { resp, _, rErr := c.attempt(ctx, p, reqHeaders) if rErr != nil { return nil, rErr } return resp, nil } } return nil, err } if c.log != nil { c.log.WarnContext(ctx, c.name+" attempt failed, will retry", "attempt", attempt+1, "max", c.maxTry, "err", err) } } return nil, fmt.Errorf("%s: exhausted %d attempts: %w", c.name, c.maxTry, lastErr) } // attempt performs one HTTP call. It returns retryable=true for 429/5xx and // network errors, false for other non-2xx (terminal 4xx). The per-attempt deadline // bounds a single hung connection; the overall per-request deadline (set by the // caller via ctx) bounds the whole retry loop so a cascade can't accrete minutes. func (c *openAIClient) attempt(ctx context.Context, payload []byte, reqHeaders map[string]string) (*openAIResponse, bool, error) { attemptCtx, cancel := context.WithTimeout(ctx, 60*time.Second) defer cancel() req, err := http.NewRequestWithContext(attemptCtx, http.MethodPost, c.base+"/chat/completions", bytes.NewReader(payload)) if err != nil { return nil, false, err } req.Header.Set("Content-Type", "application/json") req.Header.Set("Authorization", "Bearer "+c.key) for k, v := range c.headers { req.Header.Set(k, v) } for k, v := range reqHeaders { req.Header.Set(k, v) } resp, err := c.http.Do(req) if err != nil { // Network error / timeout — retryable (unless the parent ctx is done). return nil, ctx.Err() == nil, err } defer resp.Body.Close() data, _ := io.ReadAll(resp.Body) // Gated, per-user, DEBUG: the full request/response bodies for opted-in senders. // payload never contains the API key (that's the Authorization header, not logged). logLLMExchange(ctx, c.log, c.name, payload, resp.StatusCode, data) if resp.StatusCode == http.StatusTooManyRequests || resp.StatusCode >= 500 { return nil, true, fmt.Errorf("%s http %d: %s", c.name, resp.StatusCode, snippet(data)) } if resp.StatusCode < 200 || resp.StatusCode >= 300 { return nil, false, fmt.Errorf("%s http %d: %s", c.name, resp.StatusCode, snippet(data)) } var out openAIResponse if err := json.Unmarshal(data, &out); err != nil { return nil, false, fmt.Errorf("%s decode: %w", c.name, err) } // A 2xx is a billed call even when the model returns empty content (content // filter, finish_reason=length with no text, or no choices). Return it as a // success so the caller books the real cost via the ledger instead of refunding // the slot and losing the spend — which would let empty replies bypass BOTH the // per-user cap and the global ceiling. The caller just won't send an empty body. return &out, false, nil } // isReasoningEffortUnsupported reports whether an xAI error is the specific 400 a // non-reasoning model returns when sent reasoning_effort ("...does not support parameter // reasoningEffort"). Matched loosely so both reasoning_effort and reasoningEffort spellings // trip it, gating the one-shot strip-and-retry in complete(). func isReasoningEffortUnsupported(err error) bool { if err == nil { return false } s := strings.ToLower(err.Error()) return strings.Contains(s, "reasoning") && strings.Contains(s, "effort") && strings.Contains(s, "support") } func snippet(b []byte) string { const max = 300 if len(b) > max { return string(b[:max]) + "…" } return string(b) }