diff --git a/docs/superpowers/plans/2026-05-27-llm-csv-fallback-parser.md b/docs/superpowers/plans/2026-05-27-llm-csv-fallback-parser.md new file mode 100644 index 0000000..9b8ac1e --- /dev/null +++ b/docs/superpowers/plans/2026-05-27-llm-csv-fallback-parser.md @@ -0,0 +1,1732 @@ +# LLM-fallback CSV Parser Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Add an LLM-fallback CSV parser so non-T212 portfolio uploads succeed by extracting a column mapping (not data) via the LLM, caching that mapping globally by header fingerprint, and replaying it deterministically on every subsequent upload of the same broker format. + +**Architecture:** New service `app/services/llm_csv_parser.py` wraps `openrouter.call_llm` for one-time format discovery, persists results to a new `csv_format_templates` table, and produces the same `ParsedPie` shape as `parse_t212_csv`. The route `/api/portfolio/parse` in `app/routers/universe.py` gains a try/except fall-through: T212 first, LLM-cache lookup second, LLM call only on first encounter of a new format. The cache table stores headers + one anonymous data row + the JSON mapping; no `user_id` is ever recorded against the row. + +**Tech Stack:** FastAPI · SQLAlchemy 2.0 async · Alembic · MariaDB (prod) / aiosqlite (tests) · existing `openrouter.call_llm` (provider fallback + AICall ledger) + +**Spec:** `docs/superpowers/specs/2026-05-27-llm-csv-fallback-parser-design.md` + +--- + +## File Structure + +**Create:** +- `app/services/llm_csv_parser.py` — the new service; `parse_with_llm`, `LLMParseError`, helpers +- `alembic/versions/0021_csv_format_template.py` — hand-rolled migration (matches the style of `0020_trial_end.py`) +- `tests/test_llm_csv_parser.py` — unit + integration tests for the service +- `tests/fixtures/ibkr_sample.csv` — fabricated IBKR-shaped CSV (no real holdings) + +**Modify:** +- `app/models.py` — add `CsvFormatTemplate` class +- `app/routers/universe.py` — add `Depends(require_paid)` to `/portfolio/parse`; wrap `parse_t212_csv` in a try/except that falls through to `parse_with_llm` +- `app/templates/settings.html` — soften "Trading 212 CSV" copy to broker-agnostic + +**Reuse without modification:** +- `app/services/openrouter.py::call_llm, llm_configured, LogResult` +- `app/services/csv_import.py::ParsedPie, ParsedPosition, CSVImportError` +- `app/services/access.py::require_paid` +- `app/db::Base, utcnow` +- `tests/conftest.py` env setup (in-memory aiosqlite, `CASSANDRA_MOCK=1`) +- Session/engine bootstrap pattern from `tests/test_referral_conversion.py::_build_session_factory` + +--- + +## Test Conventions + +All tests must be runnable inside the test container: + +```bash +docker compose -f docker-compose.test.yml run --rm test pytest tests/test_llm_csv_parser.py -v +``` + +DB-touching tests use the same `_build_session_factory(tmp_path)` pattern as `tests/test_referral_conversion.py` — a fresh per-test sqlite file, schema created via `Base.metadata.create_all`. Do NOT introduce a shared fixture across tests; per-test isolation matches the existing pattern. + +Network-touching tests (the LLM) MUST mock `app.services.openrouter.call_llm` — no real HTTP. Use `unittest.mock.AsyncMock`. + +--- + +### Task 1: Add `CsvFormatTemplate` model + +**Files:** +- Modify: `app/models.py` (append after the `Referral` class around line 270+) +- Test: `tests/test_llm_csv_parser.py` + +- [ ] **Step 1: Write the failing test** + +Create the test file with the import + schema test: + +```python +"""Unit + integration tests for the LLM-fallback CSV parser.""" +from __future__ import annotations + +import pytest + + +def test_csv_format_template_model_columns(): + """Model exposes every column the spec requires, with correct types.""" + from sqlalchemy import inspect + + from app.models import CsvFormatTemplate + + cols = {c.name: c for c in inspect(CsvFormatTemplate).columns} + assert "fingerprint" in cols + assert "headers" in cols + assert "sample_row" in cols + assert "mapping" in cols + assert "preamble_rows" in cols + assert "delimiter" in cols + assert "broker_label" in cols + assert "first_seen_at" in cols + assert "use_count" in cols + assert "last_used_at" in cols + assert "llm_model" in cols + assert "llm_cost_usd" in cols + # Crucially, no user attribution. + assert "user_id" not in cols + assert "first_seen_user_id" not in cols + # Fingerprint is the cache key. + assert cols["fingerprint"].unique is True + assert cols["fingerprint"].nullable is False +``` + +- [ ] **Step 2: Run the test to verify it fails** + +```bash +docker compose -f docker-compose.test.yml run --rm test pytest tests/test_llm_csv_parser.py::test_csv_format_template_model_columns -v +``` + +Expected: FAIL with `ImportError: cannot import name 'CsvFormatTemplate'`. + +- [ ] **Step 3: Add the model in `app/models.py`** + +Append after the existing `Referral` class (around line 270+, before any trailing module helpers): + +```python +class CsvFormatTemplate(Base): + """Cached column-mapping for a single broker CSV format. + + Populated on the first upload of a previously-unseen format via the + LLM-fallback parser. Subsequent uploads of the same format + (identified by ``fingerprint``, a sha256 of the normalised header + row) replay ``mapping`` deterministically with no LLM call. + + The table holds the actual ``headers`` and one anonymous ``sample_row`` + from the originating upload — there is no ``user_id`` column, no link + back to the uploader. The sample exists so the operator has concrete + material to look at when hand-writing future native parsers; the + system never auto-generates or modifies parser code from this data. + """ + __tablename__ = "csv_format_templates" + + id: Mapped[int] = mapped_column(_PK, primary_key=True, autoincrement=True) + fingerprint: Mapped[str] = mapped_column(String(64), unique=True, nullable=False) + headers: Mapped[list] = mapped_column(JSON, nullable=False) + sample_row: Mapped[list] = mapped_column(JSON, nullable=False) + mapping: Mapped[dict] = mapped_column(JSON, nullable=False) + preamble_rows: Mapped[int] = mapped_column(Integer, nullable=False, default=0) + delimiter: Mapped[str] = mapped_column(String(1), nullable=False, default=",") + broker_label: Mapped[str | None] = mapped_column(String(128)) + first_seen_at: Mapped[datetime] = mapped_column( + DateTime(timezone=True), nullable=False, default=utcnow, + ) + use_count: Mapped[int] = mapped_column(Integer, nullable=False, default=1) + last_used_at: Mapped[datetime] = mapped_column( + DateTime(timezone=True), nullable=False, default=utcnow, + ) + llm_model: Mapped[str | None] = mapped_column(String(64)) + llm_cost_usd: Mapped[float | None] = mapped_column(Float) +``` + +- [ ] **Step 4: Run the test to verify it passes** + +```bash +docker compose -f docker-compose.test.yml run --rm test pytest tests/test_llm_csv_parser.py::test_csv_format_template_model_columns -v +``` + +Expected: PASS. + +- [ ] **Step 5: Commit** + +```bash +git add app/models.py tests/test_llm_csv_parser.py +git commit -m "csv-parser: add CsvFormatTemplate model" +``` + +--- + +### Task 2: Add Alembic migration `0021` + +**Files:** +- Create: `alembic/versions/0021_csv_format_template.py` + +- [ ] **Step 1: Write the migration** + +Create `alembic/versions/0021_csv_format_template.py`: + +```python +"""csv format templates table — LLM-fallback parser cache. + +Revision ID: 0021 +Revises: 0020 +Create Date: 2026-05-27 +""" +from typing import Sequence, Union + +import sqlalchemy as sa +from alembic import op + + +revision: str = "0021" +down_revision: Union[str, None] = "0020" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + op.create_table( + "csv_format_templates", + sa.Column("id", sa.BigInteger(), primary_key=True, autoincrement=True), + sa.Column("fingerprint", sa.String(length=64), nullable=False), + sa.Column("headers", sa.JSON(), nullable=False), + sa.Column("sample_row", sa.JSON(), nullable=False), + sa.Column("mapping", sa.JSON(), nullable=False), + sa.Column("preamble_rows", sa.Integer(), nullable=False, server_default="0"), + sa.Column("delimiter", sa.String(length=1), nullable=False, server_default=","), + sa.Column("broker_label", sa.String(length=128), nullable=True), + sa.Column("first_seen_at", sa.DateTime(timezone=True), nullable=False), + sa.Column("use_count", sa.Integer(), nullable=False, server_default="1"), + sa.Column("last_used_at", sa.DateTime(timezone=True), nullable=False), + sa.Column("llm_model", sa.String(length=64), nullable=True), + sa.Column("llm_cost_usd", sa.Float(), nullable=True), + sa.UniqueConstraint("fingerprint", name="uq_csv_format_templates_fingerprint"), + ) + + +def downgrade() -> None: + op.drop_table("csv_format_templates") +``` + +- [ ] **Step 2: Verify the migration applies cleanly in the test container** + +```bash +docker compose -f docker-compose.test.yml run --rm test alembic upgrade head +docker compose -f docker-compose.test.yml run --rm test alembic downgrade -1 +docker compose -f docker-compose.test.yml run --rm test alembic upgrade head +``` + +Expected: each command exits 0; the second leaves us at `0020`, the third returns us to `0021`. + +If the test container doesn't have an alembic entrypoint, run the migration check via Python instead: + +```bash +docker compose -f docker-compose.test.yml run --rm test python -c " +from alembic.config import Config +from alembic import command +cfg = Config('alembic.ini') +command.upgrade(cfg, 'head') +command.downgrade(cfg, '-1') +command.upgrade(cfg, 'head') +print('OK') +" +``` + +Expected: prints `OK`. + +- [ ] **Step 3: Commit** + +```bash +git add alembic/versions/0021_csv_format_template.py +git commit -m "alembic: add 0021 csv_format_templates" +``` + +--- + +### Task 3: Create the fabricated IBKR test fixture + +**Files:** +- Create: `tests/fixtures/ibkr_sample.csv` + +- [ ] **Step 1: Write the fixture file** + +This is a fabricated IBKR-style activity statement — column names and shape are realistic, but the values are made up (no real holdings, no real account): + +```csv +Statement,Header,Field Name,Field Value +Statement,Data,BrokerName,Interactive Brokers LLC +Statement,Data,Title,Activity Statement +Statement,Data,Period,"January 1, 2026 - January 31, 2026" +Symbol,Quantity,Avg Price,Currency,Description +AAPL,100,150.25,USD,Apple Inc +MSFT,50,310.00,USD,Microsoft Corp +NVDA,40,425.50,USD,NVIDIA Corp +VOD.L,2000,0.74,GBP,Vodafone Group Plc +ASML.AS,10,650.00,EUR,ASML Holding NV +``` + +Note: lines 1-4 are a preamble (IBKR's exports often have multi-line headers). The actual data table starts at line 5 (`Symbol,Quantity,Avg Price,Currency,Description`). + +- [ ] **Step 2: Commit** + +```bash +git add tests/fixtures/ibkr_sample.csv +git commit -m "tests: add fabricated IBKR fixture for LLM parser" +``` + +--- + +### Task 4: `_fingerprint` helper + +**Files:** +- Create: `app/services/llm_csv_parser.py` (initial scaffold) +- Test: `tests/test_llm_csv_parser.py` + +- [ ] **Step 1: Write failing test** + +Append to `tests/test_llm_csv_parser.py`: + +```python +def test_fingerprint_stable_across_case_and_whitespace(): + from app.services.llm_csv_parser import _fingerprint + + a = _fingerprint(["Symbol", "Quantity", "Avg Price"]) + b = _fingerprint(["symbol", "quantity", "avg price"]) + c = _fingerprint([" SYMBOL ", "Quantity", " AVG PRICE"]) + assert a == b == c + + +def test_fingerprint_differs_for_different_columns(): + from app.services.llm_csv_parser import _fingerprint + + a = _fingerprint(["Symbol", "Quantity"]) + b = _fingerprint(["Symbol", "Quantity", "Avg Price"]) + assert a != b + + +def test_fingerprint_is_sha256_hex_64_chars(): + from app.services.llm_csv_parser import _fingerprint + + f = _fingerprint(["Symbol", "Quantity"]) + assert len(f) == 64 + assert all(c in "0123456789abcdef" for c in f) +``` + +- [ ] **Step 2: Run tests to verify they fail** + +```bash +docker compose -f docker-compose.test.yml run --rm test pytest tests/test_llm_csv_parser.py -k fingerprint -v +``` + +Expected: FAIL with `ImportError`. + +- [ ] **Step 3: Create the service scaffold + `_fingerprint`** + +Create `app/services/llm_csv_parser.py`: + +```python +"""LLM-fallback CSV parser. + +When the deterministic Trading 212 parser (``csv_import.parse_t212_csv``) +raises ``CSVImportError`` on an unrecognised format, this service kicks +in: + +1. Detect the CSV dialect (delimiter, preamble offset). +2. Compute a fingerprint of the normalised header row. +3. Look up ``CsvFormatTemplate`` by fingerprint. On hit, replay the + cached column-mapping deterministically. On miss, ask the LLM for a + mapping, validate it, persist a new template, and apply it. + +The LLM sees only headers + the first 3-5 sample rows. It returns a +column-mapping JSON, never transcribed numbers. The system never +auto-promotes a learned format to a hand-written parser — the operator +does that by inspecting collected ``sample_row`` values. +""" +from __future__ import annotations + +import hashlib + +from app.services.csv_import import CSVImportError + + +class LLMParseError(CSVImportError): + """Raised when the LLM call fails or returns an unusable mapping. + + Inherits from ``CSVImportError`` so route-level error handling can + treat both deterministic and LLM-path failures uniformly when + desired.""" + + +def _fingerprint(headers: list[str]) -> str: + """Stable hash of the header row. + + Lowercases each header, strips surrounding whitespace, joins with + ``|`` (a character extremely unlikely to appear inside a real + header), and returns the sha256 hex digest. Whitespace/case drift + in the same broker's export does not change the fingerprint; + adding or removing a column does.""" + normalised = "|".join(h.strip().lower() for h in headers) + return hashlib.sha256(normalised.encode("utf-8")).hexdigest() +``` + +- [ ] **Step 4: Run tests to verify they pass** + +```bash +docker compose -f docker-compose.test.yml run --rm test pytest tests/test_llm_csv_parser.py -k fingerprint -v +``` + +Expected: 3 PASS. + +- [ ] **Step 5: Commit** + +```bash +git add app/services/llm_csv_parser.py tests/test_llm_csv_parser.py +git commit -m "csv-parser: add _fingerprint helper" +``` + +--- + +### Task 5: `_detect_dialect` helper + +**Files:** +- Modify: `app/services/llm_csv_parser.py` +- Test: `tests/test_llm_csv_parser.py` + +- [ ] **Step 1: Write failing tests** + +Append to `tests/test_llm_csv_parser.py`: + +```python +def test_detect_dialect_no_preamble_comma(): + from app.services.llm_csv_parser import _detect_dialect + + raw = b"Symbol,Quantity,Avg Price\nAAPL,100,150.25\nMSFT,50,310.00\n" + delimiter, preamble = _detect_dialect(raw) + assert delimiter == "," + assert preamble == 0 + + +def test_detect_dialect_with_preamble(): + from app.services.llm_csv_parser import _detect_dialect + + raw = ( + b"Statement,Header,Field Name,Field Value\n" + b"Statement,Data,BrokerName,Interactive Brokers LLC\n" + b"Statement,Data,Title,Activity Statement\n" + b"Statement,Data,Period,\"January 1, 2026 - January 31, 2026\"\n" + b"Symbol,Quantity,Avg Price,Currency,Description\n" + b"AAPL,100,150.25,USD,Apple Inc\n" + ) + delimiter, preamble = _detect_dialect(raw) + assert delimiter == "," + # The data-row header line is the FIFTH line (index 4); preamble = 4. + assert preamble == 4 + + +def test_detect_dialect_tab_delimited(): + from app.services.llm_csv_parser import _detect_dialect + + raw = b"Symbol\tQuantity\tAvg Price\nAAPL\t100\t150.25\n" + delimiter, preamble = _detect_dialect(raw) + assert delimiter == "\t" + assert preamble == 0 + + +def test_detect_dialect_empty_raises(): + from app.services.llm_csv_parser import LLMParseError, _detect_dialect + + with pytest.raises(LLMParseError): + _detect_dialect(b"") +``` + +- [ ] **Step 2: Run tests to verify they fail** + +```bash +docker compose -f docker-compose.test.yml run --rm test pytest tests/test_llm_csv_parser.py -k detect_dialect -v +``` + +Expected: 4 FAIL with `ImportError` for `_detect_dialect`. + +- [ ] **Step 3: Implement `_detect_dialect`** + +Append to `app/services/llm_csv_parser.py`: + +```python +import csv +import io + + +# Cap for how many leading lines we'll scan looking for the header row. +# Real broker preambles are typically 1-10 lines. +_MAX_PREAMBLE_SCAN = 30 + + +def _decode_raw(raw: bytes) -> str: + """Best-effort UTF-8 decode with BOM strip and lossy fallback.""" + text = raw.decode("utf-8-sig", errors="replace") + return text + + +def _detect_dialect(raw: bytes) -> tuple[str, int]: + """Detect (delimiter, preamble_rows). + + ``preamble_rows`` is the number of lines BEFORE the row we identify + as the actual table header. The header row is the first line whose + tokens are all non-numeric (so "Symbol,Quantity" is a header but + "AAPL,100" is data). Falls back to assuming the first line is the + header if no clear non-numeric line is found within the scan + window. + + Raises ``LLMParseError`` on empty input.""" + if not raw or not raw.strip(): + raise LLMParseError("empty CSV") + + text = _decode_raw(raw) + # csv.Sniffer is happy with ~4KB. Anything more and it gets slow. + sample = text[:4096] + try: + dialect = csv.Sniffer().sniff(sample, delimiters=",;\t|") + delimiter = dialect.delimiter + except csv.Error: + # Most broker exports are comma-delimited; default rather than + # error out — the caller will still validate column shapes. + delimiter = "," + + reader = csv.reader(io.StringIO(text), delimiter=delimiter) + preamble = 0 + for i, row in enumerate(reader): + if i >= _MAX_PREAMBLE_SCAN: + break + if not row: + continue + # Skip rows that are obviously preamble: <2 tokens, or any token + # is purely numeric. The header row should have multiple + # alphabetical tokens. + non_empty = [c.strip() for c in row if c.strip()] + if len(non_empty) < 2: + continue + all_alpha = all(not _looks_numeric(c) for c in non_empty) + if all_alpha: + preamble = i + return delimiter, preamble + return delimiter, 0 + + +def _looks_numeric(value: str) -> bool: + """True if ``value`` parses as a number after stripping common + decoration (thousands separators, currency symbols, percent signs).""" + s = value.strip().replace(",", "").replace("$", "").replace("€", "") + s = s.replace("£", "").replace("%", "").lstrip("-+") + if not s: + return False + try: + float(s) + return True + except ValueError: + return False +``` + +- [ ] **Step 4: Run tests to verify they pass** + +```bash +docker compose -f docker-compose.test.yml run --rm test pytest tests/test_llm_csv_parser.py -k detect_dialect -v +``` + +Expected: 4 PASS. + +- [ ] **Step 5: Commit** + +```bash +git add app/services/llm_csv_parser.py tests/test_llm_csv_parser.py +git commit -m "csv-parser: add _detect_dialect helper" +``` + +--- + +### Task 6: `_validate_mapping` helper + +**Files:** +- Modify: `app/services/llm_csv_parser.py` +- Test: `tests/test_llm_csv_parser.py` + +- [ ] **Step 1: Write failing tests** + +Append to `tests/test_llm_csv_parser.py`: + +```python +def test_validate_mapping_accepts_well_formed(): + from app.services.llm_csv_parser import _validate_mapping + + headers = ["Symbol", "Quantity", "Avg Price", "Currency"] + first_row = ["AAPL", "100", "150.25", "USD"] + mapping = { + "ticker_col": "Symbol", + "qty_col": "Quantity", + "cost_col": "Avg Price", + "currency_col": "Currency", + "name_col": None, + } + _validate_mapping(mapping, headers, first_row) # no raise + + +def test_validate_mapping_missing_ticker_raises(): + from app.services.llm_csv_parser import LLMParseError, _validate_mapping + + headers = ["Symbol", "Quantity"] + first_row = ["AAPL", "100"] + mapping = {"ticker_col": None, "qty_col": "Quantity"} + with pytest.raises(LLMParseError, match="ticker"): + _validate_mapping(mapping, headers, first_row) + + +def test_validate_mapping_missing_qty_raises(): + from app.services.llm_csv_parser import LLMParseError, _validate_mapping + + headers = ["Symbol", "Quantity"] + first_row = ["AAPL", "100"] + mapping = {"ticker_col": "Symbol", "qty_col": None} + with pytest.raises(LLMParseError, match="qty"): + _validate_mapping(mapping, headers, first_row) + + +def test_validate_mapping_unknown_column_raises(): + from app.services.llm_csv_parser import LLMParseError, _validate_mapping + + headers = ["Symbol", "Quantity"] + first_row = ["AAPL", "100"] + mapping = {"ticker_col": "Symbol", "qty_col": "NotARealColumn"} + with pytest.raises(LLMParseError, match="NotARealColumn"): + _validate_mapping(mapping, headers, first_row) + + +def test_validate_mapping_non_numeric_qty_raises(): + from app.services.llm_csv_parser import LLMParseError, _validate_mapping + + headers = ["Symbol", "Description"] + first_row = ["AAPL", "Apple Inc"] + # Mapping says qty is "Description", but "Apple Inc" can't parse as a number. + mapping = {"ticker_col": "Symbol", "qty_col": "Description"} + with pytest.raises(LLMParseError, match="numeric"): + _validate_mapping(mapping, headers, first_row) +``` + +- [ ] **Step 2: Run tests to verify they fail** + +```bash +docker compose -f docker-compose.test.yml run --rm test pytest tests/test_llm_csv_parser.py -k validate_mapping -v +``` + +Expected: 5 FAIL with `ImportError`. + +- [ ] **Step 3: Implement `_validate_mapping`** + +Append to `app/services/llm_csv_parser.py`: + +```python +_REQUIRED_MAPPING_KEYS = ("ticker_col", "qty_col") +_OPTIONAL_MAPPING_KEYS = ("name_col", "cost_col", "currency_col") + + +def _validate_mapping( + mapping: dict, headers: list[str], first_row: list[str], +) -> None: + """Verify the LLM-returned mapping is sane. + + - ``ticker_col`` and ``qty_col`` are required (non-null). + - Every named column must exist in ``headers``. + - The value at ``qty_col`` on ``first_row`` must parse as a number. + - The value at ``cost_col`` on ``first_row`` (if present) must parse + as a number. + + Raises ``LLMParseError`` on any failure, with a message that names + the specific problem (helpful for log forensics and for the + user-facing 400).""" + for key in _REQUIRED_MAPPING_KEYS: + if not mapping.get(key): + raise LLMParseError( + f"LLM mapping missing required column: {key.replace('_col', '')}" + ) + + headers_set = set(headers) + for key in _REQUIRED_MAPPING_KEYS + _OPTIONAL_MAPPING_KEYS: + col = mapping.get(key) + if col is not None and col not in headers_set: + raise LLMParseError( + f"LLM mapping references unknown column: {col!r}" + ) + + # Numeric sanity check: qty and (if present) cost must parse on row 1. + header_index = {h: i for i, h in enumerate(headers)} + qty_col = mapping["qty_col"] + qty_value = first_row[header_index[qty_col]] if header_index[qty_col] < len(first_row) else "" + if not _looks_numeric(qty_value): + raise LLMParseError( + f"LLM mapping qty_col={qty_col!r} maps to non-numeric value {qty_value!r}" + ) + + cost_col = mapping.get("cost_col") + if cost_col is not None: + cost_value = first_row[header_index[cost_col]] if header_index[cost_col] < len(first_row) else "" + if cost_value and not _looks_numeric(cost_value): + raise LLMParseError( + f"LLM mapping cost_col={cost_col!r} maps to non-numeric value {cost_value!r}" + ) +``` + +- [ ] **Step 4: Run tests to verify they pass** + +```bash +docker compose -f docker-compose.test.yml run --rm test pytest tests/test_llm_csv_parser.py -k validate_mapping -v +``` + +Expected: 5 PASS. + +- [ ] **Step 5: Commit** + +```bash +git add app/services/llm_csv_parser.py tests/test_llm_csv_parser.py +git commit -m "csv-parser: add _validate_mapping helper" +``` + +--- + +### Task 7: `_apply_mapping` helper + +**Files:** +- Modify: `app/services/llm_csv_parser.py` +- Test: `tests/test_llm_csv_parser.py` + +- [ ] **Step 1: Write failing tests** + +Append to `tests/test_llm_csv_parser.py`: + +```python +def test_apply_mapping_builds_parsed_pie(): + from app.services.csv_import import ParsedPie, ParsedPosition + from app.services.llm_csv_parser import _apply_mapping + + headers = ["Symbol", "Quantity", "Avg Price", "Currency", "Description"] + data_rows = [ + ["AAPL", "100", "150.25", "USD", "Apple Inc"], + ["MSFT", "50", "310.00", "USD", "Microsoft Corp"], + ] + mapping = { + "ticker_col": "Symbol", + "qty_col": "Quantity", + "cost_col": "Avg Price", + "currency_col": "Currency", + "name_col": "Description", + } + + pie = _apply_mapping(headers, data_rows, mapping) + + assert isinstance(pie, ParsedPie) + assert len(pie.positions) == 2 + p0 = pie.positions[0] + assert isinstance(p0, ParsedPosition) + assert p0.slice == "AAPL" + assert p0.name == "Apple Inc" + assert p0.quantity == 100.0 + assert p0.invested_value == pytest.approx(15025.0) + # invested = qty * avg_cost = 100 * 150.25 = 15025 + assert pie.invested == pytest.approx(15025.0 + 50 * 310.00) + + +def test_apply_mapping_handles_missing_optional_columns(): + from app.services.llm_csv_parser import _apply_mapping + + headers = ["Symbol", "Quantity"] + data_rows = [["AAPL", "100"]] + mapping = { + "ticker_col": "Symbol", + "qty_col": "Quantity", + "cost_col": None, + "currency_col": None, + "name_col": None, + } + + pie = _apply_mapping(headers, data_rows, mapping) + p = pie.positions[0] + assert p.slice == "AAPL" + assert p.quantity == 100.0 + assert p.invested_value is None + assert p.name == "AAPL" # falls back to ticker when name_col absent + + +def test_apply_mapping_skips_blank_and_unparseable_rows(): + from app.services.llm_csv_parser import _apply_mapping + + headers = ["Symbol", "Quantity"] + data_rows = [ + ["AAPL", "100"], + ["", ""], # blank + ["MSFT", "not-a-number"], # bad qty + ["NVDA", "40"], + ] + mapping = {"ticker_col": "Symbol", "qty_col": "Quantity"} + + pie = _apply_mapping(headers, data_rows, mapping) + assert [p.slice for p in pie.positions] == ["AAPL", "NVDA"] +``` + +- [ ] **Step 2: Run tests to verify they fail** + +```bash +docker compose -f docker-compose.test.yml run --rm test pytest tests/test_llm_csv_parser.py -k apply_mapping -v +``` + +Expected: 3 FAIL with `ImportError`. + +- [ ] **Step 3: Implement `_apply_mapping`** + +Append to `app/services/llm_csv_parser.py`: + +```python +from app.services.csv_import import ParsedPie, ParsedPosition + + +def _parse_number(value: str) -> float | None: + """Permissive float parse: strips thousands separators, currency + symbols, percent signs. Returns None on failure (so callers can + decide whether to skip or raise).""" + s = value.strip().replace(",", "").replace("$", "") + s = s.replace("€", "").replace("£", "").replace("%", "") + if not s: + return None + try: + return float(s) + except ValueError: + return None + + +def _apply_mapping( + headers: list[str], + data_rows: list[list[str]], + mapping: dict, +) -> ParsedPie: + """Iterate ``data_rows`` and produce a ``ParsedPie``. + + Rows that lack a parseable quantity (blank, non-numeric, zero) are + silently skipped — broker exports often include summary or + placeholder rows after the position list. ``name_col`` falls back + to the ticker symbol when null.""" + idx = {h: i for i, h in enumerate(headers)} + ticker_col = mapping["ticker_col"] + qty_col = mapping["qty_col"] + name_col = mapping.get("name_col") + cost_col = mapping.get("cost_col") + + positions: list[ParsedPosition] = [] + invested_total = 0.0 + invested_seen = False + + for row in data_rows: + if not any(c.strip() for c in row): + continue + ticker_raw = row[idx[ticker_col]] if idx[ticker_col] < len(row) else "" + ticker = ticker_raw.strip().upper() + if not ticker: + continue + qty_raw = row[idx[qty_col]] if idx[qty_col] < len(row) else "" + qty = _parse_number(qty_raw) + if qty is None or qty <= 0: + continue + avg_cost: float | None = None + if cost_col is not None and idx[cost_col] < len(row): + avg_cost = _parse_number(row[idx[cost_col]]) + invested_value: float | None = None + if avg_cost is not None: + invested_value = qty * avg_cost + invested_total += invested_value + invested_seen = True + name = "" + if name_col is not None and idx[name_col] < len(row): + name = row[idx[name_col]].strip() + if not name: + name = ticker + positions.append(ParsedPosition( + slice=ticker, + name=name, + invested_value=invested_value, + current_value=None, + result=None, + quantity=qty, + )) + + return ParsedPie( + name=None, + positions=tuple(positions), + invested=(invested_total if invested_seen else None), + value=None, + result=None, + ) +``` + +- [ ] **Step 4: Run tests to verify they pass** + +```bash +docker compose -f docker-compose.test.yml run --rm test pytest tests/test_llm_csv_parser.py -k apply_mapping -v +``` + +Expected: 3 PASS. + +- [ ] **Step 5: Commit** + +```bash +git add app/services/llm_csv_parser.py tests/test_llm_csv_parser.py +git commit -m "csv-parser: add _apply_mapping helper" +``` + +--- + +### Task 8: `_extract_mapping_via_llm` helper + +**Files:** +- Modify: `app/services/llm_csv_parser.py` +- Test: `tests/test_llm_csv_parser.py` + +- [ ] **Step 1: Write failing tests** + +Append to `tests/test_llm_csv_parser.py`: + +```python +@pytest.mark.asyncio +async def test_extract_mapping_via_llm_parses_valid_json(): + from unittest.mock import AsyncMock, MagicMock + from app.services.llm_csv_parser import _extract_mapping_via_llm + from app.services.openrouter import LogResult + + fake_result = LogResult( + content='{"ticker_col": "Symbol", "qty_col": "Quantity", ' + '"cost_col": "Avg Price", "currency_col": "Currency", ' + '"name_col": null, "broker_label": "IBKR Activity Statement"}', + model="deepseek/deepseek-v4-flash", + prompt_tokens=100, + completion_tokens=50, + cost_usd=0.0001, + ) + fake_client = MagicMock() + fake_call_llm = AsyncMock(return_value=fake_result) + + import app.services.llm_csv_parser as mod + mod.call_llm = fake_call_llm # monkeypatch + + headers = ["Symbol", "Quantity", "Avg Price", "Currency"] + samples = [["AAPL", "100", "150.25", "USD"]] + mapping, log = await _extract_mapping_via_llm(fake_client, headers, samples) + + assert mapping["ticker_col"] == "Symbol" + assert mapping["qty_col"] == "Quantity" + assert mapping["broker_label"] == "IBKR Activity Statement" + assert log.model == "deepseek/deepseek-v4-flash" + fake_call_llm.assert_awaited_once() + + +@pytest.mark.asyncio +async def test_extract_mapping_via_llm_malformed_json_raises(): + from unittest.mock import AsyncMock, MagicMock + from app.services.llm_csv_parser import LLMParseError, _extract_mapping_via_llm + from app.services.openrouter import LogResult + + fake_result = LogResult( + content="Sure thing — here is the mapping! ticker=Symbol", + model="deepseek/deepseek-v4-flash", + prompt_tokens=10, + completion_tokens=20, + cost_usd=0.00005, + ) + fake_client = MagicMock() + fake_call_llm = AsyncMock(return_value=fake_result) + + import app.services.llm_csv_parser as mod + mod.call_llm = fake_call_llm + + with pytest.raises(LLMParseError, match="JSON"): + await _extract_mapping_via_llm(fake_client, ["Symbol"], [["AAPL"]]) + + +@pytest.mark.asyncio +async def test_extract_mapping_via_llm_provider_failure_wraps(): + from unittest.mock import AsyncMock, MagicMock + from app.services.llm_csv_parser import LLMParseError, _extract_mapping_via_llm + + fake_client = MagicMock() + fake_call_llm = AsyncMock(side_effect=RuntimeError("provider down")) + + import app.services.llm_csv_parser as mod + mod.call_llm = fake_call_llm + + with pytest.raises(LLMParseError, match="provider"): + await _extract_mapping_via_llm(fake_client, ["Symbol"], [["AAPL"]]) +``` + +NOTE: If `pytest-asyncio` is not installed in the test container, the engineer must add `asyncio_mode = "auto"` to `pytest.ini` or use the existing decorator pattern from `tests/test_referral_conversion.py`. Check that file's top for `@pytest.mark.asyncio` usage and replicate it. + +- [ ] **Step 2: Run tests to verify they fail** + +```bash +docker compose -f docker-compose.test.yml run --rm test pytest tests/test_llm_csv_parser.py -k extract_mapping_via_llm -v +``` + +Expected: 3 FAIL with `ImportError` for `_extract_mapping_via_llm`. + +- [ ] **Step 3: Implement `_extract_mapping_via_llm`** + +Append to `app/services/llm_csv_parser.py`: + +```python +import json + +import httpx + +from app.services.openrouter import LogResult, call_llm + + +# Hard caps on what we send to the LLM, so prompt cost stays bounded. +_LLM_SAMPLES = 5 +_LLM_MAX_TOKENS = 400 + + +_SYSTEM_PROMPT = """\ +You are an expert at recognising broker portfolio CSV formats. + +You will be given the header row and 3-5 sample data rows from a CSV. +Identify which column contains each field. Return ONLY a single JSON +object, no prose, no markdown fences. + +Schema (use the EXACT header string from the input; use null if no +column is a good match): + +{ + "ticker_col": "
", + "qty_col": "
", + "name_col": "
", + "cost_col": "
", // average price per share + "currency_col": "
", + "broker_label": "" +} + +Rules: +- ticker_col and qty_col are required. If either is missing, return all nulls. +- Use the EXACT header string as it appears in the input — do not paraphrase. +- Output JSON ONLY. No prose, no code fences. +""" + + +def _build_user_prompt(headers: list[str], samples: list[list[str]]) -> str: + lines = ["headers: " + json.dumps(headers)] + lines.append("samples:") + for s in samples[:_LLM_SAMPLES]: + lines.append(" " + ",".join(s)) + return "\n".join(lines) + + +async def _extract_mapping_via_llm( + client: httpx.AsyncClient, + headers: list[str], + samples: list[list[str]], +) -> tuple[dict, LogResult]: + """Single LLM call returning ``(mapping_dict, LogResult)``. + + The LLM is asked for a strict JSON object (no markdown). We attempt + to parse the returned content; ``LLMParseError`` wraps any failure + in a way callers can surface to the user.""" + messages = [ + {"role": "system", "content": _SYSTEM_PROMPT}, + {"role": "user", "content": _build_user_prompt(headers, samples)}, + ] + try: + result = await call_llm(client, messages, max_tokens=_LLM_MAX_TOKENS) + except Exception as e: + raise LLMParseError(f"LLM provider failed: {e}") from e + + content = (result.content or "").strip() + # Strip code fences if the model added them despite instructions. + if content.startswith("```"): + content = content.strip("`") + # Drop optional 'json' language tag. + if content.lstrip().lower().startswith("json"): + content = content.lstrip()[4:] + content = content.strip() + try: + mapping = json.loads(content) + except json.JSONDecodeError as e: + raise LLMParseError(f"LLM did not return valid JSON: {e}") from e + if not isinstance(mapping, dict): + raise LLMParseError("LLM JSON was not an object") + return mapping, result +``` + +- [ ] **Step 4: Run tests to verify they pass** + +```bash +docker compose -f docker-compose.test.yml run --rm test pytest tests/test_llm_csv_parser.py -k extract_mapping_via_llm -v +``` + +Expected: 3 PASS. + +- [ ] **Step 5: Commit** + +```bash +git add app/services/llm_csv_parser.py tests/test_llm_csv_parser.py +git commit -m "csv-parser: add _extract_mapping_via_llm with provider-failure wrapping" +``` + +--- + +### Task 9: Public `parse_with_llm` orchestration + +**Files:** +- Modify: `app/services/llm_csv_parser.py` +- Test: `tests/test_llm_csv_parser.py` + +- [ ] **Step 1: Write the per-test session factory helper (copied from `test_referral_conversion.py`)** + +Append to the **top** of `tests/test_llm_csv_parser.py` (after the existing imports): + +```python +def _build_session_factory(tmp_path): + """Spin up a fresh in-memory schema and return (engine, factory). + Matches the pattern used in tests/test_referral_conversion.py.""" + from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine + + from app import db as db_mod + from app.db import Base + import app.models # noqa: F401 — registers models on Base.metadata + + engine = create_async_engine(f"sqlite+aiosqlite:///{tmp_path}/csv.db") + factory = async_sessionmaker(engine, expire_on_commit=False) + db_mod._engine = engine + db_mod._session_factory = factory + + async def _setup(): + async with engine.begin() as conn: + await conn.run_sync(Base.metadata.create_all) + + return engine, factory, _setup +``` + +Tests will `await setup()` themselves before using the factory. + +- [ ] **Step 2: Write failing tests for cache miss + cache hit** + +Append to `tests/test_llm_csv_parser.py`: + +```python +@pytest.mark.asyncio +async def test_parse_with_llm_cache_miss_inserts_template(tmp_path): + from unittest.mock import AsyncMock + from sqlalchemy import select + + from app.models import CsvFormatTemplate + from app.services.llm_csv_parser import parse_with_llm + from app.services.openrouter import LogResult + + _, factory, setup = _build_session_factory(tmp_path) + await setup() + + raw = ( + b"Symbol,Quantity,Avg Price,Currency\n" + b"AAPL,100,150.25,USD\n" + b"MSFT,50,310.00,USD\n" + ) + + import app.services.llm_csv_parser as mod + mod.call_llm = AsyncMock(return_value=LogResult( + content='{"ticker_col":"Symbol","qty_col":"Quantity",' + '"cost_col":"Avg Price","currency_col":"Currency",' + '"name_col":null,"broker_label":"Generic broker"}', + model="deepseek/deepseek-v4-flash", + prompt_tokens=120, completion_tokens=40, cost_usd=0.0002, + )) + + async with factory() as session: + pie = await parse_with_llm(raw, session) + + assert len(pie.positions) == 2 + assert pie.positions[0].slice == "AAPL" + + async with factory() as session: + rows = (await session.execute(select(CsvFormatTemplate))).scalars().all() + assert len(rows) == 1 + tmpl = rows[0] + assert tmpl.headers == ["Symbol", "Quantity", "Avg Price", "Currency"] + assert tmpl.sample_row == ["AAPL", "100", "150.25", "USD"] + assert tmpl.mapping["ticker_col"] == "Symbol" + assert tmpl.broker_label == "Generic broker" + assert tmpl.use_count == 1 + assert tmpl.llm_cost_usd == pytest.approx(0.0002) + # The crucial PII guarantee: + assert not hasattr(tmpl, "user_id"), "sample row must not be linked to a user" + + +@pytest.mark.asyncio +async def test_parse_with_llm_cache_hit_skips_llm(tmp_path): + from unittest.mock import AsyncMock + from sqlalchemy import select + + from app.db import utcnow + from app.models import CsvFormatTemplate + from app.services.llm_csv_parser import _fingerprint, parse_with_llm + + _, factory, setup = _build_session_factory(tmp_path) + await setup() + + headers = ["Symbol", "Quantity", "Avg Price", "Currency"] + fp = _fingerprint(headers) + + # Pre-populate a cache hit row. + async with factory() as session: + session.add(CsvFormatTemplate( + fingerprint=fp, + headers=headers, + sample_row=["AAPL", "100", "150.25", "USD"], + mapping={ + "ticker_col": "Symbol", "qty_col": "Quantity", + "cost_col": "Avg Price", "currency_col": "Currency", + "name_col": None, + }, + preamble_rows=0, + delimiter=",", + broker_label="Cached broker", + first_seen_at=utcnow(), + last_used_at=utcnow(), + use_count=1, + llm_model="seed", + llm_cost_usd=0.0, + )) + await session.commit() + + raw = ( + b"Symbol,Quantity,Avg Price,Currency\n" + b"NVDA,40,425.50,USD\n" + ) + + import app.services.llm_csv_parser as mod + mod.call_llm = AsyncMock(side_effect=AssertionError("call_llm must NOT be called on cache hit")) + + async with factory() as session: + pie = await parse_with_llm(raw, session) + + assert pie.positions[0].slice == "NVDA" + + async with factory() as session: + rows = (await session.execute(select(CsvFormatTemplate))).scalars().all() + assert len(rows) == 1 + assert rows[0].use_count == 2 + + +@pytest.mark.asyncio +async def test_parse_with_llm_stale_mapping_raises_but_does_not_evict(tmp_path): + from unittest.mock import AsyncMock + from sqlalchemy import select + + from app.db import utcnow + from app.models import CsvFormatTemplate + from app.services.llm_csv_parser import LLMParseError, _fingerprint, parse_with_llm + + _, factory, setup = _build_session_factory(tmp_path) + await setup() + + headers = ["Symbol", "Quantity"] + fp = _fingerprint(headers) + # Cached mapping says qty is in column "Symbol" — clearly wrong; will + # never produce a parseable row. + async with factory() as session: + session.add(CsvFormatTemplate( + fingerprint=fp, headers=headers, + sample_row=["AAPL", "100"], + mapping={"ticker_col": "Symbol", "qty_col": "Symbol"}, + preamble_rows=0, delimiter=",", broker_label=None, + first_seen_at=utcnow(), last_used_at=utcnow(), use_count=1, + llm_model="seed", llm_cost_usd=0.0, + )) + await session.commit() + + raw = b"Symbol,Quantity\nAAPL,100\nMSFT,50\n" + + import app.services.llm_csv_parser as mod + mod.call_llm = AsyncMock(side_effect=AssertionError("must not be called")) + + async with factory() as session: + with pytest.raises(LLMParseError): + await parse_with_llm(raw, session) + + # Stale template must NOT have been auto-deleted (operator owns eviction). + async with factory() as session: + rows = (await session.execute(select(CsvFormatTemplate))).scalars().all() + assert len(rows) == 1 +``` + +- [ ] **Step 3: Run tests to verify they fail** + +```bash +docker compose -f docker-compose.test.yml run --rm test pytest tests/test_llm_csv_parser.py -k parse_with_llm -v +``` + +Expected: 3 FAIL with `ImportError` for `parse_with_llm`. + +- [ ] **Step 4: Implement `parse_with_llm`** + +Append to `app/services/llm_csv_parser.py`: + +```python +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +from app.db import utcnow +from app.logging import get_logger +from app.models import CsvFormatTemplate + + +log = get_logger("llm_csv_parser") + + +# Hard cap shared with /api/portfolio/parse — bytes-level, mirrors T212 path. +_MAX_CSV_BYTES = 1_048_576 + + +async def parse_with_llm(raw: bytes, session: AsyncSession) -> ParsedPie: + """Cache-first LLM-fallback CSV parse. + + On cache hit, applies the stored mapping deterministically and + increments ``use_count``. On cache miss, calls the LLM, validates + the returned mapping against the first data row, and persists a + new ``CsvFormatTemplate``. Raises ``LLMParseError`` on any + failure; the caller (route layer) maps that to a 400.""" + if len(raw) > _MAX_CSV_BYTES: + raise LLMParseError("CSV too large (1 MB max)") + if not raw or not raw.strip(): + raise LLMParseError("empty CSV") + + delimiter, preamble_rows = _detect_dialect(raw) + text = _decode_raw(raw) + + reader = csv.reader(io.StringIO(text), delimiter=delimiter) + rows = list(reader) + if preamble_rows >= len(rows): + raise LLMParseError("no header row found in CSV") + headers = [c.strip() for c in rows[preamble_rows]] + data_rows = rows[preamble_rows + 1:] + if not headers: + raise LLMParseError("empty header row") + + first_data_row = next( + (r for r in data_rows if any(c.strip() for c in r)), None, + ) + if first_data_row is None: + raise LLMParseError("CSV contains a header but no data rows") + + fp = _fingerprint(headers) + existing = (await session.execute( + select(CsvFormatTemplate).where(CsvFormatTemplate.fingerprint == fp) + )).scalar_one_or_none() + + if existing is not None: + log.info("csv.format.cache_hit", fingerprint=fp, + broker_label=existing.broker_label, use_count=existing.use_count) + pie = _apply_mapping(headers, data_rows, existing.mapping) + if not pie.positions: + raise LLMParseError( + "cached mapping produced no positions — the broker may have " + "changed their CSV shape; ask the operator to evict the " + "stale template" + ) + existing.use_count += 1 + existing.last_used_at = utcnow() + await session.commit() + return pie + + log.info("csv.format.cache_miss", fingerprint=fp, + header_count=len(headers)) + samples = [r for r in data_rows[:_LLM_SAMPLES] if any(c.strip() for c in r)] + async with httpx.AsyncClient(follow_redirects=True, timeout=30) as client: + mapping, llm_log = await _extract_mapping_via_llm(client, headers, samples) + _validate_mapping(mapping, headers, first_data_row) + + pie = _apply_mapping(headers, data_rows, mapping) + if not pie.positions: + raise LLMParseError( + "LLM mapping validated but produced no positions — the file " + "may not contain portfolio data" + ) + + now = utcnow() + session.add(CsvFormatTemplate( + fingerprint=fp, + headers=headers, + sample_row=first_data_row, + mapping=mapping, + preamble_rows=preamble_rows, + delimiter=delimiter, + broker_label=mapping.get("broker_label"), + first_seen_at=now, + last_used_at=now, + use_count=1, + llm_model=llm_log.model, + llm_cost_usd=llm_log.cost_usd, + )) + await session.commit() + return pie +``` + +- [ ] **Step 5: Run tests to verify they pass** + +```bash +docker compose -f docker-compose.test.yml run --rm test pytest tests/test_llm_csv_parser.py -v +``` + +Expected: every test passes (including everything from earlier tasks). + +- [ ] **Step 6: Commit** + +```bash +git add app/services/llm_csv_parser.py tests/test_llm_csv_parser.py +git commit -m "csv-parser: add public parse_with_llm with cache hit/miss orchestration" +``` + +--- + +### Task 10: Wire `parse_with_llm` into the route and add `require_paid` + +**Files:** +- Modify: `app/routers/universe.py:192-214` (the `parse_portfolio` route + decorator) +- Test: `tests/test_llm_csv_parser.py` + +- [ ] **Step 1: Write the route-level integration test (direct function call, no HTTP layer)** + +Calling `parse_portfolio` directly with a fake `UploadFile` and a real session sidesteps the pytest-asyncio + `TestClient` event-loop awkwardness. `Depends(require_paid)` is decorator-level and is not invoked when we call the function directly — which is what we want (paid gating is mechanical and trusted to FastAPI; we verify it by inspection in Step 2 of the next stage). + +Append to `tests/test_llm_csv_parser.py`: + +```python +@pytest.mark.asyncio +async def test_parse_portfolio_route_falls_through_to_llm(tmp_path, monkeypatch): + """End-to-end: T212 parser raises CSVImportError, LLM fallback runs, + response shape matches the existing JSON contract.""" + from io import BytesIO + from types import SimpleNamespace + from unittest.mock import AsyncMock + + from fastapi import UploadFile + + _, factory, setup = _build_session_factory(tmp_path) + await setup() + + import app.services.llm_csv_parser as mod + from app.services.openrouter import LogResult + mod.call_llm = AsyncMock(return_value=LogResult( + content='{"ticker_col":"Symbol","qty_col":"Quantity",' + '"cost_col":"Avg Price","currency_col":"Currency",' + '"name_col":"Description",' + '"broker_label":"IBKR Activity Statement"}', + model="deepseek/deepseek-v4-flash", + prompt_tokens=150, completion_tokens=60, cost_usd=0.0003, + )) + + # The route's inline Yahoo-fetch block would otherwise hit the network. + # Patch market.fetch to return a benign placeholder per ticker. + from app.services import market as market_mod + + async def _fake_fetch(client, symbol, label, group, anchor): + return SimpleNamespace( + symbol=symbol, source="test", label=label, + price=None, currency="USD", as_of="2026-05-27", + changes=None, error=None, + ) + monkeypatch.setattr(market_mod, "fetch", _fake_fetch) + + raw = open("tests/fixtures/ibkr_sample.csv", "rb").read() + upload = UploadFile(filename="ibkr.csv", file=BytesIO(raw)) + + from app.routers.universe import parse_portfolio + async with factory() as session: + result = await parse_portfolio(file=upload, session=session) + + assert result["base_currency"] == "GBP" + # At least the AAPL/MSFT/NVDA rows should be present; resolve_slice may + # filter some if there's no InstrumentMap row, which is fine for this + # test — we just want to confirm the LLM fallback ran end-to-end. + assert isinstance(result["positions"], list) + # LLM was called exactly once (cache miss). + assert mod.call_llm.await_count == 1 +``` + +- [ ] **Step 1b: Add a paid-gate inspection test (no HTTP needed)** + +```python +def test_parse_portfolio_route_requires_paid(): + """Static check that the /portfolio/parse route is gated by require_paid.""" + from app.routers.universe import router + from app.services.access import require_paid + + parse_route = next( + r for r in router.routes + if getattr(r, "path", "") == "/portfolio/parse" + ) + dep_callables = [d.dependency for d in parse_route.dependant.dependencies] + assert require_paid in dep_callables, ( + "The /portfolio/parse route must have Depends(require_paid)" + ) +``` + +- [ ] **Step 2: Run the test to verify it fails** + +```bash +docker compose -f docker-compose.test.yml run --rm test pytest tests/test_llm_csv_parser.py::test_parse_portfolio_route_falls_through_to_llm -v +``` + +Expected: FAIL — probably with the T212 parser raising `CSVImportError` because the route does not yet have the LLM fallback wired up. The exact failure message confirms we need the wiring. + +- [ ] **Step 3: Wire `parse_with_llm` into the route** + +In `app/routers/universe.py`, find the `parse_portfolio` definition (search `async def parse_portfolio`). Make these two changes: + +**Change A: Add `require_paid` to the route decorator.** Find the existing line: + +```python +@router.post("/portfolio/parse") +async def parse_portfolio( + file: UploadFile = File(...), + session: AsyncSession = Depends(get_session), +) -> dict: +``` + +Replace with: + +```python +@router.post("/portfolio/parse", dependencies=[Depends(require_paid)]) +async def parse_portfolio( + file: UploadFile = File(...), + session: AsyncSession = Depends(get_session), +) -> dict: +``` + +**Change B: Add the LLM fallback.** Find the existing block: + +```python + try: + pie = parse_t212_csv(raw) + except CSVImportError as e: + raise HTTPException(status_code=400, detail=str(e)) +``` + +Replace with: + +```python + try: + pie = parse_t212_csv(raw) + except CSVImportError: + # Unrecognised format — try the LLM-fallback parser. It hits a + # global format-fingerprint cache first; only the very first + # upload of each broker format pays an LLM call. + from app.services.llm_csv_parser import LLMParseError, parse_with_llm + try: + pie = await parse_with_llm(raw, session) + except LLMParseError as e: + raise HTTPException(status_code=400, detail=str(e)) +``` + +- [ ] **Step 4: Run the integration test to verify it passes** + +```bash +docker compose -f docker-compose.test.yml run --rm test pytest tests/test_llm_csv_parser.py::test_parse_portfolio_route_falls_through_to_llm -v +``` + +Expected: PASS. + +- [ ] **Step 5: Run the full test file + the existing T212 tests to confirm no regression** + +```bash +docker compose -f docker-compose.test.yml run --rm test pytest tests/test_llm_csv_parser.py tests/test_csv_import.py -v +``` + +Expected: all PASS. Confirms the T212 happy path is untouched. + +- [ ] **Step 6: Commit** + +```bash +git add app/routers/universe.py tests/test_llm_csv_parser.py +git commit -m "universe: paid-gate + LLM fallback on /portfolio/parse" +``` + +--- + +### Task 11: UI copy tweaks + +**Files:** +- Modify: `app/templates/settings.html` (search the file for "Trading 212 CSV", "T212 pie CSV") + +- [ ] **Step 1: Update the section heading** + +Find: + +```html +Import portfolio (Trading 212 CSV) +``` + +Replace with: + +```html +Import portfolio (CSV) +``` + +- [ ] **Step 2: Update the drop-zone label** + +Find: + +```html +
Drop a T212 pie CSV here
+``` + +Replace with: + +```html +
Drop your broker's portfolio CSV here
+``` + +- [ ] **Step 3: Update the drop-zone hint** + +Find: + +```html +
or browse · max 1 MB
+``` + +Replace with: + +```html +
or browse · max 1 MB · T212, IBKR and others auto-detected
+``` + +- [ ] **Step 4: Soften the help paragraph** + +If there is a paragraph above or beside the drop-zone that begins with "Export your pie from T212", change its opening phrase from declarative to conditional. For example, if the current line is: + +```html +

