"""Shared market-context helpers consumed by LLM-driven jobs. Both ai_log_job and email_digest_job pull "the latest tape" the same way — most-recent quote per (group, symbol), last N hours of headlines bucketed by category, and the running month's LLM spend. Moved here so neither job depends on the other's internals. """ from __future__ import annotations from collections import defaultdict from datetime import timedelta from sqlalchemy import desc, func, select from app.db import utcnow from app.models import AICall, Headline, Quote from app.services.openrouter import month_start REFERENCE_LINE = ( "S&P 7,501 (ATH) · VIX 18.0 · US 10y 4.45% · HY OAS 279bps · " "Brent $109/bbl · Gold $4,651/oz · CPI 3.8% YoY" ) async def latest_quotes_by_group(session) -> dict[str, list[dict]]: """Latest quote per (group, symbol). Skips error rows where price is null.""" sub = ( select( Quote.group_name, Quote.symbol, func.max(Quote.fetched_at).label("mx"), ) .group_by(Quote.group_name, Quote.symbol) .subquery() ) stmt = ( select(Quote) .join( sub, (Quote.group_name == sub.c.group_name) & (Quote.symbol == sub.c.symbol) & (Quote.fetched_at == sub.c.mx), ) .where(Quote.price.is_not(None)) .order_by(Quote.group_name, Quote.symbol) ) rows = (await session.execute(stmt)).scalars().all() by_group: dict[str, list[dict]] = defaultdict(list) for q in rows: by_group[q.group_name].append(dict( symbol=q.symbol, source=q.source, label=q.label, note="", price=q.price, currency=q.currency, as_of=q.as_of, changes=q.changes, )) return by_group async def recent_headlines_by_bucket(session, hours: float = 24) -> dict[str, list[dict]]: """Last N hours of headlines, bucketed by category. Hard cap per bucket to keep the prompt under ~40KB.""" cutoff = utcnow() - timedelta(hours=hours) stmt = ( select(Headline) .where(Headline.published_at >= cutoff) .order_by(desc(Headline.published_at)) .limit(400) ) rows = (await session.execute(stmt)).scalars().all() by_bucket: dict[str, list[dict]] = defaultdict(list) for h in rows: if len(by_bucket[h.category]) >= 40: continue by_bucket[h.category].append(dict( when=h.published_at.isoformat(), source=h.source, title=h.title, )) return by_bucket async def month_spend(session) -> float: start = month_start() total = (await session.execute( select(func.coalesce(func.sum(AICall.cost_usd), 0.0)) .where(AICall.called_at >= start) )).scalar() return float(total or 0.0)