read.markets/app/services/ticker_universe.py
Giorgio Gilestro 6e7f57c6b2 phase G: data minimisation + passwordless auth + DeepSeek-first LLM
Server no longer holds portfolios. Holdings live in the browser
(localStorage); the server publishes an anonymous ticker_universe and a
gzipped /api/universe payload identical for every authenticated user, so
access patterns can't betray which tickers a user holds. AI commentary
is generated ephemerally from the browser-supplied pie and the cost
ledger row records no positions. Migrations 0009-0011 added the
universe table and dropped positions / portfolio_snapshots /
portfolios.

Authentication is now e-mail OTP only. Migration 0010 dropped
password_hash and email_verified (every active session is by
construction proof of email control). The /signup endpoint is gone;
signup and login share a single email-entry page. Email rendering is
HTML+plain-text multipart with a shared brand palette (app/branding.py)
asserted in sync with the CSS by a drift-detection test.

LLM provider defaults to DeepSeek-direct (cheaper, api.deepseek.com)
with OpenRouter as automatic fallback if DeepSeek fails. ai_log_job and
indicator_summary_job now iterate the two tones (NOVICE, INTERMEDIATE)
per cycle so the dashboard's tone toggle is instant; PROMPT_VERSION
bumped to 6 with an educational anti-TA / anti-gambling stance baked
into _CORE. NOVICE mode renders a curated glossary inline (CBOE VIX,
yield curve, HY OAS, etc.) with JS-positioned tooltips that survive
viewport edges and sticky bars. Model name and tokens hidden from the
user UI; still recorded in StrategicLog.model and AICall for admin.

Layout adds a sticky top nav, a sticky bottom markets bar (one chip per
exchange with status LED + headline index + 1d change), and
Phase H feedback reporting is queued in tasks/todo.md.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-18 14:16:57 +01:00

195 lines
7.4 KiB
Python

