"""Hourly per-group indicator summaries — a short AI read at the top of each Indicators tab. Costs ~$0.0003 per call on DeepSeek V4 Flash, so 10+ groups hourly stays comfortably under the monthly cap.""" from __future__ import annotations import asyncio import re import httpx from sqlalchemy import desc, func, select from app.config import get_settings, load_groups from app.db import utcnow from app.jobs._helpers import job_lifecycle, log from app.jobs._market_context import latest_quotes_by_group, month_spend from app.models import ( AICall, IndicatorSummary, IndicatorSummaryTranslation, JobRun, User, ) from app.services.cadence import DEFAULT_POLICY from app.services.i18n import ACTIVE_LANGUAGES from app.services.openrouter import ( PROMPT_VERSION, active_model, build_aggregate_summary_system_prompt, build_aggregate_summary_user_prompt, build_summary_system_prompt, build_summary_user_prompt, call_llm, llm_configured, month_start, ) from app.services.translation import translate AGGREGATE_GROUP_NAME = "__all__" async def translate_summary_for_active_languages(session, summary_id: int) -> None: """Fan out per-language translations for one IndicatorSummary row. Mirrors ``ai_log_job.translate_log_for_active_languages``: reads the distinct non-en ``users.lang`` set, translates the English content once per active language in parallel via ``asyncio.gather``, and persists each result as an ``IndicatorSummaryTranslation`` row. Per-language failures are logged but never raise. """ target_langs = sorted({l for l in ACTIVE_LANGUAGES if l != "en"}) if not target_langs: return active_langs = (await session.execute( select(User.lang).distinct().where(User.lang.in_(target_langs)) )).scalars().all() if not active_langs: return summary_row = await session.get(IndicatorSummary, summary_id) if summary_row is None: log.warning("ind_summary.translate.missing_summary", summary_id=summary_id) return async with httpx.AsyncClient(follow_redirects=True, timeout=60) as client: results = await asyncio.gather(*[ translate(client, summary_row.content, lang) for lang in active_langs ], return_exceptions=True) for lang, result in zip(active_langs, results): if isinstance(result, Exception): log.warning("ind_summary.translate.failed", lang=lang, summary_id=summary_id, error=str(result)[:200]) continue translated_md, llm_result = result session.add(IndicatorSummaryTranslation( summary_id=summary_id, lang=lang, content=translated_md, generated_at=utcnow(), model=llm_result.model, prompt_tokens=llm_result.prompt_tokens, completion_tokens=llm_result.completion_tokens, cost_usd=llm_result.cost_usd, )) await session.commit() # Strip known meta-commentary openers the model sometimes leaks despite the # prompt's hard constraints. Each pattern matches one leading sentence. _LEAK_PATTERNS = [ re.compile(p, re.IGNORECASE | re.DOTALL) for p in ( # First-person meta — "I need to / I'll / I have to / I'm going to ..." r"^i\s+(?:need|have|must|should|am going|'ll|will|shall|can|am)[^.]*\.\s*", # "We need / we're / we are asked / we will ..." r"^we\s+(?:need|are|'re|will|shall|can|should|must|have)[^.]*\.\s*", r"^let\s+(?:me|us|'?s)[^.]*\.\s*", r"^here['’]s[^.]*\.\s*", r"^sure[,!]?\s[^.]*\.\s*", r"^looking at[^.]*\.\s*", r"^based on[^.]*\.\s*", r"^to (?:address|answer|write|summarise|summarize)[^.]*\.\s*", r"^first[,]?\s[^.]*\.\s*", r"^the (?:user|data shows|reader|task|request|reader sees|instructions?)[^.]*\.\s*", r"^summary[:.]\s*", r"^key\s*[:\-—]\s*", r"^must\s+(?:be|cite|explain|avoid|give|stay|provide)[^.]*\.\s*", r"^should\s+(?:be|give|cite|explain|avoid|provide)[^.]*\.\s*", r"^avoid[^.]*\.\s*", r"^cite\s+at\s+most[^.]*\.\s*", r"^be\s+(?:speculative|specific|concise|brief)[^.]*\.\s*", r"^stay\s+on[^.]*\.\s*", r"^okay[,]?\s+", r"^alright[,]?\s+", r"^thinking[^.]*\.\s*", # Prompt-leak prefixes — the model echoes example framing or rule # headers from the system prompt. r"^(?:good|bad|positive|negative)\s+example\s*[:\-—]\s*", r"^example\s+(?:good|bad)\s*[:\-—]\s*", r"^example\s*[:\-—]\s*", r"^reference\s+style\s*[:\-—]\s*", # Prompt label echoes (markdown-style or plain-text) r"^(?:hard\s+)?constraints?\s*[:\-—][^.\n]*[.\n]\s*", r"^key\s+observations?\s*[:\-—]\s*", r"^observations?\s*[:\-—]\s*", r"^focus\s+on[^.]*\.\s*", r"^output\s+the\s+read[^.]*\.\s*", r"^plain\s+prose[^.]*\.\s*", r"^the\s+indicators?[^.]*\.\s*", # "The indicators include..." / "The indicators are..." r"^indicators?\s*[:\-—]\s*", r"^data\s*[:\-—]\s*", r"^analysis\s*[:\-—]\s*", r"^interpretation\s*[:\-—]\s*", r"^read\s*[:\-—]\s*", r"^note\s*[:\-—]\s*", # Sometimes the response gets wrapped in literal quotes r"^[\"“'`]+", ) ] _TRAILING_QUOTE = re.compile(r"[\"”'`]+\s*$") # Tell-tale phrases that mean the model regurgitated the prompt as its # "answer" — we'd rather show nothing than show this. _LEAKAGE_FLAGS = ( "≤60 words", "60 words", "must be under", "must cite", "must explain", "no meta-commentary", "no buy/sell", "horizon. ", "1-day moves", "the instructions are", "instructions:", "constraints:", "hard constraints", "good example", "bad example", "reference style", ) def looks_like_leakage(text: str) -> bool: """Heuristic: after cleaning, if these phrases still appear, the output is contaminated prompt-regurgitation and shouldn't be shown.""" low = text.lower() return any(flag in low for flag in _LEAKAGE_FLAGS) def clean_summary(text: str) -> str: """Strip leading meta-commentary. If cleaning removes nearly everything (suggesting the model emitted reasoning then ran out of tokens), fall back to the last non-empty paragraph of the raw output — that's usually where the actual answer ended up.""" raw = text.strip() out = raw # Up to 6 passes: handles compound leakage like # "Constraints: <...>. The indicators are: <...>. " for _ in range(6): before = out for pat in _LEAK_PATTERNS: out = pat.sub("", out, count=1).lstrip() if out == before: break if len(out) < 60 and len(raw) > 120: # Cleaning ate too much; take the last non-empty paragraph of raw. paragraphs = [p.strip() for p in re.split(r"\n\s*\n", raw) if p.strip()] if paragraphs: out = paragraphs[-1] # Re-strip leaders from the recovered paragraph too. for _ in range(2): before = out for pat in _LEAK_PATTERNS: out = pat.sub("", out, count=1).lstrip() if out == before: break # Trim any orphan closing quote/backtick from the wrap-strip above. out = _TRAILING_QUOTE.sub("", out).rstrip() return out async def _generate_one( session, client: httpx.AsyncClient, group: str, quotes: list[dict], system_prompt: str, model: str, tone: str, analysis: str, ) -> IndicatorSummary | None: """Generate + persist one group's summary. Returns the new row on success (so the caller can fan out localized translations after the commit picks up its id) or None on failure. `model` is retained for ledger labelling but call_llm now picks the active-provider model itself.""" user_prompt = build_summary_user_prompt(group, quotes) try: result = await call_llm( client, [{"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt}], max_tokens=800, # DeepSeek sometimes spends 300+ on internal reasoning ) except Exception as e: session.add(AICall(model=active_model(), status="error", error=str(e)[:500])) log.warning("ind_summary.failed", group=group, error=str(e)[:120]) return None cleaned = clean_summary(result.content) if looks_like_leakage(cleaned) or len(cleaned) < 40: # Model regurgitated the prompt or produced nothing usable. # Don't persist — keep the last good summary visible. Log it so # we can see the rate of failures over time. log.warning("ind_summary.leakage_detected", group=group, preview=cleaned[:120]) session.add(AICall( model=result.model, prompt_tokens=result.prompt_tokens, completion_tokens=result.completion_tokens, cost_usd=result.cost_usd, status="leaked", )) return None summary = IndicatorSummary( group_name=group, generated_at=utcnow(), model=result.model, tone=tone, analysis=analysis, prompt_version=PROMPT_VERSION, content=cleaned, prompt_tokens=result.prompt_tokens, completion_tokens=result.completion_tokens, cost_usd=result.cost_usd, ) session.add(summary) session.add(AICall( model=result.model, prompt_tokens=result.prompt_tokens, completion_tokens=result.completion_tokens, cost_usd=result.cost_usd, status="ok", )) return summary async def run() -> None: async with job_lifecycle("indicator_summary_job") as (session, jr): if jr.status == "skipped": return s = get_settings() if not llm_configured(): log.warning("ind_summary.skipped_no_key", provider=s.LLM_PROVIDER) jr.status = "skipped" return # Cadence — same policy as ai_log_job: hourly during EU/US active, # throttled off-hours and weekends. last_success = (await session.execute( select(func.max(JobRun.finished_at)).where( JobRun.name == "indicator_summary_job", JobRun.status == "success", ) )).scalar() should_run, reason = DEFAULT_POLICY.should_run(last_success) if not should_run: log.info("ind_summary.cadence_skip", reason=reason) jr.status = "skipped" jr.error = reason return spent = await month_spend(session) if spent >= s.OPENROUTER_MONTHLY_CAP_USD: jr.status = "skipped" jr.error = f"monthly cap reached (${spent:.2f})" return groups = await latest_quotes_by_group(session) # Only summarise groups currently configured in TOML — drops stale # group names (e.g. an old "pie" before T212 sourcing) that still have # quotes in the table but no UI presence. configured = set(load_groups(s.BASELINE_TOML, s.PORTFOLIO_TOML).keys()) groups = {g: q for g, q in groups.items() if g in configured} if not groups: jr.status = "skipped" return # Phase 2 voice pivot (PROMPT_VERSION 6): generate both tones each # run so the dashboard toggle is instant. ANALYSIS stays on the # operator-configured default. analysis = (s.CASSANDRA_ANALYSIS or "SPECULATIVE").upper() tones = ("NOVICE", "INTERMEDIATE") written = 0 async with httpx.AsyncClient(follow_redirects=True) as client: # Sequential rather than parallel — OpenRouter free tiers can # throttle bursts; total work is small (~14-16 calls × ~5s each). for tone in tones: system_prompt = build_summary_system_prompt(tone, analysis) for group, quotes in groups.items(): summary = await _generate_one( session, client, group, quotes, system_prompt, active_model(), tone, analysis, ) if summary is not None: written += 1 await session.commit() # partial progress survives mid-job error if summary is not None: await translate_summary_for_active_languages(session, summary.id) # One aggregate read across all groups, stored under __all__. agg_system = build_aggregate_summary_system_prompt(tone, analysis) agg_user = build_aggregate_summary_user_prompt(groups) agg_summary: IndicatorSummary | None = None try: result = await call_llm( client, [{"role": "system", "content": agg_system}, {"role": "user", "content": agg_user}], max_tokens=1500, # room for reasoning + 80-word output ) agg_summary = IndicatorSummary( group_name=AGGREGATE_GROUP_NAME, generated_at=utcnow(), model=result.model, tone=tone, analysis=analysis, prompt_version=PROMPT_VERSION, content=clean_summary(result.content), prompt_tokens=result.prompt_tokens, completion_tokens=result.completion_tokens, cost_usd=result.cost_usd, ) session.add(agg_summary) session.add(AICall( model=result.model, prompt_tokens=result.prompt_tokens, completion_tokens=result.completion_tokens, cost_usd=result.cost_usd, status="ok", )) written += 1 except Exception as e: session.add(AICall( model=active_model(), status="error", error=f"{tone}/agg: {str(e)[:480]}", )) log.warning("ind_summary.agg_failed", tone=tone, error=str(e)[:120]) await session.commit() if agg_summary is not None: await translate_summary_for_active_languages(session, agg_summary.id) jr.items_written = written log.info("ind_summary.done", groups=len(groups), tones=len(tones), written=written) if __name__ == "__main__": asyncio.run(run())