Stale comments referencing completed migrations: - universe.py "remain live until step 10 of Phase G" — endpoints gone - api.py "Portfolio endpoints moved to universe.py" — empty block - csv_import.py "persist_pie removed in Phase G" — historical context Dead Settings fields (all confirmed unreferenced by app code): - CASSANDRA_PORT — port is hardcoded in docker-compose / uvicorn cmd - POLAR_API_KEY — Polar was replaced by Stripe - CASSANDRA_MOCK — env var still set by tests as a sentinel; the Settings field itself was never read - CASSANDRA_BASE_CURRENCY — "GBP" hardcoded inline elsewhere Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
388 lines
15 KiB
Python
388 lines
15 KiB
Python
"""Phase G endpoints — the data-minimised path that replaces per-user
|
|
portfolio persistence.
|
|
|
|
Four routes:
|
|
|
|
- GET /api/universe Full ticker universe + prices.
|
|
Identical payload for every
|
|
authenticated user — request
|
|
bodies don't leak which
|
|
tickers belong to which user.
|
|
- GET /api/universe/sparkline/{ticker} Lazy per-ticker sparkline,
|
|
fetched on hover from the
|
|
browser.
|
|
- POST /api/portfolio/parse CSV → parsed pie back to
|
|
browser localStorage. Seeds
|
|
ticker_universe (no user FK).
|
|
No DB writes for positions.
|
|
- POST /api/analyze Ephemeral AI commentary.
|
|
Pie passed in via JSON body,
|
|
held in memory for one LLM
|
|
call, discarded on response.
|
|
|
|
All routes require authentication (session cookie OR bearer token).
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
from datetime import datetime, timedelta, timezone
|
|
|
|
import httpx
|
|
from fastapi import APIRouter, Depends, File, HTTPException, Request, UploadFile
|
|
from fastapi.responses import JSONResponse
|
|
from sqlalchemy import and_, func, select
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from app.auth import CurrentUser, require_auth
|
|
from app.config import get_settings
|
|
from app.db import get_session, utcnow
|
|
from app.logging import get_logger
|
|
from app.models import Quote, QuoteDaily
|
|
from app.services import fx, portfolio_analysis, ticker_universe
|
|
from app.services.access import require_paid
|
|
from app.services.csv_import import CSVImportError, parse_t212_csv
|
|
from app.services.instrument_map import resolve_slice
|
|
from app.services.market import fetch as market_fetch
|
|
|
|
|
|
log = get_logger("universe_router")
|
|
|
|
router = APIRouter(dependencies=[Depends(require_auth)])
|
|
|
|
|
|
# Hard caps on inbound payload sizes. Anything bigger is rejected with 4xx
|
|
# rather than tying up an LLM call or a CSV parser.
|
|
MAX_CSV_BYTES = 1_048_576 # 1 MB
|
|
MAX_ANALYZE_JSON_BYTES = 256 * 1024 # 256 KB
|
|
|
|
|
|
def _utcnow() -> datetime:
|
|
return datetime.now(timezone.utc)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# GET /api/universe — full ticker universe with prices
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@router.get("/universe")
|
|
async def get_universe(session: AsyncSession = Depends(get_session)) -> JSONResponse:
|
|
"""Return every ticker tracked by Cassandra, with its latest quote.
|
|
|
|
The response is intentionally the *whole* universe — never filtered
|
|
per user — so the access pattern (request body, return body) carries
|
|
no information about which tickers belong to which user. Browser
|
|
filters down to its own holdings client-side.
|
|
|
|
Cache-Control: 60s — the browser refreshes once a minute, matching
|
|
market_job's hourly write cadence with slack."""
|
|
tickers = await ticker_universe.get_all_tickers(session)
|
|
out: dict[str, dict] = {}
|
|
|
|
if tickers:
|
|
# Latest quote per ticker within the last 24h. Older = considered
|
|
# broken feed; we drop it rather than serve stale data.
|
|
cutoff = _utcnow() - timedelta(hours=24)
|
|
subq = (
|
|
select(Quote.symbol, func.max(Quote.fetched_at).label("max_fetched"))
|
|
.where(Quote.symbol.in_(tickers))
|
|
.where(Quote.fetched_at >= cutoff)
|
|
.group_by(Quote.symbol)
|
|
.subquery()
|
|
)
|
|
stmt = (
|
|
select(Quote)
|
|
.join(
|
|
subq,
|
|
and_(
|
|
Quote.symbol == subq.c.symbol,
|
|
Quote.fetched_at == subq.c.max_fetched,
|
|
),
|
|
)
|
|
)
|
|
rows = (await session.execute(stmt)).scalars().all()
|
|
for q in rows:
|
|
if q.price is None:
|
|
continue
|
|
price = q.price
|
|
currency = q.currency
|
|
# LSE tickers come back from Yahoo in pence (GBp / GBX) but
|
|
# T212 CSV invested-value is reported in pounds. Normalise to
|
|
# pounds here so the browser never has to know about the
|
|
# pence quirk. Daily change percentages are unit-independent.
|
|
if currency in ("GBp", "GBX"):
|
|
price = price / 100.0
|
|
currency = "GBP"
|
|
out[q.symbol] = {
|
|
"p": price,
|
|
"c": currency,
|
|
"d": q.changes or {},
|
|
}
|
|
|
|
# FX rates for every currency present, against a USD pivot. Browser
|
|
# uses these to convert each position into the pie's base currency
|
|
# before computing P/L. Same payload for every user.
|
|
needed_ccy = {q.get("c") for q in out.values() if q.get("c")}
|
|
# Always include the common bases so the browser has them even if
|
|
# no current position is denominated in them (e.g. avg cost in GBP
|
|
# but no LSE holding right now).
|
|
needed_ccy.update({"USD", "EUR", "GBP"})
|
|
try:
|
|
fx_rates = await fx.get_rates(needed_ccy)
|
|
except Exception as e:
|
|
log.warning("universe.fx_failed", error=str(e)[:200])
|
|
fx_rates = {"USD": 1.0}
|
|
|
|
body = {
|
|
"as_of": _utcnow().isoformat(),
|
|
"tickers": out,
|
|
"fx": fx_rates,
|
|
}
|
|
return JSONResponse(
|
|
body,
|
|
headers={
|
|
"Cache-Control": "max-age=60",
|
|
"Vary": "Accept-Encoding",
|
|
},
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# GET /api/universe/sparkline/{ticker} — lazy per-ticker history
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@router.get("/universe/sparkline/{ticker}")
|
|
async def get_sparkline(
|
|
ticker: str,
|
|
session: AsyncSession = Depends(get_session),
|
|
) -> JSONResponse:
|
|
"""Daily closes for the last ~60 days. Browser fetches on hover, so
|
|
we cache aggressively. 404 if the symbol has no daily rollup yet."""
|
|
ticker = ticker.strip().upper()[:32]
|
|
if not ticker:
|
|
raise HTTPException(status_code=400, detail="ticker required")
|
|
|
|
rows = (await session.execute(
|
|
select(QuoteDaily.date, QuoteDaily.close)
|
|
.where(QuoteDaily.symbol == ticker)
|
|
.where(QuoteDaily.close.is_not(None))
|
|
.order_by(QuoteDaily.date.desc())
|
|
.limit(60)
|
|
)).all()
|
|
|
|
if not rows:
|
|
raise HTTPException(status_code=404, detail=f"no sparkline data for {ticker}")
|
|
|
|
series = [{"d": r.date.isoformat(), "c": r.close} for r in reversed(rows)]
|
|
return JSONResponse(
|
|
{"ticker": ticker, "series": series},
|
|
headers={"Cache-Control": "max-age=300"},
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# POST /api/portfolio/parse — CSV → parsed pie for browser localStorage
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@router.post("/portfolio/parse", dependencies=[Depends(require_paid)])
|
|
async def parse_portfolio(
|
|
file: UploadFile = File(...),
|
|
session: AsyncSession = Depends(get_session),
|
|
) -> dict:
|
|
"""Parse a T212 pie-export CSV. Returns the structured pie to the
|
|
browser to be stashed in localStorage. **Does NOT persist holdings.**
|
|
|
|
Side effects on the server:
|
|
- Resolved Yahoo tickers are buffered into ticker_universe (no user
|
|
FK, timing-leak mitigation via 5-min batch flush in scheduler).
|
|
- last_referenced_at is bumped on any ticker already in the universe.
|
|
"""
|
|
raw = await file.read()
|
|
if len(raw) > MAX_CSV_BYTES:
|
|
raise HTTPException(status_code=413, detail="CSV too large (1 MB max)")
|
|
if not raw:
|
|
raise HTTPException(status_code=400, detail="empty CSV")
|
|
|
|
try:
|
|
pie = parse_t212_csv(raw)
|
|
except CSVImportError:
|
|
# Unrecognised format — try the LLM-fallback parser. It hits a
|
|
# global format-fingerprint cache first; only the very first
|
|
# upload of each broker format pays an LLM call.
|
|
from app.services.llm_csv_parser import LLMParseError, parse_with_llm
|
|
try:
|
|
pie = await parse_with_llm(raw, session)
|
|
except LLMParseError as e:
|
|
raise HTTPException(status_code=400, detail=str(e))
|
|
|
|
positions_out: list[dict] = []
|
|
yahoo_tickers: list[str] = []
|
|
unmapped: list[str] = []
|
|
|
|
for p in pie.positions:
|
|
if pie.tickers_resolved:
|
|
# LLM path: ``p.slice`` is already a Yahoo-ready ticker. We
|
|
# still do a best-effort InstrumentMap lookup so we can use
|
|
# the canonical name + currency when we happen to have one;
|
|
# but unlike the T212 path we never *drop* a position just
|
|
# because resolve_slice missed.
|
|
yahoo_ticker = p.slice.strip().upper()
|
|
if not yahoo_ticker:
|
|
unmapped.append(p.name or "?")
|
|
continue
|
|
resolved = await resolve_slice(session, yahoo_ticker)
|
|
name = (resolved.name if resolved else None) or p.name
|
|
currency = (
|
|
(resolved.currency if resolved else None)
|
|
or p.currency or "USD"
|
|
)
|
|
else:
|
|
# T212 path: ``p.slice`` is a shortcode that MUST round-trip
|
|
# through the InstrumentMap. Drop unmapped positions — the
|
|
# warnings block surfaces them to the user.
|
|
resolved = await resolve_slice(session, p.slice)
|
|
if resolved is None or not resolved.yahoo_ticker:
|
|
unmapped.append(p.slice or p.name or "?")
|
|
continue
|
|
yahoo_ticker = resolved.yahoo_ticker
|
|
name = resolved.name or p.name
|
|
currency = resolved.currency
|
|
positions_out.append({
|
|
"yahoo_ticker": yahoo_ticker,
|
|
"t212_slice": p.slice,
|
|
"name": name,
|
|
"qty": p.quantity,
|
|
"avg_cost": p.average_price, # @property — no call parens
|
|
"currency": currency,
|
|
})
|
|
yahoo_tickers.append(yahoo_ticker)
|
|
|
|
# Synchronous upsert: bypass the Redis buffer so the dashboard has
|
|
# live prices immediately. The buffer + flush machinery remains for
|
|
# multi-user timing-mitigation when we hit >=10 concurrent users.
|
|
upserted = await ticker_universe.upsert_tickers(session, yahoo_tickers)
|
|
# Also drop into the Redis buffer so flush_buffer's existing tests +
|
|
# ledger remain coherent if/when we re-enable buffered-only mode.
|
|
buffered = await ticker_universe.buffer_tickers(yahoo_tickers)
|
|
|
|
# Inline price fetch for the resolved tickers, so /api/universe has
|
|
# something to return on the very first dashboard load after upload.
|
|
# Bounded concurrency to keep Yahoo happy.
|
|
fetched_ok = 0
|
|
if yahoo_tickers:
|
|
anchor = get_settings().CASSANDRA_ANCHOR_DATE or None
|
|
now = utcnow()
|
|
sem = asyncio.Semaphore(16)
|
|
|
|
async def _fetch_one(client, sym):
|
|
async with sem:
|
|
return await market_fetch(client, sym, sym, "", anchor)
|
|
|
|
try:
|
|
async with httpx.AsyncClient(follow_redirects=True, timeout=20) as client:
|
|
quotes = await asyncio.gather(
|
|
*(_fetch_one(client, t) for t in yahoo_tickers),
|
|
return_exceptions=True,
|
|
)
|
|
for sym, q in zip(yahoo_tickers, quotes):
|
|
if isinstance(q, Exception):
|
|
log.warning("portfolio.parse.fetch_failed", symbol=sym, error=str(q)[:120])
|
|
continue
|
|
session.add(Quote(
|
|
symbol=q.symbol, source=q.source, label=q.label,
|
|
group_name="universe", price=q.price, currency=q.currency,
|
|
as_of=q.as_of, changes=q.changes or None,
|
|
error=(q.error[:250] if q.error else None),
|
|
fetched_at=now,
|
|
))
|
|
if q.price is not None:
|
|
fetched_ok += 1
|
|
await session.commit()
|
|
except Exception as e:
|
|
log.error("portfolio.parse.fetch_block_failed", error=str(e)[:200])
|
|
|
|
log.info(
|
|
"portfolio.parse",
|
|
positions=len(positions_out),
|
|
unmapped=len(unmapped),
|
|
upserted=upserted,
|
|
buffered=buffered,
|
|
priced=fetched_ok,
|
|
)
|
|
|
|
warnings = []
|
|
if unmapped:
|
|
warnings.append(
|
|
f"{len(unmapped)} position(s) could not be resolved to Yahoo tickers: "
|
|
+ ", ".join(unmapped[:10])
|
|
+ (" ..." if len(unmapped) > 10 else "")
|
|
)
|
|
|
|
return {
|
|
"pie_name": pie.name,
|
|
"base_currency": "GBP",
|
|
"positions": positions_out,
|
|
"totals": {
|
|
"invested": pie.invested,
|
|
"value": pie.value,
|
|
"result": pie.result,
|
|
},
|
|
"warnings": warnings,
|
|
}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# POST /api/analyze — ephemeral AI commentary
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@router.post("/analyze")
|
|
async def analyze_portfolio(
|
|
request: Request,
|
|
session: AsyncSession = Depends(get_session),
|
|
principal: CurrentUser = Depends(require_paid),
|
|
) -> dict:
|
|
"""Generate AI commentary for the supplied pie. The pie is held in
|
|
memory only for the duration of the LLM call; nothing about holdings
|
|
is persisted. The ai_calls ledger row records tokens + cost, never
|
|
holdings.
|
|
|
|
Gated behind ``require_paid``: free-tier users get 402.
|
|
Admin bearer-token bypasses the gate for testing."""
|
|
# Read JSON body manually so we can enforce a hard size cap. FastAPI's
|
|
# default body limit is generous; we want tighter control here.
|
|
body = await request.body()
|
|
if len(body) > MAX_ANALYZE_JSON_BYTES:
|
|
raise HTTPException(status_code=413, detail="payload too large")
|
|
|
|
try:
|
|
payload = await request.json()
|
|
except Exception:
|
|
raise HTTPException(status_code=400, detail="malformed JSON body")
|
|
|
|
user_lang = (
|
|
principal.user.lang if (principal.user and principal.user.lang) else "en"
|
|
)
|
|
payload["lang"] = user_lang
|
|
|
|
try:
|
|
req = portfolio_analysis.parse_request(payload)
|
|
except ValueError as e:
|
|
raise HTTPException(status_code=400, detail=str(e))
|
|
|
|
try:
|
|
result = await portfolio_analysis.analyse(session, req)
|
|
except RuntimeError as e:
|
|
log.error("analyze.llm_failed", error=str(e)[:200])
|
|
raise HTTPException(status_code=502, detail="analysis failed — try again")
|
|
|
|
return {
|
|
"content": result.content,
|
|
"model": result.model,
|
|
"generated_at": result.generated_at.isoformat(),
|
|
"prompt_tokens": result.prompt_tokens,
|
|
"completion_tokens": result.completion_tokens,
|
|
"cost_usd": result.cost_usd,
|
|
}
|