- dashboard grid: explicit "header" area as the first row so the
aggregated read panel renders at the top instead of being
auto-placed after the named areas.
- indicators: hide rows flagged stale (older than the group's
freshness threshold). Server still computes stale_symbols;
rendering can be re-enabled by removing the
`{% if not is_stale %}` wrapper in indicators.html.
- /log: add tone-changed to #log-content's hx-trigger and include
it in cassandraSetTone's selector list — toggling Novice /
Intermediate on the Log page was previously a no-op.
- prompts: bump PROMPT_VERSION 7→8. Strengthen the rational-vs-
irrational framing in the strategic-log system prompt from
aspirational to mandatory ("a paragraph without both lenses must
be rewritten"). Require the same lens in the per-group summary,
cross-asset aggregate, and portfolio commentary overrides.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
369 lines
14 KiB
Python
369 lines
14 KiB
Python
"""Ephemeral portfolio analysis — generates AI commentary from a pie that
|
|
exists only in the request's memory.
|
|
|
|
Phase G data-minimisation guarantee: this module **never writes the pie
|
|
to the database, to logs, to Redis, or to disk**. The positions list
|
|
enters as a function argument, is used to construct a prompt, the LLM
|
|
returns text, and the positions are dropped on function return. The
|
|
`ai_calls` ledger row written for the call contains model + token counts
|
|
+ cost — no holdings.
|
|
|
|
Inputs come from the browser's localStorage. The server's role is to:
|
|
1. Validate shape + sanitise free-text fields (prompt-injection defence).
|
|
2. Compute summary stats (concentration, top-N, currency mix) — these
|
|
reduce the LLM payload and let us cap the prompt size.
|
|
3. Call OpenRouter via the existing `call_openrouter` helper.
|
|
4. Write the cost ledger row (no holdings).
|
|
5. Return the commentary text + token / cost metadata.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import math
|
|
import re
|
|
from dataclasses import dataclass
|
|
from datetime import datetime, timezone
|
|
|
|
import httpx
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from app.config import get_settings
|
|
from app.db import utcnow
|
|
from app.logging import get_logger
|
|
from app.models import AICall
|
|
from app.services.openrouter import (
|
|
LogResult,
|
|
active_model,
|
|
build_system_prompt,
|
|
call_llm,
|
|
)
|
|
|
|
|
|
log = get_logger("portfolio_analysis")
|
|
|
|
|
|
PROMPT_VERSION = 1
|
|
# Hard caps on prompt construction to keep token spend bounded regardless
|
|
# of pie size. A pie with 200 positions is real — we summarise the tail.
|
|
MAX_POSITIONS_INLINED = 25
|
|
MAX_NAME_LENGTH = 64
|
|
MAX_PROMPT_BYTES = 40_000
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Input shape
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@dataclass
|
|
class Position:
|
|
"""One holding as supplied by the browser. Field names match the
|
|
/api/portfolio/parse response shape."""
|
|
yahoo_ticker: str
|
|
name: str
|
|
qty: float
|
|
avg_cost: float
|
|
currency: str | None = None
|
|
|
|
|
|
@dataclass
|
|
class AnalysisRequest:
|
|
positions: list[Position]
|
|
prices: dict[str, dict] # {ticker: {p, c, d:{1d,1m,1y}, ...}}
|
|
base_currency: str = "GBP"
|
|
anchor: str | None = None
|
|
tone: str = "INTERMEDIATE" # NOVICE | INTERMEDIATE | PRO
|
|
analysis: str = "SPECULATIVE" # DRY | SPECULATIVE
|
|
|
|
|
|
@dataclass
|
|
class AnalysisResult:
|
|
content: str
|
|
model: str
|
|
prompt_tokens: int | None
|
|
completion_tokens: int | None
|
|
cost_usd: float | None
|
|
generated_at: datetime
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Input validation + sanitisation
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
_CONTROL_CHARS = re.compile(r"[\x00-\x08\x0b\x0c\x0e-\x1f\x7f]")
|
|
# Prompt-injection markers commonly used to break out of context. Stripped
|
|
# *and* their presence flagged — caller can choose to reject.
|
|
_INJECTION_TOKENS = (
|
|
"ignore previous", "ignore above", "system:", "assistant:",
|
|
"you are now", "</system>", "<|im_start|>", "<|im_end|>",
|
|
)
|
|
|
|
|
|
def _sanitise_text(value: str, max_len: int) -> str:
|
|
"""Strip control chars, collapse whitespace, truncate. Used on
|
|
user-supplied name fields before they reach the LLM."""
|
|
if not isinstance(value, str):
|
|
return ""
|
|
cleaned = _CONTROL_CHARS.sub(" ", value).strip()
|
|
cleaned = re.sub(r"\s+", " ", cleaned)
|
|
return cleaned[:max_len]
|
|
|
|
|
|
def _looks_injected(value: str) -> bool:
|
|
lower = value.lower()
|
|
return any(token in lower for token in _INJECTION_TOKENS)
|
|
|
|
|
|
def parse_request(payload: dict) -> AnalysisRequest:
|
|
"""Validate + sanitise the JSON the browser sent. Raises ValueError on
|
|
malformed input. The browser is trusted *minimally* — strings are
|
|
sanitised, numbers coerced, oversized inputs truncated."""
|
|
raw_positions = payload.get("positions") or []
|
|
if not isinstance(raw_positions, list) or not raw_positions:
|
|
raise ValueError("positions must be a non-empty list")
|
|
|
|
positions: list[Position] = []
|
|
for p in raw_positions[:200]: # hard cap on input length
|
|
if not isinstance(p, dict):
|
|
continue
|
|
ticker = _sanitise_text(p.get("yahoo_ticker", ""), 32).upper()
|
|
if not ticker:
|
|
continue
|
|
name = _sanitise_text(p.get("name", ""), MAX_NAME_LENGTH)
|
|
if _looks_injected(name):
|
|
# Drop the name rather than the whole position — preserves
|
|
# the ticker (which has structure that constrains injection).
|
|
name = ticker
|
|
try:
|
|
qty = float(p.get("qty") or 0)
|
|
avg_cost = float(p.get("avg_cost") or 0)
|
|
except (TypeError, ValueError):
|
|
continue
|
|
# Reject NaN / inf — float() accepts these and they'd poison the
|
|
# prompt with garbage if they reached the LLM.
|
|
if not (math.isfinite(qty) and math.isfinite(avg_cost)):
|
|
continue
|
|
if qty <= 0:
|
|
continue
|
|
currency = _sanitise_text(p.get("currency", "") or "", 8) or None
|
|
positions.append(Position(
|
|
yahoo_ticker=ticker, name=name, qty=qty,
|
|
avg_cost=avg_cost, currency=currency,
|
|
))
|
|
|
|
if not positions:
|
|
raise ValueError("no valid positions after sanitisation")
|
|
|
|
prices = payload.get("prices") or {}
|
|
if not isinstance(prices, dict):
|
|
prices = {}
|
|
|
|
base_currency = _sanitise_text(payload.get("base_currency", "GBP"), 8) or "GBP"
|
|
anchor = _sanitise_text(payload.get("anchor") or "", 32) or None
|
|
tone = _sanitise_text(payload.get("tone", "INTERMEDIATE"), 16) or "INTERMEDIATE"
|
|
analysis = _sanitise_text(payload.get("analysis", "SPECULATIVE"), 16) or "SPECULATIVE"
|
|
|
|
return AnalysisRequest(
|
|
positions=positions, prices=prices, base_currency=base_currency,
|
|
anchor=anchor, tone=tone, analysis=analysis,
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Pre-LLM summarisation: keep prompt size bounded
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _enrich(req: AnalysisRequest) -> list[dict]:
|
|
"""Join positions with their current prices; compute per-position
|
|
value, P/L. Returns a list sorted by current value descending."""
|
|
out = []
|
|
for p in req.positions:
|
|
pq = req.prices.get(p.yahoo_ticker) or {}
|
|
price = pq.get("p")
|
|
currency = p.currency or pq.get("c")
|
|
value = (price * p.qty) if isinstance(price, (int, float)) else None
|
|
invested = p.avg_cost * p.qty
|
|
ppl = (value - invested) if value is not None else None
|
|
ppl_pct = ((value / invested - 1) * 100) if (value is not None and invested) else None
|
|
out.append({
|
|
"ticker": p.yahoo_ticker,
|
|
"name": p.name,
|
|
"qty": round(p.qty, 6),
|
|
"avg_cost": round(p.avg_cost, 4),
|
|
"current_price": price,
|
|
"currency": currency,
|
|
"value": round(value, 2) if value is not None else None,
|
|
"invested": round(invested, 2),
|
|
"ppl": round(ppl, 2) if ppl is not None else None,
|
|
"ppl_pct": round(ppl_pct, 2) if ppl_pct is not None else None,
|
|
"change_1d_pct": pq.get("d", {}).get("1d") if isinstance(pq.get("d"), dict) else None,
|
|
})
|
|
out.sort(key=lambda r: r["value"] if r["value"] is not None else -1, reverse=True)
|
|
return out
|
|
|
|
|
|
def _summarise(enriched: list[dict]) -> dict:
|
|
"""Aggregate stats for the model — concentration, currency mix,
|
|
P/L overall. Saves tokens by not making the LLM compute these."""
|
|
total_value = sum((r["value"] or 0) for r in enriched)
|
|
total_invested = sum(r["invested"] for r in enriched)
|
|
by_ccy: dict[str, float] = {}
|
|
for r in enriched:
|
|
if r["currency"] and r["value"] is not None:
|
|
by_ccy[r["currency"]] = by_ccy.get(r["currency"], 0) + r["value"]
|
|
top_n = enriched[:5]
|
|
top_share = (sum(r["value"] or 0 for r in top_n) / total_value * 100) if total_value else None
|
|
return {
|
|
"n_positions": len(enriched),
|
|
"total_value": round(total_value, 2),
|
|
"total_invested": round(total_invested, 2),
|
|
"total_ppl": round(total_value - total_invested, 2) if total_value else None,
|
|
"total_ppl_pct": round((total_value / total_invested - 1) * 100, 2)
|
|
if (total_value and total_invested) else None,
|
|
"top5_share_pct": round(top_share, 1) if top_share is not None else None,
|
|
"currency_split_pct": {
|
|
k: round(v / total_value * 100, 1)
|
|
for k, v in by_ccy.items()
|
|
} if total_value else {},
|
|
}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Prompt construction
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
_SYSTEM_OVERRIDES = """\
|
|
# Mode: portfolio commentary
|
|
You are writing a short read of ONE investor's portfolio. Be specific to
|
|
the holdings shown. Frame each observation as analysis ("this allocation
|
|
implies X under scenario Y"), not advice ("buy X" / "sell Y" are forbidden).
|
|
|
|
# Output
|
|
- Open with one TL;DR sentence on the portfolio's *posture* (defensive,
|
|
cyclical, concentrated, etc.).
|
|
- Then 3-5 short paragraphs covering, in order of relevance to this pie:
|
|
concentration / single-name risk; sector or geography tilt;
|
|
currency exposure if multi-currency; notable winners or laggards;
|
|
what would invalidate the current posture.
|
|
- ~350 words. No bullet lists. No buy/sell recommendations.
|
|
- Do not repeat the input data verbatim — interpret it.
|
|
|
|
# Rational vs irrational lens (mandatory)
|
|
Carry the base prompt's rational-vs-irrational framing through to every
|
|
paragraph of the portfolio read. For each section above, contrast:
|
|
- The RATIONAL read: what the underlying factors (fundamentals,
|
|
macro/policy regime, valuation, currency dynamics) justify for this
|
|
exposure;
|
|
- The IRRATIONAL read: what positioning, narrative momentum, sentiment
|
|
or flows are doing to that same exposure right now.
|
|
Then name the GAP — does the holder's posture line up with the rational
|
|
read, or is it riding the irrational one? A paragraph that names only
|
|
the pie's numbers or only the macro backdrop, without placing the
|
|
holding on this rational-vs-irrational axis, is incomplete.
|
|
"""
|
|
|
|
|
|
def build_prompt(req: AnalysisRequest) -> tuple[str, str]:
|
|
"""Returns (system_message, user_message). Pure function — pie data
|
|
flows in, prompt strings flow out, nothing is stored."""
|
|
enriched = _enrich(req)
|
|
summary = _summarise(enriched)
|
|
|
|
# Truncate the per-position table to keep the prompt bounded.
|
|
head = enriched[:MAX_POSITIONS_INLINED]
|
|
tail_count = max(0, len(enriched) - MAX_POSITIONS_INLINED)
|
|
|
|
system = build_system_prompt(req.tone, req.analysis) + "\n\n" + _SYSTEM_OVERRIDES
|
|
|
|
user_parts = [
|
|
f"# Portfolio commentary request — {utcnow().strftime('%Y-%m-%d')}",
|
|
f"Base currency: {req.base_currency}",
|
|
]
|
|
if req.anchor:
|
|
user_parts.append(f"Anchor reference date: {req.anchor}")
|
|
user_parts.append("\n## Portfolio summary")
|
|
user_parts.append("```json\n" + json.dumps(summary, indent=2) + "\n```")
|
|
user_parts.append(f"\n## Top {len(head)} positions by value"
|
|
+ (f" ({tail_count} smaller positions omitted)" if tail_count else ""))
|
|
user_parts.append("```json\n" + json.dumps(head, indent=2, default=str) + "\n```")
|
|
user_parts.append(
|
|
"\n## Task\nWrite the portfolio read per the system prompt. ~350 words. "
|
|
"No preamble, no headers other than the TL;DR opener."
|
|
)
|
|
user = "\n".join(user_parts)
|
|
|
|
# Cap on prompt size (token-cost protection).
|
|
if len(user) > MAX_PROMPT_BYTES:
|
|
user = user[:MAX_PROMPT_BYTES] + "\n[truncated]"
|
|
|
|
return system, user
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Orchestration
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
async def analyse(
|
|
session: AsyncSession,
|
|
req: AnalysisRequest,
|
|
) -> AnalysisResult:
|
|
"""The whole pipeline: prompt → LLM → ledger row → result. The `req`
|
|
object is a function-local — when this function returns, the pie is
|
|
garbage-collected. No DB writes mention positions."""
|
|
s = get_settings()
|
|
system, user = build_prompt(req)
|
|
|
|
async with httpx.AsyncClient() as client:
|
|
try:
|
|
llm: LogResult = await call_llm(
|
|
client,
|
|
messages=[
|
|
{"role": "system", "content": system},
|
|
{"role": "user", "content": user},
|
|
],
|
|
max_tokens=2000,
|
|
)
|
|
status = "ok"
|
|
error_msg = None
|
|
except Exception as e:
|
|
status = "failed"
|
|
error_msg = str(e)[:500]
|
|
llm = None
|
|
log.error("portfolio_analysis.failed", error=error_msg)
|
|
|
|
# Ledger row — NO portfolio data, just metadata. Same row whether the
|
|
# call succeeded or failed, so cost-cap and rate-limit logic can
|
|
# observe the attempt.
|
|
session.add(AICall(
|
|
called_at=utcnow(),
|
|
model=llm.model if llm else active_model(),
|
|
prompt_tokens=llm.prompt_tokens if llm else None,
|
|
completion_tokens=llm.completion_tokens if llm else None,
|
|
cost_usd=llm.cost_usd if llm else None,
|
|
status=status,
|
|
error=error_msg,
|
|
))
|
|
await session.commit()
|
|
|
|
if llm is None:
|
|
raise RuntimeError(error_msg or "portfolio analysis failed")
|
|
|
|
log.info(
|
|
"portfolio_analysis.ok",
|
|
n_positions=len(req.positions),
|
|
prompt_tokens=llm.prompt_tokens,
|
|
completion_tokens=llm.completion_tokens,
|
|
cost_usd=llm.cost_usd,
|
|
)
|
|
return AnalysisResult(
|
|
content=llm.content,
|
|
model=llm.model,
|
|
prompt_tokens=llm.prompt_tokens,
|
|
completion_tokens=llm.completion_tokens,
|
|
cost_usd=llm.cost_usd,
|
|
generated_at=datetime.now(timezone.utc),
|
|
)
|