- Move news_job from hourly to 3x/hour (cron 10,30,50), with a CadencePolicy gate that throttles to active hours (07-21 UTC weekdays at 20 min), off-hours (3 h), weekends (6 h). Keeps the daytime feed fresh without spamming RSS sources overnight. - Tag each headline on ingestion via DeepSeek (BATCH_SIZE=25, max_tokens=4000, json.JSONDecoder().raw_decode + per-row regex recovery for resilient parsing). Vocabulary: 16 tags including new EU / USA / AI / Conflict. NULL tags are picked up automatically on the next news_job run, so back-tagging is implicit rather than a separate migration step. - Tag UI: pill bar above the feed with off → include → exclude cycle on click; shift-click jumps straight to exclude. State persists in localStorage and is injected into /api/news requests via htmx:configRequest. Per-row chips sit to the right of the headline (new 5-column grid: age | source | title | tags | UTC) so vertical density stays high. - Strategic log header bug: model was hallucinating "(Updated 21:30 UTC)" in future tense. Bumped PROMPT_VERSION 6→7, added explicit ban on time-of-day clauses, and supply the actual current UTC time in the user prompt so the model has no need to invent one. Migration 0012 adds headlines.tags (JSON, nullable). Tests cover vocabulary integrity, validation/normalisation, and the JSON-recovery parser (17 tests).
81 lines
3 KiB
Python
81 lines
3 KiB
Python
"""Scheduler container entrypoint. Runs APScheduler with 5 cron jobs, each
|
|
guarded by a MariaDB advisory lock (in job_lifecycle). Waits for the DB to be
|
|
reachable, then schedules and blocks forever."""
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import signal
|
|
|
|
from apscheduler.schedulers.asyncio import AsyncIOScheduler
|
|
from apscheduler.triggers.cron import CronTrigger
|
|
|
|
from app.db import get_engine
|
|
from app.logging import configure_logging, get_logger
|
|
from app.jobs import (
|
|
market_job, news_job, ai_log_job, rollup_job,
|
|
indicator_summary_job, universe_flush_job,
|
|
)
|
|
|
|
|
|
log = get_logger("scheduler")
|
|
|
|
|
|
async def _wait_for_db(retries: int = 60, delay: float = 1.0) -> None:
|
|
engine = get_engine()
|
|
for i in range(retries):
|
|
try:
|
|
async with engine.connect() as conn:
|
|
await conn.execute(__import__("sqlalchemy").text("SELECT 1"))
|
|
return
|
|
except Exception as e:
|
|
log.warning("scheduler.db_wait", attempt=i + 1, error=str(e)[:120])
|
|
await asyncio.sleep(delay)
|
|
raise RuntimeError("DB never became reachable")
|
|
|
|
|
|
async def main() -> None:
|
|
configure_logging()
|
|
log.info("scheduler.starting")
|
|
await _wait_for_db()
|
|
|
|
sched = AsyncIOScheduler(timezone="UTC")
|
|
sched.add_job(market_job.run, CronTrigger(minute=5), name="market_job", id="market_job")
|
|
# 3x/hour: cron fires at xx:10, xx:30, xx:50. NEWS_POLICY inside the
|
|
# job throttles off-hours / weekends so most fires no-op when the
|
|
# markets are closed.
|
|
sched.add_job(news_job.run, CronTrigger(minute="10,30,50"),
|
|
name="news_job", id="news_job")
|
|
# portfolio_job removed in Phase G — server no longer holds holdings.
|
|
sched.add_job(indicator_summary_job.run, CronTrigger(minute=7), name="indicator_summary_job", id="indicator_summary_job")
|
|
sched.add_job(ai_log_job.run, CronTrigger(minute=20), name="ai_log_job", id="ai_log_job")
|
|
sched.add_job(rollup_job.run, CronTrigger(hour=0, minute=5), name="rollup_job", id="rollup_job")
|
|
# Phase G: flush the Redis ticker-add buffer every 5 minutes (xx:01,
|
|
# xx:06, ...). The 1-min offset gives the bucket boundary time to
|
|
# close before we read the previous one.
|
|
sched.add_job(universe_flush_job.run,
|
|
CronTrigger(minute="1-59/5"),
|
|
name="universe_flush_job", id="universe_flush_job")
|
|
sched.add_job(universe_flush_job.evict_run,
|
|
CronTrigger(hour=0, minute=15),
|
|
name="universe_evict_job", id="universe_evict_job")
|
|
sched.start()
|
|
log.info("scheduler.started", jobs=[j.id for j in sched.get_jobs()])
|
|
|
|
# Stay alive until SIGTERM.
|
|
stop_event = asyncio.Event()
|
|
|
|
def _stop(*_):
|
|
log.info("scheduler.stopping")
|
|
stop_event.set()
|
|
|
|
loop = asyncio.get_running_loop()
|
|
for sig in (signal.SIGTERM, signal.SIGINT):
|
|
loop.add_signal_handler(sig, _stop)
|
|
|
|
await stop_event.wait()
|
|
sched.shutdown(wait=False)
|
|
log.info("scheduler.stopped")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main())
|