read.markets/app/jobs/_market_context.py
Giorgio Gilestro 82e529b6b2 jobs: extract shared market-context helpers from ai_log_job
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-25 23:18:27 +02:00

86 lines
2.7 KiB
Python

"""Shared market-context helpers consumed by LLM-driven jobs.
Both ai_log_job and email_digest_job pull "the latest tape" the same
way — most-recent quote per (group, symbol), last N hours of headlines
bucketed by category, and the running month's LLM spend. Moved here so
neither job depends on the other's internals.
"""
from __future__ import annotations
from collections import defaultdict
from datetime import timedelta
from sqlalchemy import desc, func, select
from app.db import utcnow
from app.models import AICall, Headline, Quote
from app.services.openrouter import month_start
REFERENCE_LINE = (
"S&P 7,501 (ATH) · VIX 18.0 · US 10y 4.45% · HY OAS 279bps · "
"Brent $109/bbl · Gold $4,651/oz · CPI 3.8% YoY"
)
async def latest_quotes_by_group(session) -> dict[str, list[dict]]:
"""Latest quote per (group, symbol). Skips error rows where price is null."""
sub = (
select(
Quote.group_name,
Quote.symbol,
func.max(Quote.fetched_at).label("mx"),
)
.group_by(Quote.group_name, Quote.symbol)
.subquery()
)
stmt = (
select(Quote)
.join(
sub,
(Quote.group_name == sub.c.group_name)
& (Quote.symbol == sub.c.symbol)
& (Quote.fetched_at == sub.c.mx),
)
.order_by(Quote.group_name, Quote.symbol)
)
rows = (await session.execute(stmt)).scalars().all()
by_group: dict[str, list[dict]] = defaultdict(list)
for q in rows:
by_group[q.group_name].append(dict(
symbol=q.symbol, source=q.source, label=q.label,
note="", price=q.price, currency=q.currency,
as_of=q.as_of, changes=q.changes,
))
return by_group
async def recent_headlines_by_bucket(session, hours: float = 24) -> dict[str, list[dict]]:
"""Last N hours of headlines, bucketed by category. Hard cap per
bucket to keep the prompt under ~40KB."""
cutoff = utcnow() - timedelta(hours=hours)
stmt = (
select(Headline)
.where(Headline.published_at >= cutoff)
.order_by(desc(Headline.published_at))
.limit(400)
)
rows = (await session.execute(stmt)).scalars().all()
by_bucket: dict[str, list[dict]] = defaultdict(list)
for h in rows:
if len(by_bucket[h.category]) >= 40:
continue
by_bucket[h.category].append(dict(
when=h.published_at.isoformat(),
source=h.source, title=h.title,
))
return by_bucket
async def month_spend(session) -> float:
start = month_start()
total = (await session.execute(
select(func.coalesce(func.sum(AICall.cost_usd), 0.0))
.where(AICall.called_at >= start)
)).scalar()
return float(total or 0.0)