The /api/analyze flow previously read principal.user.lang from the DB on every request and ignored anything the client might send. That races the language toggle's PATCH: a user can flip the toggle and click Generate/Regenerate before the PATCH /api/settings/language hits the DB, so the analysis is sent with the OLD persisted lang while the toggle visually reads as the new one. From the user's POV the analysis comes back in the wrong language. Frontend portfolio.js now reads the live #lang-toggle data-lang attribute (the same source the UI itself uses) and includes it in the /api/analyze body. The dataset attribute is updated optimistically by cassandraSetLang() before the PATCH fires, so it always reflects what the user is looking at. Backend universe.py prefers payload["lang"] when present and falls back to user.lang otherwise — older clients (scripts, direct curl) that don't send anything still get the DB-stored preference. The resolution path is logged so we can confirm in prod which lang actually drove a given request. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
397 lines
15 KiB
Python
397 lines
15 KiB
Python
"""Phase G endpoints — the data-minimised path that replaces per-user
|
|
portfolio persistence.
|
|
|
|
Four routes:
|
|
|
|
- GET /api/universe Full ticker universe + prices.
|
|
Identical payload for every
|
|
authenticated user — request
|
|
bodies don't leak which
|
|
tickers belong to which user.
|
|
- GET /api/universe/sparkline/{ticker} Lazy per-ticker sparkline,
|
|
fetched on hover from the
|
|
browser.
|
|
- POST /api/portfolio/parse CSV → parsed pie back to
|
|
browser localStorage. Seeds
|
|
ticker_universe (no user FK).
|
|
No DB writes for positions.
|
|
- POST /api/analyze Ephemeral AI commentary.
|
|
Pie passed in via JSON body,
|
|
held in memory for one LLM
|
|
call, discarded on response.
|
|
|
|
All routes require authentication (session cookie OR bearer token).
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
from datetime import datetime, timedelta, timezone
|
|
|
|
import httpx
|
|
from fastapi import APIRouter, Depends, File, HTTPException, Request, UploadFile
|
|
from fastapi.responses import JSONResponse
|
|
from sqlalchemy import and_, func, select
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from app.auth import CurrentUser, require_auth
|
|
from app.config import get_settings
|
|
from app.db import get_session, utcnow
|
|
from app.logging import get_logger
|
|
from app.models import Quote, QuoteDaily
|
|
from app.services import fx, portfolio_analysis, ticker_universe
|
|
from app.services.access import require_paid
|
|
from app.services.csv_import import CSVImportError, parse_t212_csv
|
|
from app.services.instrument_map import resolve_slice
|
|
from app.services.market import fetch as market_fetch
|
|
|
|
|
|
log = get_logger("universe_router")
|
|
|
|
router = APIRouter(dependencies=[Depends(require_auth)])
|
|
|
|
|
|
# Hard caps on inbound payload sizes. Anything bigger is rejected with 4xx
|
|
# rather than tying up an LLM call or a CSV parser.
|
|
MAX_CSV_BYTES = 1_048_576 # 1 MB
|
|
MAX_ANALYZE_JSON_BYTES = 256 * 1024 # 256 KB
|
|
|
|
|
|
def _utcnow() -> datetime:
|
|
return datetime.now(timezone.utc)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# GET /api/universe — full ticker universe with prices
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@router.get("/universe")
|
|
async def get_universe(session: AsyncSession = Depends(get_session)) -> JSONResponse:
|
|
"""Return every ticker tracked by Cassandra, with its latest quote.
|
|
|
|
The response is intentionally the *whole* universe — never filtered
|
|
per user — so the access pattern (request body, return body) carries
|
|
no information about which tickers belong to which user. Browser
|
|
filters down to its own holdings client-side.
|
|
|
|
Cache-Control: 60s — the browser refreshes once a minute, matching
|
|
market_job's hourly write cadence with slack."""
|
|
tickers = await ticker_universe.get_all_tickers(session)
|
|
out: dict[str, dict] = {}
|
|
|
|
if tickers:
|
|
# Latest quote per ticker within the last 24h. Older = considered
|
|
# broken feed; we drop it rather than serve stale data.
|
|
cutoff = _utcnow() - timedelta(hours=24)
|
|
subq = (
|
|
select(Quote.symbol, func.max(Quote.fetched_at).label("max_fetched"))
|
|
.where(Quote.symbol.in_(tickers))
|
|
.where(Quote.fetched_at >= cutoff)
|
|
.group_by(Quote.symbol)
|
|
.subquery()
|
|
)
|
|
stmt = (
|
|
select(Quote)
|
|
.join(
|
|
subq,
|
|
and_(
|
|
Quote.symbol == subq.c.symbol,
|
|
Quote.fetched_at == subq.c.max_fetched,
|
|
),
|
|
)
|
|
)
|
|
rows = (await session.execute(stmt)).scalars().all()
|
|
for q in rows:
|
|
if q.price is None:
|
|
continue
|
|
price = q.price
|
|
currency = q.currency
|
|
# LSE tickers come back from Yahoo in pence (GBp / GBX) but
|
|
# T212 CSV invested-value is reported in pounds. Normalise to
|
|
# pounds here so the browser never has to know about the
|
|
# pence quirk. Daily change percentages are unit-independent.
|
|
if currency in ("GBp", "GBX"):
|
|
price = price / 100.0
|
|
currency = "GBP"
|
|
out[q.symbol] = {
|
|
"p": price,
|
|
"c": currency,
|
|
"d": q.changes or {},
|
|
}
|
|
|
|
# FX rates for every currency present, against a USD pivot. Browser
|
|
# uses these to convert each position into the pie's base currency
|
|
# before computing P/L. Same payload for every user.
|
|
needed_ccy = {q.get("c") for q in out.values() if q.get("c")}
|
|
# Always include the common bases so the browser has them even if
|
|
# no current position is denominated in them (e.g. avg cost in GBP
|
|
# but no LSE holding right now).
|
|
needed_ccy.update({"USD", "EUR", "GBP"})
|
|
try:
|
|
fx_rates = await fx.get_rates(needed_ccy)
|
|
except Exception as e:
|
|
log.warning("universe.fx_failed", error=str(e)[:200])
|
|
fx_rates = {"USD": 1.0}
|
|
|
|
body = {
|
|
"as_of": _utcnow().isoformat(),
|
|
"tickers": out,
|
|
"fx": fx_rates,
|
|
}
|
|
return JSONResponse(
|
|
body,
|
|
headers={
|
|
"Cache-Control": "max-age=60",
|
|
"Vary": "Accept-Encoding",
|
|
},
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# GET /api/universe/sparkline/{ticker} — lazy per-ticker history
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@router.get("/universe/sparkline/{ticker}")
|
|
async def get_sparkline(
|
|
ticker: str,
|
|
session: AsyncSession = Depends(get_session),
|
|
) -> JSONResponse:
|
|
"""Daily closes for the last ~60 days. Browser fetches on hover, so
|
|
we cache aggressively. 404 if the symbol has no daily rollup yet."""
|
|
ticker = ticker.strip().upper()[:32]
|
|
if not ticker:
|
|
raise HTTPException(status_code=400, detail="ticker required")
|
|
|
|
rows = (await session.execute(
|
|
select(QuoteDaily.date, QuoteDaily.close)
|
|
.where(QuoteDaily.symbol == ticker)
|
|
.where(QuoteDaily.close.is_not(None))
|
|
.order_by(QuoteDaily.date.desc())
|
|
.limit(60)
|
|
)).all()
|
|
|
|
if not rows:
|
|
raise HTTPException(status_code=404, detail=f"no sparkline data for {ticker}")
|
|
|
|
series = [{"d": r.date.isoformat(), "c": r.close} for r in reversed(rows)]
|
|
return JSONResponse(
|
|
{"ticker": ticker, "series": series},
|
|
headers={"Cache-Control": "max-age=300"},
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# POST /api/portfolio/parse — CSV → parsed pie for browser localStorage
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@router.post("/portfolio/parse", dependencies=[Depends(require_paid)])
|
|
async def parse_portfolio(
|
|
file: UploadFile = File(...),
|
|
session: AsyncSession = Depends(get_session),
|
|
) -> dict:
|
|
"""Parse a T212 pie-export CSV. Returns the structured pie to the
|
|
browser to be stashed in localStorage. **Does NOT persist holdings.**
|
|
|
|
Side effects on the server:
|
|
- Resolved Yahoo tickers are buffered into ticker_universe (no user
|
|
FK, timing-leak mitigation via 5-min batch flush in scheduler).
|
|
- last_referenced_at is bumped on any ticker already in the universe.
|
|
"""
|
|
raw = await file.read()
|
|
if len(raw) > MAX_CSV_BYTES:
|
|
raise HTTPException(status_code=413, detail="CSV too large (1 MB max)")
|
|
if not raw:
|
|
raise HTTPException(status_code=400, detail="empty CSV")
|
|
|
|
try:
|
|
pie = parse_t212_csv(raw)
|
|
except CSVImportError:
|
|
# Unrecognised format — try the LLM-fallback parser. It hits a
|
|
# global format-fingerprint cache first; only the very first
|
|
# upload of each broker format pays an LLM call.
|
|
from app.services.llm_csv_parser import LLMParseError, parse_with_llm
|
|
try:
|
|
pie = await parse_with_llm(raw, session)
|
|
except LLMParseError as e:
|
|
raise HTTPException(status_code=400, detail=str(e))
|
|
|
|
positions_out: list[dict] = []
|
|
yahoo_tickers: list[str] = []
|
|
unmapped: list[str] = []
|
|
|
|
for p in pie.positions:
|
|
if pie.tickers_resolved:
|
|
# LLM path: ``p.slice`` is already a Yahoo-ready ticker. We
|
|
# still do a best-effort InstrumentMap lookup so we can use
|
|
# the canonical name + currency when we happen to have one;
|
|
# but unlike the T212 path we never *drop* a position just
|
|
# because resolve_slice missed.
|
|
yahoo_ticker = p.slice.strip().upper()
|
|
if not yahoo_ticker:
|
|
unmapped.append(p.name or "?")
|
|
continue
|
|
resolved = await resolve_slice(session, yahoo_ticker)
|
|
name = (resolved.name if resolved else None) or p.name
|
|
currency = (
|
|
(resolved.currency if resolved else None)
|
|
or p.currency or "USD"
|
|
)
|
|
else:
|
|
# T212 path: ``p.slice`` is a shortcode that MUST round-trip
|
|
# through the InstrumentMap. Drop unmapped positions — the
|
|
# warnings block surfaces them to the user.
|
|
resolved = await resolve_slice(session, p.slice)
|
|
if resolved is None or not resolved.yahoo_ticker:
|
|
unmapped.append(p.slice or p.name or "?")
|
|
continue
|
|
yahoo_ticker = resolved.yahoo_ticker
|
|
name = resolved.name or p.name
|
|
currency = resolved.currency
|
|
positions_out.append({
|
|
"yahoo_ticker": yahoo_ticker,
|
|
"t212_slice": p.slice,
|
|
"name": name,
|
|
"qty": p.quantity,
|
|
"avg_cost": p.average_price, # @property — no call parens
|
|
"currency": currency,
|
|
})
|
|
yahoo_tickers.append(yahoo_ticker)
|
|
|
|
# Synchronous upsert: bypass the Redis buffer so the dashboard has
|
|
# live prices immediately. The buffer + flush machinery remains for
|
|
# multi-user timing-mitigation when we hit >=10 concurrent users.
|
|
upserted = await ticker_universe.upsert_tickers(session, yahoo_tickers)
|
|
# Also drop into the Redis buffer so flush_buffer's existing tests +
|
|
# ledger remain coherent if/when we re-enable buffered-only mode.
|
|
buffered = await ticker_universe.buffer_tickers(yahoo_tickers)
|
|
|
|
# Inline price fetch for the resolved tickers, so /api/universe has
|
|
# something to return on the very first dashboard load after upload.
|
|
# Bounded concurrency to keep Yahoo happy.
|
|
fetched_ok = 0
|
|
if yahoo_tickers:
|
|
anchor = get_settings().CASSANDRA_ANCHOR_DATE or None
|
|
now = utcnow()
|
|
sem = asyncio.Semaphore(16)
|
|
|
|
async def _fetch_one(client, sym):
|
|
async with sem:
|
|
return await market_fetch(client, sym, sym, "", anchor)
|
|
|
|
try:
|
|
async with httpx.AsyncClient(follow_redirects=True, timeout=20) as client:
|
|
quotes = await asyncio.gather(
|
|
*(_fetch_one(client, t) for t in yahoo_tickers),
|
|
return_exceptions=True,
|
|
)
|
|
for sym, q in zip(yahoo_tickers, quotes):
|
|
if isinstance(q, Exception):
|
|
log.warning("portfolio.parse.fetch_failed", symbol=sym, error=str(q)[:120])
|
|
continue
|
|
session.add(Quote(
|
|
symbol=q.symbol, source=q.source, label=q.label,
|
|
group_name="universe", price=q.price, currency=q.currency,
|
|
as_of=q.as_of, changes=q.changes or None,
|
|
error=(q.error[:250] if q.error else None),
|
|
fetched_at=now,
|
|
))
|
|
if q.price is not None:
|
|
fetched_ok += 1
|
|
await session.commit()
|
|
except Exception as e:
|
|
log.error("portfolio.parse.fetch_block_failed", error=str(e)[:200])
|
|
|
|
log.info(
|
|
"portfolio.parse",
|
|
positions=len(positions_out),
|
|
unmapped=len(unmapped),
|
|
upserted=upserted,
|
|
buffered=buffered,
|
|
priced=fetched_ok,
|
|
)
|
|
|
|
warnings = []
|
|
if unmapped:
|
|
warnings.append(
|
|
f"{len(unmapped)} position(s) could not be resolved to Yahoo tickers: "
|
|
+ ", ".join(unmapped[:10])
|
|
+ (" ..." if len(unmapped) > 10 else "")
|
|
)
|
|
|
|
return {
|
|
"pie_name": pie.name,
|
|
"base_currency": "GBP",
|
|
"positions": positions_out,
|
|
"totals": {
|
|
"invested": pie.invested,
|
|
"value": pie.value,
|
|
"result": pie.result,
|
|
},
|
|
"warnings": warnings,
|
|
}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# POST /api/analyze — ephemeral AI commentary
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@router.post("/analyze")
|
|
async def analyze_portfolio(
|
|
request: Request,
|
|
session: AsyncSession = Depends(get_session),
|
|
principal: CurrentUser = Depends(require_paid),
|
|
) -> dict:
|
|
"""Generate AI commentary for the supplied pie. The pie is held in
|
|
memory only for the duration of the LLM call; nothing about holdings
|
|
is persisted. The ai_calls ledger row records tokens + cost, never
|
|
holdings.
|
|
|
|
Gated behind ``require_paid``: free-tier users get 402.
|
|
Admin bearer-token bypasses the gate for testing."""
|
|
# Read JSON body manually so we can enforce a hard size cap. FastAPI's
|
|
# default body limit is generous; we want tighter control here.
|
|
body = await request.body()
|
|
if len(body) > MAX_ANALYZE_JSON_BYTES:
|
|
raise HTTPException(status_code=413, detail="payload too large")
|
|
|
|
try:
|
|
payload = await request.json()
|
|
except Exception:
|
|
raise HTTPException(status_code=400, detail="malformed JSON body")
|
|
|
|
# Resolve lang. The frontend sends the live toggle state in
|
|
# payload["lang"]; that's what the user is *looking at* right now
|
|
# and is the most up-to-date value. user.lang from the DB is the
|
|
# persisted preference and is used as a fallback when the frontend
|
|
# didn't send anything (older clients, scripts, direct curl).
|
|
db_lang = (
|
|
principal.user.lang if (principal.user and principal.user.lang) else "en"
|
|
)
|
|
incoming = (payload.get("lang") or "").strip().lower()
|
|
payload["lang"] = incoming or db_lang
|
|
log.info("analyze.lang_resolved",
|
|
payload_lang=incoming or None, db_lang=db_lang,
|
|
final=payload["lang"])
|
|
|
|
try:
|
|
req = portfolio_analysis.parse_request(payload)
|
|
except ValueError as e:
|
|
raise HTTPException(status_code=400, detail=str(e))
|
|
|
|
try:
|
|
result = await portfolio_analysis.analyse(session, req)
|
|
except RuntimeError as e:
|
|
log.error("analyze.llm_failed", error=str(e)[:200])
|
|
raise HTTPException(status_code=502, detail="analysis failed — try again")
|
|
|
|
return {
|
|
"content": result.content,
|
|
"model": result.model,
|
|
"generated_at": result.generated_at.isoformat(),
|
|
"prompt_tokens": result.prompt_tokens,
|
|
"completion_tokens": result.completion_tokens,
|
|
"cost_usd": result.cost_usd,
|
|
}
|