"""Hourly AI strategic-log generator. Pulls already-persisted market data and headlines from the DB (no live fetches), calls OpenRouter, persists the log and a row in the cost ledger.""" from __future__ import annotations import asyncio from collections import defaultdict from datetime import timedelta import httpx from sqlalchemy import desc, func, select from app.config import get_settings from app.db import utcnow from app.jobs._helpers import job_lifecycle, log from app.models import AICall, Headline, JobRun, Quote, StrategicLog from app.services.cadence import DEFAULT_POLICY from app.services.openrouter import ( PROMPT_VERSION, active_model, build_system_prompt, build_user_prompt, call_llm, llm_configured, month_start, ) REFERENCE_LINE = ( "S&P 7,501 (ATH) · VIX 18.0 · US 10y 4.45% · HY OAS 279bps · " "Brent $109/bbl · Gold $4,651/oz · CPI 3.8% YoY" ) async def _latest_quotes_by_group(session) -> dict[str, list[dict]]: """Latest quote per (group, symbol). Skips error rows where price is null.""" sub = ( select( Quote.group_name, Quote.symbol, func.max(Quote.fetched_at).label("mx"), ) .group_by(Quote.group_name, Quote.symbol) .subquery() ) stmt = ( select(Quote) .join( sub, (Quote.group_name == sub.c.group_name) & (Quote.symbol == sub.c.symbol) & (Quote.fetched_at == sub.c.mx), ) .order_by(Quote.group_name, Quote.symbol) ) rows = (await session.execute(stmt)).scalars().all() by_group: dict[str, list[dict]] = defaultdict(list) for q in rows: by_group[q.group_name].append(dict( symbol=q.symbol, source=q.source, label=q.label, note="", price=q.price, currency=q.currency, as_of=q.as_of, changes=q.changes, )) return by_group async def _recent_headlines_by_bucket(session, hours: float = 24) -> dict[str, list[dict]]: """Last N hours of headlines, bucketed by category. Hard cap per bucket to keep the prompt under ~40KB.""" cutoff = utcnow() - timedelta(hours=hours) stmt = ( select(Headline) .where(Headline.published_at >= cutoff) .order_by(desc(Headline.published_at)) .limit(400) ) rows = (await session.execute(stmt)).scalars().all() by_bucket: dict[str, list[dict]] = defaultdict(list) for h in rows: if len(by_bucket[h.category]) >= 40: continue by_bucket[h.category].append(dict( when=h.published_at.isoformat(), source=h.source, title=h.title, )) return by_bucket async def _month_spend(session) -> float: start = month_start() total = (await session.execute( select(func.coalesce(func.sum(AICall.cost_usd), 0.0)) .where(AICall.called_at >= start) )).scalar() return float(total or 0.0) async def run() -> None: async with job_lifecycle("ai_log_job") as (session, jr): if jr.status == "skipped": return s = get_settings() if not llm_configured(): log.warning("ai_log.skipped_no_key", provider=s.LLM_PROVIDER) jr.status = "skipped" return # Cadence: hourly during EU/US active hours; throttled off-hours. last_success = (await session.execute( select(func.max(JobRun.finished_at)).where( JobRun.name == "ai_log_job", JobRun.status == "success", ) )).scalar() should_run, reason = DEFAULT_POLICY.should_run(last_success) if not should_run: log.info("ai_log.cadence_skip", reason=reason) jr.status = "skipped" jr.error = reason return spent = await _month_spend(session) if spent >= s.OPENROUTER_MONTHLY_CAP_USD: log.warning("ai_log.cap_reached", spent=spent, cap=s.OPENROUTER_MONTHLY_CAP_USD) jr.status = "skipped" jr.error = f"monthly cost cap reached (${spent:.2f})" return quotes = await _latest_quotes_by_group(session) news = await _recent_headlines_by_bucket(session) if not quotes and not news: log.warning("ai_log.no_data_yet") jr.status = "skipped" return # Look up the most recent log generated today (UTC) so the model can # update it rather than start from scratch. This gives the model # temporal awareness — "since this morning's read, X has changed". today_start = utcnow().replace(hour=0, minute=0, second=0, microsecond=0) previous_log = (await session.execute( select(StrategicLog) .where(StrategicLog.generated_at >= today_start) .order_by(desc(StrategicLog.generated_at)) .limit(1) )).scalar_one_or_none() anchor = s.CASSANDRA_ANCHOR_DATE or None user_prompt = build_user_prompt( today=utcnow(), anchor=anchor, quotes_by_group=quotes, headlines_by_bucket=news, reference_line=REFERENCE_LINE, previous_log=previous_log, ) # Phase 2 voice pivot (PROMPT_VERSION 6): generate both tones per # run so the dashboard toggle is instant. Analysis stays on the # operator-configured default (DRY|SPECULATIVE is a system-wide # preference, not a per-user toggle). PRO was dropped. analysis = (s.CASSANDRA_ANALYSIS or "SPECULATIVE").upper() variants = [ ("NOVICE", analysis), ("INTERMEDIATE", analysis), ] written = 0 async with httpx.AsyncClient(follow_redirects=True) as client: for tone, analysis in variants: # Re-check cost cap between variants so a runaway run is # bounded. spent = await _month_spend(session) if spent >= s.OPENROUTER_MONTHLY_CAP_USD: log.warning("ai_log.cap_reached_midrun", spent=spent, completed=written) break system_prompt = build_system_prompt(tone, analysis) try: result = await call_llm( client, [{"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt}], ) except Exception as e: session.add(AICall( model=active_model(), status="error", error=f"{tone}/{analysis}: {str(e)[:480]}", )) await session.commit() log.error("ai_log.variant_failed", tone=tone, analysis=analysis, error=str(e)[:200]) continue session.add(StrategicLog( generated_at=utcnow(), model=result.model, anchor_date=anchor, prompt_version=PROMPT_VERSION, tone=tone, analysis=analysis, content=result.content, prompt_tokens=result.prompt_tokens, completion_tokens=result.completion_tokens, cost_usd=result.cost_usd, )) session.add(AICall( model=result.model, prompt_tokens=result.prompt_tokens, completion_tokens=result.completion_tokens, cost_usd=result.cost_usd, status="ok", )) await session.commit() written += 1 log.info("ai_log.variant_done", tone=tone, analysis=analysis, prompt_tokens=result.prompt_tokens, completion_tokens=result.completion_tokens) jr.items_written = written log.info("ai_log.done", variants=written, total=len(variants)) if __name__ == "__main__": asyncio.run(run())