"""LLM-fallback CSV parser. When the deterministic Trading 212 parser (``csv_import.parse_t212_csv``) raises ``CSVImportError`` on an unrecognised format, this service kicks in: 1. Detect the CSV dialect (delimiter, preamble offset). 2. Compute a fingerprint of the normalised header row. 3. Look up ``CsvFormatTemplate`` by fingerprint. On hit, replay the cached column-mapping deterministically. On miss, ask the LLM for a mapping, validate it, persist a new template, and apply it. The LLM sees only headers + the first 3-5 sample rows. It returns a column-mapping JSON, never transcribed numbers. The system never auto-promotes a learned format to a hand-written parser — the operator does that by inspecting collected ``sample_row`` values. """ from __future__ import annotations import csv import hashlib import io from app.services.csv_import import CSVImportError # --------------------------------------------------------------------------- # Module-level constants # --------------------------------------------------------------------------- # Cap for how many leading lines we'll scan looking for the header row. # Real broker preambles are typically 1-10 lines. _MAX_PREAMBLE_SCAN = 30 class LLMParseError(CSVImportError): """Raised when the LLM call fails or returns an unusable mapping. Inherits from ``CSVImportError`` so route-level error handling can treat both deterministic and LLM-path failures uniformly when desired.""" def _fingerprint(headers: list[str]) -> str: """Stable hash of the header row. Lowercases each header, strips surrounding whitespace, joins with ``|`` (a character extremely unlikely to appear inside a real header), and returns the sha256 hex digest. Whitespace/case drift in the same broker's export does not change the fingerprint; adding or removing a column does.""" normalised = "|".join(h.strip().lower() for h in headers) return hashlib.sha256(normalised.encode("utf-8")).hexdigest() def _decode_raw(raw: bytes) -> str: """Best-effort UTF-8 decode with BOM strip and lossy fallback.""" return raw.decode("utf-8-sig", errors="replace") def _looks_numeric(value: str) -> bool: """True if ``value`` parses as a number after stripping common decoration (thousands separators, currency symbols, percent signs).""" s = value.strip().replace(",", "").replace("$", "").replace("€", "") s = s.replace("£", "").replace("%", "").lstrip("-+") if not s: return False try: float(s) return True except ValueError: return False def _detect_dialect(raw: bytes) -> tuple[str, int]: """Detect (delimiter, preamble_rows). ``preamble_rows`` is the number of lines BEFORE the row we identify as the actual table header. The header row is the first line whose tokens are all non-numeric (so "Symbol,Quantity" is a header but "AAPL,100" is data). Falls back to assuming the first line is the header if no clear non-numeric line is found within the scan window. Raises ``LLMParseError`` on empty input.""" if not raw or not raw.strip(): raise LLMParseError("empty CSV") text = _decode_raw(raw) # csv.Sniffer is happy with ~4KB. Anything more and it gets slow. sample = text[:4096] try: dialect = csv.Sniffer().sniff(sample, delimiters=",;\t|") delimiter = dialect.delimiter except csv.Error: # Most broker exports are comma-delimited; default rather than # error out — the caller will still validate column shapes. delimiter = "," rows = list(csv.reader(io.StringIO(text), delimiter=delimiter)) # Build a flat list of (index, non_empty_tokens) for rows within scan limit parsed = [] for i, row in enumerate(rows): if i >= _MAX_PREAMBLE_SCAN: break non_empty = [c.strip() for c in row if c.strip()] parsed.append((i, non_empty)) # Find the first all-alpha candidate row that is followed by a data # row (one that contains at least one numeric token). This # distinguishes real header rows from preamble metadata rows that # also happen to be all-text. for idx, (i, non_empty) in enumerate(parsed): if len(non_empty) < 2: continue all_alpha = all(not _looks_numeric(c) for c in non_empty) if not all_alpha: continue # Check whether the next non-empty row looks like data (has a numeric) for _, next_non_empty in parsed[idx + 1:]: if not next_non_empty: continue if any(_looks_numeric(c) for c in next_non_empty): return delimiter, i # Next row is also all-alpha → keep scanning break return delimiter, 0