"""Hourly per-group indicator summaries — a short AI read at the top of each Indicators tab. Costs ~$0.0003 per call on DeepSeek V4 Flash, so 10+ groups hourly stays comfortably under the monthly cap.""" from __future__ import annotations import asyncio import re from collections import defaultdict import httpx from sqlalchemy import desc, func, select from app.config import get_settings, load_groups from app.db import utcnow from app.jobs._helpers import job_lifecycle, log from app.models import AICall, IndicatorSummary, JobRun, Quote from app.services.cadence import DEFAULT_POLICY from app.services.openrouter import ( PROMPT_VERSION, build_aggregate_summary_system_prompt, build_aggregate_summary_user_prompt, build_summary_system_prompt, build_summary_user_prompt, call_openrouter, month_start, ) AGGREGATE_GROUP_NAME = "__all__" # Strip known meta-commentary openers the model sometimes leaks despite the # prompt's hard constraints. Each pattern matches one leading sentence. _LEAK_PATTERNS = [ re.compile(p, re.IGNORECASE | re.DOTALL) for p in ( # First-person meta — "I need to / I'll / I have to / I'm going to ..." r"^i\s+(?:need|have|must|should|am going|'ll|will|shall|can|am)[^.]*\.\s*", # "We need / we're / we are asked / we will ..." r"^we\s+(?:need|are|'re|will|shall|can|should|must|have)[^.]*\.\s*", r"^let\s+(?:me|us|'?s)[^.]*\.\s*", r"^here['’]s[^.]*\.\s*", r"^sure[,!]?\s[^.]*\.\s*", r"^looking at[^.]*\.\s*", r"^based on[^.]*\.\s*", r"^to (?:address|answer|write|summarise|summarize)[^.]*\.\s*", r"^first[,]?\s[^.]*\.\s*", r"^the (?:user|data shows|reader|task|request|reader sees|instructions?)[^.]*\.\s*", r"^summary[:.]\s*", r"^key\s*[:\-—]\s*", r"^must\s+(?:be|cite|explain|avoid|give|stay|provide)[^.]*\.\s*", r"^should\s+(?:be|give|cite|explain|avoid|provide)[^.]*\.\s*", r"^avoid[^.]*\.\s*", r"^cite\s+at\s+most[^.]*\.\s*", r"^be\s+(?:speculative|specific|concise|brief)[^.]*\.\s*", r"^stay\s+on[^.]*\.\s*", r"^okay[,]?\s+", r"^alright[,]?\s+", r"^thinking[^.]*\.\s*", # Prompt-leak prefixes — the model echoes example framing or rule # headers from the system prompt. r"^(?:good|bad|positive|negative)\s+example\s*[:\-—]\s*", r"^example\s+(?:good|bad)\s*[:\-—]\s*", r"^example\s*[:\-—]\s*", r"^reference\s+style\s*[:\-—]\s*", # Prompt label echoes (markdown-style or plain-text) r"^(?:hard\s+)?constraints?\s*[:\-—][^.\n]*[.\n]\s*", r"^key\s+observations?\s*[:\-—]\s*", r"^observations?\s*[:\-—]\s*", r"^focus\s+on[^.]*\.\s*", r"^output\s+the\s+read[^.]*\.\s*", r"^plain\s+prose[^.]*\.\s*", r"^the\s+indicators?[^.]*\.\s*", # "The indicators include..." / "The indicators are..." r"^indicators?\s*[:\-—]\s*", r"^data\s*[:\-—]\s*", r"^analysis\s*[:\-—]\s*", r"^interpretation\s*[:\-—]\s*", r"^read\s*[:\-—]\s*", r"^note\s*[:\-—]\s*", # Sometimes the response gets wrapped in literal quotes r"^[\"“'`]+", ) ] _TRAILING_QUOTE = re.compile(r"[\"”'`]+\s*$") # Tell-tale phrases that mean the model regurgitated the prompt as its # "answer" — we'd rather show nothing than show this. _LEAKAGE_FLAGS = ( "≤60 words", "60 words", "must be under", "must cite", "must explain", "no meta-commentary", "no buy/sell", "horizon. ", "1-day moves", "the instructions are", "instructions:", "constraints:", "hard constraints", "good example", "bad example", "reference style", ) def looks_like_leakage(text: str) -> bool: """Heuristic: after cleaning, if these phrases still appear, the output is contaminated prompt-regurgitation and shouldn't be shown.""" low = text.lower() return any(flag in low for flag in _LEAKAGE_FLAGS) def clean_summary(text: str) -> str: """Strip leading meta-commentary. If cleaning removes nearly everything (suggesting the model emitted reasoning then ran out of tokens), fall back to the last non-empty paragraph of the raw output — that's usually where the actual answer ended up.""" raw = text.strip() out = raw # Up to 6 passes: handles compound leakage like # "Constraints: <...>. The indicators are: <...>. " for _ in range(6): before = out for pat in _LEAK_PATTERNS: out = pat.sub("", out, count=1).lstrip() if out == before: break if len(out) < 60 and len(raw) > 120: # Cleaning ate too much; take the last non-empty paragraph of raw. paragraphs = [p.strip() for p in re.split(r"\n\s*\n", raw) if p.strip()] if paragraphs: out = paragraphs[-1] # Re-strip leaders from the recovered paragraph too. for _ in range(2): before = out for pat in _LEAK_PATTERNS: out = pat.sub("", out, count=1).lstrip() if out == before: break # Trim any orphan closing quote/backtick from the wrap-strip above. out = _TRAILING_QUOTE.sub("", out).rstrip() return out async def _latest_quotes_by_group(session) -> dict[str, list[dict]]: """Latest non-null quote per (group, symbol). Drops error rows.""" sub = ( select(Quote.group_name, Quote.symbol, func.max(Quote.fetched_at).label("mx")) .group_by(Quote.group_name, Quote.symbol) .subquery() ) rows = (await session.execute( select(Quote).join( sub, (Quote.group_name == sub.c.group_name) & (Quote.symbol == sub.c.symbol) & (Quote.fetched_at == sub.c.mx), ).where(Quote.price.is_not(None)) .order_by(Quote.group_name, Quote.symbol) )).scalars().all() by_group: dict[str, list[dict]] = defaultdict(list) for q in rows: by_group[q.group_name].append({ "symbol": q.symbol, "label": q.label, "price": q.price, "currency": q.currency, "as_of": q.as_of, "changes": q.changes, }) return by_group async def _month_spend(session) -> float: total = (await session.execute( select(func.coalesce(func.sum(AICall.cost_usd), 0.0)) .where(AICall.called_at >= month_start()) )).scalar() return float(total or 0.0) async def _generate_one( session, client: httpx.AsyncClient, group: str, quotes: list[dict], system_prompt: str, model: str, tone: str, analysis: str, ) -> bool: """Generate + persist one group's summary. Returns True on success.""" user_prompt = build_summary_user_prompt(group, quotes) try: result = await call_openrouter( client, [{"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt}], model=model, max_tokens=800, # DeepSeek sometimes spends 300+ on internal reasoning ) except Exception as e: session.add(AICall(model=model, status="error", error=str(e)[:500])) log.warning("ind_summary.failed", group=group, error=str(e)[:120]) return False cleaned = clean_summary(result.content) if looks_like_leakage(cleaned) or len(cleaned) < 40: # Model regurgitated the prompt or produced nothing usable. # Don't persist — keep the last good summary visible. Log it so # we can see the rate of failures over time. log.warning("ind_summary.leakage_detected", group=group, preview=cleaned[:120]) session.add(AICall( model=result.model, prompt_tokens=result.prompt_tokens, completion_tokens=result.completion_tokens, cost_usd=result.cost_usd, status="leaked", )) return False session.add(IndicatorSummary( group_name=group, generated_at=utcnow(), model=result.model, tone=tone, analysis=analysis, prompt_version=PROMPT_VERSION, content=cleaned, prompt_tokens=result.prompt_tokens, completion_tokens=result.completion_tokens, cost_usd=result.cost_usd, )) session.add(AICall( model=result.model, prompt_tokens=result.prompt_tokens, completion_tokens=result.completion_tokens, cost_usd=result.cost_usd, status="ok", )) return True async def run() -> None: async with job_lifecycle("indicator_summary_job") as (session, jr): if jr.status == "skipped": return s = get_settings() if not s.OPENROUTER_API_KEY: jr.status = "skipped" return # Cadence — same policy as ai_log_job: hourly during EU/US active, # throttled off-hours and weekends. last_success = (await session.execute( select(func.max(JobRun.finished_at)).where( JobRun.name == "indicator_summary_job", JobRun.status == "success", ) )).scalar() should_run, reason = DEFAULT_POLICY.should_run(last_success) if not should_run: log.info("ind_summary.cadence_skip", reason=reason) jr.status = "skipped" jr.error = reason return spent = await _month_spend(session) if spent >= s.OPENROUTER_MONTHLY_CAP_USD: jr.status = "skipped" jr.error = f"monthly cap reached (${spent:.2f})" return groups = await _latest_quotes_by_group(session) # Only summarise groups currently configured in TOML — drops stale # group names (e.g. an old "pie" before T212 sourcing) that still have # quotes in the table but no UI presence. configured = set(load_groups(s.BASELINE_TOML, s.PORTFOLIO_TOML).keys()) groups = {g: q for g, q in groups.items() if g in configured} if not groups: jr.status = "skipped" return tone = s.CASSANDRA_TONE.upper() analysis = s.CASSANDRA_ANALYSIS.upper() system_prompt = build_summary_system_prompt(tone, analysis) written = 0 async with httpx.AsyncClient(follow_redirects=True) as client: # Sequential rather than parallel — OpenRouter free tiers can # throttle bursts; total work is small (~12 calls × ~5s each). for group, quotes in groups.items(): ok = await _generate_one( session, client, group, quotes, system_prompt, s.OPENROUTER_MODEL, tone, analysis, ) if ok: written += 1 await session.commit() # partial progress survives mid-job error # One aggregate read across all groups, stored under __all__. agg_system = build_aggregate_summary_system_prompt(tone, analysis) agg_user = build_aggregate_summary_user_prompt(groups) try: result = await call_openrouter( client, [{"role": "system", "content": agg_system}, {"role": "user", "content": agg_user}], model=s.OPENROUTER_MODEL, max_tokens=1500, # room for reasoning + 80-word output ) session.add(IndicatorSummary( group_name=AGGREGATE_GROUP_NAME, generated_at=utcnow(), model=result.model, tone=tone, analysis=analysis, prompt_version=PROMPT_VERSION, content=clean_summary(result.content), prompt_tokens=result.prompt_tokens, completion_tokens=result.completion_tokens, cost_usd=result.cost_usd, )) session.add(AICall( model=result.model, prompt_tokens=result.prompt_tokens, completion_tokens=result.completion_tokens, cost_usd=result.cost_usd, status="ok", )) written += 1 except Exception as e: session.add(AICall( model=s.OPENROUTER_MODEL, status="error", error=str(e)[:500], )) log.warning("ind_summary.agg_failed", error=str(e)[:120]) await session.commit() jr.items_written = written log.info("ind_summary.done", groups=len(groups), written=written) if __name__ == "__main__": asyncio.run(run())