"""Hourly per-group indicator summaries — a short AI read at the top of each Indicators tab. Costs ~$0.0003 per call on DeepSeek V4 Flash, so 10+ groups hourly stays comfortably under the monthly cap.""" from __future__ import annotations import asyncio import json import httpx from sqlalchemy import desc, func, select from app.config import get_settings, load_groups from app.db import utcnow from app.jobs._helpers import job_lifecycle, log from app.jobs._market_context import latest_quotes_by_group, month_spend from app.models import ( AICall, IndicatorSummary, IndicatorSummaryTranslation, JobRun, User, ) from app.services.cadence import DEFAULT_POLICY from app.services.i18n import ACTIVE_LANGUAGES from app.services.llm_prompts import ( PROMPT_VERSION, build_aggregate_summary_system_prompt, build_aggregate_summary_user_prompt, build_summary_system_prompt, build_summary_user_prompt, ) from app.services.openrouter import ( active_model, call_llm, llm_configured, month_start, ) from app.services.output_review import review_read from app.services.translation import translate AGGREGATE_GROUP_NAME = "__all__" async def translate_summary_for_active_languages(session, summary_id: int) -> None: """Fan out per-language translations for one IndicatorSummary row. Mirrors ``ai_log_job.translate_log_for_active_languages``: reads the distinct non-en ``users.lang`` set, translates the English content once per active language in parallel via ``asyncio.gather``, and persists each result as an ``IndicatorSummaryTranslation`` row in its own savepoint so one bad row doesn't lose the rest. """ target_langs = sorted({l for l in ACTIVE_LANGUAGES if l != "en"}) if not target_langs: return active_langs = (await session.execute( select(User.lang).distinct().where(User.lang.in_(target_langs)) )).scalars().all() if not active_langs: return summary_row = await session.get(IndicatorSummary, summary_id) if summary_row is None: log.warning("ind_summary.translate.missing_summary", summary_id=summary_id) return async with httpx.AsyncClient(follow_redirects=True, timeout=60) as client: results = await asyncio.gather(*[ translate(client, summary_row.content, lang) for lang in active_langs ], return_exceptions=True) succeeded = 0 failed = 0 for lang, result in zip(active_langs, results): if isinstance(result, Exception): log.warning("ind_summary.translate.failed", lang=lang, summary_id=summary_id, error=str(result)[:200]) failed += 1 continue translated_md, llm_result = result try: async with session.begin_nested(): session.add(IndicatorSummaryTranslation( summary_id=summary_id, lang=lang, content=translated_md, generated_at=utcnow(), model=llm_result.model, prompt_tokens=llm_result.prompt_tokens, completion_tokens=llm_result.completion_tokens, cost_usd=llm_result.cost_usd, )) await session.commit() succeeded += 1 except Exception as exc: log.warning("ind_summary.translate.persist_failed", lang=lang, summary_id=summary_id, error=str(exc)[:200]) failed += 1 if failed and succeeded == 0: log.error("ind_summary.translate.all_failed", summary_id=summary_id, attempted=len(active_langs)) else: log.info("ind_summary.translate.done", summary_id=summary_id, succeeded=succeeded, failed=failed) # Defence-in-depth: read generation goes through JSON mode + a reviewer. # # 1. The system prompt instructs the model to emit {"read": "..."} only; # response_format={"type":"json_object"} forces well-formed JSON at # the API layer, so prose outside the field is impossible. # 2. We extract `read`, then ask a second LLM call (services/output_review) # whether the candidate text is publishable. Scratchpad INSIDE the # field — "Let's see…", "X? Actually Y?" — is caught here. # 3. Any failure at either stage (parse, missing field, reviewer veto, # reviewer error) drops the candidate. The previous good # IndicatorSummary stays visible. # # The old _LEAK_PATTERNS / clean_summary / looks_like_leakage regex # scaffolding lived here previously. It produced false positives (e.g. # chopping off a legitimate leading sentence like "The indicators are # pricing…") and false negatives (it never caught the chain-of-thought # patterns the model actually emits). The reviewer agent replaces it. def _extract_read(raw: str) -> str | None: """Parse the model's JSON envelope and return the "read" field, or None if the body isn't valid JSON / the field is missing / the field isn't a string. Conservative: on any deviation from the schema we drop the candidate rather than try to salvage it.""" try: parsed = json.loads(raw) except json.JSONDecodeError: return None if not isinstance(parsed, dict): return None read = parsed.get("read") if not isinstance(read, str): return None read = read.strip() return read or None async def _generate_one( session, client: httpx.AsyncClient, group: str, quotes: list[dict], system_prompt: str, model: str, tone: str, analysis: str, ) -> IndicatorSummary | None: """Generate + persist one group's summary. Returns the new row on success (so the caller can fan out localized translations after the commit picks up its id) or None on failure. `model` is retained for ledger labelling but call_llm now picks the active-provider model itself.""" user_prompt = build_summary_user_prompt(group, quotes) try: result = await call_llm( client, [{"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt}], max_tokens=800, # DeepSeek sometimes spends 300+ on internal reasoning response_format={"type": "json_object"}, ) except Exception as e: session.add(AICall(model=active_model(), status="error", error=str(e)[:500])) log.warning("ind_summary.failed", group=group, error=str(e)[:120]) return None candidate = _extract_read(result.content) if candidate is None or len(candidate) < 40: # JSON envelope malformed, "read" field missing/wrong type, or # the candidate is too short to be a real read. Don't persist; # the last good summary stays visible. log.warning("ind_summary.json_invalid", group=group, preview=result.content[:160]) session.add(AICall( model=result.model, prompt_tokens=result.prompt_tokens, completion_tokens=result.completion_tokens, cost_usd=result.cost_usd, status="leaked", )) return None verdict = await review_read(client, candidate) if not verdict.clean: # Reviewer caught scratchpad / meta-commentary / partial text # INSIDE the read field. Drop the candidate; the previous good # summary continues to serve. log.warning("ind_summary.reviewer_rejected", group=group, reason=verdict.reason, preview=candidate[:120]) session.add(AICall( model=result.model, prompt_tokens=result.prompt_tokens, completion_tokens=result.completion_tokens, cost_usd=(result.cost_usd or 0.0) + (verdict.cost_usd or 0.0), status="leaked", )) return None summary = IndicatorSummary( group_name=group, generated_at=utcnow(), model=result.model, tone=tone, analysis=analysis, prompt_version=PROMPT_VERSION, content=candidate, prompt_tokens=result.prompt_tokens, completion_tokens=result.completion_tokens, # Include the reviewer's cost in the row's recorded spend so the # monthly budget tracking covers the full pipeline cost. cost_usd=(result.cost_usd or 0.0) + (verdict.cost_usd or 0.0), ) session.add(summary) session.add(AICall( model=result.model, prompt_tokens=result.prompt_tokens, completion_tokens=result.completion_tokens, cost_usd=(result.cost_usd or 0.0) + (verdict.cost_usd or 0.0), status="ok", )) return summary async def run() -> None: async with job_lifecycle("indicator_summary_job") as (session, jr): if jr.status == "skipped": return s = get_settings() if not llm_configured(): log.warning("ind_summary.skipped_no_key", provider=s.LLM_PROVIDER) jr.status = "skipped" return # Cadence — same policy as ai_log_job: hourly during EU/US active, # throttled off-hours and weekends. last_success = (await session.execute( select(func.max(JobRun.finished_at)).where( JobRun.name == "indicator_summary_job", JobRun.status == "success", ) )).scalar() should_run, reason = DEFAULT_POLICY.should_run(last_success) if not should_run: log.info("ind_summary.cadence_skip", reason=reason) jr.status = "skipped" jr.error = reason return spent = await month_spend(session) if spent >= s.OPENROUTER_MONTHLY_CAP_USD: jr.status = "skipped" jr.error = f"monthly cap reached (${spent:.2f})" return groups = await latest_quotes_by_group(session) # Only summarise groups currently configured in TOML — drops stale # group names (e.g. an old "pie" before T212 sourcing) that still have # quotes in the table but no UI presence. configured = set(load_groups(s.BASELINE_TOML, s.PORTFOLIO_TOML).keys()) groups = {g: q for g, q in groups.items() if g in configured} if not groups: jr.status = "skipped" return # Phase 2 voice pivot (PROMPT_VERSION 6): generate both tones each # run so the dashboard toggle is instant. ANALYSIS stays on the # operator-configured default. analysis = (s.CASSANDRA_ANALYSIS or "SPECULATIVE").upper() tones = ("NOVICE", "INTERMEDIATE") written = 0 async with httpx.AsyncClient(follow_redirects=True) as client: # Sequential rather than parallel — OpenRouter free tiers can # throttle bursts; total work is small (~14-16 calls × ~5s each). for tone in tones: system_prompt = build_summary_system_prompt(tone, analysis) for group, quotes in groups.items(): summary = await _generate_one( session, client, group, quotes, system_prompt, active_model(), tone, analysis, ) if summary is not None: written += 1 await session.commit() # partial progress survives mid-job error if summary is not None: await translate_summary_for_active_languages(session, summary.id) # One aggregate read across all groups, stored under __all__. # Same JSON-mode + reviewer-agent path as per-group reads. agg_system = build_aggregate_summary_system_prompt(tone, analysis) agg_user = build_aggregate_summary_user_prompt(groups) agg_summary: IndicatorSummary | None = None try: result = await call_llm( client, [{"role": "system", "content": agg_system}, {"role": "user", "content": agg_user}], max_tokens=1500, response_format={"type": "json_object"}, ) candidate = _extract_read(result.content) if candidate is None or len(candidate) < 40: log.warning("ind_summary.agg_json_invalid", tone=tone, preview=result.content[:160]) session.add(AICall( model=result.model, prompt_tokens=result.prompt_tokens, completion_tokens=result.completion_tokens, cost_usd=result.cost_usd, status="leaked", )) else: verdict = await review_read(client, candidate) full_cost = (result.cost_usd or 0.0) + (verdict.cost_usd or 0.0) if not verdict.clean: log.warning("ind_summary.agg_reviewer_rejected", tone=tone, reason=verdict.reason, preview=candidate[:120]) session.add(AICall( model=result.model, prompt_tokens=result.prompt_tokens, completion_tokens=result.completion_tokens, cost_usd=full_cost, status="leaked", )) else: agg_summary = IndicatorSummary( group_name=AGGREGATE_GROUP_NAME, generated_at=utcnow(), model=result.model, tone=tone, analysis=analysis, prompt_version=PROMPT_VERSION, content=candidate, prompt_tokens=result.prompt_tokens, completion_tokens=result.completion_tokens, cost_usd=full_cost, ) session.add(agg_summary) session.add(AICall( model=result.model, prompt_tokens=result.prompt_tokens, completion_tokens=result.completion_tokens, cost_usd=full_cost, status="ok", )) written += 1 except Exception as e: session.add(AICall( model=active_model(), status="error", error=f"{tone}/agg: {str(e)[:480]}", )) log.warning("ind_summary.agg_failed", tone=tone, error=str(e)[:120]) await session.commit() if agg_summary is not None: await translate_summary_for_active_languages(session, agg_summary.id) jr.items_written = written log.info("ind_summary.done", groups=len(groups), tones=len(tones), written=written) if __name__ == "__main__": asyncio.run(run())