"""Hourly market ingestion: fetch every (symbol, group) defined in TOML *plus* every ticker in the Phase G shared ticker_universe, inserting one Quote row per fetch.""" from __future__ import annotations import asyncio import httpx from app.config import get_settings, load_groups from app.db import utcnow from app.jobs._helpers import job_lifecycle, log from app.models import Quote from app.services.market import fetch from app.services.ticker_universe import get_all_tickers async def run() -> None: async with job_lifecycle("market_job") as (session, run): if run.status == "skipped": return s = get_settings() groups = load_groups(s.BASELINE_TOML, s.PORTFOLIO_TOML) anchor = s.CASSANDRA_ANCHOR_DATE or None # Build the (group, symbol, label, note) work list from config TOML. items_flat: list[tuple[str, str, str, str]] = [ (group, sym, lab, note) for group, items in groups.items() for sym, lab, note in items ] configured_syms = {sym for _, sym, _, _ in items_flat} # Phase G: extend with anything in ticker_universe that isn't # already covered by config. These land under group_name="universe" # — the /api/universe endpoint reads the latest quote per symbol # regardless of group. universe_tickers = await get_all_tickers(session) for t in universe_tickers: if t not in configured_syms: items_flat.append(("universe", t, t, "")) async with httpx.AsyncClient(follow_redirects=True) as client: tasks = [ fetch(client, sym, lab, note, anchor) for _, sym, lab, note in items_flat ] # Run in parallel but bounded — Yahoo can throttle if we hammer. sem = asyncio.Semaphore(16) async def bounded(t): async with sem: return await t quotes = await asyncio.gather(*(bounded(t) for t in tasks)) now = utcnow() for (group, _sym, _lab, _note), q in zip(items_flat, quotes): session.add(Quote( symbol=q.symbol, source=q.source, label=q.label, group_name=group, price=q.price, currency=q.currency, as_of=q.as_of, changes=q.changes or None, # Truncate to the column's 255-char limit. Some providers # return verbose redirect chains that blow the limit. error=(q.error[:250] if q.error else None), fetched_at=now, )) await session.commit() run.items_written = len(quotes) log.info( "market_job.done", count=len(quotes), configured=len(configured_syms), universe=len(universe_tickers), ) if __name__ == "__main__": asyncio.run(run())