read.markets/app/jobs/rollup_job.py
Giorgio Gilestro a10409c02b initial commit — cassandra v0.1
Containerised macro-strategy dashboard: 4-panel web UI (indicators,
portfolio, flash news, AI strategic log), MariaDB store, hourly
ingestion jobs, OpenRouter-backed AI analysis.

Ports the four prototype scripts in the parent dir (market_pulse,
flash_news, trading212, strategic_log) into async services backed by a
persistent DB and served via FastAPI + Jinja2 + HTMX. APScheduler runs
as a separate compose service for crash-safety and easier restarts.

Portfolio composition + position names come live from Trading 212;
news per-ticker headlines reuse those names. Tone (NOVICE/INTERMEDIATE/
PRO) and analysis style (DRY/SPECULATIVE) are env-configurable and
stored on each log row so historical entries show what produced them.

Default model is deepseek/deepseek-v4-flash (overridable via env).
Light/dark theme toggle, sans-serif for prose surfaces, monospace for
data. Bearer-token auth, OpenRouter monthly cost cap, RSS feeds auto-
disabled on consecutive failures.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-15 21:56:10 +01:00

70 lines
2.2 KiB
Python

"""Daily rollup: collapse `quotes` into `quotes_daily` (one row per
(symbol, date) using the latest fetched_at of the day) and prune `quotes`
older than 30 days. Sparklines read from `quotes_daily`."""
from __future__ import annotations
import asyncio
from datetime import date, timedelta
from sqlalchemy import delete, func, select
from sqlalchemy.dialects.mysql import insert as mysql_insert
from app.db import utcnow
from app.jobs._helpers import job_lifecycle, log
from app.models import Quote, QuoteDaily
PRUNE_DAYS = 30
async def run() -> None:
async with job_lifecycle("rollup_job") as (session, jr):
if jr.status == "skipped":
return
# 1. Rollup: latest fetched_at of each (symbol, date) gets stored as close.
sub = (
select(
Quote.symbol.label("symbol"),
func.date(Quote.fetched_at).label("date"),
func.max(Quote.fetched_at).label("mx"),
)
.where(Quote.price.is_not(None))
.group_by(Quote.symbol, func.date(Quote.fetched_at))
.subquery()
)
rows = (await session.execute(
select(
Quote.symbol,
func.date(Quote.fetched_at).label("d"),
Quote.price,
Quote.source,
)
.join(sub,
(Quote.symbol == sub.c.symbol)
& (Quote.fetched_at == sub.c.mx))
)).all()
if rows:
stmt = mysql_insert(QuoteDaily).values([
dict(symbol=r.symbol, date=r.d, close=r.price,
high=r.price, low=r.price, source=r.source)
for r in rows
])
stmt = stmt.on_duplicate_key_update(close=stmt.inserted.close)
await session.execute(stmt)
# 2. Prune raw quotes older than PRUNE_DAYS.
cutoff = utcnow() - timedelta(days=PRUNE_DAYS)
result = await session.execute(
delete(Quote).where(Quote.fetched_at < cutoff)
)
await session.commit()
pruned = result.rowcount or 0
jr.items_written = len(rows)
log.info("rollup_job.done", rolled=len(rows), pruned=pruned)
if __name__ == "__main__":
asyncio.run(run())