diff --git a/app/jobs/ai_log_job.py b/app/jobs/ai_log_job.py index 9b5683e..197faa5 100644 --- a/app/jobs/ai_log_job.py +++ b/app/jobs/ai_log_job.py @@ -25,6 +25,7 @@ from app.services.llm_prompts import ( build_system_prompt, build_user_prompt, ) +from app.services.output_review import review_read from app.services.openrouter import ( active_model, call_llm, @@ -200,6 +201,27 @@ async def run() -> None: tone=tone, analysis=analysis, error=str(e)[:200]) continue + # Reviewer gate: catches chain-of-thought, truncation, + # and (regulatory-critical) any financial-advice phrasing + # that drifted past the generator's system prompt. Drop + # rejected variants; the API falls back to the previous + # clean StrategicLog row. + verdict = await review_read(client, result.content) + full_cost = (result.cost_usd or 0.0) + (verdict.cost_usd or 0.0) + if not verdict.clean: + session.add(AICall( + model=result.model, + prompt_tokens=result.prompt_tokens, + completion_tokens=result.completion_tokens, + cost_usd=full_cost, status="leaked", + )) + await session.commit() + log.warning("ai_log.reviewer_rejected", + tone=tone, analysis=analysis, + reason=verdict.reason, + preview=result.content[:120]) + continue + slog = StrategicLog( generated_at=utcnow(), model=result.model, @@ -210,14 +232,14 @@ async def run() -> None: content=result.content, prompt_tokens=result.prompt_tokens, completion_tokens=result.completion_tokens, - cost_usd=result.cost_usd, + cost_usd=full_cost, ) session.add(slog) session.add(AICall( model=result.model, prompt_tokens=result.prompt_tokens, completion_tokens=result.completion_tokens, - cost_usd=result.cost_usd, + cost_usd=full_cost, status="ok", )) await session.commit() diff --git a/app/jobs/email_digest_job.py b/app/jobs/email_digest_job.py index dc89e5b..4cbd865 100644 --- a/app/jobs/email_digest_job.py +++ b/app/jobs/email_digest_job.py @@ -41,6 +41,7 @@ from app.services.openrouter import ( call_llm, llm_configured, ) +from app.services.output_review import review_read from app.services.translation import translate @@ -93,12 +94,31 @@ async def _generate_variants(session, client, kind: str, ctx: dict) -> dict[str, [{"role": "system", "content": sys_}, {"role": "user", "content": usr}], ) + # Reviewer gate. Digest emails land in inboxes — once + # delivered they're unrecallable, so a financial-advice slip + # has more reach than the dashboard. Drop rejected variants; + # users on that tone get no digest this cycle (better than + # delivering bad copy). + verdict = await review_read(client, result.content) + full_cost = (result.cost_usd or 0.0) + (verdict.cost_usd or 0.0) + if not verdict.clean: + session.add(AICall( + model=result.model, + prompt_tokens=result.prompt_tokens, + completion_tokens=result.completion_tokens, + cost_usd=full_cost, status="leaked", + error=f"reviewer: {verdict.reason}", + )) + await session.commit() + log.warning("digest.reviewer_rejected", kind=kind, tone=tone, + reason=verdict.reason, preview=result.content[:120]) + continue out[tone] = result.content session.add(AICall( model=result.model, prompt_tokens=result.prompt_tokens, completion_tokens=result.completion_tokens, - cost_usd=result.cost_usd, + cost_usd=full_cost, status="ok", )) await session.commit() diff --git a/app/routers/chat.py b/app/routers/chat.py index f213637..20f99e5 100644 --- a/app/routers/chat.py +++ b/app/routers/chat.py @@ -24,6 +24,10 @@ from app.routers.api import _md_to_html from app.services.i18n import respond_in_clause from app.services.llm_prompts import build_chat_system_prompt from app.services.openrouter import call_llm, month_start +from app.services.output_review import review_read + +from app.logging import get_logger +log = get_logger("chat") router = APIRouter(dependencies=[Depends(require_token)]) @@ -176,6 +180,11 @@ async def chat( try: async with httpx.AsyncClient(follow_redirects=True) as client: result = await call_llm(client, msgs) + # Reviewer gate. The chat turn could solicit advice with a + # leading question; the generator's system prompt forbids it, + # but the reviewer is the enforcement layer. ~1-2 s extra + # latency per turn on top of the generation call. + verdict = await review_read(client, result.content) except Exception as e: session.add(AICall( model=s.OPENROUTER_MODEL, status="error", error=str(e)[:500], @@ -183,11 +192,40 @@ async def chat( await session.commit() raise HTTPException(status_code=502, detail=f"OpenRouter error: {e}") + full_cost = (result.cost_usd or 0.0) + (verdict.cost_usd or 0.0) + if not verdict.clean: + # Rejected reply. Record the cost and surface a generic refusal + # the user can retry, rather than letting potentially non-compliant + # text reach them. + session.add(AICall( + model=result.model, + prompt_tokens=result.prompt_tokens, + completion_tokens=result.completion_tokens, + cost_usd=full_cost, status="leaked", + error=f"reviewer: {verdict.reason}", + )) + await session.commit() + log.warning("chat.reviewer_rejected", reason=verdict.reason, + preview=result.content[:120]) + refusal = ( + "I can't generate that reply — it would have crossed into " + "investment advice or specific recommendations, which I'm " + "not licensed to give. Try rephrasing as a question about " + "what the data means rather than what to do." + ) + return { + "role": "assistant", + "content": refusal, + "content_html": _md_to_html(refusal), + "prompt_tokens": result.prompt_tokens, + "completion_tokens": result.completion_tokens, + } + session.add(AICall( model=result.model, prompt_tokens=result.prompt_tokens, completion_tokens=result.completion_tokens, - cost_usd=result.cost_usd, + cost_usd=full_cost, status="ok", )) await session.commit() diff --git a/app/services/output_review.py b/app/services/output_review.py index 833b927..4fbb2fb 100644 --- a/app/services/output_review.py +++ b/app/services/output_review.py @@ -39,25 +39,29 @@ DEFAULT_REVIEWER_MODEL = "anthropic/claude-haiku-4.5" _SYSTEM_PROMPT = """\ You are a strict editor for a financial-markets dashboard. The author -was asked to produce a short interpretive read for human readers. -You receive their proposed read and decide if it is publishable as-is. +was asked to produce editorial commentary on public market data for +human readers. You receive the proposed text — it may be a one-line +read, a multi-paragraph daily log, a portfolio analysis, a chat +reply, or an email digest — and decide if it is publishable as-is. -Mark CLEAN only if the text reads like a finished interpretation a -reader could see on a public dashboard without confusion. +Mark CLEAN only if the text reads like finished editorial commentary +a reader could see on a public dashboard without confusion. Mark UNCLEAN if the text contains ANY of: -- Chain-of-thought / scratchpad markers used as thinking — phrases like +- Chain-of-thought / scratchpad markers — the author thinking on the + page rather than presenting finished commentary. Phrases like "Let me", "Let's see", "we need to", "actually" (correcting itself), - "wait", "hmm", "or rather", "I should". + "wait", "hmm", "or rather", "I should". Rhetorical questions used + as structure are fine; questions that the author then answers in + front of the reader (self-questioning) are not. - Self-questioning parentheticals: "Q1 2026? Actually Q4 2025?", "is it X or Y?", any place where the author appears to be working out the answer in front of the reader. -- Multiple rhetorical questions or any question that interrupts the - declarative voice. A clean interpretive read is assertive. - Meta-commentary about the task, output format, word limits, or instructions — e.g. "as required by the constraints", "the prompt asks", "let me address each". -- Partial / truncated content. Starts mid-word, mid-number, mid-clause. +- Partial / truncated content. Starts mid-word, mid-number, mid-clause, + ends mid-thought. - Visible internal numbers without clear meaning ("change 1y +5.9%?"), raw column names ("as_of 2026-01-01"), or any debug-like fragments. - FINANCIAL ADVICE or any phrasing that recommends an action the @@ -75,7 +79,7 @@ Mark UNCLEAN if the text contains ANY of: "valuations are stretched", "real yields are restrictive", "rates and credit disagree". The test: does the text describe a STATE, or does it suggest an ACTION? States are fine; actions are not. -- Anything else other than the finished, publishable interpretation. +- Anything else other than the finished, publishable commentary. Return ONLY a JSON object with this exact shape: {"clean": true | false, "reason": "<≤20 words, plain text>"} diff --git a/app/services/portfolio_analysis.py b/app/services/portfolio_analysis.py index 450f948..1f6bea7 100644 --- a/app/services/portfolio_analysis.py +++ b/app/services/portfolio_analysis.py @@ -33,6 +33,7 @@ from app.logging import get_logger from app.models import AICall from app.services.i18n import LANGUAGES, respond_in_clause from app.services.llm_prompts import build_system_prompt +from app.services.output_review import review_read from app.services.openrouter import ( LogResult, active_model, @@ -322,6 +323,8 @@ async def analyse( s = get_settings() system, user = build_prompt(req) + review_cost = 0.0 + review_reason: str | None = None async with httpx.AsyncClient() as client: try: llm: LogResult = await call_llm( @@ -340,15 +343,31 @@ async def analyse( llm = None log.error("portfolio_analysis.failed", error=error_msg) + # Reviewer gate. This is the highest-risk surface — the model is + # commenting on a real user's holdings, so any drift into + # buy/sell or allocation language is a regulatory hazard. Drop + # the response on a reject and surface a retry-able error to the + # caller; no analysis is ever persisted server-side anyway. + if llm is not None: + verdict = await review_read(client, llm.content) + review_cost = verdict.cost_usd or 0.0 + if not verdict.clean: + status = "leaked" + error_msg = f"reviewer rejected: {verdict.reason}" + review_reason = verdict.reason + log.warning("portfolio_analysis.reviewer_rejected", + reason=verdict.reason, preview=llm.content[:120]) + + full_cost = ((llm.cost_usd or 0.0) + review_cost) if llm else None # Ledger row — NO portfolio data, just metadata. Same row whether the - # call succeeded or failed, so cost-cap and rate-limit logic can - # observe the attempt. + # call succeeded, failed, or was rejected by the reviewer, so + # cost-cap and rate-limit logic can observe the attempt. session.add(AICall( called_at=utcnow(), model=llm.model if llm else active_model(), prompt_tokens=llm.prompt_tokens if llm else None, completion_tokens=llm.completion_tokens if llm else None, - cost_usd=llm.cost_usd if llm else None, + cost_usd=full_cost, status=status, error=error_msg, )) @@ -356,19 +375,26 @@ async def analyse( if llm is None: raise RuntimeError(error_msg or "portfolio analysis failed") + if review_reason is not None: + # Reviewer rejected the candidate. Treat as a generation failure + # at the API layer so the user sees a retry-able error rather + # than potentially non-compliant advice. + raise RuntimeError( + "AI analysis couldn't be generated cleanly — please try again." + ) log.info( "portfolio_analysis.ok", n_positions=len(req.positions), prompt_tokens=llm.prompt_tokens, completion_tokens=llm.completion_tokens, - cost_usd=llm.cost_usd, + cost_usd=full_cost, ) return AnalysisResult( content=llm.content, model=llm.model, prompt_tokens=llm.prompt_tokens, completion_tokens=llm.completion_tokens, - cost_usd=llm.cost_usd, + cost_usd=full_cost, generated_at=datetime.now(timezone.utc), ) diff --git a/tests/conftest.py b/tests/conftest.py index b032028..e49e229 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -22,6 +22,38 @@ os.environ.setdefault("CASSANDRA_MOCK", "1") import pytest +@pytest.fixture(autouse=True) +def stub_reviewer(monkeypatch): + """Replace review_read with a clean-passing stub in every consumer + module. Tests that mock the generator's call_llm shouldn't also + have to mock the reviewer that runs after it — the reviewer is a + safety gate, not behaviour under test. + + Tests in test_output_review.py exercise review_read through its + own module and are unaffected. Tests that want to assert the + reviewer-rejected branch can override with their own + monkeypatch.setattr — later wins. + """ + from app.services.output_review import Verdict + + async def _clean(_client, _candidate): + return Verdict(clean=True, reason="stubbed-by-conftest", cost_usd=0.0) + + for mod_path in ( + "app.services.portfolio_analysis", + "app.routers.chat", + "app.jobs.ai_log_job", + "app.jobs.email_digest_job", + "app.jobs.indicator_summary_job", + ): + try: + mod = __import__(mod_path, fromlist=["review_read"]) + except ImportError: + continue + if hasattr(mod, "review_read"): + monkeypatch.setattr(mod, "review_read", _clean) + + @pytest.fixture async def db_factory(tmp_path): """Per-test sqlite engine + async session factory.