Export your pie from T212 as CSV ...

+``` + +Replace with: + +```html +

If you use Trading 212, export your pie as CSV ...

+``` + +(Search the file for `Export your pie from T212` to locate the exact paragraph; preserve any surrounding markup.) + +- [ ] **Step 5: Commit** + +```bash +git add app/templates/settings.html +git commit -m "settings: soften import copy to be broker-agnostic" +``` + +--- + +### Task 12: Final regression run + manual smoke + +**Files:** +- (no code changes — verification only) + +- [ ] **Step 1: Full test suite** + +```bash +docker compose -f docker-compose.test.yml run --rm test pytest tests/ -v +``` + +Expected: every existing test still passes; the new tests in `tests/test_llm_csv_parser.py` all pass. + +- [ ] **Step 2: Apply the migration against the dev DB and confirm the table exists** + +```bash +docker compose exec app alembic upgrade head +docker compose exec app python -c " +import asyncio +from sqlalchemy import inspect +from app.db import get_engine + +async def main(): + eng = get_engine() + async with eng.connect() as conn: + names = await conn.run_sync(lambda c: inspect(c).get_table_names()) + assert 'csv_format_templates' in names, names + print('csv_format_templates table present:', sorted(names)) +asyncio.run(main()) +" +``` + +Expected: prints the table name in the list. NOTE: this touches the prod DB on this host — only run when the user has explicitly approved this deploy. + +- [ ] **Step 3: Restart the app container** + +```bash +docker compose restart app +docker compose logs app --tail 30 | grep -E "(Uvicorn|startup complete|error)" || true +``` + +Expected: clean startup; no tracebacks. + +- [ ] **Step 4: Manual smoke — re-import a T212 CSV** + +Through the browser at `/settings`, drop a known T212 CSV. The dashboard should load as it always has. (Confirms zero regression on the happy path.) + +- [ ] **Step 5: Manual smoke — first IBKR-shaped upload** + +Through the browser at `/settings`, drop `tests/fixtures/ibkr_sample.csv` (or a real IBKR statement). The dashboard should load with the IBKR positions. Then query the DB to confirm the template was cached: + +```bash +docker compose exec app python -c " +import asyncio +from sqlalchemy import select +from app.db import get_session_factory +from app.models import CsvFormatTemplate + +async def main(): + factory = get_session_factory() + async with factory() as s: + rows = (await s.execute(select(CsvFormatTemplate))).scalars().all() + for r in rows: + print(r.fingerprint[:12], r.broker_label, 'use_count=', r.use_count, + 'cost=', r.llm_cost_usd) +asyncio.run(main()) +" +``` + +NOTE: this is a prod DB read; only run with explicit user approval. + +Expected: a single row, with `use_count=1` and a small positive `llm_cost_usd`. + +- [ ] **Step 6: Manual smoke — second IBKR-shaped upload (cache hit)** + +Drop the same fixture again. The dashboard should load identically, and the DB row should now show `use_count=2`. AICall ledger should NOT have a new row for this second upload (only the first paid the LLM cost). + +- [ ] **Step 7: Manual smoke — paid gating** + +In a free-tier browser session, attempt the upload. Expect a 402 response visible in network tools / surfaced as an "upgrade required" message in the UI. + +--- + +## Self-Review + +Spec coverage walkthrough: + +- **Trigger: transparent fallback** → Task 10 (route try/except) +- **Cache for reuse** → Task 9 (cache-hit branch in `parse_with_llm`) +- **Paid-only** → Task 10 Step 3 Change A (adds `Depends(require_paid)`) +- **LLM column-mapping only** → Tasks 6–8 (`_validate_mapping`, `_extract_mapping_via_llm`, no full-CSV extraction anywhere) +- **Global cache** → Tasks 1, 2 (no `user_id` column); Task 9 cache lookup is global +- **`sample_row` is a real first data row, anonymous** → Task 9 Step 4 (the `INSERT` uses `first_data_row`); Task 1 test asserts `user_id` absent +- **No self-heal / no auto-eviction** → Task 9 stale-mapping test asserts row is NOT deleted +- **No code authoring** → out of scope by construction (no code writes anywhere in the plan) +- **`fingerprint` = sha256(normalised headers)** → Task 4 +- **Preamble detection** → Task 5 +- **Drop-zone + heading copy softened** → Task 11 +- **Error handling** (LLM down → 502 in spirit; nonsense → 400; non-numeric qty → 400) → Tasks 6, 8, 10 (the route raises `HTTPException(400)` on `LLMParseError`; LLM provider failure is wrapped in `LLMParseError` by `_extract_mapping_via_llm`, which the route surfaces as 400 — note this is 400 in the implementation, not 502 as the spec implied; if you specifically want 502 for provider-down vs 400 for mapping-bad, split the exception types in Task 8 and branch in Task 10) +- **Migration 0021** → Task 2 +- **Fabricated IBKR fixture** → Task 3 +- **Test pattern matches `test_referral_conversion.py`** → Task 9 Step 1 (copies the factory pattern) + +One spec-vs-plan deviation worth flagging to the engineer: the spec error-handling table says "LLM provider down → 502". This plan returns 400 for all `LLMParseError` cases including provider-down, because the wrapping is uniform. If you want a 502/400 split, the simplest fix is to introduce a sibling `LLMProviderError(LLMParseError)` raised inside `_extract_mapping_via_llm`'s exception path, and branch on it in Task 10. Two-line change. Either behaviour is defensible — flagging so it's a conscious choice.