read.markets/app/jobs/ai_log_job.py
Giorgio Gilestro f9534f7ad6 review: gate strategic-log, portfolio, chat, and digest on reviewer
Extends the reviewer agent — previously only protecting indicator
summaries — to every AI-generated surface that reaches a user. The
reviewer's prompt already rejects scratchpad, truncation,
meta-commentary, and (since a6e476b) financial advice; wiring it in
turns those rules from prompt-level "asks" into structural gates.

Four call sites updated:

- ai_log_job.run() : after each tone/analysis variant is generated,
  pass through review_read. On reject, log the reason and skip the
  StrategicLog insert; the API's existing "latest StrategicLog" lookup
  falls back to the previous clean log.

- services/portfolio_analysis.analyse() : on reject, raise a clean
  RuntimeError that the /api/analyze router already maps to HTTP 502
  with a retry-able message. Portfolio analysis isn't cached server-
  side, so the user retries; the reviewer's verdict reason goes into
  the AICall ledger as the leaked-status row's error column.

- routers/chat.chat() : on reject, instead of returning the raw
  assistant content we return a short refusal explaining the limit
  and inviting a rephrase. Adds ~1-2 s of latency per turn (one extra
  LLM call to Haiku) — the only user-facing latency tax.

- jobs/email_digest_job._generate_variants() : on reject, the variant
  is dropped for the cycle. Recipients on the rejected tone get no
  digest email this run, which is better than delivering inbox copy
  that drifts into advice (emails are unrecallable once sent).

In every case the AICall ledger row records the reviewer cost so
month_spend stays accurate across all paths.

The reviewer system prompt is slightly generalised to cover both the
indicator-summary case and the longer-form log/digest/chat case:
- removes "short interpretive read" framing
- softens the "any question" rule so genuine rhetorical structure in
  a long-form log doesn't trigger a reject

tests/conftest.py grows an autouse fixture that stubs review_read to
clean=True in every consumer module. Tests that mock the generator
shouldn't have to also mock the safety gate behind it; tests that
specifically want the reject branch can override with their own
monkeypatch. test_output_review.py is unaffected — it imports
review_read directly.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-29 14:40:04 +02:00

258 lines
10 KiB
Python

