read.markets/app/services/portfolio_analysis.py
Giorgio Gilestro 0060166d32 review: per-surface rider, loosen for portfolio commentary
Reviewer was rejecting legitimate IT portfolio analyses, citing
descriptive risk language as actionable advice:

  reason: "Allocation guidance throughout: 'concentrazione gestibile',
  'non eliminabile', 'bassa esposizione', 'va monitorato'. Treats
  portfolio construction as actionable."

These phrases describe portfolio state (manageable concentration,
non-eliminable risk, low exposure, warrants monitoring) without
directing the user to take action. They are exactly the kind of
prose a portfolio commentary surface is supposed to produce. The
reviewer's generic "no financial advice" rule is too broad here.

Add a `surface` parameter to review_read() with a per-surface rider
mechanism (_SURFACE_RIDERS). The "portfolio" rider:

- Lists DESCRIPTIVE phrasings that are EXPLICITLY permitted:
  attribute naming ("high concentration", "currency exposure"),
  thesis invalidation conditions, impersonal observations about a
  position's sensitivity.
- Tightens the reject list to EXPLICIT calls to action: imperative
  verbs aimed at the reader, "you should", "consider X-ing",
  specific allocation prescriptions, price-target predictions.

portfolio_analysis.analyse() now passes surface="portfolio". All
other reviewer call sites (indicator summary, log, chat, digest)
default to surface=None and keep the generic rules.

tests/conftest.py's autouse review_read stub picks up **_kw so
adding new keyword arguments to review_read doesn't keep breaking
the locale-integration tests.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-29 16:44:27 +02:00

423 lines
17 KiB
Python