"""Server-wide ticker universe — the set of Yahoo tickers Cassandra currently
tracks, without user attribution.
Population happens in two stages to mitigate the timing-correlation leak:
1. **Buffer.** When /api/portfolio/parse or /api/analyze sees a ticker, it
pushes that ticker into a Redis set keyed by the 5-minute wall-clock
bucket: ``ticker_universe:buffer:<bucket>``. The buffer expires
automatically (TTL = 2 hours, plenty of slack to recover from a missed
flush).
2. **Flush.** A scheduler job runs at fixed 5-minute boundaries (xx:00,
xx:05, ...), reads the *previous* bucket (now closed, no more writes
landing), and INSERTs new tickers into the `ticker_universe` table.
Multiple users' uploads in the same bucket batch together; intra-bucket
ordering is randomised by SET-set semantics. The longer a bucket stays
open, the more uploads it absorbs, the harder timing-correlation gets.
Refresh of `last_referenced_at` for already-known tickers happens
synchronously in the same request — it's just an UPDATE and doesn't leak
membership.
Eviction: passive aging via a daily cron that prunes rows older than
UNIVERSE_EVICTION_TTL.
"""
from __future__ import annotations
import time
from datetime import datetime, timedelta, timezone
from typing import Iterable
from sqlalchemy import delete, insert, select, update
from sqlalchemy.dialects.mysql import insert as mysql_insert
from sqlalchemy.ext.asyncio import AsyncSession
from app.db import utcnow
from app.logging import get_logger
from app.models import TickerUniverse
from app.redis_client import get_redis
log = get_logger("ticker_universe")
# Bucket width for the timing-mitigation flush. 5 minutes is a sane default:
# small enough that the price feed isn't *that* stale, large enough that
# multiple uploads in a busy hour batch together. At alpha scale (1-10
# users) bucketing has limited statistical effect; we keep it anyway so
# the property is in place when traffic grows.
BUCKET_SECONDS = 5 * 60
BUFFER_TTL_SECONDS = 2 * 60 * 60 # 2h slack for a missed flush
UNIVERSE_EVICTION_TTL = timedelta(days=60)
def _as_utc(d: datetime) -> datetime:
return d if d.tzinfo is not None else d.replace(tzinfo=timezone.utc)
def _bucket_key(now_ts: float | None = None) -> str:
ts = int(now_ts if now_ts is not None else time.time())
bucket = (ts // BUCKET_SECONDS) * BUCKET_SECONDS
return f"ticker_universe:buffer:{bucket}"
def _previous_bucket_key(now_ts: float | None = None) -> str:
ts = int(now_ts if now_ts is not None else time.time())
bucket = ((ts // BUCKET_SECONDS) - 1) * BUCKET_SECONDS
return f"ticker_universe:buffer:{bucket}"
def _normalise(ticker: str) -> str:
"""Yahoo tickers are case-sensitive (AAPL is not the same as aapl in
their world); we uppercase the alpha part and strip whitespace. Suffixes
like .L / .DE / .HK are conventionally uppercase already."""
return ticker.strip().upper()
async def buffer_tickers(tickers: Iterable[str]) -> int:
"""Push tickers into the current 5-min flush bucket. Returns the count
of distinct tickers buffered. Safe to call with an empty iterable.
Already-known tickers are still buffered — the flush job will collapse
them via INSERT IGNORE. Cheap and avoids a synchronous DB read here."""
items = [_normalise(t) for t in tickers if t and t.strip()]
if not items:
return 0
r = get_redis()
key = _bucket_key()
added = await r.sadd(key, *items)
await r.expire(key, BUFFER_TTL_SECONDS)
return int(added)
async def refresh_references(
session: AsyncSession,
tickers: Iterable[str],
) -> int:
"""Bump last_referenced_at for tickers already in the universe.
Returns rows updated. Tickers not yet in the universe are silently
ignored — they'll land via the buffered flush path."""
items = list({_normalise(t) for t in tickers if t and t.strip()})
if not items:
return 0
res = await session.execute(
update(TickerUniverse)
.where(TickerUniverse.yahoo_ticker.in_(items))
.values(last_referenced_at=utcnow())
)
await session.commit()
return int(res.rowcount or 0)
async def flush_buffer(session: AsyncSession) -> dict[str, int]:
"""Read the previous 5-min bucket from Redis, INSERT any new tickers
into ticker_universe (collision-safe), and delete the bucket. Returns
counts for observability.
Idempotent: re-running on the same bucket is a no-op because the bucket
is deleted on success."""
r = get_redis()
key = _previous_bucket_key()
tickers = await r.smembers(key)
if not tickers:
return {"buffered": 0, "inserted": 0}
now = utcnow()
payload = [
{"yahoo_ticker": t, "currency": None,
"first_seen_at": now, "last_referenced_at": now}
for t in tickers
]
# ON DUPLICATE KEY UPDATE: existing rows just get their last_referenced_at
# bumped. INSERT IGNORE would also work but the timestamp refresh is
# useful (a ticker that's been buffered means an active user has it).
stmt = mysql_insert(TickerUniverse).values(payload)
stmt = stmt.on_duplicate_key_update(last_referenced_at=stmt.inserted.last_referenced_at)
res = await session.execute(stmt)
await session.commit()
inserted = int(res.rowcount or 0)
await r.delete(key)
log.info("universe.flush", buffered=len(tickers), affected=inserted)
return {"buffered": len(tickers), "inserted": inserted}
async def evict_stale(session: AsyncSession, ttl: timedelta = UNIVERSE_EVICTION_TTL) -> int:
"""Passive aging: delete rows not referenced within the TTL window.
Returns rows deleted."""
cutoff = utcnow() - ttl
res = await session.execute(
delete(TickerUniverse)
.where(TickerUniverse.last_referenced_at < cutoff)
)
await session.commit()
deleted = int(res.rowcount or 0)
if deleted:
log.info("universe.evicted", count=deleted, ttl_days=ttl.days)
return deleted
async def get_all_tickers(session: AsyncSession) -> list[str]:
"""Returns every ticker currently tracked. Order is unspecified."""
rows = (await session.execute(select(TickerUniverse.yahoo_ticker))).scalars().all()
return list(rows)
async def upsert_tickers(session: AsyncSession, tickers: Iterable[str]) -> int:
"""Synchronous upsert into ticker_universe, bypassing the Redis flush
buffer. Used by the /api/portfolio/parse endpoint so the dashboard
has live prices immediately after upload, rather than waiting up to
5 minutes for the buffer to flush.
Returns the count of distinct tickers in the input. The DB-level
side-effect is "row created" for new tickers and "last_referenced_at
bumped" for existing ones.
At alpha scale (<10 concurrent users) the buffer's timing-correlation
mitigation has no statistical effect anyway, so bypassing it is free.
When we hit ≥10 users this path will be deprecated in favour of the
buffered path, per the Phase G plan."""
items = list({_normalise(t) for t in tickers if t and t.strip()})
if not items:
return 0
now = utcnow()
payload = [
{"yahoo_ticker": t, "currency": None,
"first_seen_at": now, "last_referenced_at": now}
for t in items
]
stmt = mysql_insert(TickerUniverse).values(payload)
stmt = stmt.on_duplicate_key_update(
last_referenced_at=stmt.inserted.last_referenced_at,
)
await session.execute(stmt)
await session.commit()
return len(items)