read.markets/app/jobs/indicator_summary_job.py
Giorgio Gilestro 45fa31bb2b ai: structured-output + reviewer agent for indicator summaries
Replaces the regex-based clean_summary / looks_like_leakage pipeline
that produced the 2026-05-29 valuation-read leak. Two layers of defence
in depth:

1. JSON-mode generation. The per-group and aggregate summary system
   prompts now require the model to emit a single object
   {"read": "..."}; response_format={"type":"json_object"} is passed
   through to the provider so the API enforces well-formed JSON. Prose
   outside the field is physically impossible. The "read" field is the
   only schema slot, so the model has nowhere to spill scratchpad
   into the envelope.

2. Reviewer agent. services/output_review.review_read() makes a second
   small LLM call that judges whether the candidate "read" string is
   publishable. It catches the residual failure mode — scratchpad
   INSIDE the field ("Let's see…", multi-question parentheticals,
   meta-commentary) — and returns a JSON verdict {"clean": bool,
   "reason": str}. Any failure (provider error, parse error, missing
   field) returns clean=false (fail-safe). Cost ~$0.0001/check; latency
   ~1-2 s in the hourly job, no user-facing latency.

The old regex scaffolding (_LEAK_PATTERNS, clean_summary,
looks_like_leakage, _TRAILING_QUOTE) is deleted entirely. It produced
false positives (chopped legitimate "The indicators are…" leaders) and
false negatives (never matched the chain-of-thought patterns the model
actually emits). The reviewer agent is strictly better on both.

On reviewer/parse rejection: don't persist a new IndicatorSummary; the
API's existing fallback to the previous good row continues to serve
the panel. Failures are logged as ind_summary.json_invalid /
ind_summary.reviewer_rejected so we can measure the rejection rate.

Reviewer cost is added to the row's recorded cost_usd so the monthly
budget cap covers the full pipeline.

Adds tests/test_output_review.py: 11 cases covering _extract_read
(JSON envelope handling — invalid JSON, missing field, wrong types,
empty values) and review_read (clean / unclean verdicts plus three
fail-safe paths for malformed reviewer responses).

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-29 13:10:52 +02:00