"""Hourly AI strategic-log generator. Pulls already-persisted market data and
headlines from the DB (no live fetches), calls OpenRouter, persists the log
and a row in the cost ledger."""
from __future__ import annotations
import asyncio
import httpx
from sqlalchemy import desc, func, select
from app.config import get_settings
from app.db import utcnow
from app.jobs._helpers import job_lifecycle, log
from app.jobs._market_context import (
REFERENCE_LINE,
latest_quotes_by_group,
month_spend,
recent_headlines_by_bucket,
)
from app.models import AICall, JobRun, StrategicLog, StrategicLogTranslation, User
from app.services.cadence import DEFAULT_POLICY
from app.services.i18n import ACTIVE_LANGUAGES
from app.services.llm_prompts import (
PROMPT_VERSION,
build_system_prompt,
build_user_prompt,
)
from app.services.output_review import review_read
from app.services.openrouter import (
active_model,
call_llm,
llm_configured,
)
from app.services.translation import translate
async def translate_log_for_active_languages(session, log_id: int) -> None:
"""Fan out per-language translations for the strategic log identified
by ``log_id``.
Reads ``users.lang`` (deduplicated, restricted to ACTIVE_LANGUAGES
minus English), one translation call per language in parallel via
``asyncio.gather``, persists each successful result as a
``StrategicLogTranslation`` row. Each row is committed in its own
savepoint so a per-language LLM error or DB error doesn't roll back
the languages that already succeeded.
The job orchestrator calls this AFTER the English ``StrategicLog``
row is committed; pass the row's ``id`` in.
"""
target_langs = sorted({l for l in ACTIVE_LANGUAGES if l != "en"})
if not target_langs:
return
active_langs = (await session.execute(
select(User.lang).distinct().where(User.lang.in_(target_langs))
)).scalars().all()
if not active_langs:
return
log_row = await session.get(StrategicLog, log_id)
if log_row is None:
log.warning("log.translate.missing_log", log_id=log_id)
return
async with httpx.AsyncClient(follow_redirects=True, timeout=60) as client:
results = await asyncio.gather(*[
translate(client, log_row.content, lang)
for lang in active_langs
], return_exceptions=True)
succeeded = 0
failed = 0
for lang, result in zip(active_langs, results):
if isinstance(result, Exception):
log.warning("log.translate.failed", lang=lang, log_id=log_id,
error=str(result)[:200])
failed += 1
continue
translated_md, llm_result = result
try:
async with session.begin_nested():
session.add(StrategicLogTranslation(
log_id=log_id, lang=lang,
content=translated_md,
generated_at=utcnow(),
model=llm_result.model,
prompt_tokens=llm_result.prompt_tokens,
completion_tokens=llm_result.completion_tokens,
cost_usd=llm_result.cost_usd,
))
await session.commit()
succeeded += 1
except Exception as exc:
log.warning("log.translate.persist_failed",
lang=lang, log_id=log_id, error=str(exc)[:200])
failed += 1
if failed and succeeded == 0:
log.error("log.translate.all_failed",
log_id=log_id, attempted=len(active_langs))
else:
log.info("log.translate.done",
log_id=log_id, succeeded=succeeded, failed=failed)
async def run() -> None:
async with job_lifecycle("ai_log_job") as (session, jr):
if jr.status == "skipped":
return
s = get_settings()
if not llm_configured():
log.warning("ai_log.skipped_no_key", provider=s.LLM_PROVIDER)
jr.status = "skipped"
return
# Cadence: hourly during EU/US active hours; throttled off-hours.
last_success = (await session.execute(
select(func.max(JobRun.finished_at)).where(
JobRun.name == "ai_log_job",
JobRun.status == "success",
)
)).scalar()
should_run, reason = DEFAULT_POLICY.should_run(last_success)
if not should_run:
log.info("ai_log.cadence_skip", reason=reason)
jr.status = "skipped"
jr.error = reason
return
spent = await month_spend(session)
if spent >= s.OPENROUTER_MONTHLY_CAP_USD:
log.warning("ai_log.cap_reached", spent=spent,
cap=s.OPENROUTER_MONTHLY_CAP_USD)
jr.status = "skipped"
jr.error = f"monthly cost cap reached (${spent:.2f})"
return
quotes = await latest_quotes_by_group(session)
news = await recent_headlines_by_bucket(session)
if not quotes and not news:
log.warning("ai_log.no_data_yet")
jr.status = "skipped"
return
# Look up the most recent log generated today (UTC) so the model can
# update it rather than start from scratch. This gives the model
# temporal awareness — "since this morning's read, X has changed".
today_start = utcnow().replace(hour=0, minute=0, second=0, microsecond=0)
previous_log = (await session.execute(
select(StrategicLog)
.where(StrategicLog.generated_at >= today_start)
.order_by(desc(StrategicLog.generated_at))
.limit(1)
)).scalar_one_or_none()
anchor = s.CASSANDRA_ANCHOR_DATE or None
user_prompt = build_user_prompt(
today=utcnow(),
anchor=anchor,
quotes_by_group=quotes,
headlines_by_bucket=news,
reference_line=REFERENCE_LINE,
previous_log=previous_log,
)
# Phase 2 voice pivot (PROMPT_VERSION 6): generate both tones per
# run so the dashboard toggle is instant. Analysis stays on the
# operator-configured default (DRY|SPECULATIVE is a system-wide
# preference, not a per-user toggle). PRO was dropped.
analysis = (s.CASSANDRA_ANALYSIS or "SPECULATIVE").upper()
variants = [
("NOVICE", analysis),
("INTERMEDIATE", analysis),
]
written = 0
async with httpx.AsyncClient(follow_redirects=True) as client:
for tone, analysis in variants:
# Re-check cost cap between variants so a runaway run is
# bounded.
spent = await month_spend(session)
if spent >= s.OPENROUTER_MONTHLY_CAP_USD:
log.warning("ai_log.cap_reached_midrun",
spent=spent, completed=written)
break
system_prompt = build_system_prompt(tone, analysis)
try:
result = await call_llm(
client,
[{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt}],
)
except Exception as e:
session.add(AICall(
model=active_model(), status="error",
error=f"{tone}/{analysis}: {str(e)[:480]}",
))
await session.commit()
log.error("ai_log.variant_failed",
tone=tone, analysis=analysis, error=str(e)[:200])
continue
# Reviewer gate: catches chain-of-thought, truncation,
# and (regulatory-critical) any financial-advice phrasing
# that drifted past the generator's system prompt. Drop
# rejected variants; the API falls back to the previous
# clean StrategicLog row.
verdict = await review_read(client, result.content)
full_cost = (result.cost_usd or 0.0) + (verdict.cost_usd or 0.0)
if not verdict.clean:
session.add(AICall(
model=result.model,
prompt_tokens=result.prompt_tokens,
completion_tokens=result.completion_tokens,
cost_usd=full_cost, status="leaked",
))
await session.commit()
log.warning("ai_log.reviewer_rejected",
tone=tone, analysis=analysis,
reason=verdict.reason,
preview=result.content[:120])
continue
slog = StrategicLog(
generated_at=utcnow(),
model=result.model,
anchor_date=anchor,
prompt_version=PROMPT_VERSION,
tone=tone,
analysis=analysis,
content=result.content,
prompt_tokens=result.prompt_tokens,
completion_tokens=result.completion_tokens,
cost_usd=full_cost,
)
session.add(slog)
session.add(AICall(
model=result.model,
prompt_tokens=result.prompt_tokens,
completion_tokens=result.completion_tokens,
cost_usd=full_cost,
status="ok",
))
await session.commit()
await translate_log_for_active_languages(session, slog.id)
written += 1
log.info("ai_log.variant_done",
tone=tone, analysis=analysis,
prompt_tokens=result.prompt_tokens,
completion_tokens=result.completion_tokens)
jr.items_written = written
log.info("ai_log.done", variants=written, total=len(variants))
if __name__ == "__main__":
asyncio.run(run())