openrouter.py was 790 lines mixing two orthogonal concerns: - Prompt engineering (build_system_prompt, build_summary_*, build_chat_*, build_daily_digest_*, etc.) — ~400 lines, changes weekly as PROMPT_VERSION bumps - LLM transport (call_llm, _provider_chain, _call_provider, retry + fallback machinery) — ~250 lines, rarely changes Extracted the prompt-engineering surface to app/services/llm_prompts.py. Transport stays in openrouter.py (consistent with the filename — the OpenRouter URL is the transport's anchor). All import sites (jobs, routers, services, tests) split their multi-import lines into two: prompt-things from llm_prompts, transport from openrouter. PROMPT_VERSION constant, _TONE_ALIASES, _resolve_tone, and SYSTEM_PROMPT moved with the prompt functions. No behaviour change — pure relocation. Function signatures, body, and naming all preserved. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
371 lines
15 KiB
Python
371 lines
15 KiB
Python
"""Hourly per-group indicator summaries — a short AI read at the top of each
|
||
Indicators tab. Costs ~$0.0003 per call on DeepSeek V4 Flash, so 10+ groups
|
||
hourly stays comfortably under the monthly cap."""
|
||
from __future__ import annotations
|
||
|
||
import asyncio
|
||
import re
|
||
|
||
import httpx
|
||
from sqlalchemy import desc, func, select
|
||
|
||
from app.config import get_settings, load_groups
|
||
from app.db import utcnow
|
||
from app.jobs._helpers import job_lifecycle, log
|
||
from app.jobs._market_context import latest_quotes_by_group, month_spend
|
||
from app.models import (
|
||
AICall,
|
||
IndicatorSummary,
|
||
IndicatorSummaryTranslation,
|
||
JobRun,
|
||
User,
|
||
)
|
||
from app.services.cadence import DEFAULT_POLICY
|
||
from app.services.i18n import ACTIVE_LANGUAGES
|
||
from app.services.llm_prompts import (
|
||
PROMPT_VERSION,
|
||
build_aggregate_summary_system_prompt,
|
||
build_aggregate_summary_user_prompt,
|
||
build_summary_system_prompt,
|
||
build_summary_user_prompt,
|
||
)
|
||
from app.services.openrouter import (
|
||
active_model,
|
||
call_llm,
|
||
llm_configured,
|
||
month_start,
|
||
)
|
||
from app.services.translation import translate
|
||
|
||
|
||
AGGREGATE_GROUP_NAME = "__all__"
|
||
|
||
|
||
async def translate_summary_for_active_languages(session, summary_id: int) -> None:
|
||
"""Fan out per-language translations for one IndicatorSummary row.
|
||
|
||
Mirrors ``ai_log_job.translate_log_for_active_languages``: reads the
|
||
distinct non-en ``users.lang`` set, translates the English content
|
||
once per active language in parallel via ``asyncio.gather``, and
|
||
persists each result as an ``IndicatorSummaryTranslation`` row.
|
||
Per-language failures are logged but never raise.
|
||
"""
|
||
target_langs = sorted({l for l in ACTIVE_LANGUAGES if l != "en"})
|
||
if not target_langs:
|
||
return
|
||
active_langs = (await session.execute(
|
||
select(User.lang).distinct().where(User.lang.in_(target_langs))
|
||
)).scalars().all()
|
||
if not active_langs:
|
||
return
|
||
|
||
summary_row = await session.get(IndicatorSummary, summary_id)
|
||
if summary_row is None:
|
||
log.warning("ind_summary.translate.missing_summary", summary_id=summary_id)
|
||
return
|
||
|
||
async with httpx.AsyncClient(follow_redirects=True, timeout=60) as client:
|
||
results = await asyncio.gather(*[
|
||
translate(client, summary_row.content, lang)
|
||
for lang in active_langs
|
||
], return_exceptions=True)
|
||
|
||
for lang, result in zip(active_langs, results):
|
||
if isinstance(result, Exception):
|
||
log.warning("ind_summary.translate.failed",
|
||
lang=lang, summary_id=summary_id,
|
||
error=str(result)[:200])
|
||
continue
|
||
translated_md, llm_result = result
|
||
session.add(IndicatorSummaryTranslation(
|
||
summary_id=summary_id, lang=lang,
|
||
content=translated_md,
|
||
generated_at=utcnow(),
|
||
model=llm_result.model,
|
||
prompt_tokens=llm_result.prompt_tokens,
|
||
completion_tokens=llm_result.completion_tokens,
|
||
cost_usd=llm_result.cost_usd,
|
||
))
|
||
await session.commit()
|
||
|
||
|
||
# Strip known meta-commentary openers the model sometimes leaks despite the
|
||
# prompt's hard constraints. Each pattern matches one leading sentence.
|
||
_LEAK_PATTERNS = [
|
||
re.compile(p, re.IGNORECASE | re.DOTALL)
|
||
for p in (
|
||
# First-person meta — "I need to / I'll / I have to / I'm going to ..."
|
||
r"^i\s+(?:need|have|must|should|am going|'ll|will|shall|can|am)[^.]*\.\s*",
|
||
# "We need / we're / we are asked / we will ..."
|
||
r"^we\s+(?:need|are|'re|will|shall|can|should|must|have)[^.]*\.\s*",
|
||
r"^let\s+(?:me|us|'?s)[^.]*\.\s*",
|
||
r"^here['’]s[^.]*\.\s*",
|
||
r"^sure[,!]?\s[^.]*\.\s*",
|
||
r"^looking at[^.]*\.\s*",
|
||
r"^based on[^.]*\.\s*",
|
||
r"^to (?:address|answer|write|summarise|summarize)[^.]*\.\s*",
|
||
r"^first[,]?\s[^.]*\.\s*",
|
||
r"^the (?:user|data shows|reader|task|request|reader sees|instructions?)[^.]*\.\s*",
|
||
r"^summary[:.]\s*",
|
||
r"^key\s*[:\-—]\s*",
|
||
r"^must\s+(?:be|cite|explain|avoid|give|stay|provide)[^.]*\.\s*",
|
||
r"^should\s+(?:be|give|cite|explain|avoid|provide)[^.]*\.\s*",
|
||
r"^avoid[^.]*\.\s*",
|
||
r"^cite\s+at\s+most[^.]*\.\s*",
|
||
r"^be\s+(?:speculative|specific|concise|brief)[^.]*\.\s*",
|
||
r"^stay\s+on[^.]*\.\s*",
|
||
r"^okay[,]?\s+",
|
||
r"^alright[,]?\s+",
|
||
r"^thinking[^.]*\.\s*",
|
||
# Prompt-leak prefixes — the model echoes example framing or rule
|
||
# headers from the system prompt.
|
||
r"^(?:good|bad|positive|negative)\s+example\s*[:\-—]\s*",
|
||
r"^example\s+(?:good|bad)\s*[:\-—]\s*",
|
||
r"^example\s*[:\-—]\s*",
|
||
r"^reference\s+style\s*[:\-—]\s*",
|
||
# Prompt label echoes (markdown-style or plain-text)
|
||
r"^(?:hard\s+)?constraints?\s*[:\-—][^.\n]*[.\n]\s*",
|
||
r"^key\s+observations?\s*[:\-—]\s*",
|
||
r"^observations?\s*[:\-—]\s*",
|
||
r"^focus\s+on[^.]*\.\s*",
|
||
r"^output\s+the\s+read[^.]*\.\s*",
|
||
r"^plain\s+prose[^.]*\.\s*",
|
||
r"^the\s+indicators?[^.]*\.\s*", # "The indicators include..." / "The indicators are..."
|
||
r"^indicators?\s*[:\-—]\s*",
|
||
r"^data\s*[:\-—]\s*",
|
||
r"^analysis\s*[:\-—]\s*",
|
||
r"^interpretation\s*[:\-—]\s*",
|
||
r"^read\s*[:\-—]\s*",
|
||
r"^note\s*[:\-—]\s*",
|
||
# Sometimes the response gets wrapped in literal quotes
|
||
r"^[\"“'`]+",
|
||
)
|
||
]
|
||
|
||
|
||
_TRAILING_QUOTE = re.compile(r"[\"”'`]+\s*$")
|
||
|
||
# Tell-tale phrases that mean the model regurgitated the prompt as its
|
||
# "answer" — we'd rather show nothing than show this.
|
||
_LEAKAGE_FLAGS = (
|
||
"≤60 words", "60 words", "must be under", "must cite", "must explain",
|
||
"no meta-commentary", "no buy/sell", "horizon. ", "1-day moves",
|
||
"the instructions are", "instructions:", "constraints:", "hard constraints",
|
||
"good example", "bad example", "reference style",
|
||
)
|
||
|
||
|
||
def looks_like_leakage(text: str) -> bool:
|
||
"""Heuristic: after cleaning, if these phrases still appear, the output
|
||
is contaminated prompt-regurgitation and shouldn't be shown."""
|
||
low = text.lower()
|
||
return any(flag in low for flag in _LEAKAGE_FLAGS)
|
||
|
||
|
||
def clean_summary(text: str) -> str:
|
||
"""Strip leading meta-commentary. If cleaning removes nearly everything
|
||
(suggesting the model emitted reasoning then ran out of tokens), fall
|
||
back to the last non-empty paragraph of the raw output — that's usually
|
||
where the actual answer ended up."""
|
||
raw = text.strip()
|
||
out = raw
|
||
# Up to 6 passes: handles compound leakage like
|
||
# "Constraints: <...>. The indicators are: <...>. <actual answer>"
|
||
for _ in range(6):
|
||
before = out
|
||
for pat in _LEAK_PATTERNS:
|
||
out = pat.sub("", out, count=1).lstrip()
|
||
if out == before:
|
||
break
|
||
if len(out) < 60 and len(raw) > 120:
|
||
# Cleaning ate too much; take the last non-empty paragraph of raw.
|
||
paragraphs = [p.strip() for p in re.split(r"\n\s*\n", raw) if p.strip()]
|
||
if paragraphs:
|
||
out = paragraphs[-1]
|
||
# Re-strip leaders from the recovered paragraph too.
|
||
for _ in range(2):
|
||
before = out
|
||
for pat in _LEAK_PATTERNS:
|
||
out = pat.sub("", out, count=1).lstrip()
|
||
if out == before:
|
||
break
|
||
# Trim any orphan closing quote/backtick from the wrap-strip above.
|
||
out = _TRAILING_QUOTE.sub("", out).rstrip()
|
||
return out
|
||
|
||
|
||
|
||
async def _generate_one(
|
||
session, client: httpx.AsyncClient, group: str, quotes: list[dict],
|
||
system_prompt: str, model: str, tone: str, analysis: str,
|
||
) -> IndicatorSummary | None:
|
||
"""Generate + persist one group's summary. Returns the new row on
|
||
success (so the caller can fan out localized translations after
|
||
the commit picks up its id) or None on failure.
|
||
`model` is retained for ledger labelling but call_llm now picks the
|
||
active-provider model itself."""
|
||
user_prompt = build_summary_user_prompt(group, quotes)
|
||
try:
|
||
result = await call_llm(
|
||
client,
|
||
[{"role": "system", "content": system_prompt},
|
||
{"role": "user", "content": user_prompt}],
|
||
max_tokens=800, # DeepSeek sometimes spends 300+ on internal reasoning
|
||
)
|
||
except Exception as e:
|
||
session.add(AICall(model=active_model(), status="error", error=str(e)[:500]))
|
||
log.warning("ind_summary.failed", group=group, error=str(e)[:120])
|
||
return None
|
||
|
||
cleaned = clean_summary(result.content)
|
||
if looks_like_leakage(cleaned) or len(cleaned) < 40:
|
||
# Model regurgitated the prompt or produced nothing usable.
|
||
# Don't persist — keep the last good summary visible. Log it so
|
||
# we can see the rate of failures over time.
|
||
log.warning("ind_summary.leakage_detected",
|
||
group=group, preview=cleaned[:120])
|
||
session.add(AICall(
|
||
model=result.model,
|
||
prompt_tokens=result.prompt_tokens,
|
||
completion_tokens=result.completion_tokens,
|
||
cost_usd=result.cost_usd,
|
||
status="leaked",
|
||
))
|
||
return None
|
||
|
||
summary = IndicatorSummary(
|
||
group_name=group,
|
||
generated_at=utcnow(),
|
||
model=result.model,
|
||
tone=tone,
|
||
analysis=analysis,
|
||
prompt_version=PROMPT_VERSION,
|
||
content=cleaned,
|
||
prompt_tokens=result.prompt_tokens,
|
||
completion_tokens=result.completion_tokens,
|
||
cost_usd=result.cost_usd,
|
||
)
|
||
session.add(summary)
|
||
session.add(AICall(
|
||
model=result.model,
|
||
prompt_tokens=result.prompt_tokens,
|
||
completion_tokens=result.completion_tokens,
|
||
cost_usd=result.cost_usd,
|
||
status="ok",
|
||
))
|
||
return summary
|
||
|
||
|
||
async def run() -> None:
|
||
async with job_lifecycle("indicator_summary_job") as (session, jr):
|
||
if jr.status == "skipped":
|
||
return
|
||
s = get_settings()
|
||
if not llm_configured():
|
||
log.warning("ind_summary.skipped_no_key", provider=s.LLM_PROVIDER)
|
||
jr.status = "skipped"
|
||
return
|
||
|
||
# Cadence — same policy as ai_log_job: hourly during EU/US active,
|
||
# throttled off-hours and weekends.
|
||
last_success = (await session.execute(
|
||
select(func.max(JobRun.finished_at)).where(
|
||
JobRun.name == "indicator_summary_job",
|
||
JobRun.status == "success",
|
||
)
|
||
)).scalar()
|
||
should_run, reason = DEFAULT_POLICY.should_run(last_success)
|
||
if not should_run:
|
||
log.info("ind_summary.cadence_skip", reason=reason)
|
||
jr.status = "skipped"
|
||
jr.error = reason
|
||
return
|
||
|
||
spent = await month_spend(session)
|
||
if spent >= s.OPENROUTER_MONTHLY_CAP_USD:
|
||
jr.status = "skipped"
|
||
jr.error = f"monthly cap reached (${spent:.2f})"
|
||
return
|
||
|
||
groups = await latest_quotes_by_group(session)
|
||
# Only summarise groups currently configured in TOML — drops stale
|
||
# group names (e.g. an old "pie" before T212 sourcing) that still have
|
||
# quotes in the table but no UI presence.
|
||
configured = set(load_groups(s.BASELINE_TOML, s.PORTFOLIO_TOML).keys())
|
||
groups = {g: q for g, q in groups.items() if g in configured}
|
||
if not groups:
|
||
jr.status = "skipped"
|
||
return
|
||
|
||
# Phase 2 voice pivot (PROMPT_VERSION 6): generate both tones each
|
||
# run so the dashboard toggle is instant. ANALYSIS stays on the
|
||
# operator-configured default.
|
||
analysis = (s.CASSANDRA_ANALYSIS or "SPECULATIVE").upper()
|
||
tones = ("NOVICE", "INTERMEDIATE")
|
||
|
||
written = 0
|
||
async with httpx.AsyncClient(follow_redirects=True) as client:
|
||
# Sequential rather than parallel — OpenRouter free tiers can
|
||
# throttle bursts; total work is small (~14-16 calls × ~5s each).
|
||
for tone in tones:
|
||
system_prompt = build_summary_system_prompt(tone, analysis)
|
||
for group, quotes in groups.items():
|
||
summary = await _generate_one(
|
||
session, client, group, quotes,
|
||
system_prompt, active_model(), tone, analysis,
|
||
)
|
||
if summary is not None:
|
||
written += 1
|
||
await session.commit() # partial progress survives mid-job error
|
||
if summary is not None:
|
||
await translate_summary_for_active_languages(session, summary.id)
|
||
|
||
# One aggregate read across all groups, stored under __all__.
|
||
agg_system = build_aggregate_summary_system_prompt(tone, analysis)
|
||
agg_user = build_aggregate_summary_user_prompt(groups)
|
||
agg_summary: IndicatorSummary | None = None
|
||
try:
|
||
result = await call_llm(
|
||
client,
|
||
[{"role": "system", "content": agg_system},
|
||
{"role": "user", "content": agg_user}],
|
||
max_tokens=1500, # room for reasoning + 80-word output
|
||
)
|
||
agg_summary = IndicatorSummary(
|
||
group_name=AGGREGATE_GROUP_NAME,
|
||
generated_at=utcnow(),
|
||
model=result.model,
|
||
tone=tone,
|
||
analysis=analysis,
|
||
prompt_version=PROMPT_VERSION,
|
||
content=clean_summary(result.content),
|
||
prompt_tokens=result.prompt_tokens,
|
||
completion_tokens=result.completion_tokens,
|
||
cost_usd=result.cost_usd,
|
||
)
|
||
session.add(agg_summary)
|
||
session.add(AICall(
|
||
model=result.model,
|
||
prompt_tokens=result.prompt_tokens,
|
||
completion_tokens=result.completion_tokens,
|
||
cost_usd=result.cost_usd, status="ok",
|
||
))
|
||
written += 1
|
||
except Exception as e:
|
||
session.add(AICall(
|
||
model=active_model(), status="error",
|
||
error=f"{tone}/agg: {str(e)[:480]}",
|
||
))
|
||
log.warning("ind_summary.agg_failed",
|
||
tone=tone, error=str(e)[:120])
|
||
await session.commit()
|
||
if agg_summary is not None:
|
||
await translate_summary_for_active_languages(session, agg_summary.id)
|
||
|
||
jr.items_written = written
|
||
log.info("ind_summary.done",
|
||
groups=len(groups), tones=len(tones), written=written)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
asyncio.run(run())
|