367 lines
15 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Hourly per-group indicator summaries — a short AI read at the top of each
Indicators tab. Costs ~$0.0003 per call on DeepSeek V4 Flash, so 10+ groups
hourly stays comfortably under the monthly cap."""
from __future__ import annotations
import asyncio
import json
import httpx
from sqlalchemy import desc, func, select
from app.config import get_settings, load_groups
from app.db import utcnow
from app.jobs._helpers import job_lifecycle, log
from app.jobs._market_context import latest_quotes_by_group, month_spend
from app.models import (
AICall,
IndicatorSummary,
IndicatorSummaryTranslation,
JobRun,
User,
)
from app.services.cadence import DEFAULT_POLICY
from app.services.i18n import ACTIVE_LANGUAGES
from app.services.llm_prompts import (
PROMPT_VERSION,
build_aggregate_summary_system_prompt,
build_aggregate_summary_user_prompt,
build_summary_system_prompt,
build_summary_user_prompt,
)
from app.services.openrouter import (
active_model,
call_llm,
llm_configured,
month_start,
)
from app.services.output_review import review_read
from app.services.translation import translate
AGGREGATE_GROUP_NAME = "__all__"
async def translate_summary_for_active_languages(session, summary_id: int) -> None:
"""Fan out per-language translations for one IndicatorSummary row.
Mirrors ``ai_log_job.translate_log_for_active_languages``: reads the
distinct non-en ``users.lang`` set, translates the English content
once per active language in parallel via ``asyncio.gather``, and
persists each result as an ``IndicatorSummaryTranslation`` row in
its own savepoint so one bad row doesn't lose the rest.
"""
target_langs = sorted({l for l in ACTIVE_LANGUAGES if l != "en"})
if not target_langs:
return
active_langs = (await session.execute(
select(User.lang).distinct().where(User.lang.in_(target_langs))
)).scalars().all()
if not active_langs:
return
summary_row = await session.get(IndicatorSummary, summary_id)
if summary_row is None:
log.warning("ind_summary.translate.missing_summary", summary_id=summary_id)
return
async with httpx.AsyncClient(follow_redirects=True, timeout=60) as client:
results = await asyncio.gather(*[
translate(client, summary_row.content, lang)
for lang in active_langs
], return_exceptions=True)
succeeded = 0
failed = 0
for lang, result in zip(active_langs, results):
if isinstance(result, Exception):
log.warning("ind_summary.translate.failed",
lang=lang, summary_id=summary_id,
error=str(result)[:200])
failed += 1
continue
translated_md, llm_result = result
try:
async with session.begin_nested():
session.add(IndicatorSummaryTranslation(
summary_id=summary_id, lang=lang,
content=translated_md,
generated_at=utcnow(),
model=llm_result.model,
prompt_tokens=llm_result.prompt_tokens,
completion_tokens=llm_result.completion_tokens,
cost_usd=llm_result.cost_usd,
))
await session.commit()
succeeded += 1
except Exception as exc:
log.warning("ind_summary.translate.persist_failed",
lang=lang, summary_id=summary_id, error=str(exc)[:200])
failed += 1
if failed and succeeded == 0:
log.error("ind_summary.translate.all_failed",
summary_id=summary_id, attempted=len(active_langs))
else:
log.info("ind_summary.translate.done",
summary_id=summary_id, succeeded=succeeded, failed=failed)
# Defence-in-depth: read generation goes through JSON mode + a reviewer.
#
# 1. The system prompt instructs the model to emit {"read": "..."} only;
# response_format={"type":"json_object"} forces well-formed JSON at
# the API layer, so prose outside the field is impossible.
# 2. We extract `read`, then ask a second LLM call (services/output_review)
# whether the candidate text is publishable. Scratchpad INSIDE the
# field — "Let's see…", "X? Actually Y?" — is caught here.
# 3. Any failure at either stage (parse, missing field, reviewer veto,
# reviewer error) drops the candidate. The previous good
# IndicatorSummary stays visible.
#
# The old _LEAK_PATTERNS / clean_summary / looks_like_leakage regex
# scaffolding lived here previously. It produced false positives (e.g.
# chopping off a legitimate leading sentence like "The indicators are
# pricing…") and false negatives (it never caught the chain-of-thought
# patterns the model actually emits). The reviewer agent replaces it.
def _extract_read(raw: str) -> str | None:
"""Parse the model's JSON envelope and return the "read" field, or
None if the body isn't valid JSON / the field is missing / the field
isn't a string. Conservative: on any deviation from the schema we
drop the candidate rather than try to salvage it."""
try:
parsed = json.loads(raw)
except json.JSONDecodeError:
return None
if not isinstance(parsed, dict):
return None
read = parsed.get("read")
if not isinstance(read, str):
return None
read = read.strip()
return read or None
async def _generate_one(
session, client: httpx.AsyncClient, group: str, quotes: list[dict],
system_prompt: str, model: str, tone: str, analysis: str,
) -> IndicatorSummary | None:
"""Generate + persist one group's summary. Returns the new row on
success (so the caller can fan out localized translations after
the commit picks up its id) or None on failure.
`model` is retained for ledger labelling but call_llm now picks the
active-provider model itself."""
user_prompt = build_summary_user_prompt(group, quotes)
try:
result = await call_llm(
client,
[{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt}],
max_tokens=800, # DeepSeek sometimes spends 300+ on internal reasoning
response_format={"type": "json_object"},
)
except Exception as e:
session.add(AICall(model=active_model(), status="error", error=str(e)[:500]))
log.warning("ind_summary.failed", group=group, error=str(e)[:120])
return None
candidate = _extract_read(result.content)
if candidate is None or len(candidate) < 40:
# JSON envelope malformed, "read" field missing/wrong type, or
# the candidate is too short to be a real read. Don't persist;
# the last good summary stays visible.
log.warning("ind_summary.json_invalid",
group=group, preview=result.content[:160])
session.add(AICall(
model=result.model,
prompt_tokens=result.prompt_tokens,
completion_tokens=result.completion_tokens,
cost_usd=result.cost_usd,
status="leaked",
))
return None
verdict = await review_read(client, candidate)
if not verdict.clean:
# Reviewer caught scratchpad / meta-commentary / partial text
# INSIDE the read field. Drop the candidate; the previous good
# summary continues to serve.
log.warning("ind_summary.reviewer_rejected",
group=group, reason=verdict.reason,
preview=candidate[:120])
session.add(AICall(
model=result.model,
prompt_tokens=result.prompt_tokens,
completion_tokens=result.completion_tokens,
cost_usd=(result.cost_usd or 0.0) + (verdict.cost_usd or 0.0),
status="leaked",
))
return None
summary = IndicatorSummary(
group_name=group,
generated_at=utcnow(),
model=result.model,
tone=tone,
analysis=analysis,
prompt_version=PROMPT_VERSION,
content=candidate,
prompt_tokens=result.prompt_tokens,
completion_tokens=result.completion_tokens,
# Include the reviewer's cost in the row's recorded spend so the
# monthly budget tracking covers the full pipeline cost.
cost_usd=(result.cost_usd or 0.0) + (verdict.cost_usd or 0.0),
)
session.add(summary)
session.add(AICall(
model=result.model,
prompt_tokens=result.prompt_tokens,
completion_tokens=result.completion_tokens,
cost_usd=(result.cost_usd or 0.0) + (verdict.cost_usd or 0.0),
status="ok",
))
return summary
async def run() -> None:
async with job_lifecycle("indicator_summary_job") as (session, jr):
if jr.status == "skipped":
return
s = get_settings()
if not llm_configured():
log.warning("ind_summary.skipped_no_key", provider=s.LLM_PROVIDER)
jr.status = "skipped"
return
# Cadence — same policy as ai_log_job: hourly during EU/US active,
# throttled off-hours and weekends.
last_success = (await session.execute(
select(func.max(JobRun.finished_at)).where(
JobRun.name == "indicator_summary_job",
JobRun.status == "success",
)
)).scalar()
should_run, reason = DEFAULT_POLICY.should_run(last_success)
if not should_run:
log.info("ind_summary.cadence_skip", reason=reason)
jr.status = "skipped"
jr.error = reason
return
spent = await month_spend(session)
if spent >= s.OPENROUTER_MONTHLY_CAP_USD:
jr.status = "skipped"
jr.error = f"monthly cap reached (${spent:.2f})"
return
groups = await latest_quotes_by_group(session)
# Only summarise groups currently configured in TOML — drops stale
# group names (e.g. an old "pie" before T212 sourcing) that still have
# quotes in the table but no UI presence.
configured = set(load_groups(s.BASELINE_TOML, s.PORTFOLIO_TOML).keys())
groups = {g: q for g, q in groups.items() if g in configured}
if not groups:
jr.status = "skipped"
return
# Phase 2 voice pivot (PROMPT_VERSION 6): generate both tones each
# run so the dashboard toggle is instant. ANALYSIS stays on the
# operator-configured default.
analysis = (s.CASSANDRA_ANALYSIS or "SPECULATIVE").upper()
tones = ("NOVICE", "INTERMEDIATE")
written = 0
async with httpx.AsyncClient(follow_redirects=True) as client:
# Sequential rather than parallel — OpenRouter free tiers can
# throttle bursts; total work is small (~14-16 calls × ~5s each).
for tone in tones:
system_prompt = build_summary_system_prompt(tone, analysis)
for group, quotes in groups.items():
summary = await _generate_one(
session, client, group, quotes,
system_prompt, active_model(), tone, analysis,
)
if summary is not None:
written += 1
await session.commit() # partial progress survives mid-job error
if summary is not None:
await translate_summary_for_active_languages(session, summary.id)
# One aggregate read across all groups, stored under __all__.
# Same JSON-mode + reviewer-agent path as per-group reads.
agg_system = build_aggregate_summary_system_prompt(tone, analysis)
agg_user = build_aggregate_summary_user_prompt(groups)
agg_summary: IndicatorSummary | None = None
try:
result = await call_llm(
client,
[{"role": "system", "content": agg_system},
{"role": "user", "content": agg_user}],
max_tokens=1500,
response_format={"type": "json_object"},
)
candidate = _extract_read(result.content)
if candidate is None or len(candidate) < 40:
log.warning("ind_summary.agg_json_invalid",
tone=tone, preview=result.content[:160])
session.add(AICall(
model=result.model,
prompt_tokens=result.prompt_tokens,
completion_tokens=result.completion_tokens,
cost_usd=result.cost_usd, status="leaked",
))
else:
verdict = await review_read(client, candidate)
full_cost = (result.cost_usd or 0.0) + (verdict.cost_usd or 0.0)
if not verdict.clean:
log.warning("ind_summary.agg_reviewer_rejected",
tone=tone, reason=verdict.reason,
preview=candidate[:120])
session.add(AICall(
model=result.model,
prompt_tokens=result.prompt_tokens,
completion_tokens=result.completion_tokens,
cost_usd=full_cost, status="leaked",
))
else:
agg_summary = IndicatorSummary(
group_name=AGGREGATE_GROUP_NAME,
generated_at=utcnow(),
model=result.model,
tone=tone,
analysis=analysis,
prompt_version=PROMPT_VERSION,
content=candidate,
prompt_tokens=result.prompt_tokens,
completion_tokens=result.completion_tokens,
cost_usd=full_cost,
)
session.add(agg_summary)
session.add(AICall(
model=result.model,
prompt_tokens=result.prompt_tokens,
completion_tokens=result.completion_tokens,
cost_usd=full_cost, status="ok",
))
written += 1
except Exception as e:
session.add(AICall(
model=active_model(), status="error",
error=f"{tone}/agg: {str(e)[:480]}",
))
log.warning("ind_summary.agg_failed",
tone=tone, error=str(e)[:120])
await session.commit()
if agg_summary is not None:
await translate_summary_for_active_languages(session, agg_summary.id)
jr.items_written = written
log.info("ind_summary.done",
groups=len(groups), tones=len(tones), written=written)
if __name__ == "__main__":
asyncio.run(run())