"""Ephemeral portfolio analysis — generates AI commentary from a pie that
exists only in the request's memory.
Phase G data-minimisation guarantee: this module **never writes the pie
to the database, to logs, to Redis, or to disk**. The positions list
enters as a function argument, is used to construct a prompt, the LLM
returns text, and the positions are dropped on function return. The
`ai_calls` ledger row written for the call contains model + token counts
+ cost — no holdings.
Inputs come from the browser's localStorage. The server's role is to:
1. Validate shape + sanitise free-text fields (prompt-injection defence).
2. Compute summary stats (concentration, top-N, currency mix) — these
reduce the LLM payload and let us cap the prompt size.
3. Call OpenRouter via the existing `call_openrouter` helper.
4. Write the cost ledger row (no holdings).
5. Return the commentary text + token / cost metadata.
"""
from __future__ import annotations
import json
import math
import re
from dataclasses import dataclass
from datetime import datetime, timezone
import httpx
from sqlalchemy.ext.asyncio import AsyncSession
from app.config import get_settings
from app.db import utcnow
from app.logging import get_logger
from app.models import AICall
from app.services.i18n import LANGUAGES, language_directive_lead, respond_in_clause
from app.services.llm_prompts import build_system_prompt
from app.services.output_review import review_read
from app.services.openrouter import (
LogResult,
active_model,
call_llm,
)
log = get_logger("portfolio_analysis")
PROMPT_VERSION = 1
# Hard caps on prompt construction to keep token spend bounded regardless
# of pie size. A pie with 200 positions is real — we summarise the tail.
MAX_POSITIONS_INLINED = 25
MAX_NAME_LENGTH = 64
MAX_PROMPT_BYTES = 40_000
# ---------------------------------------------------------------------------
# Input shape
# ---------------------------------------------------------------------------
@dataclass
class Position:
"""One holding as supplied by the browser. Field names match the
/api/portfolio/parse response shape."""
yahoo_ticker: str
name: str
qty: float
avg_cost: float
currency: str | None = None
@dataclass
class AnalysisRequest:
positions: list[Position]
prices: dict[str, dict] # {ticker: {p, c, d:{1d,1m,1y}, ...}}
base_currency: str = "GBP"
anchor: str | None = None
tone: str = "INTERMEDIATE" # NOVICE | INTERMEDIATE | PRO
analysis: str = "SPECULATIVE" # DRY | SPECULATIVE
lang: str = "en"
@dataclass
class AnalysisResult:
content: str
model: str
prompt_tokens: int | None
completion_tokens: int | None
cost_usd: float | None
generated_at: datetime
# ---------------------------------------------------------------------------
# Input validation + sanitisation
# ---------------------------------------------------------------------------
_CONTROL_CHARS = re.compile(r"[\x00-\x08\x0b\x0c\x0e-\x1f\x7f]")
# Prompt-injection markers commonly used to break out of context. Stripped
# *and* their presence flagged — caller can choose to reject.
_INJECTION_TOKENS = (
"ignore previous", "ignore above", "system:", "assistant:",
"you are now", "</system>", "<|im_start|>", "<|im_end|>",
)
def _sanitise_text(value: str, max_len: int) -> str:
"""Strip control chars, collapse whitespace, truncate. Used on
user-supplied name fields before they reach the LLM."""
if not isinstance(value, str):
return ""
cleaned = _CONTROL_CHARS.sub(" ", value).strip()
cleaned = re.sub(r"\s+", " ", cleaned)
return cleaned[:max_len]
def _looks_injected(value: str) -> bool:
lower = value.lower()
return any(token in lower for token in _INJECTION_TOKENS)
def parse_request(payload: dict) -> AnalysisRequest:
"""Validate + sanitise the JSON the browser sent. Raises ValueError on
malformed input. The browser is trusted *minimally* — strings are
sanitised, numbers coerced, oversized inputs truncated."""
raw_positions = payload.get("positions") or []
if not isinstance(raw_positions, list) or not raw_positions:
raise ValueError("positions must be a non-empty list")
positions: list[Position] = []
for p in raw_positions[:200]: # hard cap on input length
if not isinstance(p, dict):
continue
ticker = _sanitise_text(p.get("yahoo_ticker", ""), 32).upper()
if not ticker:
continue
name = _sanitise_text(p.get("name", ""), MAX_NAME_LENGTH)
if _looks_injected(name):
# Drop the name rather than the whole position — preserves
# the ticker (which has structure that constrains injection).
name = ticker
try:
qty = float(p.get("qty") or 0)
avg_cost = float(p.get("avg_cost") or 0)
except (TypeError, ValueError):
continue
# Reject NaN / inf — float() accepts these and they'd poison the
# prompt with garbage if they reached the LLM.
if not (math.isfinite(qty) and math.isfinite(avg_cost)):
continue
if qty <= 0:
continue
currency = _sanitise_text(p.get("currency", "") or "", 8) or None
positions.append(Position(
yahoo_ticker=ticker, name=name, qty=qty,
avg_cost=avg_cost, currency=currency,
))
if not positions:
raise ValueError("no valid positions after sanitisation")
prices = payload.get("prices") or {}
if not isinstance(prices, dict):
prices = {}
base_currency = _sanitise_text(payload.get("base_currency", "GBP"), 8) or "GBP"
anchor = _sanitise_text(payload.get("anchor") or "", 32) or None
tone = _sanitise_text(payload.get("tone", "INTERMEDIATE"), 16) or "INTERMEDIATE"
analysis = _sanitise_text(payload.get("analysis", "SPECULATIVE"), 16) or "SPECULATIVE"
lang = (payload.get("lang") or "en").strip().lower()
if lang not in LANGUAGES:
lang = "en"
return AnalysisRequest(
positions=positions, prices=prices, base_currency=base_currency,
anchor=anchor, tone=tone, analysis=analysis, lang=lang,
)
# ---------------------------------------------------------------------------
# Pre-LLM summarisation: keep prompt size bounded
# ---------------------------------------------------------------------------
def _enrich(req: AnalysisRequest) -> list[dict]:
"""Join positions with their current prices; compute per-position
value, P/L. Returns a list sorted by current value descending."""
out = []
for p in req.positions:
pq = req.prices.get(p.yahoo_ticker) or {}
price = pq.get("p")
currency = p.currency or pq.get("c")
value = (price * p.qty) if isinstance(price, (int, float)) else None
invested = p.avg_cost * p.qty
ppl = (value - invested) if value is not None else None
ppl_pct = ((value / invested - 1) * 100) if (value is not None and invested) else None
out.append({
"ticker": p.yahoo_ticker,
"name": p.name,
"qty": round(p.qty, 6),
"avg_cost": round(p.avg_cost, 4),
"current_price": price,
"currency": currency,
"value": round(value, 2) if value is not None else None,
"invested": round(invested, 2),
"ppl": round(ppl, 2) if ppl is not None else None,
"ppl_pct": round(ppl_pct, 2) if ppl_pct is not None else None,
"change_1d_pct": pq.get("d", {}).get("1d") if isinstance(pq.get("d"), dict) else None,
})
out.sort(key=lambda r: r["value"] if r["value"] is not None else -1, reverse=True)
return out
def _summarise(enriched: list[dict]) -> dict:
"""Aggregate stats for the model — concentration, currency mix,
P/L overall. Saves tokens by not making the LLM compute these."""
total_value = sum((r["value"] or 0) for r in enriched)
total_invested = sum(r["invested"] for r in enriched)
by_ccy: dict[str, float] = {}
for r in enriched:
if r["currency"] and r["value"] is not None:
by_ccy[r["currency"]] = by_ccy.get(r["currency"], 0) + r["value"]
top_n = enriched[:5]
top_share = (sum(r["value"] or 0 for r in top_n) / total_value * 100) if total_value else None
return {
"n_positions": len(enriched),
"total_value": round(total_value, 2),
"total_invested": round(total_invested, 2),
"total_ppl": round(total_value - total_invested, 2) if total_value else None,
"total_ppl_pct": round((total_value / total_invested - 1) * 100, 2)
if (total_value and total_invested) else None,
"top5_share_pct": round(top_share, 1) if top_share is not None else None,
"currency_split_pct": {
k: round(v / total_value * 100, 1)
for k, v in by_ccy.items()
} if total_value else {},
}
# ---------------------------------------------------------------------------
# Prompt construction
# ---------------------------------------------------------------------------
_SYSTEM_OVERRIDES = """\
# Mode: portfolio commentary
You are writing a short read of ONE investor's portfolio. Be specific to
the holdings shown. Frame each observation as analysis ("this allocation
implies X under scenario Y"), not advice ("buy X" / "sell Y" are forbidden).
# Output
- Open with one TL;DR sentence on the portfolio's *posture* (defensive,
cyclical, concentrated, etc.).
- Then 3-5 short paragraphs covering, in order of relevance to this pie:
concentration / single-name risk; sector or geography tilt;
currency exposure if multi-currency; notable winners or laggards;
what would invalidate the current posture.
- ~350 words. No bullet lists. No buy/sell recommendations.
- Do not repeat the input data verbatim — interpret it.
# DO NOT include in this surface (overrides the base prompt)
- No "Rational vs irrational" framing, no "Rational:" / "Irrational:"
section labels, no parallel contrast lists. The base prompt asks
for this framework elsewhere; this surface is plain declarative
commentary on the holdings, not a comparative essay.
- No "System temperature:" closing line. That artefact belongs to the
daily strategic log; here the analysis ends with the last paragraph.
- No "Update mode" headers, no anchor-date callouts, no watch list.
"""
def build_prompt(req: AnalysisRequest) -> tuple[str, str]:
"""Returns (system_message, user_message). Pure function — pie data
flows in, prompt strings flow out, nothing is stored."""
enriched = _enrich(req)
summary = _summarise(enriched)
# Truncate the per-position table to keep the prompt bounded.
head = enriched[:MAX_POSITIONS_INLINED]
tail_count = max(0, len(enriched) - MAX_POSITIONS_INLINED)
# Language directive both prepended (so the model anchors on the
# target language before reading the long English instruction
# block) and appended (defence in depth — a tail nudge alone
# was being ignored by deepseek-v4-flash when most of the
# context is English).
system = (
language_directive_lead(req.lang)
+ build_system_prompt(req.tone, req.analysis)
+ "\n\n"
+ _SYSTEM_OVERRIDES
+ respond_in_clause(req.lang)
)
user_parts = [
f"# Portfolio commentary request — {utcnow().strftime('%Y-%m-%d')}",
f"Base currency: {req.base_currency}",
]
if req.anchor:
user_parts.append(f"Anchor reference date: {req.anchor}")
user_parts.append("\n## Portfolio summary")
user_parts.append("```json\n" + json.dumps(summary, indent=2) + "\n```")
user_parts.append(f"\n## Top {len(head)} positions by value"
+ (f" ({tail_count} smaller positions omitted)" if tail_count else ""))
user_parts.append("```json\n" + json.dumps(head, indent=2, default=str) + "\n```")
user_parts.append(
"\n## Task\nWrite the portfolio read per the system prompt. ~350 words. "
"No preamble, no headers other than the TL;DR opener."
)
user = "\n".join(user_parts)
# Cap on prompt size (token-cost protection).
if len(user) > MAX_PROMPT_BYTES:
user = user[:MAX_PROMPT_BYTES] + "\n[truncated]"
return system, user
# ---------------------------------------------------------------------------
# Orchestration
# ---------------------------------------------------------------------------
async def analyse(
session: AsyncSession,
req: AnalysisRequest,
) -> AnalysisResult:
"""The whole pipeline: prompt → LLM → ledger row → result. The `req`
object is a function-local — when this function returns, the pie is
garbage-collected. No DB writes mention positions."""
s = get_settings()
system, user = build_prompt(req)
review_cost = 0.0
review_reason: str | None = None
async with httpx.AsyncClient() as client:
try:
llm: LogResult = await call_llm(
client,
messages=[
{"role": "system", "content": system},
{"role": "user", "content": user},
],
# 4000 not 2000. Italian / Spanish / French / German
# output runs ~25-35% longer in tokens than English; on
# top of that DeepSeek-V4-flash bills its internal
# reasoning against the same budget. At 2000 we
# repeatedly hit finish_reason=length mid-sentence,
# which the reviewer agent then correctly flags as
# truncated and rejects — the user ends up looking at
# whatever stale row was last cached. 4000 leaves
# ample headroom; we only pay for tokens actually
# emitted, not the cap itself.
max_tokens=4000,
)
status = "ok"
error_msg = None
except Exception as e:
status = "failed"
error_msg = str(e)[:500]
llm = None
log.error("portfolio_analysis.failed", error=error_msg)
# Reviewer gate. This is the highest-risk surface — the model is
# commenting on a real user's holdings, so any drift into
# buy/sell or allocation language is a regulatory hazard. Drop
# the response on a reject and surface a retry-able error to the
# caller; no analysis is ever persisted server-side anyway.
# surface="portfolio" applies a rider that loosens the generic
# "no advice" rule to permit descriptive risk language
# ("concentration is high", "currency exposure is unhedged",
# "the position warrants monitoring") which is the actual
# purpose of this surface, while keeping explicit
# buy/sell/allocation directives forbidden.
if llm is not None:
verdict = await review_read(client, llm.content, surface="portfolio")
review_cost = verdict.cost_usd or 0.0
if not verdict.clean:
status = "leaked"
error_msg = f"reviewer rejected: {verdict.reason}"
review_reason = verdict.reason
log.warning("portfolio_analysis.reviewer_rejected",
reason=verdict.reason, preview=llm.content[:120])
full_cost = ((llm.cost_usd or 0.0) + review_cost) if llm else None
# Ledger row — NO portfolio data, just metadata. Same row whether the
# call succeeded, failed, or was rejected by the reviewer, so
# cost-cap and rate-limit logic can observe the attempt.
session.add(AICall(
called_at=utcnow(),
model=llm.model if llm else active_model(),
prompt_tokens=llm.prompt_tokens if llm else None,
completion_tokens=llm.completion_tokens if llm else None,
cost_usd=full_cost,
status=status,
error=error_msg,
))
await session.commit()
if llm is None:
raise RuntimeError(error_msg or "portfolio analysis failed")
if review_reason is not None:
# Reviewer rejected the candidate. Treat as a generation failure
# at the API layer so the user sees a retry-able error rather
# than potentially non-compliant advice.
raise RuntimeError(
"AI analysis couldn't be generated cleanly — please try again."
)
log.info(
"portfolio_analysis.ok",
n_positions=len(req.positions),
prompt_tokens=llm.prompt_tokens,
completion_tokens=llm.completion_tokens,
cost_usd=full_cost,
)
return AnalysisResult(
content=llm.content,
model=llm.model,
prompt_tokens=llm.prompt_tokens,
completion_tokens=llm.completion_tokens,
cost_usd=full_cost,
generated_at=datetime.now(timezone.utc),
)