"""Hourly news ingestion. Reads enabled feeds from the DB (not TOML — DB has the authoritative enabled/failure state). Per-ticker Yahoo news pulled for each symbol in the default portfolio group ('pie').""" from __future__ import annotations import asyncio import httpx from sqlalchemy import desc, select from sqlalchemy.dialects.mysql import insert as mysql_insert from app.db import utcnow from app.jobs._helpers import job_lifecycle, log from app.models import Feed, Headline, Portfolio, PortfolioSnapshot, Position from app.services.news import dedupe, fetch_feed, fetch_yahoo_news AUTO_DISABLE_AT = 5 async def _process_feed(client: httpx.AsyncClient, feed: Feed) -> tuple[Feed, list]: try: items = await fetch_feed(client, feed.name, feed.category, feed.url) feed.consecutive_failures = 0 feed.last_success_at = utcnow() return feed, items except Exception as e: feed.consecutive_failures += 1 if feed.consecutive_failures >= AUTO_DISABLE_AT: feed.enabled = False log.warning("feed.fetch_failed", name=feed.name, fails=feed.consecutive_failures, error=str(e)) return feed, [] async def run() -> None: async with job_lifecycle("news_job") as (session, run): if run.status == "skipped": return feeds = ( await session.execute(select(Feed).where(Feed.enabled == True)) ).scalars().all() # Portfolio tickers + names now come from the latest T212 snapshot, # not from TOML. The (ticker, name) pair lets fetch_yahoo_news skip # the chart-meta round-trip and use the proper company name directly. latest_snap_id = (await session.execute( select(PortfolioSnapshot.id) .order_by(desc(PortfolioSnapshot.snapshot_at)) .limit(1) )).scalar_one_or_none() ticker_pairs: list[tuple[str, str]] = [] if latest_snap_id is not None: positions = (await session.execute( select(Position).where(Position.snapshot_id == latest_snap_id) )).scalars().all() ticker_pairs = [(p.ticker, p.name or p.ticker) for p in positions] async with httpx.AsyncClient(follow_redirects=True) as client: feed_results = await asyncio.gather( *(_process_feed(client, f) for f in feeds) ) ticker_results = await asyncio.gather( *(fetch_yahoo_news(client, t, query_override=n) for t, n in ticker_pairs) ) all_headlines = [] for _feed, items in feed_results: all_headlines.extend(items) for items in ticker_results: all_headlines.extend(items) headlines = dedupe(all_headlines) # Bulk INSERT IGNORE (fingerprint UNIQUE de-dupes across runs). if headlines: stmt = mysql_insert(Headline).values([ dict( source=h.source, category=h.category, title=h.title[:512], url=h.url[:1024], published_at=h.when, fetched_at=utcnow(), fingerprint=h.fingerprint, ) for h in headlines ]).prefix_with("IGNORE") await session.execute(stmt) await session.commit() run.items_written = len(headlines) log.info("news_job.done", fetched=len(all_headlines), kept=len(headlines)) if __name__ == "__main__": asyncio.run(run())