read.markets/app/services/portfolio_analysis.py
Giorgio Gilestro 4adc8dfe82 openrouter: split into llm_prompts (prompt engineering) + transport
openrouter.py was 790 lines mixing two orthogonal concerns:
- Prompt engineering (build_system_prompt, build_summary_*,
  build_chat_*, build_daily_digest_*, etc.) — ~400 lines, changes
  weekly as PROMPT_VERSION bumps
- LLM transport (call_llm, _provider_chain, _call_provider, retry
  + fallback machinery) — ~250 lines, rarely changes

Extracted the prompt-engineering surface to app/services/llm_prompts.py.
Transport stays in openrouter.py (consistent with the filename — the
OpenRouter URL is the transport's anchor).

All import sites (jobs, routers, services, tests) split their
multi-import lines into two: prompt-things from llm_prompts, transport
from openrouter. PROMPT_VERSION constant, _TONE_ALIASES, _resolve_tone,
and SYSTEM_PROMPT moved with the prompt functions.

No behaviour change — pure relocation. Function signatures, body, and
naming all preserved.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-27 21:27:23 +02:00

374 lines
14 KiB
Python

"""Ephemeral portfolio analysis — generates AI commentary from a pie that
exists only in the request's memory.
Phase G data-minimisation guarantee: this module **never writes the pie
to the database, to logs, to Redis, or to disk**. The positions list
enters as a function argument, is used to construct a prompt, the LLM
returns text, and the positions are dropped on function return. The
`ai_calls` ledger row written for the call contains model + token counts
+ cost — no holdings.
Inputs come from the browser's localStorage. The server's role is to:
1. Validate shape + sanitise free-text fields (prompt-injection defence).
2. Compute summary stats (concentration, top-N, currency mix) — these
reduce the LLM payload and let us cap the prompt size.
3. Call OpenRouter via the existing `call_openrouter` helper.
4. Write the cost ledger row (no holdings).
5. Return the commentary text + token / cost metadata.
"""
from __future__ import annotations
import json
import math
import re
from dataclasses import dataclass
from datetime import datetime, timezone
import httpx
from sqlalchemy.ext.asyncio import AsyncSession
from app.config import get_settings
from app.db import utcnow
from app.logging import get_logger
from app.models import AICall
from app.services.i18n import LANGUAGES, respond_in_clause
from app.services.llm_prompts import build_system_prompt
from app.services.openrouter import (
LogResult,
active_model,
call_llm,
)
log = get_logger("portfolio_analysis")
PROMPT_VERSION = 1
# Hard caps on prompt construction to keep token spend bounded regardless
# of pie size. A pie with 200 positions is real — we summarise the tail.
MAX_POSITIONS_INLINED = 25
MAX_NAME_LENGTH = 64
MAX_PROMPT_BYTES = 40_000
# ---------------------------------------------------------------------------
# Input shape
# ---------------------------------------------------------------------------
@dataclass
class Position:
"""One holding as supplied by the browser. Field names match the
/api/portfolio/parse response shape."""
yahoo_ticker: str
name: str
qty: float
avg_cost: float
currency: str | None = None
@dataclass
class AnalysisRequest:
positions: list[Position]
prices: dict[str, dict] # {ticker: {p, c, d:{1d,1m,1y}, ...}}
base_currency: str = "GBP"
anchor: str | None = None
tone: str = "INTERMEDIATE" # NOVICE | INTERMEDIATE | PRO
analysis: str = "SPECULATIVE" # DRY | SPECULATIVE
lang: str = "en"
@dataclass
class AnalysisResult:
content: str
model: str
prompt_tokens: int | None
completion_tokens: int | None
cost_usd: float | None
generated_at: datetime
# ---------------------------------------------------------------------------
# Input validation + sanitisation
# ---------------------------------------------------------------------------
_CONTROL_CHARS = re.compile(r"[\x00-\x08\x0b\x0c\x0e-\x1f\x7f]")
# Prompt-injection markers commonly used to break out of context. Stripped
# *and* their presence flagged — caller can choose to reject.
_INJECTION_TOKENS = (
"ignore previous", "ignore above", "system:", "assistant:",
"you are now", "</system>", "<|im_start|>", "<|im_end|>",
)
def _sanitise_text(value: str, max_len: int) -> str:
"""Strip control chars, collapse whitespace, truncate. Used on
user-supplied name fields before they reach the LLM."""
if not isinstance(value, str):
return ""
cleaned = _CONTROL_CHARS.sub(" ", value).strip()
cleaned = re.sub(r"\s+", " ", cleaned)
return cleaned[:max_len]
def _looks_injected(value: str) -> bool:
lower = value.lower()
return any(token in lower for token in _INJECTION_TOKENS)
def parse_request(payload: dict) -> AnalysisRequest:
"""Validate + sanitise the JSON the browser sent. Raises ValueError on
malformed input. The browser is trusted *minimally* — strings are
sanitised, numbers coerced, oversized inputs truncated."""
raw_positions = payload.get("positions") or []
if not isinstance(raw_positions, list) or not raw_positions:
raise ValueError("positions must be a non-empty list")
positions: list[Position] = []
for p in raw_positions[:200]: # hard cap on input length
if not isinstance(p, dict):
continue
ticker = _sanitise_text(p.get("yahoo_ticker", ""), 32).upper()
if not ticker:
continue
name = _sanitise_text(p.get("name", ""), MAX_NAME_LENGTH)
if _looks_injected(name):
# Drop the name rather than the whole position — preserves
# the ticker (which has structure that constrains injection).
name = ticker
try:
qty = float(p.get("qty") or 0)
avg_cost = float(p.get("avg_cost") or 0)
except (TypeError, ValueError):
continue
# Reject NaN / inf — float() accepts these and they'd poison the
# prompt with garbage if they reached the LLM.
if not (math.isfinite(qty) and math.isfinite(avg_cost)):
continue
if qty <= 0:
continue
currency = _sanitise_text(p.get("currency", "") or "", 8) or None
positions.append(Position(
yahoo_ticker=ticker, name=name, qty=qty,
avg_cost=avg_cost, currency=currency,
))
if not positions:
raise ValueError("no valid positions after sanitisation")
prices = payload.get("prices") or {}
if not isinstance(prices, dict):
prices = {}
base_currency = _sanitise_text(payload.get("base_currency", "GBP"), 8) or "GBP"
anchor = _sanitise_text(payload.get("anchor") or "", 32) or None
tone = _sanitise_text(payload.get("tone", "INTERMEDIATE"), 16) or "INTERMEDIATE"
analysis = _sanitise_text(payload.get("analysis", "SPECULATIVE"), 16) or "SPECULATIVE"
lang = (payload.get("lang") or "en").strip().lower()
if lang not in LANGUAGES:
lang = "en"
return AnalysisRequest(
positions=positions, prices=prices, base_currency=base_currency,
anchor=anchor, tone=tone, analysis=analysis, lang=lang,
)
# ---------------------------------------------------------------------------
# Pre-LLM summarisation: keep prompt size bounded
# ---------------------------------------------------------------------------
def _enrich(req: AnalysisRequest) -> list[dict]:
"""Join positions with their current prices; compute per-position
value, P/L. Returns a list sorted by current value descending."""
out = []
for p in req.positions:
pq = req.prices.get(p.yahoo_ticker) or {}
price = pq.get("p")
currency = p.currency or pq.get("c")
value = (price * p.qty) if isinstance(price, (int, float)) else None
invested = p.avg_cost * p.qty
ppl = (value - invested) if value is not None else None
ppl_pct = ((value / invested - 1) * 100) if (value is not None and invested) else None
out.append({
"ticker": p.yahoo_ticker,
"name": p.name,
"qty": round(p.qty, 6),
"avg_cost": round(p.avg_cost, 4),
"current_price": price,
"currency": currency,
"value": round(value, 2) if value is not None else None,
"invested": round(invested, 2),
"ppl": round(ppl, 2) if ppl is not None else None,
"ppl_pct": round(ppl_pct, 2) if ppl_pct is not None else None,
"change_1d_pct": pq.get("d", {}).get("1d") if isinstance(pq.get("d"), dict) else None,
})
out.sort(key=lambda r: r["value"] if r["value"] is not None else -1, reverse=True)
return out
def _summarise(enriched: list[dict]) -> dict:
"""Aggregate stats for the model — concentration, currency mix,
P/L overall. Saves tokens by not making the LLM compute these."""
total_value = sum((r["value"] or 0) for r in enriched)
total_invested = sum(r["invested"] for r in enriched)
by_ccy: dict[str, float] = {}
for r in enriched:
if r["currency"] and r["value"] is not None:
by_ccy[r["currency"]] = by_ccy.get(r["currency"], 0) + r["value"]
top_n = enriched[:5]
top_share = (sum(r["value"] or 0 for r in top_n) / total_value * 100) if total_value else None
return {
"n_positions": len(enriched),
"total_value": round(total_value, 2),
"total_invested": round(total_invested, 2),
"total_ppl": round(total_value - total_invested, 2) if total_value else None,
"total_ppl_pct": round((total_value / total_invested - 1) * 100, 2)
if (total_value and total_invested) else None,
"top5_share_pct": round(top_share, 1) if top_share is not None else None,
"currency_split_pct": {
k: round(v / total_value * 100, 1)
for k, v in by_ccy.items()
} if total_value else {},
}
# ---------------------------------------------------------------------------
# Prompt construction
# ---------------------------------------------------------------------------
_SYSTEM_OVERRIDES = """\
# Mode: portfolio commentary
You are writing a short read of ONE investor's portfolio. Be specific to
the holdings shown. Frame each observation as analysis ("this allocation
implies X under scenario Y"), not advice ("buy X" / "sell Y" are forbidden).
# Output
- Open with one TL;DR sentence on the portfolio's *posture* (defensive,
cyclical, concentrated, etc.).
- Then 3-5 short paragraphs covering, in order of relevance to this pie:
concentration / single-name risk; sector or geography tilt;
currency exposure if multi-currency; notable winners or laggards;
what would invalidate the current posture.
- ~350 words. No bullet lists. No buy/sell recommendations.
- Do not repeat the input data verbatim — interpret it.
# Rational vs irrational lens (mandatory)
Carry the base prompt's rational-vs-irrational framing through to every
paragraph of the portfolio read. For each section above, contrast:
- The RATIONAL read: what the underlying factors (fundamentals,
macro/policy regime, valuation, currency dynamics) justify for this
exposure;
- The IRRATIONAL read: what positioning, narrative momentum, sentiment
or flows are doing to that same exposure right now.
Then name the GAP — does the holder's posture line up with the rational
read, or is it riding the irrational one? A paragraph that names only
the pie's numbers or only the macro backdrop, without placing the
holding on this rational-vs-irrational axis, is incomplete.
"""
def build_prompt(req: AnalysisRequest) -> tuple[str, str]:
"""Returns (system_message, user_message). Pure function — pie data
flows in, prompt strings flow out, nothing is stored."""
enriched = _enrich(req)
summary = _summarise(enriched)
# Truncate the per-position table to keep the prompt bounded.
head = enriched[:MAX_POSITIONS_INLINED]
tail_count = max(0, len(enriched) - MAX_POSITIONS_INLINED)
system = build_system_prompt(req.tone, req.analysis) + "\n\n" + _SYSTEM_OVERRIDES + respond_in_clause(req.lang)
user_parts = [
f"# Portfolio commentary request — {utcnow().strftime('%Y-%m-%d')}",
f"Base currency: {req.base_currency}",
]
if req.anchor:
user_parts.append(f"Anchor reference date: {req.anchor}")
user_parts.append("\n## Portfolio summary")
user_parts.append("```json\n" + json.dumps(summary, indent=2) + "\n```")
user_parts.append(f"\n## Top {len(head)} positions by value"
+ (f" ({tail_count} smaller positions omitted)" if tail_count else ""))
user_parts.append("```json\n" + json.dumps(head, indent=2, default=str) + "\n```")
user_parts.append(
"\n## Task\nWrite the portfolio read per the system prompt. ~350 words. "
"No preamble, no headers other than the TL;DR opener."
)
user = "\n".join(user_parts)
# Cap on prompt size (token-cost protection).
if len(user) > MAX_PROMPT_BYTES:
user = user[:MAX_PROMPT_BYTES] + "\n[truncated]"
return system, user
# ---------------------------------------------------------------------------
# Orchestration
# ---------------------------------------------------------------------------
async def analyse(
session: AsyncSession,
req: AnalysisRequest,
) -> AnalysisResult:
"""The whole pipeline: prompt → LLM → ledger row → result. The `req`
object is a function-local — when this function returns, the pie is
garbage-collected. No DB writes mention positions."""
s = get_settings()
system, user = build_prompt(req)
async with httpx.AsyncClient() as client:
try:
llm: LogResult = await call_llm(
client,
messages=[
{"role": "system", "content": system},
{"role": "user", "content": user},
],
max_tokens=2000,
)
status = "ok"
error_msg = None
except Exception as e:
status = "failed"
error_msg = str(e)[:500]
llm = None
log.error("portfolio_analysis.failed", error=error_msg)
# Ledger row — NO portfolio data, just metadata. Same row whether the
# call succeeded or failed, so cost-cap and rate-limit logic can
# observe the attempt.
session.add(AICall(
called_at=utcnow(),
model=llm.model if llm else active_model(),
prompt_tokens=llm.prompt_tokens if llm else None,
completion_tokens=llm.completion_tokens if llm else None,
cost_usd=llm.cost_usd if llm else None,
status=status,
error=error_msg,
))
await session.commit()
if llm is None:
raise RuntimeError(error_msg or "portfolio analysis failed")
log.info(
"portfolio_analysis.ok",
n_positions=len(req.positions),
prompt_tokens=llm.prompt_tokens,
completion_tokens=llm.completion_tokens,
cost_usd=llm.cost_usd,
)
return AnalysisResult(
content=llm.content,
model=llm.model,
prompt_tokens=llm.prompt_tokens,
completion_tokens=llm.completion_tokens,
cost_usd=llm.cost_usd,
generated_at=datetime.now(timezone.utc),
)