"""Daily rollup: collapse `quotes` into `quotes_daily` (one row per (symbol, date) using the latest fetched_at of the day) and prune `quotes` older than 30 days. Sparklines read from `quotes_daily`.""" from __future__ import annotations import asyncio from datetime import date, timedelta from sqlalchemy import delete, func, select from sqlalchemy.dialects.mysql import insert as mysql_insert from app.db import utcnow from app.jobs._helpers import job_lifecycle, log from app.models import Quote, QuoteDaily PRUNE_DAYS = 30 async def run() -> None: async with job_lifecycle("rollup_job") as (session, jr): if jr.status == "skipped": return # 1. Rollup: latest fetched_at of each (symbol, date) gets stored as close. sub = ( select( Quote.symbol.label("symbol"), func.date(Quote.fetched_at).label("date"), func.max(Quote.fetched_at).label("mx"), ) .where(Quote.price.is_not(None)) .group_by(Quote.symbol, func.date(Quote.fetched_at)) .subquery() ) rows = (await session.execute( select( Quote.symbol, func.date(Quote.fetched_at).label("d"), Quote.price, Quote.source, ) .join(sub, (Quote.symbol == sub.c.symbol) & (Quote.fetched_at == sub.c.mx)) )).all() if rows: stmt = mysql_insert(QuoteDaily).values([ dict(symbol=r.symbol, date=r.d, close=r.price, high=r.price, low=r.price, source=r.source) for r in rows ]) stmt = stmt.on_duplicate_key_update(close=stmt.inserted.close) await session.execute(stmt) # 2. Prune raw quotes older than PRUNE_DAYS. cutoff = utcnow() - timedelta(days=PRUNE_DAYS) result = await session.execute( delete(Quote).where(Quote.fetched_at < cutoff) ) await session.commit() pruned = result.rowcount or 0 jr.items_written = len(rows) log.info("rollup_job.done", rolled=len(rows), pruned=pruned) if __name__ == "__main__": asyncio.run(run())