"""Ephemeral portfolio analysis — generates AI commentary from a pie that exists only in the request's memory. Phase G data-minimisation guarantee: this module **never writes the pie to the database, to logs, to Redis, or to disk**. The positions list enters as a function argument, is used to construct a prompt, the LLM returns text, and the positions are dropped on function return. The `ai_calls` ledger row written for the call contains model + token counts + cost — no holdings. Inputs come from the browser's localStorage. The server's role is to: 1. Validate shape + sanitise free-text fields (prompt-injection defence). 2. Compute summary stats (concentration, top-N, currency mix) — these reduce the LLM payload and let us cap the prompt size. 3. Call OpenRouter via the existing `call_openrouter` helper. 4. Write the cost ledger row (no holdings). 5. Return the commentary text + token / cost metadata. """ from __future__ import annotations import json import math import re from dataclasses import dataclass from datetime import datetime, timezone import httpx from sqlalchemy.ext.asyncio import AsyncSession from app.config import get_settings from app.db import utcnow from app.logging import get_logger from app.models import AICall from app.services.i18n import LANGUAGES, respond_in_clause from app.services.openrouter import ( LogResult, active_model, build_system_prompt, call_llm, ) log = get_logger("portfolio_analysis") PROMPT_VERSION = 1 # Hard caps on prompt construction to keep token spend bounded regardless # of pie size. A pie with 200 positions is real — we summarise the tail. MAX_POSITIONS_INLINED = 25 MAX_NAME_LENGTH = 64 MAX_PROMPT_BYTES = 40_000 # --------------------------------------------------------------------------- # Input shape # --------------------------------------------------------------------------- @dataclass class Position: """One holding as supplied by the browser. Field names match the /api/portfolio/parse response shape.""" yahoo_ticker: str name: str qty: float avg_cost: float currency: str | None = None @dataclass class AnalysisRequest: positions: list[Position] prices: dict[str, dict] # {ticker: {p, c, d:{1d,1m,1y}, ...}} base_currency: str = "GBP" anchor: str | None = None tone: str = "INTERMEDIATE" # NOVICE | INTERMEDIATE | PRO analysis: str = "SPECULATIVE" # DRY | SPECULATIVE lang: str = "en" @dataclass class AnalysisResult: content: str model: str prompt_tokens: int | None completion_tokens: int | None cost_usd: float | None generated_at: datetime # --------------------------------------------------------------------------- # Input validation + sanitisation # --------------------------------------------------------------------------- _CONTROL_CHARS = re.compile(r"[\x00-\x08\x0b\x0c\x0e-\x1f\x7f]") # Prompt-injection markers commonly used to break out of context. Stripped # *and* their presence flagged — caller can choose to reject. _INJECTION_TOKENS = ( "ignore previous", "ignore above", "system:", "assistant:", "you are now", "", "<|im_start|>", "<|im_end|>", ) def _sanitise_text(value: str, max_len: int) -> str: """Strip control chars, collapse whitespace, truncate. Used on user-supplied name fields before they reach the LLM.""" if not isinstance(value, str): return "" cleaned = _CONTROL_CHARS.sub(" ", value).strip() cleaned = re.sub(r"\s+", " ", cleaned) return cleaned[:max_len] def _looks_injected(value: str) -> bool: lower = value.lower() return any(token in lower for token in _INJECTION_TOKENS) def parse_request(payload: dict) -> AnalysisRequest: """Validate + sanitise the JSON the browser sent. Raises ValueError on malformed input. The browser is trusted *minimally* — strings are sanitised, numbers coerced, oversized inputs truncated.""" raw_positions = payload.get("positions") or [] if not isinstance(raw_positions, list) or not raw_positions: raise ValueError("positions must be a non-empty list") positions: list[Position] = [] for p in raw_positions[:200]: # hard cap on input length if not isinstance(p, dict): continue ticker = _sanitise_text(p.get("yahoo_ticker", ""), 32).upper() if not ticker: continue name = _sanitise_text(p.get("name", ""), MAX_NAME_LENGTH) if _looks_injected(name): # Drop the name rather than the whole position — preserves # the ticker (which has structure that constrains injection). name = ticker try: qty = float(p.get("qty") or 0) avg_cost = float(p.get("avg_cost") or 0) except (TypeError, ValueError): continue # Reject NaN / inf — float() accepts these and they'd poison the # prompt with garbage if they reached the LLM. if not (math.isfinite(qty) and math.isfinite(avg_cost)): continue if qty <= 0: continue currency = _sanitise_text(p.get("currency", "") or "", 8) or None positions.append(Position( yahoo_ticker=ticker, name=name, qty=qty, avg_cost=avg_cost, currency=currency, )) if not positions: raise ValueError("no valid positions after sanitisation") prices = payload.get("prices") or {} if not isinstance(prices, dict): prices = {} base_currency = _sanitise_text(payload.get("base_currency", "GBP"), 8) or "GBP" anchor = _sanitise_text(payload.get("anchor") or "", 32) or None tone = _sanitise_text(payload.get("tone", "INTERMEDIATE"), 16) or "INTERMEDIATE" analysis = _sanitise_text(payload.get("analysis", "SPECULATIVE"), 16) or "SPECULATIVE" lang = (payload.get("lang") or "en").strip().lower() if lang not in LANGUAGES: lang = "en" return AnalysisRequest( positions=positions, prices=prices, base_currency=base_currency, anchor=anchor, tone=tone, analysis=analysis, lang=lang, ) # --------------------------------------------------------------------------- # Pre-LLM summarisation: keep prompt size bounded # --------------------------------------------------------------------------- def _enrich(req: AnalysisRequest) -> list[dict]: """Join positions with their current prices; compute per-position value, P/L. Returns a list sorted by current value descending.""" out = [] for p in req.positions: pq = req.prices.get(p.yahoo_ticker) or {} price = pq.get("p") currency = p.currency or pq.get("c") value = (price * p.qty) if isinstance(price, (int, float)) else None invested = p.avg_cost * p.qty ppl = (value - invested) if value is not None else None ppl_pct = ((value / invested - 1) * 100) if (value is not None and invested) else None out.append({ "ticker": p.yahoo_ticker, "name": p.name, "qty": round(p.qty, 6), "avg_cost": round(p.avg_cost, 4), "current_price": price, "currency": currency, "value": round(value, 2) if value is not None else None, "invested": round(invested, 2), "ppl": round(ppl, 2) if ppl is not None else None, "ppl_pct": round(ppl_pct, 2) if ppl_pct is not None else None, "change_1d_pct": pq.get("d", {}).get("1d") if isinstance(pq.get("d"), dict) else None, }) out.sort(key=lambda r: r["value"] if r["value"] is not None else -1, reverse=True) return out def _summarise(enriched: list[dict]) -> dict: """Aggregate stats for the model — concentration, currency mix, P/L overall. Saves tokens by not making the LLM compute these.""" total_value = sum((r["value"] or 0) for r in enriched) total_invested = sum(r["invested"] for r in enriched) by_ccy: dict[str, float] = {} for r in enriched: if r["currency"] and r["value"] is not None: by_ccy[r["currency"]] = by_ccy.get(r["currency"], 0) + r["value"] top_n = enriched[:5] top_share = (sum(r["value"] or 0 for r in top_n) / total_value * 100) if total_value else None return { "n_positions": len(enriched), "total_value": round(total_value, 2), "total_invested": round(total_invested, 2), "total_ppl": round(total_value - total_invested, 2) if total_value else None, "total_ppl_pct": round((total_value / total_invested - 1) * 100, 2) if (total_value and total_invested) else None, "top5_share_pct": round(top_share, 1) if top_share is not None else None, "currency_split_pct": { k: round(v / total_value * 100, 1) for k, v in by_ccy.items() } if total_value else {}, } # --------------------------------------------------------------------------- # Prompt construction # --------------------------------------------------------------------------- _SYSTEM_OVERRIDES = """\ # Mode: portfolio commentary You are writing a short read of ONE investor's portfolio. Be specific to the holdings shown. Frame each observation as analysis ("this allocation implies X under scenario Y"), not advice ("buy X" / "sell Y" are forbidden). # Output - Open with one TL;DR sentence on the portfolio's *posture* (defensive, cyclical, concentrated, etc.). - Then 3-5 short paragraphs covering, in order of relevance to this pie: concentration / single-name risk; sector or geography tilt; currency exposure if multi-currency; notable winners or laggards; what would invalidate the current posture. - ~350 words. No bullet lists. No buy/sell recommendations. - Do not repeat the input data verbatim — interpret it. # Rational vs irrational lens (mandatory) Carry the base prompt's rational-vs-irrational framing through to every paragraph of the portfolio read. For each section above, contrast: - The RATIONAL read: what the underlying factors (fundamentals, macro/policy regime, valuation, currency dynamics) justify for this exposure; - The IRRATIONAL read: what positioning, narrative momentum, sentiment or flows are doing to that same exposure right now. Then name the GAP — does the holder's posture line up with the rational read, or is it riding the irrational one? A paragraph that names only the pie's numbers or only the macro backdrop, without placing the holding on this rational-vs-irrational axis, is incomplete. """ def build_prompt(req: AnalysisRequest) -> tuple[str, str]: """Returns (system_message, user_message). Pure function — pie data flows in, prompt strings flow out, nothing is stored.""" enriched = _enrich(req) summary = _summarise(enriched) # Truncate the per-position table to keep the prompt bounded. head = enriched[:MAX_POSITIONS_INLINED] tail_count = max(0, len(enriched) - MAX_POSITIONS_INLINED) system = build_system_prompt(req.tone, req.analysis) + "\n\n" + _SYSTEM_OVERRIDES + respond_in_clause(req.lang) user_parts = [ f"# Portfolio commentary request — {utcnow().strftime('%Y-%m-%d')}", f"Base currency: {req.base_currency}", ] if req.anchor: user_parts.append(f"Anchor reference date: {req.anchor}") user_parts.append("\n## Portfolio summary") user_parts.append("```json\n" + json.dumps(summary, indent=2) + "\n```") user_parts.append(f"\n## Top {len(head)} positions by value" + (f" ({tail_count} smaller positions omitted)" if tail_count else "")) user_parts.append("```json\n" + json.dumps(head, indent=2, default=str) + "\n```") user_parts.append( "\n## Task\nWrite the portfolio read per the system prompt. ~350 words. " "No preamble, no headers other than the TL;DR opener." ) user = "\n".join(user_parts) # Cap on prompt size (token-cost protection). if len(user) > MAX_PROMPT_BYTES: user = user[:MAX_PROMPT_BYTES] + "\n[truncated]" return system, user # --------------------------------------------------------------------------- # Orchestration # --------------------------------------------------------------------------- async def analyse( session: AsyncSession, req: AnalysisRequest, ) -> AnalysisResult: """The whole pipeline: prompt → LLM → ledger row → result. The `req` object is a function-local — when this function returns, the pie is garbage-collected. No DB writes mention positions.""" s = get_settings() system, user = build_prompt(req) async with httpx.AsyncClient() as client: try: llm: LogResult = await call_llm( client, messages=[ {"role": "system", "content": system}, {"role": "user", "content": user}, ], max_tokens=2000, ) status = "ok" error_msg = None except Exception as e: status = "failed" error_msg = str(e)[:500] llm = None log.error("portfolio_analysis.failed", error=error_msg) # Ledger row — NO portfolio data, just metadata. Same row whether the # call succeeded or failed, so cost-cap and rate-limit logic can # observe the attempt. session.add(AICall( called_at=utcnow(), model=llm.model if llm else active_model(), prompt_tokens=llm.prompt_tokens if llm else None, completion_tokens=llm.completion_tokens if llm else None, cost_usd=llm.cost_usd if llm else None, status=status, error=error_msg, )) await session.commit() if llm is None: raise RuntimeError(error_msg or "portfolio analysis failed") log.info( "portfolio_analysis.ok", n_positions=len(req.positions), prompt_tokens=llm.prompt_tokens, completion_tokens=llm.completion_tokens, cost_usd=llm.cost_usd, ) return AnalysisResult( content=llm.content, model=llm.model, prompt_tokens=llm.prompt_tokens, completion_tokens=llm.completion_tokens, cost_usd=llm.cost_usd, generated_at=datetime.now(timezone.utc), )