read.markets/app/services/openrouter.py
Giorgio Gilestro 788563a81f ai: route reviewer through OpenRouter + Claude Haiku 4.5
The DeepSeek-V4-flash reviewer was unreliable in production: it pads
its JSON verdicts with internal chain-of-thought even when the prompt
forbids it, so the verdict gets truncated at any reasonable max_tokens
cap and the parser drops it as malformed (a false-negative verdict
that would purge clean rows). A live run on 50 rows reproduced the
failure on 8 of 12 rejections, even at 800 tokens.

Fix: pin the reviewer call to OpenRouter with anthropic/claude-haiku-4.5.
Haiku answers structured-output classification tersely (no scratchpad
preamble), which means a 300-token cap is comfortably above the
~30-token JSON verdict. Cost is roughly the same (~$0.0001-$0.0003 per
review) and the latency tax is smaller.

To enable the pinned-provider call without disrupting other callers,
call_llm grows an optional `provider` parameter: when set, only that
provider is used (no fallback chain). All existing call sites
default to provider=None and keep the chain behaviour.

REVIEWER_MODEL is read from settings via getattr-with-fallback so an
env override can swap models without code changes — useful if we want
to A/B test against e.g. gemini-2.5-flash later.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-29 13:21:26 +02:00

263 lines
10 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""LLM transport layer — OpenRouter / DeepSeek API calls.
Handles provider selection, retry + fallback machinery, and the monthly
budget-cap helpers. Prompt engineering lives in ``app.services.llm_prompts``;
this module only cares about *how* to reach the model, not *what to ask*.
"""
from __future__ import annotations
import json
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
import httpx
from tenacity import retry, retry_if_exception_type, stop_after_attempt, wait_exponential
from app import branding
from app.config import get_settings
OPENROUTER_URL = "https://openrouter.ai/api/v1/chat/completions"
# Per-model USD rates: (input_per_million, output_per_million).
# OpenRouter returns `usage.cost` directly; DeepSeek's native API does not.
# Used as a fallback when the upstream omits the cost field.
_MODEL_PRICING_USD_PER_MILLION: dict[str, tuple[float, float]] = {
"deepseek-v4-flash": (0.07, 0.28),
"deepseek/deepseek-v4-flash": (0.07, 0.28),
"deepseek-chat": (0.27, 1.10),
"deepseek-reasoner": (0.55, 2.19),
}
def _estimate_cost_usd(model: str, prompt_tokens, completion_tokens) -> float | None:
"""Compute cost from token counts when the upstream didn't return one.
Returns None if either token count is missing or the model isn't in
the pricing table — caller falls back to whatever value the upstream
did (or didn't) return.
"""
rates = _MODEL_PRICING_USD_PER_MILLION.get(model)
if rates is None or prompt_tokens is None or completion_tokens is None:
return None
in_rate, out_rate = rates
return (prompt_tokens * in_rate + completion_tokens * out_rate) / 1_000_000.0
@dataclass
class LogResult:
content: str
model: str
prompt_tokens: int | None
completion_tokens: int | None
cost_usd: float | None
def _provider_chain() -> list[str]:
"""Ordered list of providers to try: primary, then fallback (unless
the fallback is unset, the same as primary, or has no API key)."""
s = get_settings()
primary = (s.LLM_PROVIDER or "deepseek").lower()
fallback = (s.LLM_FALLBACK or "").lower()
chain = [primary]
if fallback and fallback != primary:
chain.append(fallback)
# Drop providers with no API key configured.
return [p for p in chain if _provider_has_key(p)]
def _provider_has_key(provider: str) -> bool:
s = get_settings()
if provider == "deepseek":
return bool(s.DEEPSEEK_API_KEY)
if provider == "openrouter":
return bool(s.OPENROUTER_API_KEY)
return False
def _endpoint_for(provider: str) -> tuple[str, str, str, dict[str, str]]:
"""Resolve (url, api_key, default_model, extra_headers) for a specific
provider. Raises if its API key isn't set."""
s = get_settings()
if provider == "deepseek":
if not s.DEEPSEEK_API_KEY:
raise RuntimeError("DEEPSEEK_API_KEY not set")
return s.DEEPSEEK_URL, s.DEEPSEEK_API_KEY, s.DEEPSEEK_MODEL, {}
if provider == "openrouter":
if not s.OPENROUTER_API_KEY:
raise RuntimeError("OPENROUTER_API_KEY not set")
return (
OPENROUTER_URL,
s.OPENROUTER_API_KEY,
s.OPENROUTER_MODEL,
{
# OpenRouter-specific attribution headers. Visible on the
# OpenRouter dashboard — keep aligned with the live brand.
"HTTP-Referer": branding.SITE_URL,
"X-Title": branding.BRAND_NAME,
# No-train opt-out. Tells OpenRouter (and any compatible
# upstream) that this request must not be used to train
# or improve models. The Privacy notice promises this; the
# header is what makes the promise truthful. If a future
# upstream ignores the header, fix the provider — not the
# header — so the contract stays auditable.
"X-OR-Allow-Training": "false",
},
)
raise RuntimeError(f"Unknown LLM provider: {provider!r}")
def llm_configured() -> bool:
"""At least one provider in the configured chain has an API key."""
return bool(_provider_chain())
def active_model() -> str:
"""Return the model name of the *first* provider in the configured
chain (the one that would be tried first). Used to label AICall ledger
rows when no actual call result is available yet."""
chain = _provider_chain()
if not chain:
return "unknown"
s = get_settings()
return s.DEEPSEEK_MODEL if chain[0] == "deepseek" else s.OPENROUTER_MODEL
@retry(
reraise=True,
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=2, min=2, max=30),
retry=retry_if_exception_type((httpx.HTTPStatusError, httpx.TransportError)),
)
async def _call_provider(
client: httpx.AsyncClient,
provider: str,
messages: list[dict],
model: str | None,
max_tokens: int,
response_format: dict | None = None,
) -> LogResult:
"""One provider call with tenacity retries on transport/HTTP errors.
Lives inside the retry decorator so retries happen within a provider,
not across the fallback chain.
`response_format` is forwarded to the provider verbatim — DeepSeek and
OpenRouter both accept the OpenAI-shaped {"type": "json_object"} for
JSON-mode generation. None means free-form text."""
url, api_key, default_model, extra_headers = _endpoint_for(provider)
used_model = model or default_model
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
**extra_headers,
}
body: dict = {"model": used_model, "messages": messages, "max_tokens": max_tokens}
if response_format is not None:
body["response_format"] = response_format
r = await client.post(url, headers=headers, json=body, timeout=180)
r.raise_for_status()
data = r.json()
msg = data["choices"][0]["message"]
# The `content` field is the model's user-facing answer. The optional
# `reasoning` field is the model's internal chain-of-thought — never
# safe to publish; it contains raw scratchpad ("Let's see…",
# mid-sentence question marks, planning notes). If `content` is empty
# (provider issue, finish_reason=length cutoff, or the model spent
# its budget on thinking), treat that as a generation failure and
# raise so the caller can retry or skip the row. Do NOT fall back to
# reasoning — see the 2026-05-29 valuation-read leak.
content = msg.get("content")
if not content:
finish = data["choices"][0].get("finish_reason")
raise RuntimeError(
f"LLM returned empty content (finish_reason={finish}, "
f"provider={provider}, model={used_model}, max_tokens={max_tokens})"
)
usage = data.get("usage") or {}
prompt_tokens = usage.get("prompt_tokens")
completion_tokens = usage.get("completion_tokens")
# OpenRouter populates `usage.cost`; DeepSeek's native API doesn't —
# estimate from tokens × per-model rates so the cost ledger stays
# populated regardless of which provider answered.
cost_usd = usage.get("cost") or usage.get("total_cost")
if cost_usd is None:
cost_usd = _estimate_cost_usd(used_model, prompt_tokens, completion_tokens)
return LogResult(
content=content,
# Record provider+model so admin can see which path produced this row.
model=f"{provider}/{used_model}",
prompt_tokens=prompt_tokens,
completion_tokens=completion_tokens,
cost_usd=cost_usd,
)
async def call_llm(
client: httpx.AsyncClient,
messages: list[dict],
model: str | None = None,
max_tokens: int = 4000,
response_format: dict | None = None,
provider: str | None = None,
) -> LogResult:
"""Provider-aware chat completion with fallback. Tries primary
(LLM_PROVIDER) first; if it raises after retries, falls through to
LLM_FALLBACK. Raises only if every provider in the chain fails.
The returned LogResult.model is prefixed with the provider that
actually answered (e.g. ``deepseek/deepseek-v4-flash`` or
``openrouter/deepseek/deepseek-v4-flash``) — useful admin metadata
even though we hide it from the user-facing UI.
Pass response_format={"type": "json_object"} to force JSON-mode
output (the model still needs to be instructed in the system prompt
to emit valid JSON — this flag enforces, not asks).
Pass `provider` (e.g. "openrouter") to skip the configured chain
and pin the call to a specific provider. Used by the reviewer agent
to force routing through OpenRouter so it can address a non-DeepSeek
model that doesn't pre-think before emitting JSON."""
if provider is not None:
chain = [provider]
else:
chain = _provider_chain()
if not chain:
raise RuntimeError("No LLM provider configured (no API key set)")
last_exc: Exception | None = None
for i, provider in enumerate(chain):
try:
result = await _call_provider(
client, provider, messages, model, max_tokens,
response_format=response_format,
)
if i > 0:
from app.logging import get_logger
get_logger("llm").info(
"llm.fallback_succeeded", provider=provider, attempt=i + 1,
)
return result
except Exception as e:
last_exc = e
if i + 1 < len(chain):
from app.logging import get_logger
get_logger("llm").warning(
"llm.primary_failed_trying_fallback",
provider=provider, error=str(e)[:200],
)
continue
# Re-raise the last exception so callers see the failure mode.
assert last_exc is not None
raise last_exc
def month_window() -> tuple[datetime, datetime]:
"""[start, now] in UTC for the current calendar month."""
now = datetime.now(timezone.utc)
start = now.replace(day=1, hour=0, minute=0, second=0, microsecond=0)
return start, now
def month_start() -> datetime:
return month_window()[0]