read.markets/app/scheduler_main.py
Giorgio Gilestro 2013bfa8cc news: auto-tag headlines + market-aware cadence + filter UI
- Move news_job from hourly to 3x/hour (cron 10,30,50), with a CadencePolicy
  gate that throttles to active hours (07-21 UTC weekdays at 20 min), off-hours
  (3 h), weekends (6 h). Keeps the daytime feed fresh without spamming RSS
  sources overnight.
- Tag each headline on ingestion via DeepSeek (BATCH_SIZE=25, max_tokens=4000,
  json.JSONDecoder().raw_decode + per-row regex recovery for resilient parsing).
  Vocabulary: 16 tags including new EU / USA / AI / Conflict. NULL tags are
  picked up automatically on the next news_job run, so back-tagging is implicit
  rather than a separate migration step.
- Tag UI: pill bar above the feed with off → include → exclude cycle on click;
  shift-click jumps straight to exclude. State persists in localStorage and is
  injected into /api/news requests via htmx:configRequest. Per-row chips sit to
  the right of the headline (new 5-column grid: age | source | title | tags |
  UTC) so vertical density stays high.
- Strategic log header bug: model was hallucinating "(Updated 21:30 UTC)" in
  future tense. Bumped PROMPT_VERSION 6→7, added explicit ban on time-of-day
  clauses, and supply the actual current UTC time in the user prompt so the
  model has no need to invent one.

Migration 0012 adds headlines.tags (JSON, nullable). Tests cover vocabulary
integrity, validation/normalisation, and the JSON-recovery parser (17 tests).
2026-05-21 23:25:03 +01:00

81 lines
3 KiB
Python

"""Scheduler container entrypoint. Runs APScheduler with 5 cron jobs, each
guarded by a MariaDB advisory lock (in job_lifecycle). Waits for the DB to be
reachable, then schedules and blocks forever."""
from __future__ import annotations
import asyncio
import signal
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger
from app.db import get_engine
from app.logging import configure_logging, get_logger
from app.jobs import (
market_job, news_job, ai_log_job, rollup_job,
indicator_summary_job, universe_flush_job,
)
log = get_logger("scheduler")
async def _wait_for_db(retries: int = 60, delay: float = 1.0) -> None:
engine = get_engine()
for i in range(retries):
try:
async with engine.connect() as conn:
await conn.execute(__import__("sqlalchemy").text("SELECT 1"))
return
except Exception as e:
log.warning("scheduler.db_wait", attempt=i + 1, error=str(e)[:120])
await asyncio.sleep(delay)
raise RuntimeError("DB never became reachable")
async def main() -> None:
configure_logging()
log.info("scheduler.starting")
await _wait_for_db()
sched = AsyncIOScheduler(timezone="UTC")
sched.add_job(market_job.run, CronTrigger(minute=5), name="market_job", id="market_job")
# 3x/hour: cron fires at xx:10, xx:30, xx:50. NEWS_POLICY inside the
# job throttles off-hours / weekends so most fires no-op when the
# markets are closed.
sched.add_job(news_job.run, CronTrigger(minute="10,30,50"),
name="news_job", id="news_job")
# portfolio_job removed in Phase G — server no longer holds holdings.
sched.add_job(indicator_summary_job.run, CronTrigger(minute=7), name="indicator_summary_job", id="indicator_summary_job")
sched.add_job(ai_log_job.run, CronTrigger(minute=20), name="ai_log_job", id="ai_log_job")
sched.add_job(rollup_job.run, CronTrigger(hour=0, minute=5), name="rollup_job", id="rollup_job")
# Phase G: flush the Redis ticker-add buffer every 5 minutes (xx:01,
# xx:06, ...). The 1-min offset gives the bucket boundary time to
# close before we read the previous one.
sched.add_job(universe_flush_job.run,
CronTrigger(minute="1-59/5"),
name="universe_flush_job", id="universe_flush_job")
sched.add_job(universe_flush_job.evict_run,
CronTrigger(hour=0, minute=15),
name="universe_evict_job", id="universe_evict_job")
sched.start()
log.info("scheduler.started", jobs=[j.id for j in sched.get_jobs()])
# Stay alive until SIGTERM.
stop_event = asyncio.Event()
def _stop(*_):
log.info("scheduler.stopping")
stop_event.set()
loop = asyncio.get_running_loop()
for sig in (signal.SIGTERM, signal.SIGINT):
loop.add_signal_handler(sig, _stop)
await stop_event.wait()
sched.shutdown(wait=False)
log.info("scheduler.stopped")
if __name__ == "__main__":
asyncio.run(main())