read.markets/app/services/llm_csv_parser.py
Giorgio Gilestro b99f46d2fc csv-parser: add _apply_mapping helper
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-27 12:18:31 +02:00

253 lines
9.2 KiB
Python

"""LLM-fallback CSV parser.
When the deterministic Trading 212 parser (``csv_import.parse_t212_csv``)
raises ``CSVImportError`` on an unrecognised format, this service kicks
in:
1. Detect the CSV dialect (delimiter, preamble offset).
2. Compute a fingerprint of the normalised header row.
3. Look up ``CsvFormatTemplate`` by fingerprint. On hit, replay the
cached column-mapping deterministically. On miss, ask the LLM for a
mapping, validate it, persist a new template, and apply it.
The LLM sees only headers + the first 3-5 sample rows. It returns a
column-mapping JSON, never transcribed numbers. The system never
auto-promotes a learned format to a hand-written parser — the operator
does that by inspecting collected ``sample_row`` values.
"""
from __future__ import annotations
import csv
import hashlib
import io
from app.services.csv_import import CSVImportError, ParsedPie, ParsedPosition
# ---------------------------------------------------------------------------
# Module-level constants
# ---------------------------------------------------------------------------
# Cap for how many leading lines we'll scan looking for the header row.
# Real broker preambles are typically 1-10 lines.
_MAX_PREAMBLE_SCAN = 30
# Required and optional keys in the LLM-returned column mapping.
_REQUIRED_MAPPING_KEYS = ("ticker_col", "qty_col")
_OPTIONAL_MAPPING_KEYS = ("name_col", "cost_col", "currency_col")
class LLMParseError(CSVImportError):
"""Raised when the LLM call fails or returns an unusable mapping.
Inherits from ``CSVImportError`` so route-level error handling can
treat both deterministic and LLM-path failures uniformly when
desired."""
def _fingerprint(headers: list[str]) -> str:
"""Stable hash of the header row.
Lowercases each header, strips surrounding whitespace, joins with
``|`` (a character extremely unlikely to appear inside a real
header), and returns the sha256 hex digest. Whitespace/case drift
in the same broker's export does not change the fingerprint;
adding or removing a column does."""
normalised = "|".join(h.strip().lower() for h in headers)
return hashlib.sha256(normalised.encode("utf-8")).hexdigest()
def _decode_raw(raw: bytes) -> str:
"""Best-effort UTF-8 decode with BOM strip and lossy fallback."""
return raw.decode("utf-8-sig", errors="replace")
def _looks_numeric(value: str) -> bool:
"""True if ``value`` parses as a number after stripping common
decoration (thousands separators, currency symbols, percent signs)."""
s = value.strip().replace(",", "").replace("$", "").replace("", "")
s = s.replace("£", "").replace("%", "").lstrip("-+")
if not s:
return False
try:
float(s)
return True
except ValueError:
return False
def _detect_dialect(raw: bytes) -> tuple[str, int]:
"""Detect (delimiter, preamble_rows).
``preamble_rows`` is the number of lines BEFORE the row we identify
as the actual table header. The header row is the first line whose
tokens are all non-numeric (so "Symbol,Quantity" is a header but
"AAPL,100" is data). Falls back to assuming the first line is the
header if no clear non-numeric line is found within the scan
window.
Raises ``LLMParseError`` on empty input."""
if not raw or not raw.strip():
raise LLMParseError("empty CSV")
text = _decode_raw(raw)
# csv.Sniffer is happy with ~4KB. Anything more and it gets slow.
sample = text[:4096]
try:
dialect = csv.Sniffer().sniff(sample, delimiters=",;\t|")
delimiter = dialect.delimiter
except csv.Error:
# Most broker exports are comma-delimited; default rather than
# error out — the caller will still validate column shapes.
delimiter = ","
rows = list(csv.reader(io.StringIO(text), delimiter=delimiter))
# Build a flat list of (index, non_empty_tokens) for rows within scan limit
parsed = []
for i, row in enumerate(rows):
if i >= _MAX_PREAMBLE_SCAN:
break
non_empty = [c.strip() for c in row if c.strip()]
parsed.append((i, non_empty))
# Find the first all-alpha candidate row that is followed by a data
# row (one that contains at least one numeric token). This
# distinguishes real header rows from preamble metadata rows that
# also happen to be all-text.
for idx, (i, non_empty) in enumerate(parsed):
if len(non_empty) < 2:
continue
all_alpha = all(not _looks_numeric(c) for c in non_empty)
if not all_alpha:
continue
# Check whether the next non-empty row looks like data (has a numeric)
for _, next_non_empty in parsed[idx + 1:]:
if not next_non_empty:
continue
if any(_looks_numeric(c) for c in next_non_empty):
return delimiter, i
# Next row is also all-alpha → keep scanning
break
return delimiter, 0
def _validate_mapping(
mapping: dict, headers: list[str], first_row: list[str],
) -> None:
"""Verify the LLM-returned mapping is sane.
- ``ticker_col`` and ``qty_col`` are required (non-null).
- Every named column must exist in ``headers``.
- The value at ``qty_col`` on ``first_row`` must parse as a number.
- The value at ``cost_col`` on ``first_row`` (if present) must parse
as a number.
Raises ``LLMParseError`` on any failure, with a message that names
the specific problem (helpful for log forensics and for the
user-facing 400)."""
for key in _REQUIRED_MAPPING_KEYS:
if not mapping.get(key):
raise LLMParseError(
f"LLM mapping missing required column: {key.replace('_col', '')}"
)
headers_set = set(headers)
for key in _REQUIRED_MAPPING_KEYS + _OPTIONAL_MAPPING_KEYS:
col = mapping.get(key)
if col is not None and col not in headers_set:
raise LLMParseError(
f"LLM mapping references unknown column: {col!r}"
)
# Numeric sanity check: qty and (if present) cost must parse on row 1.
header_index = {h: i for i, h in enumerate(headers)}
qty_col = mapping["qty_col"]
qty_value = first_row[header_index[qty_col]] if header_index[qty_col] < len(first_row) else ""
if not _looks_numeric(qty_value):
raise LLMParseError(
f"LLM mapping qty_col={qty_col!r} maps to non-numeric value {qty_value!r}"
)
cost_col = mapping.get("cost_col")
if cost_col is not None:
cost_value = first_row[header_index[cost_col]] if header_index[cost_col] < len(first_row) else ""
if cost_value and not _looks_numeric(cost_value):
raise LLMParseError(
f"LLM mapping cost_col={cost_col!r} maps to non-numeric value {cost_value!r}"
)
def _parse_number(value: str) -> float | None:
"""Permissive float parse: strips thousands separators, currency
symbols, percent signs. Returns None on failure (so callers can
decide whether to skip or raise)."""
s = value.strip().replace(",", "").replace("$", "")
s = s.replace("", "").replace("£", "").replace("%", "")
if not s:
return None
try:
return float(s)
except ValueError:
return None
def _apply_mapping(
headers: list[str],
data_rows: list[list[str]],
mapping: dict,
) -> ParsedPie:
"""Iterate ``data_rows`` and produce a ``ParsedPie``.
Rows that lack a parseable quantity (blank, non-numeric, zero) are
silently skipped — broker exports often include summary or
placeholder rows after the position list. ``name_col`` falls back
to the ticker symbol when null."""
idx = {h: i for i, h in enumerate(headers)}
ticker_col = mapping["ticker_col"]
qty_col = mapping["qty_col"]
name_col = mapping.get("name_col")
cost_col = mapping.get("cost_col")
positions: list[ParsedPosition] = []
invested_total = 0.0
invested_seen = False
for row in data_rows:
if not any(c.strip() for c in row):
continue
ticker_raw = row[idx[ticker_col]] if idx[ticker_col] < len(row) else ""
ticker = ticker_raw.strip().upper()
if not ticker:
continue
qty_raw = row[idx[qty_col]] if idx[qty_col] < len(row) else ""
qty = _parse_number(qty_raw)
if qty is None or qty <= 0:
continue
avg_cost: float | None = None
if cost_col is not None and idx[cost_col] < len(row):
avg_cost = _parse_number(row[idx[cost_col]])
invested_value: float | None = None
if avg_cost is not None:
invested_value = qty * avg_cost
invested_total += invested_value
invested_seen = True
name = ""
if name_col is not None and idx[name_col] < len(row):
name = row[idx[name_col]].strip()
if not name:
name = ticker
positions.append(ParsedPosition(
slice=ticker,
name=name,
invested_value=invested_value,
current_value=None,
result=None,
quantity=qty,
))
return ParsedPie(
name=None,
positions=tuple(positions),
invested=(invested_total if invested_seen else None),
value=None,
result=None,
)