"""News ingestion + AI tagging. Cron fires every 20 minutes. NEWS_POLICY gates the actual work: - Active window (07-21 UTC weekdays): always run (20-min gap) - Off-hours weekday: skip until 3h since last success - Weekend: skip until 6h since last success Each run does (a) fresh fetch of all enabled feeds + per-ticker Yahoo news, (b) bulk INSERT IGNORE into headlines, (c) batch-tags any rows still NULL via news_tagging. Untagged rows survive run failures and are retried automatically next cycle. """ from __future__ import annotations import asyncio import httpx from sqlalchemy import desc, func, select, update from sqlalchemy.dialects.mysql import insert as mysql_insert from app.db import utcnow from app.jobs._helpers import job_lifecycle, log from app.models import Feed, Headline, InstrumentMap, JobRun, TickerUniverse from app.services.cadence import NEWS_POLICY from app.services.news import dedupe, fetch_feed, fetch_yahoo_news from app.services.news_tagging import ToTag, tag_titles AUTO_DISABLE_AT = 5 # Cap on how many untagged headlines a single run will tag. Stops a # backlog from blowing the cost ledger if the tagger has been failing # for a while. TAG_PER_RUN_LIMIT = 200 async def _process_feed(client: httpx.AsyncClient, feed: Feed) -> tuple[Feed, list]: try: items = await fetch_feed(client, feed.name, feed.category, feed.url) feed.consecutive_failures = 0 feed.last_success_at = utcnow() return feed, items except Exception as e: feed.consecutive_failures += 1 if feed.consecutive_failures >= AUTO_DISABLE_AT: feed.enabled = False log.warning("feed.fetch_failed", name=feed.name, fails=feed.consecutive_failures, error=str(e)) return feed, [] async def run() -> None: async with job_lifecycle("news_job") as (session, run): if run.status == "skipped": return # Market-aware cadence: skip this fire if too soon (off-hours / # weekend). Active window still runs every 20 min. last_success = (await session.execute( select(func.max(JobRun.finished_at)).where( JobRun.name == "news_job", JobRun.status == "success", ) )).scalar() should_run, reason = NEWS_POLICY.should_run(last_success) if not should_run: log.info("news_job.cadence_skip", reason=reason) run.status = "skipped" run.error = reason return feeds = ( await session.execute(select(Feed).where(Feed.enabled == True)) ).scalars().all() # Per-ticker news: pull every Yahoo ticker in the anonymous # universe (Phase G), pair each with its display name from # instrument_map when available. No per-user attribution. uni_tickers = (await session.execute( select(TickerUniverse.yahoo_ticker) )).scalars().all() ticker_pairs: list[tuple[str, str]] = [] if uni_tickers: name_rows = (await session.execute( select(InstrumentMap.yahoo_ticker, InstrumentMap.name) .where(InstrumentMap.yahoo_ticker.in_(uni_tickers)) )).all() names = {y: n for y, n in name_rows if y} ticker_pairs = [(t, names.get(t) or t) for t in uni_tickers] async with httpx.AsyncClient(follow_redirects=True) as client: feed_results = await asyncio.gather( *(_process_feed(client, f) for f in feeds) ) ticker_results = await asyncio.gather( *(fetch_yahoo_news(client, t, query_override=n) for t, n in ticker_pairs) ) all_headlines = [] for _feed, items in feed_results: all_headlines.extend(items) for items in ticker_results: all_headlines.extend(items) headlines = dedupe(all_headlines) # Bulk INSERT IGNORE (fingerprint UNIQUE de-dupes across runs). if headlines: stmt = mysql_insert(Headline).values([ dict( source=h.source, category=h.category, title=h.title[:512], url=h.url[:1024], published_at=h.when, fetched_at=utcnow(), fingerprint=h.fingerprint, ) for h in headlines ]).prefix_with("IGNORE") await session.execute(stmt) await session.commit() # Tag any headlines still NULL — fresh inserts from this run plus # any that failed to tag on previous runs. Bounded by # TAG_PER_RUN_LIMIT so a long outage doesn't blow the cost ledger. untagged_rows = (await session.execute( select(Headline.id, Headline.title) .where(Headline.tags.is_(None)) .order_by(desc(Headline.published_at)) .limit(TAG_PER_RUN_LIMIT) )).all() tagged_count = 0 if untagged_rows: items = [ToTag(id=int(r.id), title=r.title) for r in untagged_rows] tags_by_id = await tag_titles(items) for hid, tags in tags_by_id.items(): await session.execute( update(Headline) .where(Headline.id == hid) .values(tags=tags) ) tagged_count = len(tags_by_id) await session.commit() run.items_written = len(headlines) log.info( "news_job.done", fetched=len(all_headlines), kept=len(headlines), untagged_seen=len(untagged_rows), tagged=tagged_count, ) if __name__ == "__main__": asyncio.run(run())