vojo/apps/ai-bot/httpllm.go

269 lines
9.9 KiB
Go

package main
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"log/slog"
"math/rand"
"net/http"
"strings"
"sync"
"time"
)
// httpllm.go is the shared OpenAI-compatible Chat Completions transport: one
// HTTP+retry implementation reused by every provider adapter. Grok and Gemini both
// expose this wire format, so the retry/backoff classification (429/5xx/network =
// retryable, other 4xx = terminal) lives once here, parameterised by base/key/
// headers, instead of being copied per provider.
// openAIClient performs OpenAI-compatible /chat/completions calls with retry.
type openAIClient struct {
name string // provider label for logs/errors ("xai", "gemini")
base string
key string
http *http.Client
maxTry int
headers map[string]string // extra static headers (provider-specific), may be nil
log *slog.Logger
// noReasoningEffort remembers models that 400'd on the reasoning_effort param so we
// drop it up front on every later call instead of paying the 400+heal each time. A
// reasoning_effort/model mismatch is a config error (operator set both); we heal it
// ONCE (and WARN once, in markNoReasoningEffort) rather than per-message. Guarded by mu.
mu sync.Mutex
noReasoningEffort map[string]bool
}
func newOpenAIClient(name, base, key string, headers map[string]string, logger *slog.Logger) *openAIClient {
return &openAIClient{
name: name,
base: base,
key: key,
http: &http.Client{},
maxTry: 3,
headers: headers,
log: logger,
noReasoningEffort: map[string]bool{},
}
}
// rejectsReasoningEffort reports whether a prior call already learned this model 400s on
// the reasoning_effort param (so we omit it up front).
func (c *openAIClient) rejectsReasoningEffort(model string) bool {
c.mu.Lock()
defer c.mu.Unlock()
return c.noReasoningEffort[model]
}
// markNoReasoningEffort records that a model rejects reasoning_effort and WARNs exactly
// once (the first time), so the operator sees the config mismatch without a per-message log.
func (c *openAIClient) markNoReasoningEffort(ctx context.Context, model string) {
c.mu.Lock()
first := !c.noReasoningEffort[model]
c.noReasoningEffort[model] = true
c.mu.Unlock()
if first && c.log != nil {
c.log.WarnContext(ctx, c.name+": model rejects reasoning_effort; dropping it for this model — unset GROK_REASONING_EFFORT or use a model that supports it", "model", model)
}
}
// --- OpenAI-compatible wire types -------------------------------------------------
type openAIMessage struct {
Role string `json:"role"`
Content string `json:"content"`
}
// openAITool is the wire shape of a model tool (e.g. web search). Only serialized
// when the request carries tools, so a plain completion's body is unchanged.
type openAITool struct {
Type string `json:"type"`
}
type openAIRequest struct {
Model string `json:"model"`
Messages []openAIMessage `json:"messages"`
MaxTokens int `json:"max_tokens"`
Temperature float64 `json:"temperature"`
Stream bool `json:"stream"`
// Optional; omitempty keeps the grok_direct body byte-identical to before.
Tools []openAITool `json:"tools,omitempty"`
ReasoningEffort string `json:"reasoning_effort,omitempty"`
// SearchParameters drives xAI Live Search on chat/completions (the web route's
// grok_web_search provider). nil for every non-web call, so it serializes away.
SearchParameters any `json:"search_parameters,omitempty"`
}
type openAIUsage struct {
PromptTokens int `json:"prompt_tokens"`
CompletionTokens int `json:"completion_tokens"`
PromptTokensDetails struct {
CachedTokens int `json:"cached_tokens"`
} `json:"prompt_tokens_details"`
}
type openAIResponse struct {
ID string `json:"id"`
Choices []struct {
Message struct {
Content string `json:"content"`
} `json:"message"`
FinishReason string `json:"finish_reason"`
} `json:"choices"`
Usage openAIUsage `json:"usage"`
// Citations is the source list xAI Live Search returns by default (absent on a
// non-web call → nil).
Citations []string `json:"citations"`
}
func (r *openAIResponse) Text() string {
if len(r.Choices) == 0 {
return ""
}
return r.Choices[0].Message.Content
}
// complete calls Chat Completions with retry on transient failures (429 / 5xx /
// network timeout, exponential backoff + jitter). Non-retryable 4xx fail
// immediately. On exhaustion the caller refunds the reserved request and notifies
// the user, so a transient failure is never silently swallowed (F6). reqHeaders are
// per-request headers (e.g. x-grok-conv-id) merged on top of the static ones; nil is
// fine.
func (c *openAIClient) complete(ctx context.Context, reqBody openAIRequest, reqHeaders map[string]string) (*openAIResponse, error) {
// If a prior call already learned this model rejects reasoning_effort, drop it up front
// so we never pay the 400+heal again (healed once below; this is the steady state after).
if reqBody.ReasoningEffort != "" && c.rejectsReasoningEffort(reqBody.Model) {
reqBody.ReasoningEffort = ""
}
payload, err := json.Marshal(reqBody)
if err != nil {
return nil, err
}
var lastErr error
for attempt := 0; attempt < c.maxTry; attempt++ {
if attempt > 0 {
// 0.5s, 1s, 2s … capped at 8s, plus up to 250ms jitter.
backoff := time.Duration(500<<uint(attempt-1)) * time.Millisecond
if backoff > 8*time.Second {
backoff = 8 * time.Second
}
backoff += time.Duration(rand.Intn(250)) * time.Millisecond
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-time.After(backoff):
}
}
resp, retryable, err := c.attempt(ctx, payload, reqHeaders)
if err == nil {
return resp, nil
}
lastErr = err
if ctx.Err() != nil {
return nil, ctx.Err()
}
if !retryable {
// Self-heal (first time only): a model that doesn't support reasoning_effort rejects
// it with a 400. Remember the model (so every later call drops the param up front —
// see top of complete), then strip it and retry ONCE, immediately and inline — the
// error is deterministic, so a backoff buys nothing, and the retry must NOT depend on
// a remaining loop slot (else a 400 on the final attempt would never be re-tried).
// markNoReasoningEffort WARNs once. This lets switching XAI_MODEL to such a model
// degrade gracefully instead of hard-failing every request into a react.
if reqBody.ReasoningEffort != "" && isReasoningEffortUnsupported(err) {
reqBody.ReasoningEffort = ""
c.markNoReasoningEffort(ctx, reqBody.Model)
if p, mErr := json.Marshal(reqBody); mErr == nil {
resp, _, rErr := c.attempt(ctx, p, reqHeaders)
if rErr != nil {
return nil, rErr
}
return resp, nil
}
}
return nil, err
}
if c.log != nil {
c.log.WarnContext(ctx, c.name+" attempt failed, will retry", "attempt", attempt+1, "max", c.maxTry, "err", err)
}
}
return nil, fmt.Errorf("%s: exhausted %d attempts: %w", c.name, c.maxTry, lastErr)
}
// attempt performs one HTTP call. It returns retryable=true for 429/5xx and
// network errors, false for other non-2xx (terminal 4xx). The per-attempt deadline
// bounds a single hung connection; the overall per-request deadline (set by the
// caller via ctx) bounds the whole retry loop so a cascade can't accrete minutes.
func (c *openAIClient) attempt(ctx context.Context, payload []byte, reqHeaders map[string]string) (*openAIResponse, bool, error) {
attemptCtx, cancel := context.WithTimeout(ctx, 60*time.Second)
defer cancel()
req, err := http.NewRequestWithContext(attemptCtx, http.MethodPost, c.base+"/chat/completions", bytes.NewReader(payload))
if err != nil {
return nil, false, err
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", "Bearer "+c.key)
for k, v := range c.headers {
req.Header.Set(k, v)
}
for k, v := range reqHeaders {
req.Header.Set(k, v)
}
resp, err := c.http.Do(req)
if err != nil {
// Network error / timeout — retryable (unless the parent ctx is done).
return nil, ctx.Err() == nil, err
}
defer resp.Body.Close()
data, _ := io.ReadAll(resp.Body)
// Gated, per-user, DEBUG: the full request/response bodies for opted-in senders.
// payload never contains the API key (that's the Authorization header, not logged).
logLLMExchange(ctx, c.log, c.name, payload, resp.StatusCode, data)
if resp.StatusCode == http.StatusTooManyRequests || resp.StatusCode >= 500 {
return nil, true, fmt.Errorf("%s http %d: %s", c.name, resp.StatusCode, snippet(data))
}
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
return nil, false, fmt.Errorf("%s http %d: %s", c.name, resp.StatusCode, snippet(data))
}
var out openAIResponse
if err := json.Unmarshal(data, &out); err != nil {
return nil, false, fmt.Errorf("%s decode: %w", c.name, err)
}
// A 2xx is a billed call even when the model returns empty content (content
// filter, finish_reason=length with no text, or no choices). Return it as a
// success so the caller books the real cost via the ledger instead of refunding
// the slot and losing the spend — which would let empty replies bypass BOTH the
// per-user cap and the global ceiling. The caller just won't send an empty body.
return &out, false, nil
}
// isReasoningEffortUnsupported reports whether an xAI error is the specific 400 a
// non-reasoning model returns when sent reasoning_effort ("...does not support parameter
// reasoningEffort"). Matched loosely so both reasoning_effort and reasoningEffort spellings
// trip it, gating the one-shot strip-and-retry in complete().
func isReasoningEffortUnsupported(err error) bool {
if err == nil {
return false
}
s := strings.ToLower(err.Error())
return strings.Contains(s, "reasoning") && strings.Contains(s, "effort") && strings.Contains(s, "support")
}
func snippet(b []byte) string {
const max = 300
if len(b) > max {
return string(b[:max]) + "…"
}
return string(b)
}