read.markets/app/services/portfolio_analysis.py
Giorgio Gilestro 8e7ea673ce analyze: bump max_tokens 2000 → 4000 for portfolio analysis
Logs (analyze.lang_resolved → portfolio_analysis.reviewer_rejected
chain on 2026-05-29) showed the lang directive was working — the
model was producing Italian — but the reviewer was rejecting every
response as truncated mid-word ("supera i mass", "INRG +8"). The
analyze endpoint then returns 502 and the frontend keeps showing
whatever stale English row was last cached in localStorage, so from
the user's POV the analysis "is still in English".

Same shape as the strategic-log translation cap we fixed earlier:
the prompt targets ~350 English words, IT runs ~25-35% longer in
tokens, and DeepSeek-V4-flash bills internal reasoning against the
same budget. At 2000 we ran out of room mid-sentence. 4000 is well
above the longest realistic Italian output; cost is bounded by
tokens actually emitted, not the cap.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-29 16:04:40 +02:00

421 lines
16 KiB
Python

"""Ephemeral portfolio analysis — generates AI commentary from a pie that
exists only in the request's memory.
Phase G data-minimisation guarantee: this module **never writes the pie
to the database, to logs, to Redis, or to disk**. The positions list
enters as a function argument, is used to construct a prompt, the LLM
returns text, and the positions are dropped on function return. The
`ai_calls` ledger row written for the call contains model + token counts
+ cost — no holdings.
Inputs come from the browser's localStorage. The server's role is to:
1. Validate shape + sanitise free-text fields (prompt-injection defence).
2. Compute summary stats (concentration, top-N, currency mix) — these
reduce the LLM payload and let us cap the prompt size.
3. Call OpenRouter via the existing `call_openrouter` helper.
4. Write the cost ledger row (no holdings).
5. Return the commentary text + token / cost metadata.
"""
from __future__ import annotations
import json
import math
import re
from dataclasses import dataclass
from datetime import datetime, timezone
import httpx
from sqlalchemy.ext.asyncio import AsyncSession
from app.config import get_settings
from app.db import utcnow
from app.logging import get_logger
from app.models import AICall
from app.services.i18n import LANGUAGES, language_directive_lead, respond_in_clause
from app.services.llm_prompts import build_system_prompt
from app.services.output_review import review_read
from app.services.openrouter import (
LogResult,
active_model,
call_llm,
)
log = get_logger("portfolio_analysis")
PROMPT_VERSION = 1
# Hard caps on prompt construction to keep token spend bounded regardless
# of pie size. A pie with 200 positions is real — we summarise the tail.
MAX_POSITIONS_INLINED = 25
MAX_NAME_LENGTH = 64
MAX_PROMPT_BYTES = 40_000
# ---------------------------------------------------------------------------
# Input shape
# ---------------------------------------------------------------------------
@dataclass
class Position:
"""One holding as supplied by the browser. Field names match the
/api/portfolio/parse response shape."""
yahoo_ticker: str
name: str
qty: float
avg_cost: float
currency: str | None = None
@dataclass
class AnalysisRequest:
positions: list[Position]
prices: dict[str, dict] # {ticker: {p, c, d:{1d,1m,1y}, ...}}
base_currency: str = "GBP"
anchor: str | None = None
tone: str = "INTERMEDIATE" # NOVICE | INTERMEDIATE | PRO
analysis: str = "SPECULATIVE" # DRY | SPECULATIVE
lang: str = "en"
@dataclass
class AnalysisResult:
content: str
model: str
prompt_tokens: int | None
completion_tokens: int | None
cost_usd: float | None
generated_at: datetime
# ---------------------------------------------------------------------------
# Input validation + sanitisation
# ---------------------------------------------------------------------------
_CONTROL_CHARS = re.compile(r"[\x00-\x08\x0b\x0c\x0e-\x1f\x7f]")
# Prompt-injection markers commonly used to break out of context. Stripped
# *and* their presence flagged — caller can choose to reject.
_INJECTION_TOKENS = (
"ignore previous", "ignore above", "system:", "assistant:",
"you are now", "</system>", "<|im_start|>", "<|im_end|>",
)
def _sanitise_text(value: str, max_len: int) -> str:
"""Strip control chars, collapse whitespace, truncate. Used on
user-supplied name fields before they reach the LLM."""
if not isinstance(value, str):
return ""
cleaned = _CONTROL_CHARS.sub(" ", value).strip()
cleaned = re.sub(r"\s+", " ", cleaned)
return cleaned[:max_len]
def _looks_injected(value: str) -> bool:
lower = value.lower()
return any(token in lower for token in _INJECTION_TOKENS)
def parse_request(payload: dict) -> AnalysisRequest:
"""Validate + sanitise the JSON the browser sent. Raises ValueError on
malformed input. The browser is trusted *minimally* — strings are
sanitised, numbers coerced, oversized inputs truncated."""
raw_positions = payload.get("positions") or []
if not isinstance(raw_positions, list) or not raw_positions:
raise ValueError("positions must be a non-empty list")
positions: list[Position] = []
for p in raw_positions[:200]: # hard cap on input length
if not isinstance(p, dict):
continue
ticker = _sanitise_text(p.get("yahoo_ticker", ""), 32).upper()
if not ticker:
continue
name = _sanitise_text(p.get("name", ""), MAX_NAME_LENGTH)
if _looks_injected(name):
# Drop the name rather than the whole position — preserves
# the ticker (which has structure that constrains injection).
name = ticker
try:
qty = float(p.get("qty") or 0)
avg_cost = float(p.get("avg_cost") or 0)
except (TypeError, ValueError):
continue
# Reject NaN / inf — float() accepts these and they'd poison the
# prompt with garbage if they reached the LLM.
if not (math.isfinite(qty) and math.isfinite(avg_cost)):
continue
if qty <= 0:
continue
currency = _sanitise_text(p.get("currency", "") or "", 8) or None
positions.append(Position(
yahoo_ticker=ticker, name=name, qty=qty,
avg_cost=avg_cost, currency=currency,
))
if not positions:
raise ValueError("no valid positions after sanitisation")
prices = payload.get("prices") or {}
if not isinstance(prices, dict):
prices = {}
base_currency = _sanitise_text(payload.get("base_currency", "GBP"), 8) or "GBP"
anchor = _sanitise_text(payload.get("anchor") or "", 32) or None
tone = _sanitise_text(payload.get("tone", "INTERMEDIATE"), 16) or "INTERMEDIATE"
analysis = _sanitise_text(payload.get("analysis", "SPECULATIVE"), 16) or "SPECULATIVE"
lang = (payload.get("lang") or "en").strip().lower()
if lang not in LANGUAGES:
lang = "en"
return AnalysisRequest(
positions=positions, prices=prices, base_currency=base_currency,
anchor=anchor, tone=tone, analysis=analysis, lang=lang,
)
# ---------------------------------------------------------------------------
# Pre-LLM summarisation: keep prompt size bounded
# ---------------------------------------------------------------------------
def _enrich(req: AnalysisRequest) -> list[dict]:
"""Join positions with their current prices; compute per-position
value, P/L. Returns a list sorted by current value descending."""
out = []
for p in req.positions:
pq = req.prices.get(p.yahoo_ticker) or {}
price = pq.get("p")
currency = p.currency or pq.get("c")
value = (price * p.qty) if isinstance(price, (int, float)) else None
invested = p.avg_cost * p.qty
ppl = (value - invested) if value is not None else None
ppl_pct = ((value / invested - 1) * 100) if (value is not None and invested) else None
out.append({
"ticker": p.yahoo_ticker,
"name": p.name,
"qty": round(p.qty, 6),
"avg_cost": round(p.avg_cost, 4),
"current_price": price,
"currency": currency,
"value": round(value, 2) if value is not None else None,
"invested": round(invested, 2),
"ppl": round(ppl, 2) if ppl is not None else None,
"ppl_pct": round(ppl_pct, 2) if ppl_pct is not None else None,
"change_1d_pct": pq.get("d", {}).get("1d") if isinstance(pq.get("d"), dict) else None,
})
out.sort(key=lambda r: r["value"] if r["value"] is not None else -1, reverse=True)
return out
def _summarise(enriched: list[dict]) -> dict:
"""Aggregate stats for the model — concentration, currency mix,
P/L overall. Saves tokens by not making the LLM compute these."""
total_value = sum((r["value"] or 0) for r in enriched)
total_invested = sum(r["invested"] for r in enriched)
by_ccy: dict[str, float] = {}
for r in enriched:
if r["currency"] and r["value"] is not None:
by_ccy[r["currency"]] = by_ccy.get(r["currency"], 0) + r["value"]
top_n = enriched[:5]
top_share = (sum(r["value"] or 0 for r in top_n) / total_value * 100) if total_value else None
return {
"n_positions": len(enriched),
"total_value": round(total_value, 2),
"total_invested": round(total_invested, 2),
"total_ppl": round(total_value - total_invested, 2) if total_value else None,
"total_ppl_pct": round((total_value / total_invested - 1) * 100, 2)
if (total_value and total_invested) else None,
"top5_share_pct": round(top_share, 1) if top_share is not None else None,
"currency_split_pct": {
k: round(v / total_value * 100, 1)
for k, v in by_ccy.items()
} if total_value else {},
}
# ---------------------------------------------------------------------------
# Prompt construction
# ---------------------------------------------------------------------------
_SYSTEM_OVERRIDES = """\
# Mode: portfolio commentary
You are writing a short read of ONE investor's portfolio. Be specific to
the holdings shown. Frame each observation as analysis ("this allocation
implies X under scenario Y"), not advice ("buy X" / "sell Y" are forbidden).
# Output
- Open with one TL;DR sentence on the portfolio's *posture* (defensive,
cyclical, concentrated, etc.).
- Then 3-5 short paragraphs covering, in order of relevance to this pie:
concentration / single-name risk; sector or geography tilt;
currency exposure if multi-currency; notable winners or laggards;
what would invalidate the current posture.
- ~350 words. No bullet lists. No buy/sell recommendations.
- Do not repeat the input data verbatim — interpret it.
# Rational vs irrational lens (mandatory)
Carry the base prompt's rational-vs-irrational framing through to every
paragraph of the portfolio read. For each section above, contrast:
- The RATIONAL read: what the underlying factors (fundamentals,
macro/policy regime, valuation, currency dynamics) justify for this
exposure;
- The IRRATIONAL read: what positioning, narrative momentum, sentiment
or flows are doing to that same exposure right now.
Then name the GAP — does the holder's posture line up with the rational
read, or is it riding the irrational one? A paragraph that names only
the pie's numbers or only the macro backdrop, without placing the
holding on this rational-vs-irrational axis, is incomplete.
"""
def build_prompt(req: AnalysisRequest) -> tuple[str, str]:
"""Returns (system_message, user_message). Pure function — pie data
flows in, prompt strings flow out, nothing is stored."""
enriched = _enrich(req)
summary = _summarise(enriched)
# Truncate the per-position table to keep the prompt bounded.
head = enriched[:MAX_POSITIONS_INLINED]
tail_count = max(0, len(enriched) - MAX_POSITIONS_INLINED)
# Language directive both prepended (so the model anchors on the
# target language before reading the long English instruction
# block) and appended (defence in depth — a tail nudge alone
# was being ignored by deepseek-v4-flash when most of the
# context is English).
system = (
language_directive_lead(req.lang)
+ build_system_prompt(req.tone, req.analysis)
+ "\n\n"
+ _SYSTEM_OVERRIDES
+ respond_in_clause(req.lang)
)
user_parts = [
f"# Portfolio commentary request — {utcnow().strftime('%Y-%m-%d')}",
f"Base currency: {req.base_currency}",
]
if req.anchor:
user_parts.append(f"Anchor reference date: {req.anchor}")
user_parts.append("\n## Portfolio summary")
user_parts.append("```json\n" + json.dumps(summary, indent=2) + "\n```")
user_parts.append(f"\n## Top {len(head)} positions by value"
+ (f" ({tail_count} smaller positions omitted)" if tail_count else ""))
user_parts.append("```json\n" + json.dumps(head, indent=2, default=str) + "\n```")
user_parts.append(
"\n## Task\nWrite the portfolio read per the system prompt. ~350 words. "
"No preamble, no headers other than the TL;DR opener."
)
user = "\n".join(user_parts)
# Cap on prompt size (token-cost protection).
if len(user) > MAX_PROMPT_BYTES:
user = user[:MAX_PROMPT_BYTES] + "\n[truncated]"
return system, user
# ---------------------------------------------------------------------------
# Orchestration
# ---------------------------------------------------------------------------
async def analyse(
session: AsyncSession,
req: AnalysisRequest,
) -> AnalysisResult:
"""The whole pipeline: prompt → LLM → ledger row → result. The `req`
object is a function-local — when this function returns, the pie is
garbage-collected. No DB writes mention positions."""
s = get_settings()
system, user = build_prompt(req)
review_cost = 0.0
review_reason: str | None = None
async with httpx.AsyncClient() as client:
try:
llm: LogResult = await call_llm(
client,
messages=[
{"role": "system", "content": system},
{"role": "user", "content": user},
],
# 4000 not 2000. Italian / Spanish / French / German
# output runs ~25-35% longer in tokens than English; on
# top of that DeepSeek-V4-flash bills its internal
# reasoning against the same budget. At 2000 we
# repeatedly hit finish_reason=length mid-sentence,
# which the reviewer agent then correctly flags as
# truncated and rejects — the user ends up looking at
# whatever stale row was last cached. 4000 leaves
# ample headroom; we only pay for tokens actually
# emitted, not the cap itself.
max_tokens=4000,
)
status = "ok"
error_msg = None
except Exception as e:
status = "failed"
error_msg = str(e)[:500]
llm = None
log.error("portfolio_analysis.failed", error=error_msg)
# Reviewer gate. This is the highest-risk surface — the model is
# commenting on a real user's holdings, so any drift into
# buy/sell or allocation language is a regulatory hazard. Drop
# the response on a reject and surface a retry-able error to the
# caller; no analysis is ever persisted server-side anyway.
if llm is not None:
verdict = await review_read(client, llm.content)
review_cost = verdict.cost_usd or 0.0
if not verdict.clean:
status = "leaked"
error_msg = f"reviewer rejected: {verdict.reason}"
review_reason = verdict.reason
log.warning("portfolio_analysis.reviewer_rejected",
reason=verdict.reason, preview=llm.content[:120])
full_cost = ((llm.cost_usd or 0.0) + review_cost) if llm else None
# Ledger row — NO portfolio data, just metadata. Same row whether the
# call succeeded, failed, or was rejected by the reviewer, so
# cost-cap and rate-limit logic can observe the attempt.
session.add(AICall(
called_at=utcnow(),
model=llm.model if llm else active_model(),
prompt_tokens=llm.prompt_tokens if llm else None,
completion_tokens=llm.completion_tokens if llm else None,
cost_usd=full_cost,
status=status,
error=error_msg,
))
await session.commit()
if llm is None:
raise RuntimeError(error_msg or "portfolio analysis failed")
if review_reason is not None:
# Reviewer rejected the candidate. Treat as a generation failure
# at the API layer so the user sees a retry-able error rather
# than potentially non-compliant advice.
raise RuntimeError(
"AI analysis couldn't be generated cleanly — please try again."
)
log.info(
"portfolio_analysis.ok",
n_positions=len(req.positions),
prompt_tokens=llm.prompt_tokens,
completion_tokens=llm.completion_tokens,
cost_usd=full_cost,
)
return AnalysisResult(
content=llm.content,
model=llm.model,
prompt_tokens=llm.prompt_tokens,
completion_tokens=llm.completion_tokens,
cost_usd=full_cost,
generated_at=datetime.now(timezone.utc),
)