"""Server-wide ticker universe — the set of Yahoo tickers Cassandra currently tracks, without user attribution. Population happens in two stages to mitigate the timing-correlation leak: 1. **Buffer.** When /api/portfolio/parse or /api/analyze sees a ticker, it pushes that ticker into a Redis set keyed by the 5-minute wall-clock bucket: ``ticker_universe:buffer:``. The buffer expires automatically (TTL = 2 hours, plenty of slack to recover from a missed flush). 2. **Flush.** A scheduler job runs at fixed 5-minute boundaries (xx:00, xx:05, ...), reads the *previous* bucket (now closed, no more writes landing), and INSERTs new tickers into the `ticker_universe` table. Multiple users' uploads in the same bucket batch together; intra-bucket ordering is randomised by SET-set semantics. The longer a bucket stays open, the more uploads it absorbs, the harder timing-correlation gets. Refresh of `last_referenced_at` for already-known tickers happens synchronously in the same request — it's just an UPDATE and doesn't leak membership. Eviction: passive aging via a daily cron that prunes rows older than UNIVERSE_EVICTION_TTL. """ from __future__ import annotations import time from datetime import datetime, timedelta, timezone from typing import Iterable from sqlalchemy import delete, insert, select, update from sqlalchemy.dialects.mysql import insert as mysql_insert from sqlalchemy.ext.asyncio import AsyncSession from app.db import utcnow from app.logging import get_logger from app.models import TickerUniverse from app.redis_client import get_redis log = get_logger("ticker_universe") # Bucket width for the timing-mitigation flush. 5 minutes is a sane default: # small enough that the price feed isn't *that* stale, large enough that # multiple uploads in a busy hour batch together. At alpha scale (1-10 # users) bucketing has limited statistical effect; we keep it anyway so # the property is in place when traffic grows. BUCKET_SECONDS = 5 * 60 BUFFER_TTL_SECONDS = 2 * 60 * 60 # 2h slack for a missed flush UNIVERSE_EVICTION_TTL = timedelta(days=60) def _as_utc(d: datetime) -> datetime: return d if d.tzinfo is not None else d.replace(tzinfo=timezone.utc) def _bucket_key(now_ts: float | None = None) -> str: ts = int(now_ts if now_ts is not None else time.time()) bucket = (ts // BUCKET_SECONDS) * BUCKET_SECONDS return f"ticker_universe:buffer:{bucket}" def _previous_bucket_key(now_ts: float | None = None) -> str: ts = int(now_ts if now_ts is not None else time.time()) bucket = ((ts // BUCKET_SECONDS) - 1) * BUCKET_SECONDS return f"ticker_universe:buffer:{bucket}" def _normalise(ticker: str) -> str: """Yahoo tickers are case-sensitive (AAPL is not the same as aapl in their world); we uppercase the alpha part and strip whitespace. Suffixes like .L / .DE / .HK are conventionally uppercase already.""" return ticker.strip().upper() async def buffer_tickers(tickers: Iterable[str]) -> int: """Push tickers into the current 5-min flush bucket. Returns the count of distinct tickers buffered. Safe to call with an empty iterable. Already-known tickers are still buffered — the flush job will collapse them via INSERT IGNORE. Cheap and avoids a synchronous DB read here.""" items = [_normalise(t) for t in tickers if t and t.strip()] if not items: return 0 r = get_redis() key = _bucket_key() added = await r.sadd(key, *items) await r.expire(key, BUFFER_TTL_SECONDS) return int(added) async def refresh_references( session: AsyncSession, tickers: Iterable[str], ) -> int: """Bump last_referenced_at for tickers already in the universe. Returns rows updated. Tickers not yet in the universe are silently ignored — they'll land via the buffered flush path.""" items = list({_normalise(t) for t in tickers if t and t.strip()}) if not items: return 0 res = await session.execute( update(TickerUniverse) .where(TickerUniverse.yahoo_ticker.in_(items)) .values(last_referenced_at=utcnow()) ) await session.commit() return int(res.rowcount or 0) async def flush_buffer(session: AsyncSession) -> dict[str, int]: """Read the previous 5-min bucket from Redis, INSERT any new tickers into ticker_universe (collision-safe), and delete the bucket. Returns counts for observability. Idempotent: re-running on the same bucket is a no-op because the bucket is deleted on success.""" r = get_redis() key = _previous_bucket_key() tickers = await r.smembers(key) if not tickers: return {"buffered": 0, "inserted": 0} now = utcnow() payload = [ {"yahoo_ticker": t, "currency": None, "first_seen_at": now, "last_referenced_at": now} for t in tickers ] # ON DUPLICATE KEY UPDATE: existing rows just get their last_referenced_at # bumped. INSERT IGNORE would also work but the timestamp refresh is # useful (a ticker that's been buffered means an active user has it). stmt = mysql_insert(TickerUniverse).values(payload) stmt = stmt.on_duplicate_key_update(last_referenced_at=stmt.inserted.last_referenced_at) res = await session.execute(stmt) await session.commit() inserted = int(res.rowcount or 0) await r.delete(key) log.info("universe.flush", buffered=len(tickers), affected=inserted) return {"buffered": len(tickers), "inserted": inserted} async def evict_stale(session: AsyncSession, ttl: timedelta = UNIVERSE_EVICTION_TTL) -> int: """Passive aging: delete rows not referenced within the TTL window. Returns rows deleted.""" cutoff = utcnow() - ttl res = await session.execute( delete(TickerUniverse) .where(TickerUniverse.last_referenced_at < cutoff) ) await session.commit() deleted = int(res.rowcount or 0) if deleted: log.info("universe.evicted", count=deleted, ttl_days=ttl.days) return deleted async def get_all_tickers(session: AsyncSession) -> list[str]: """Returns every ticker currently tracked. Order is unspecified.""" rows = (await session.execute(select(TickerUniverse.yahoo_ticker))).scalars().all() return list(rows) async def upsert_tickers(session: AsyncSession, tickers: Iterable[str]) -> int: """Synchronous upsert into ticker_universe, bypassing the Redis flush buffer. Used by the /api/portfolio/parse endpoint so the dashboard has live prices immediately after upload, rather than waiting up to 5 minutes for the buffer to flush. Returns the count of distinct tickers in the input. The DB-level side-effect is "row created" for new tickers and "last_referenced_at bumped" for existing ones. At alpha scale (<10 concurrent users) the buffer's timing-correlation mitigation has no statistical effect anyway, so bypassing it is free. When we hit ≥10 users this path will be deprecated in favour of the buffered path, per the Phase G plan.""" items = list({_normalise(t) for t in tickers if t and t.strip()}) if not items: return 0 now = utcnow() payload = [ {"yahoo_ticker": t, "currency": None, "first_seen_at": now, "last_referenced_at": now} for t in items ] stmt = mysql_insert(TickerUniverse).values(payload) stmt = stmt.on_duplicate_key_update( last_referenced_at=stmt.inserted.last_referenced_at, ) await session.execute(stmt) await session.commit() return len(items)