read.markets/app/scheduler_main.py

86 lines
3.3 KiB
Python

"""Scheduler container entrypoint. Runs APScheduler with 5 cron jobs, each
guarded by a MariaDB advisory lock (in job_lifecycle). Waits for the DB to be
reachable, then schedules and blocks forever."""
from __future__ import annotations
import asyncio
import signal
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger
from app.db import get_engine
from app.logging import configure_logging, get_logger
from app.jobs import (
market_job, news_job, ai_log_job, rollup_job,
indicator_summary_job, universe_flush_job, email_digest_job,
)
log = get_logger("scheduler")
async def _wait_for_db(retries: int = 60, delay: float = 1.0) -> None:
engine = get_engine()
for i in range(retries):
try:
async with engine.connect() as conn:
await conn.execute(__import__("sqlalchemy").text("SELECT 1"))
return
except Exception as e:
log.warning("scheduler.db_wait", attempt=i + 1, error=str(e)[:120])
await asyncio.sleep(delay)
raise RuntimeError("DB never became reachable")
async def main() -> None:
configure_logging()
log.info("scheduler.starting")
await _wait_for_db()
sched = AsyncIOScheduler(timezone="UTC")
sched.add_job(market_job.run, CronTrigger(minute=5), name="market_job", id="market_job")
# 3x/hour: cron fires at xx:10, xx:30, xx:50. NEWS_POLICY inside the
# job throttles off-hours / weekends so most fires no-op when the
# markets are closed.
sched.add_job(news_job.run, CronTrigger(minute="10,30,50"),
name="news_job", id="news_job")
# portfolio_job removed in Phase G — server no longer holds holdings.
sched.add_job(indicator_summary_job.run, CronTrigger(minute=7), name="indicator_summary_job", id="indicator_summary_job")
sched.add_job(ai_log_job.run, CronTrigger(minute=20), name="ai_log_job", id="ai_log_job")
sched.add_job(rollup_job.run, CronTrigger(hour=0, minute=5), name="rollup_job", id="rollup_job")
# Phase G: flush the Redis ticker-add buffer every 5 minutes (xx:01,
# xx:06, ...). The 1-min offset gives the bucket boundary time to
# close before we read the previous one.
sched.add_job(universe_flush_job.run,
CronTrigger(minute="1-59/5"),
name="universe_flush_job", id="universe_flush_job")
sched.add_job(universe_flush_job.evict_run,
CronTrigger(hour=0, minute=15),
name="universe_evict_job", id="universe_evict_job")
# Editorial email digests: daily Mon-Sat for paid opt-in, weekly Sunday
# recap for everyone opt-in. Job decides which kind based on weekday.
sched.add_job(email_digest_job.run,
CronTrigger(hour=6, minute=30),
name="email_digest_job", id="email_digest_job")
sched.start()
log.info("scheduler.started", jobs=[j.id for j in sched.get_jobs()])
# Stay alive until SIGTERM.
stop_event = asyncio.Event()
def _stop(*_):
log.info("scheduler.stopping")
stop_event.set()
loop = asyncio.get_running_loop()
for sig in (signal.SIGTERM, signal.SIGINT):
loop.add_signal_handler(sig, _stop)
await stop_event.wait()
sched.shutdown(wait=False)
log.info("scheduler.stopped")
if __name__ == "__main__":
asyncio.run(main())