"""Flush the ticker_universe Redis buffer into the DB at 5-min boundaries. The buffer is keyed by 5-minute wall-clock buckets: `ticker_universe:buffer:`. This job runs slightly after each boundary and reads the *previous* bucket, ensuring it's closed (no new writes can land in it). New tickers are inserted into `ticker_universe`; already-known ones have their `last_referenced_at` bumped. The lag between bucket-close and flush is intentional: it batches multiple users' uploads into one INSERT, making timing-correlation between "user uploaded at T" and "ticker XYZ appeared at T+δ" weaker. """ from __future__ import annotations import asyncio from app.jobs._helpers import job_lifecycle, log from app.services.ticker_universe import evict_stale, flush_buffer async def run() -> None: async with job_lifecycle("universe_flush_job") as (session, run): if run.status == "skipped": return out = await flush_buffer(session) run.items_written = out.get("inserted", 0) log.info("universe_flush.done", **out) async def evict_run() -> None: """Separate daily run: prune entries that haven't been referenced within the eviction TTL (60 days). Kept in this module so all universe-maintenance lives in one place.""" async with job_lifecycle("universe_evict_job") as (session, run): if run.status == "skipped": return deleted = await evict_stale(session) run.items_written = deleted log.info("universe_evict.done", deleted=deleted) if __name__ == "__main__": asyncio.run(run())