The route's resolve-slice loop is T212-specific — it looks tickers up against the InstrumentMap, which only has T212's universe. For the LLM path the ticker is already Yahoo-ready (e.g. VOD.L, ASML.AS), so sending it through resolve_slice produced spurious "could not be resolved" warnings and dropped the positions. Fix: ParsedPie gains a ``tickers_resolved`` flag (default False for T212 backward-compat); _apply_mapping in the LLM path sets it True and also extracts currency from the LLM-mapped currency_col into a new ``ParsedPosition.currency`` field. The route branches on the flag: LLM-path positions are kept verbatim with a best-effort InstrumentMap lookup for nicer name/currency overrides, never dropped. Integration test tightened to assert all 5 IBKR fixture positions round-trip with the right currencies (USD / GBP / EUR). Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
431 lines
16 KiB
Python
431 lines
16 KiB
Python
"""LLM-fallback CSV parser.
|
|
|
|
When the deterministic Trading 212 parser (``csv_import.parse_t212_csv``)
|
|
raises ``CSVImportError`` on an unrecognised format, this service kicks
|
|
in:
|
|
|
|
1. Detect the CSV dialect (delimiter, preamble offset).
|
|
2. Compute a fingerprint of the normalised header row.
|
|
3. Look up ``CsvFormatTemplate`` by fingerprint. On hit, replay the
|
|
cached column-mapping deterministically. On miss, ask the LLM for a
|
|
mapping, validate it, persist a new template, and apply it.
|
|
|
|
The LLM sees only headers + the first 3-5 sample rows. It returns a
|
|
column-mapping JSON, never transcribed numbers. The system never
|
|
auto-promotes a learned format to a hand-written parser — the operator
|
|
does that by inspecting collected ``sample_row`` values.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import csv
|
|
import hashlib
|
|
import io
|
|
import json
|
|
|
|
import httpx
|
|
from sqlalchemy import select
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from app.db import utcnow
|
|
from app.logging import get_logger
|
|
from app.models import CsvFormatTemplate
|
|
from app.services.csv_import import CSVImportError, ParsedPie, ParsedPosition
|
|
from app.services.openrouter import LogResult, call_llm
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Module-level constants
|
|
# ---------------------------------------------------------------------------
|
|
|
|
# Cap for how many leading lines we'll scan looking for the header row.
|
|
# Real broker preambles are typically 1-10 lines.
|
|
_MAX_PREAMBLE_SCAN = 30
|
|
|
|
# Number of sample rows to send to the LLM and max token budget for the reply.
|
|
_LLM_SAMPLES = 5
|
|
_LLM_MAX_TOKENS = 400
|
|
|
|
# Required and optional keys in the LLM-returned column mapping.
|
|
_REQUIRED_MAPPING_KEYS = ("ticker_col", "qty_col")
|
|
_OPTIONAL_MAPPING_KEYS = ("name_col", "cost_col", "currency_col")
|
|
|
|
# Maximum CSV payload size accepted by parse_with_llm.
|
|
_MAX_CSV_BYTES = 1_048_576
|
|
|
|
log = get_logger("llm_csv_parser")
|
|
|
|
|
|
_SYSTEM_PROMPT = """\
|
|
You are an expert at recognising broker portfolio CSV formats.
|
|
|
|
You will be given the header row and 3-5 sample data rows from a CSV.
|
|
Identify which column contains each field. Return ONLY a single JSON
|
|
object, no prose, no markdown fences.
|
|
|
|
Schema (use the EXACT header string from the input; use null if no
|
|
column is a good match):
|
|
|
|
{
|
|
"ticker_col": "<header name or null>",
|
|
"qty_col": "<header name or null>",
|
|
"name_col": "<header name or null>",
|
|
"cost_col": "<header name or null>", // average price per share
|
|
"currency_col": "<header name or null>",
|
|
"broker_label": "<short identifier like 'IBKR Activity Statement' or null>"
|
|
}
|
|
|
|
Rules:
|
|
- ticker_col and qty_col are required. If either is missing, return all nulls.
|
|
- Use the EXACT header string as it appears in the input — do not paraphrase.
|
|
- Output JSON ONLY. No prose, no code fences.
|
|
"""
|
|
|
|
|
|
class LLMParseError(CSVImportError):
|
|
"""Raised when the LLM call fails or returns an unusable mapping.
|
|
|
|
Inherits from ``CSVImportError`` so route-level error handling can
|
|
treat both deterministic and LLM-path failures uniformly when
|
|
desired."""
|
|
|
|
|
|
def _fingerprint(headers: list[str]) -> str:
|
|
"""Stable hash of the header row.
|
|
|
|
Lowercases each header, strips surrounding whitespace, joins with
|
|
``|`` (a character extremely unlikely to appear inside a real
|
|
header), and returns the sha256 hex digest. Whitespace/case drift
|
|
in the same broker's export does not change the fingerprint;
|
|
adding or removing a column does."""
|
|
normalised = "|".join(h.strip().lower() for h in headers)
|
|
return hashlib.sha256(normalised.encode("utf-8")).hexdigest()
|
|
|
|
|
|
def _decode_raw(raw: bytes) -> str:
|
|
"""Best-effort UTF-8 decode with BOM strip and lossy fallback."""
|
|
return raw.decode("utf-8-sig", errors="replace")
|
|
|
|
|
|
def _looks_numeric(value: str) -> bool:
|
|
"""True if ``value`` parses as a number after stripping common
|
|
decoration (thousands separators, currency symbols, percent signs)."""
|
|
s = value.strip().replace(",", "").replace("$", "").replace("€", "")
|
|
s = s.replace("£", "").replace("%", "").lstrip("-+")
|
|
if not s:
|
|
return False
|
|
try:
|
|
float(s)
|
|
return True
|
|
except ValueError:
|
|
return False
|
|
|
|
|
|
def _detect_dialect(raw: bytes) -> tuple[str, int]:
|
|
"""Detect (delimiter, preamble_rows).
|
|
|
|
``preamble_rows`` is the number of lines BEFORE the row we identify
|
|
as the actual table header. The header row is the first line whose
|
|
tokens are all non-numeric (so "Symbol,Quantity" is a header but
|
|
"AAPL,100" is data). Falls back to assuming the first line is the
|
|
header if no clear non-numeric line is found within the scan
|
|
window.
|
|
|
|
Raises ``LLMParseError`` on empty input."""
|
|
if not raw or not raw.strip():
|
|
raise LLMParseError("empty CSV")
|
|
|
|
text = _decode_raw(raw)
|
|
# csv.Sniffer is happy with ~4KB. Anything more and it gets slow.
|
|
sample = text[:4096]
|
|
try:
|
|
dialect = csv.Sniffer().sniff(sample, delimiters=",;\t|")
|
|
delimiter = dialect.delimiter
|
|
except csv.Error:
|
|
# Most broker exports are comma-delimited; default rather than
|
|
# error out — the caller will still validate column shapes.
|
|
delimiter = ","
|
|
|
|
rows = list(csv.reader(io.StringIO(text), delimiter=delimiter))
|
|
# Build a flat list of (index, non_empty_tokens) for rows within scan limit
|
|
parsed = []
|
|
for i, row in enumerate(rows):
|
|
if i >= _MAX_PREAMBLE_SCAN:
|
|
break
|
|
non_empty = [c.strip() for c in row if c.strip()]
|
|
parsed.append((i, non_empty))
|
|
|
|
# Find the first all-alpha candidate row that is followed by a data
|
|
# row (one that contains at least one numeric token). This
|
|
# distinguishes real header rows from preamble metadata rows that
|
|
# also happen to be all-text.
|
|
for idx, (i, non_empty) in enumerate(parsed):
|
|
if len(non_empty) < 2:
|
|
continue
|
|
all_alpha = all(not _looks_numeric(c) for c in non_empty)
|
|
if not all_alpha:
|
|
continue
|
|
# Check whether the next non-empty row looks like data (has a numeric)
|
|
for _, next_non_empty in parsed[idx + 1:]:
|
|
if not next_non_empty:
|
|
continue
|
|
if any(_looks_numeric(c) for c in next_non_empty):
|
|
return delimiter, i
|
|
# Next row is also all-alpha → keep scanning
|
|
break
|
|
return delimiter, 0
|
|
|
|
|
|
def _validate_mapping(
|
|
mapping: dict, headers: list[str], first_row: list[str],
|
|
) -> None:
|
|
"""Verify the LLM-returned mapping is sane.
|
|
|
|
- ``ticker_col`` and ``qty_col`` are required (non-null).
|
|
- Every named column must exist in ``headers``.
|
|
- The value at ``qty_col`` on ``first_row`` must parse as a number.
|
|
- The value at ``cost_col`` on ``first_row`` (if present) must parse
|
|
as a number.
|
|
|
|
Raises ``LLMParseError`` on any failure, with a message that names
|
|
the specific problem (helpful for log forensics and for the
|
|
user-facing 400)."""
|
|
for key in _REQUIRED_MAPPING_KEYS:
|
|
if not mapping.get(key):
|
|
raise LLMParseError(
|
|
f"LLM mapping missing required column: {key.replace('_col', '')}"
|
|
)
|
|
|
|
headers_set = set(headers)
|
|
for key in _REQUIRED_MAPPING_KEYS + _OPTIONAL_MAPPING_KEYS:
|
|
col = mapping.get(key)
|
|
if col is not None and col not in headers_set:
|
|
raise LLMParseError(
|
|
f"LLM mapping references unknown column: {col!r}"
|
|
)
|
|
|
|
# Numeric sanity check: qty and (if present) cost must parse on row 1.
|
|
header_index = {h: i for i, h in enumerate(headers)}
|
|
qty_col = mapping["qty_col"]
|
|
qty_value = first_row[header_index[qty_col]] if header_index[qty_col] < len(first_row) else ""
|
|
if not _looks_numeric(qty_value):
|
|
raise LLMParseError(
|
|
f"LLM mapping qty_col={qty_col!r} maps to non-numeric value {qty_value!r}"
|
|
)
|
|
|
|
cost_col = mapping.get("cost_col")
|
|
if cost_col is not None:
|
|
cost_value = first_row[header_index[cost_col]] if header_index[cost_col] < len(first_row) else ""
|
|
if cost_value and not _looks_numeric(cost_value):
|
|
raise LLMParseError(
|
|
f"LLM mapping cost_col={cost_col!r} maps to non-numeric value {cost_value!r}"
|
|
)
|
|
|
|
|
|
def _parse_number(value: str) -> float | None:
|
|
"""Permissive float parse: strips thousands separators, currency
|
|
symbols, percent signs. Returns None on failure (so callers can
|
|
decide whether to skip or raise)."""
|
|
s = value.strip().replace(",", "").replace("$", "")
|
|
s = s.replace("€", "").replace("£", "").replace("%", "")
|
|
if not s:
|
|
return None
|
|
try:
|
|
return float(s)
|
|
except ValueError:
|
|
return None
|
|
|
|
|
|
def _apply_mapping(
|
|
headers: list[str],
|
|
data_rows: list[list[str]],
|
|
mapping: dict,
|
|
) -> ParsedPie:
|
|
"""Iterate ``data_rows`` and produce a ``ParsedPie``.
|
|
|
|
Rows that lack a parseable quantity (blank, non-numeric, zero) are
|
|
silently skipped — broker exports often include summary or
|
|
placeholder rows after the position list. ``name_col`` falls back
|
|
to the ticker symbol when null."""
|
|
idx = {h: i for i, h in enumerate(headers)}
|
|
ticker_col = mapping["ticker_col"]
|
|
qty_col = mapping["qty_col"]
|
|
name_col = mapping.get("name_col")
|
|
cost_col = mapping.get("cost_col")
|
|
currency_col = mapping.get("currency_col")
|
|
|
|
positions: list[ParsedPosition] = []
|
|
invested_total = 0.0
|
|
invested_seen = False
|
|
|
|
for row in data_rows:
|
|
if not any(c.strip() for c in row):
|
|
continue
|
|
ticker_raw = row[idx[ticker_col]] if idx[ticker_col] < len(row) else ""
|
|
ticker = ticker_raw.strip().upper()
|
|
if not ticker:
|
|
continue
|
|
qty_raw = row[idx[qty_col]] if idx[qty_col] < len(row) else ""
|
|
qty = _parse_number(qty_raw)
|
|
if qty is None or qty <= 0:
|
|
continue
|
|
avg_cost: float | None = None
|
|
if cost_col is not None and idx[cost_col] < len(row):
|
|
avg_cost = _parse_number(row[idx[cost_col]])
|
|
invested_value: float | None = None
|
|
if avg_cost is not None:
|
|
invested_value = qty * avg_cost
|
|
invested_total += invested_value
|
|
invested_seen = True
|
|
name = ""
|
|
if name_col is not None and idx[name_col] < len(row):
|
|
name = row[idx[name_col]].strip()
|
|
if not name:
|
|
name = ticker
|
|
currency: str | None = None
|
|
if currency_col is not None and idx[currency_col] < len(row):
|
|
currency = row[idx[currency_col]].strip() or None
|
|
positions.append(ParsedPosition(
|
|
slice=ticker,
|
|
name=name,
|
|
invested_value=invested_value,
|
|
current_value=None,
|
|
result=None,
|
|
quantity=qty,
|
|
currency=currency,
|
|
))
|
|
|
|
return ParsedPie(
|
|
name=None,
|
|
positions=tuple(positions),
|
|
invested=(invested_total if invested_seen else None),
|
|
value=None,
|
|
result=None,
|
|
tickers_resolved=True,
|
|
)
|
|
|
|
|
|
def _build_user_prompt(headers: list[str], samples: list[list[str]]) -> str:
|
|
lines = ["headers: " + json.dumps(headers)]
|
|
lines.append("samples:")
|
|
for s in samples[:_LLM_SAMPLES]:
|
|
lines.append(" " + ",".join(s))
|
|
return "\n".join(lines)
|
|
|
|
|
|
async def _extract_mapping_via_llm(
|
|
client: httpx.AsyncClient,
|
|
headers: list[str],
|
|
samples: list[list[str]],
|
|
) -> tuple[dict, LogResult]:
|
|
"""Single LLM call returning ``(mapping_dict, LogResult)``.
|
|
|
|
The LLM is asked for a strict JSON object (no markdown). We attempt
|
|
to parse the returned content; ``LLMParseError`` wraps any failure
|
|
in a way callers can surface to the user."""
|
|
messages = [
|
|
{"role": "system", "content": _SYSTEM_PROMPT},
|
|
{"role": "user", "content": _build_user_prompt(headers, samples)},
|
|
]
|
|
try:
|
|
result = await call_llm(client, messages, max_tokens=_LLM_MAX_TOKENS)
|
|
except Exception as e:
|
|
raise LLMParseError(f"LLM provider failed: {e}") from e
|
|
|
|
content = (result.content or "").strip()
|
|
# Strip code fences if the model added them despite instructions.
|
|
if content.startswith("```"):
|
|
content = content.strip("`")
|
|
# Drop optional 'json' language tag.
|
|
if content.lstrip().lower().startswith("json"):
|
|
content = content.lstrip()[4:]
|
|
content = content.strip()
|
|
try:
|
|
mapping = json.loads(content)
|
|
except json.JSONDecodeError as e:
|
|
raise LLMParseError(f"LLM did not return valid JSON: {e}") from e
|
|
if not isinstance(mapping, dict):
|
|
raise LLMParseError("LLM JSON was not an object")
|
|
return mapping, result
|
|
|
|
|
|
async def parse_with_llm(raw: bytes, session: AsyncSession) -> ParsedPie:
|
|
"""Cache-first LLM-fallback CSV parse.
|
|
|
|
On cache hit, applies the stored mapping deterministically and
|
|
increments ``use_count``. On cache miss, calls the LLM, validates
|
|
the returned mapping against the first data row, and persists a
|
|
new ``CsvFormatTemplate``. Raises ``LLMParseError`` on any
|
|
failure; the caller (route layer) maps that to a 400."""
|
|
if len(raw) > _MAX_CSV_BYTES:
|
|
raise LLMParseError("CSV too large (1 MB max)")
|
|
if not raw or not raw.strip():
|
|
raise LLMParseError("empty CSV")
|
|
|
|
delimiter, preamble_rows = _detect_dialect(raw)
|
|
text = _decode_raw(raw)
|
|
|
|
reader = csv.reader(io.StringIO(text), delimiter=delimiter)
|
|
rows = list(reader)
|
|
if preamble_rows >= len(rows):
|
|
raise LLMParseError("no header row found in CSV")
|
|
headers = [c.strip() for c in rows[preamble_rows]]
|
|
data_rows = rows[preamble_rows + 1:]
|
|
if not headers:
|
|
raise LLMParseError("empty header row")
|
|
|
|
first_data_row = next(
|
|
(r for r in data_rows if any(c.strip() for c in r)), None,
|
|
)
|
|
if first_data_row is None:
|
|
raise LLMParseError("CSV contains a header but no data rows")
|
|
|
|
fp = _fingerprint(headers)
|
|
existing = (await session.execute(
|
|
select(CsvFormatTemplate).where(CsvFormatTemplate.fingerprint == fp)
|
|
)).scalar_one_or_none()
|
|
|
|
if existing is not None:
|
|
log.info("csv.format.cache_hit", fingerprint=fp,
|
|
broker_label=existing.broker_label, use_count=existing.use_count)
|
|
pie = _apply_mapping(headers, data_rows, existing.mapping)
|
|
if not pie.positions:
|
|
raise LLMParseError(
|
|
"cached mapping produced no positions — the broker may have "
|
|
"changed their CSV shape; ask the operator to evict the "
|
|
"stale template"
|
|
)
|
|
existing.use_count += 1
|
|
existing.last_used_at = utcnow()
|
|
await session.commit()
|
|
return pie
|
|
|
|
log.info("csv.format.cache_miss", fingerprint=fp,
|
|
header_count=len(headers))
|
|
samples = [r for r in data_rows[:_LLM_SAMPLES] if any(c.strip() for c in r)]
|
|
async with httpx.AsyncClient(follow_redirects=True, timeout=30) as client:
|
|
mapping, llm_log = await _extract_mapping_via_llm(client, headers, samples)
|
|
_validate_mapping(mapping, headers, first_data_row)
|
|
|
|
pie = _apply_mapping(headers, data_rows, mapping)
|
|
if not pie.positions:
|
|
raise LLMParseError(
|
|
"LLM mapping validated but produced no positions — the file "
|
|
"may not contain portfolio data"
|
|
)
|
|
|
|
now = utcnow()
|
|
session.add(CsvFormatTemplate(
|
|
fingerprint=fp,
|
|
headers=headers,
|
|
sample_row=first_data_row,
|
|
mapping=mapping,
|
|
preamble_rows=preamble_rows,
|
|
delimiter=delimiter,
|
|
broker_label=mapping.get("broker_label"),
|
|
first_seen_at=now,
|
|
last_used_at=now,
|
|
use_count=1,
|
|
llm_model=llm_log.model,
|
|
llm_cost_usd=llm_log.cost_usd,
|
|
))
|
|
await session.commit()
|
|
return pie
|