read.markets/docs/superpowers/plans/2026-05-27-llm-csv-fallback-parser.md
Giorgio Gilestro 08b4dddcdd docs: implementation plan for LLM-fallback CSV parser
12 TDD tasks covering model + migration, fingerprint, dialect detection,
mapping validation/application, LLM extraction (mocked in tests), cache
orchestration, route wiring + paid gate, UI copy tweaks, and final
manual smoke.
2026-05-27 11:41:44 +02:00

58 KiB
Raw Blame History

LLM-fallback CSV Parser Implementation Plan

For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (- [ ]) syntax for tracking.

Goal: Add an LLM-fallback CSV parser so non-T212 portfolio uploads succeed by extracting a column mapping (not data) via the LLM, caching that mapping globally by header fingerprint, and replaying it deterministically on every subsequent upload of the same broker format.

Architecture: New service app/services/llm_csv_parser.py wraps openrouter.call_llm for one-time format discovery, persists results to a new csv_format_templates table, and produces the same ParsedPie shape as parse_t212_csv. The route /api/portfolio/parse in app/routers/universe.py gains a try/except fall-through: T212 first, LLM-cache lookup second, LLM call only on first encounter of a new format. The cache table stores headers + one anonymous data row + the JSON mapping; no user_id is ever recorded against the row.

Tech Stack: FastAPI · SQLAlchemy 2.0 async · Alembic · MariaDB (prod) / aiosqlite (tests) · existing openrouter.call_llm (provider fallback + AICall ledger)

Spec: docs/superpowers/specs/2026-05-27-llm-csv-fallback-parser-design.md


File Structure

Create:

  • app/services/llm_csv_parser.py — the new service; parse_with_llm, LLMParseError, helpers
  • alembic/versions/0021_csv_format_template.py — hand-rolled migration (matches the style of 0020_trial_end.py)
  • tests/test_llm_csv_parser.py — unit + integration tests for the service
  • tests/fixtures/ibkr_sample.csv — fabricated IBKR-shaped CSV (no real holdings)

Modify:

  • app/models.py — add CsvFormatTemplate class
  • app/routers/universe.py — add Depends(require_paid) to /portfolio/parse; wrap parse_t212_csv in a try/except that falls through to parse_with_llm
  • app/templates/settings.html — soften "Trading 212 CSV" copy to broker-agnostic

Reuse without modification:

  • app/services/openrouter.py::call_llm, llm_configured, LogResult
  • app/services/csv_import.py::ParsedPie, ParsedPosition, CSVImportError
  • app/services/access.py::require_paid
  • app/db::Base, utcnow
  • tests/conftest.py env setup (in-memory aiosqlite, CASSANDRA_MOCK=1)
  • Session/engine bootstrap pattern from tests/test_referral_conversion.py::_build_session_factory

Test Conventions

All tests must be runnable inside the test container:

docker compose -f docker-compose.test.yml run --rm test pytest tests/test_llm_csv_parser.py -v

DB-touching tests use the same _build_session_factory(tmp_path) pattern as tests/test_referral_conversion.py — a fresh per-test sqlite file, schema created via Base.metadata.create_all. Do NOT introduce a shared fixture across tests; per-test isolation matches the existing pattern.

Network-touching tests (the LLM) MUST mock app.services.openrouter.call_llm — no real HTTP. Use unittest.mock.AsyncMock.


Task 1: Add CsvFormatTemplate model

Files:

  • Modify: app/models.py (append after the Referral class around line 270+)

  • Test: tests/test_llm_csv_parser.py

  • Step 1: Write the failing test

Create the test file with the import + schema test:

"""Unit + integration tests for the LLM-fallback CSV parser."""
from __future__ import annotations

import pytest


def test_csv_format_template_model_columns():
    """Model exposes every column the spec requires, with correct types."""
    from sqlalchemy import inspect

    from app.models import CsvFormatTemplate

    cols = {c.name: c for c in inspect(CsvFormatTemplate).columns}
    assert "fingerprint" in cols
    assert "headers" in cols
    assert "sample_row" in cols
    assert "mapping" in cols
    assert "preamble_rows" in cols
    assert "delimiter" in cols
    assert "broker_label" in cols
    assert "first_seen_at" in cols
    assert "use_count" in cols
    assert "last_used_at" in cols
    assert "llm_model" in cols
    assert "llm_cost_usd" in cols
    # Crucially, no user attribution.
    assert "user_id" not in cols
    assert "first_seen_user_id" not in cols
    # Fingerprint is the cache key.
    assert cols["fingerprint"].unique is True
    assert cols["fingerprint"].nullable is False
  • Step 2: Run the test to verify it fails
docker compose -f docker-compose.test.yml run --rm test pytest tests/test_llm_csv_parser.py::test_csv_format_template_model_columns -v

Expected: FAIL with ImportError: cannot import name 'CsvFormatTemplate'.

  • Step 3: Add the model in app/models.py

Append after the existing Referral class (around line 270+, before any trailing module helpers):

class CsvFormatTemplate(Base):
    """Cached column-mapping for a single broker CSV format.

    Populated on the first upload of a previously-unseen format via the
    LLM-fallback parser. Subsequent uploads of the same format
    (identified by ``fingerprint``, a sha256 of the normalised header
    row) replay ``mapping`` deterministically with no LLM call.

    The table holds the actual ``headers`` and one anonymous ``sample_row``
    from the originating upload — there is no ``user_id`` column, no link
    back to the uploader. The sample exists so the operator has concrete
    material to look at when hand-writing future native parsers; the
    system never auto-generates or modifies parser code from this data.
    """
    __tablename__ = "csv_format_templates"

    id: Mapped[int] = mapped_column(_PK, primary_key=True, autoincrement=True)
    fingerprint: Mapped[str] = mapped_column(String(64), unique=True, nullable=False)
    headers: Mapped[list] = mapped_column(JSON, nullable=False)
    sample_row: Mapped[list] = mapped_column(JSON, nullable=False)
    mapping: Mapped[dict] = mapped_column(JSON, nullable=False)
    preamble_rows: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
    delimiter: Mapped[str] = mapped_column(String(1), nullable=False, default=",")
    broker_label: Mapped[str | None] = mapped_column(String(128))
    first_seen_at: Mapped[datetime] = mapped_column(
        DateTime(timezone=True), nullable=False, default=utcnow,
    )
    use_count: Mapped[int] = mapped_column(Integer, nullable=False, default=1)
    last_used_at: Mapped[datetime] = mapped_column(
        DateTime(timezone=True), nullable=False, default=utcnow,
    )
    llm_model: Mapped[str | None] = mapped_column(String(64))
    llm_cost_usd: Mapped[float | None] = mapped_column(Float)
  • Step 4: Run the test to verify it passes
docker compose -f docker-compose.test.yml run --rm test pytest tests/test_llm_csv_parser.py::test_csv_format_template_model_columns -v

Expected: PASS.

  • Step 5: Commit
git add app/models.py tests/test_llm_csv_parser.py
git commit -m "csv-parser: add CsvFormatTemplate model"

Task 2: Add Alembic migration 0021

Files:

  • Create: alembic/versions/0021_csv_format_template.py

  • Step 1: Write the migration

Create alembic/versions/0021_csv_format_template.py:

"""csv format templates table — LLM-fallback parser cache.

Revision ID: 0021
Revises: 0020
Create Date: 2026-05-27
"""
from typing import Sequence, Union

import sqlalchemy as sa
from alembic import op


revision: str = "0021"
down_revision: Union[str, None] = "0020"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
    op.create_table(
        "csv_format_templates",
        sa.Column("id", sa.BigInteger(), primary_key=True, autoincrement=True),
        sa.Column("fingerprint", sa.String(length=64), nullable=False),
        sa.Column("headers", sa.JSON(), nullable=False),
        sa.Column("sample_row", sa.JSON(), nullable=False),
        sa.Column("mapping", sa.JSON(), nullable=False),
        sa.Column("preamble_rows", sa.Integer(), nullable=False, server_default="0"),
        sa.Column("delimiter", sa.String(length=1), nullable=False, server_default=","),
        sa.Column("broker_label", sa.String(length=128), nullable=True),
        sa.Column("first_seen_at", sa.DateTime(timezone=True), nullable=False),
        sa.Column("use_count", sa.Integer(), nullable=False, server_default="1"),
        sa.Column("last_used_at", sa.DateTime(timezone=True), nullable=False),
        sa.Column("llm_model", sa.String(length=64), nullable=True),
        sa.Column("llm_cost_usd", sa.Float(), nullable=True),
        sa.UniqueConstraint("fingerprint", name="uq_csv_format_templates_fingerprint"),
    )


def downgrade() -> None:
    op.drop_table("csv_format_templates")
  • Step 2: Verify the migration applies cleanly in the test container
docker compose -f docker-compose.test.yml run --rm test alembic upgrade head
docker compose -f docker-compose.test.yml run --rm test alembic downgrade -1
docker compose -f docker-compose.test.yml run --rm test alembic upgrade head

Expected: each command exits 0; the second leaves us at 0020, the third returns us to 0021.

If the test container doesn't have an alembic entrypoint, run the migration check via Python instead:

docker compose -f docker-compose.test.yml run --rm test python -c "
from alembic.config import Config
from alembic import command
cfg = Config('alembic.ini')
command.upgrade(cfg, 'head')
command.downgrade(cfg, '-1')
command.upgrade(cfg, 'head')
print('OK')
"

Expected: prints OK.

  • Step 3: Commit
git add alembic/versions/0021_csv_format_template.py
git commit -m "alembic: add 0021 csv_format_templates"

Task 3: Create the fabricated IBKR test fixture

Files:

  • Create: tests/fixtures/ibkr_sample.csv

  • Step 1: Write the fixture file

This is a fabricated IBKR-style activity statement — column names and shape are realistic, but the values are made up (no real holdings, no real account):

Statement,Header,Field Name,Field Value
Statement,Data,BrokerName,Interactive Brokers LLC
Statement,Data,Title,Activity Statement
Statement,Data,Period,"January 1, 2026 - January 31, 2026"
Symbol,Quantity,Avg Price,Currency,Description
AAPL,100,150.25,USD,Apple Inc
MSFT,50,310.00,USD,Microsoft Corp
NVDA,40,425.50,USD,NVIDIA Corp
VOD.L,2000,0.74,GBP,Vodafone Group Plc
ASML.AS,10,650.00,EUR,ASML Holding NV

Note: lines 1-4 are a preamble (IBKR's exports often have multi-line headers). The actual data table starts at line 5 (Symbol,Quantity,Avg Price,Currency,Description).

  • Step 2: Commit
git add tests/fixtures/ibkr_sample.csv
git commit -m "tests: add fabricated IBKR fixture for LLM parser"

Task 4: _fingerprint helper

Files:

  • Create: app/services/llm_csv_parser.py (initial scaffold)

  • Test: tests/test_llm_csv_parser.py

  • Step 1: Write failing test

Append to tests/test_llm_csv_parser.py:

def test_fingerprint_stable_across_case_and_whitespace():
    from app.services.llm_csv_parser import _fingerprint

    a = _fingerprint(["Symbol", "Quantity", "Avg Price"])
    b = _fingerprint(["symbol", "quantity", "avg price"])
    c = _fingerprint(["  SYMBOL ", "Quantity", " AVG PRICE"])
    assert a == b == c


def test_fingerprint_differs_for_different_columns():
    from app.services.llm_csv_parser import _fingerprint

    a = _fingerprint(["Symbol", "Quantity"])
    b = _fingerprint(["Symbol", "Quantity", "Avg Price"])
    assert a != b


def test_fingerprint_is_sha256_hex_64_chars():
    from app.services.llm_csv_parser import _fingerprint

    f = _fingerprint(["Symbol", "Quantity"])
    assert len(f) == 64
    assert all(c in "0123456789abcdef" for c in f)
  • Step 2: Run tests to verify they fail
docker compose -f docker-compose.test.yml run --rm test pytest tests/test_llm_csv_parser.py -k fingerprint -v

Expected: FAIL with ImportError.

  • Step 3: Create the service scaffold + _fingerprint

Create app/services/llm_csv_parser.py:

"""LLM-fallback CSV parser.

When the deterministic Trading 212 parser (``csv_import.parse_t212_csv``)
raises ``CSVImportError`` on an unrecognised format, this service kicks
in:

1. Detect the CSV dialect (delimiter, preamble offset).
2. Compute a fingerprint of the normalised header row.
3. Look up ``CsvFormatTemplate`` by fingerprint. On hit, replay the
   cached column-mapping deterministically. On miss, ask the LLM for a
   mapping, validate it, persist a new template, and apply it.

The LLM sees only headers + the first 3-5 sample rows. It returns a
column-mapping JSON, never transcribed numbers. The system never
auto-promotes a learned format to a hand-written parser — the operator
does that by inspecting collected ``sample_row`` values.
"""
from __future__ import annotations

import hashlib

from app.services.csv_import import CSVImportError


class LLMParseError(CSVImportError):
    """Raised when the LLM call fails or returns an unusable mapping.

    Inherits from ``CSVImportError`` so route-level error handling can
    treat both deterministic and LLM-path failures uniformly when
    desired."""


def _fingerprint(headers: list[str]) -> str:
    """Stable hash of the header row.

    Lowercases each header, strips surrounding whitespace, joins with
    ``|`` (a character extremely unlikely to appear inside a real
    header), and returns the sha256 hex digest. Whitespace/case drift
    in the same broker's export does not change the fingerprint;
    adding or removing a column does."""
    normalised = "|".join(h.strip().lower() for h in headers)
    return hashlib.sha256(normalised.encode("utf-8")).hexdigest()
  • Step 4: Run tests to verify they pass
docker compose -f docker-compose.test.yml run --rm test pytest tests/test_llm_csv_parser.py -k fingerprint -v

Expected: 3 PASS.

  • Step 5: Commit
git add app/services/llm_csv_parser.py tests/test_llm_csv_parser.py
git commit -m "csv-parser: add _fingerprint helper"

Task 5: _detect_dialect helper

Files:

  • Modify: app/services/llm_csv_parser.py

  • Test: tests/test_llm_csv_parser.py

  • Step 1: Write failing tests

Append to tests/test_llm_csv_parser.py:

def test_detect_dialect_no_preamble_comma():
    from app.services.llm_csv_parser import _detect_dialect

    raw = b"Symbol,Quantity,Avg Price\nAAPL,100,150.25\nMSFT,50,310.00\n"
    delimiter, preamble = _detect_dialect(raw)
    assert delimiter == ","
    assert preamble == 0


def test_detect_dialect_with_preamble():
    from app.services.llm_csv_parser import _detect_dialect

    raw = (
        b"Statement,Header,Field Name,Field Value\n"
        b"Statement,Data,BrokerName,Interactive Brokers LLC\n"
        b"Statement,Data,Title,Activity Statement\n"
        b"Statement,Data,Period,\"January 1, 2026 - January 31, 2026\"\n"
        b"Symbol,Quantity,Avg Price,Currency,Description\n"
        b"AAPL,100,150.25,USD,Apple Inc\n"
    )
    delimiter, preamble = _detect_dialect(raw)
    assert delimiter == ","
    # The data-row header line is the FIFTH line (index 4); preamble = 4.
    assert preamble == 4


def test_detect_dialect_tab_delimited():
    from app.services.llm_csv_parser import _detect_dialect

    raw = b"Symbol\tQuantity\tAvg Price\nAAPL\t100\t150.25\n"
    delimiter, preamble = _detect_dialect(raw)
    assert delimiter == "\t"
    assert preamble == 0


def test_detect_dialect_empty_raises():
    from app.services.llm_csv_parser import LLMParseError, _detect_dialect

    with pytest.raises(LLMParseError):
        _detect_dialect(b"")
  • Step 2: Run tests to verify they fail
docker compose -f docker-compose.test.yml run --rm test pytest tests/test_llm_csv_parser.py -k detect_dialect -v

Expected: 4 FAIL with ImportError for _detect_dialect.

  • Step 3: Implement _detect_dialect

Append to app/services/llm_csv_parser.py:

import csv
import io


# Cap for how many leading lines we'll scan looking for the header row.
# Real broker preambles are typically 1-10 lines.
_MAX_PREAMBLE_SCAN = 30


def _decode_raw(raw: bytes) -> str:
    """Best-effort UTF-8 decode with BOM strip and lossy fallback."""
    text = raw.decode("utf-8-sig", errors="replace")
    return text


def _detect_dialect(raw: bytes) -> tuple[str, int]:
    """Detect (delimiter, preamble_rows).

    ``preamble_rows`` is the number of lines BEFORE the row we identify
    as the actual table header. The header row is the first line whose
    tokens are all non-numeric (so "Symbol,Quantity" is a header but
    "AAPL,100" is data). Falls back to assuming the first line is the
    header if no clear non-numeric line is found within the scan
    window.

    Raises ``LLMParseError`` on empty input."""
    if not raw or not raw.strip():
        raise LLMParseError("empty CSV")

    text = _decode_raw(raw)
    # csv.Sniffer is happy with ~4KB. Anything more and it gets slow.
    sample = text[:4096]
    try:
        dialect = csv.Sniffer().sniff(sample, delimiters=",;\t|")
        delimiter = dialect.delimiter
    except csv.Error:
        # Most broker exports are comma-delimited; default rather than
        # error out — the caller will still validate column shapes.
        delimiter = ","

    reader = csv.reader(io.StringIO(text), delimiter=delimiter)
    preamble = 0
    for i, row in enumerate(reader):
        if i >= _MAX_PREAMBLE_SCAN:
            break
        if not row:
            continue
        # Skip rows that are obviously preamble: <2 tokens, or any token
        # is purely numeric. The header row should have multiple
        # alphabetical tokens.
        non_empty = [c.strip() for c in row if c.strip()]
        if len(non_empty) < 2:
            continue
        all_alpha = all(not _looks_numeric(c) for c in non_empty)
        if all_alpha:
            preamble = i
            return delimiter, preamble
    return delimiter, 0


def _looks_numeric(value: str) -> bool:
    """True if ``value`` parses as a number after stripping common
    decoration (thousands separators, currency symbols, percent signs)."""
    s = value.strip().replace(",", "").replace("$", "").replace("€", "")
    s = s.replace("£", "").replace("%", "").lstrip("-+")
    if not s:
        return False
    try:
        float(s)
        return True
    except ValueError:
        return False
  • Step 4: Run tests to verify they pass
docker compose -f docker-compose.test.yml run --rm test pytest tests/test_llm_csv_parser.py -k detect_dialect -v

Expected: 4 PASS.

  • Step 5: Commit
git add app/services/llm_csv_parser.py tests/test_llm_csv_parser.py
git commit -m "csv-parser: add _detect_dialect helper"

Task 6: _validate_mapping helper

Files:

  • Modify: app/services/llm_csv_parser.py

  • Test: tests/test_llm_csv_parser.py

  • Step 1: Write failing tests

Append to tests/test_llm_csv_parser.py:

def test_validate_mapping_accepts_well_formed():
    from app.services.llm_csv_parser import _validate_mapping

    headers = ["Symbol", "Quantity", "Avg Price", "Currency"]
    first_row = ["AAPL", "100", "150.25", "USD"]
    mapping = {
        "ticker_col": "Symbol",
        "qty_col": "Quantity",
        "cost_col": "Avg Price",
        "currency_col": "Currency",
        "name_col": None,
    }
    _validate_mapping(mapping, headers, first_row)  # no raise


def test_validate_mapping_missing_ticker_raises():
    from app.services.llm_csv_parser import LLMParseError, _validate_mapping

    headers = ["Symbol", "Quantity"]
    first_row = ["AAPL", "100"]
    mapping = {"ticker_col": None, "qty_col": "Quantity"}
    with pytest.raises(LLMParseError, match="ticker"):
        _validate_mapping(mapping, headers, first_row)


def test_validate_mapping_missing_qty_raises():
    from app.services.llm_csv_parser import LLMParseError, _validate_mapping

    headers = ["Symbol", "Quantity"]
    first_row = ["AAPL", "100"]
    mapping = {"ticker_col": "Symbol", "qty_col": None}
    with pytest.raises(LLMParseError, match="qty"):
        _validate_mapping(mapping, headers, first_row)


def test_validate_mapping_unknown_column_raises():
    from app.services.llm_csv_parser import LLMParseError, _validate_mapping

    headers = ["Symbol", "Quantity"]
    first_row = ["AAPL", "100"]
    mapping = {"ticker_col": "Symbol", "qty_col": "NotARealColumn"}
    with pytest.raises(LLMParseError, match="NotARealColumn"):
        _validate_mapping(mapping, headers, first_row)


def test_validate_mapping_non_numeric_qty_raises():
    from app.services.llm_csv_parser import LLMParseError, _validate_mapping

    headers = ["Symbol", "Description"]
    first_row = ["AAPL", "Apple Inc"]
    # Mapping says qty is "Description", but "Apple Inc" can't parse as a number.
    mapping = {"ticker_col": "Symbol", "qty_col": "Description"}
    with pytest.raises(LLMParseError, match="numeric"):
        _validate_mapping(mapping, headers, first_row)
  • Step 2: Run tests to verify they fail
docker compose -f docker-compose.test.yml run --rm test pytest tests/test_llm_csv_parser.py -k validate_mapping -v

Expected: 5 FAIL with ImportError.

  • Step 3: Implement _validate_mapping

Append to app/services/llm_csv_parser.py:

_REQUIRED_MAPPING_KEYS = ("ticker_col", "qty_col")
_OPTIONAL_MAPPING_KEYS = ("name_col", "cost_col", "currency_col")


def _validate_mapping(
    mapping: dict, headers: list[str], first_row: list[str],
) -> None:
    """Verify the LLM-returned mapping is sane.

    - ``ticker_col`` and ``qty_col`` are required (non-null).
    - Every named column must exist in ``headers``.
    - The value at ``qty_col`` on ``first_row`` must parse as a number.
    - The value at ``cost_col`` on ``first_row`` (if present) must parse
      as a number.

    Raises ``LLMParseError`` on any failure, with a message that names
    the specific problem (helpful for log forensics and for the
    user-facing 400)."""
    for key in _REQUIRED_MAPPING_KEYS:
        if not mapping.get(key):
            raise LLMParseError(
                f"LLM mapping missing required column: {key.replace('_col', '')}"
            )

    headers_set = set(headers)
    for key in _REQUIRED_MAPPING_KEYS + _OPTIONAL_MAPPING_KEYS:
        col = mapping.get(key)
        if col is not None and col not in headers_set:
            raise LLMParseError(
                f"LLM mapping references unknown column: {col!r}"
            )

    # Numeric sanity check: qty and (if present) cost must parse on row 1.
    header_index = {h: i for i, h in enumerate(headers)}
    qty_col = mapping["qty_col"]
    qty_value = first_row[header_index[qty_col]] if header_index[qty_col] < len(first_row) else ""
    if not _looks_numeric(qty_value):
        raise LLMParseError(
            f"LLM mapping qty_col={qty_col!r} maps to non-numeric value {qty_value!r}"
        )

    cost_col = mapping.get("cost_col")
    if cost_col is not None:
        cost_value = first_row[header_index[cost_col]] if header_index[cost_col] < len(first_row) else ""
        if cost_value and not _looks_numeric(cost_value):
            raise LLMParseError(
                f"LLM mapping cost_col={cost_col!r} maps to non-numeric value {cost_value!r}"
            )
  • Step 4: Run tests to verify they pass
docker compose -f docker-compose.test.yml run --rm test pytest tests/test_llm_csv_parser.py -k validate_mapping -v

Expected: 5 PASS.

  • Step 5: Commit
git add app/services/llm_csv_parser.py tests/test_llm_csv_parser.py
git commit -m "csv-parser: add _validate_mapping helper"

Task 7: _apply_mapping helper

Files:

  • Modify: app/services/llm_csv_parser.py

  • Test: tests/test_llm_csv_parser.py

  • Step 1: Write failing tests

Append to tests/test_llm_csv_parser.py:

def test_apply_mapping_builds_parsed_pie():
    from app.services.csv_import import ParsedPie, ParsedPosition
    from app.services.llm_csv_parser import _apply_mapping

    headers = ["Symbol", "Quantity", "Avg Price", "Currency", "Description"]
    data_rows = [
        ["AAPL", "100", "150.25", "USD", "Apple Inc"],
        ["MSFT", "50", "310.00", "USD", "Microsoft Corp"],
    ]
    mapping = {
        "ticker_col": "Symbol",
        "qty_col": "Quantity",
        "cost_col": "Avg Price",
        "currency_col": "Currency",
        "name_col": "Description",
    }

    pie = _apply_mapping(headers, data_rows, mapping)

    assert isinstance(pie, ParsedPie)
    assert len(pie.positions) == 2
    p0 = pie.positions[0]
    assert isinstance(p0, ParsedPosition)
    assert p0.slice == "AAPL"
    assert p0.name == "Apple Inc"
    assert p0.quantity == 100.0
    assert p0.invested_value == pytest.approx(15025.0)
    # invested = qty * avg_cost = 100 * 150.25 = 15025
    assert pie.invested == pytest.approx(15025.0 + 50 * 310.00)


def test_apply_mapping_handles_missing_optional_columns():
    from app.services.llm_csv_parser import _apply_mapping

    headers = ["Symbol", "Quantity"]
    data_rows = [["AAPL", "100"]]
    mapping = {
        "ticker_col": "Symbol",
        "qty_col": "Quantity",
        "cost_col": None,
        "currency_col": None,
        "name_col": None,
    }

    pie = _apply_mapping(headers, data_rows, mapping)
    p = pie.positions[0]
    assert p.slice == "AAPL"
    assert p.quantity == 100.0
    assert p.invested_value is None
    assert p.name == "AAPL"  # falls back to ticker when name_col absent


def test_apply_mapping_skips_blank_and_unparseable_rows():
    from app.services.llm_csv_parser import _apply_mapping

    headers = ["Symbol", "Quantity"]
    data_rows = [
        ["AAPL", "100"],
        ["", ""],                   # blank
        ["MSFT", "not-a-number"],   # bad qty
        ["NVDA", "40"],
    ]
    mapping = {"ticker_col": "Symbol", "qty_col": "Quantity"}

    pie = _apply_mapping(headers, data_rows, mapping)
    assert [p.slice for p in pie.positions] == ["AAPL", "NVDA"]
  • Step 2: Run tests to verify they fail
docker compose -f docker-compose.test.yml run --rm test pytest tests/test_llm_csv_parser.py -k apply_mapping -v

Expected: 3 FAIL with ImportError.

  • Step 3: Implement _apply_mapping

Append to app/services/llm_csv_parser.py:

from app.services.csv_import import ParsedPie, ParsedPosition


def _parse_number(value: str) -> float | None:
    """Permissive float parse: strips thousands separators, currency
    symbols, percent signs. Returns None on failure (so callers can
    decide whether to skip or raise)."""
    s = value.strip().replace(",", "").replace("$", "")
    s = s.replace("€", "").replace("£", "").replace("%", "")
    if not s:
        return None
    try:
        return float(s)
    except ValueError:
        return None


def _apply_mapping(
    headers: list[str],
    data_rows: list[list[str]],
    mapping: dict,
) -> ParsedPie:
    """Iterate ``data_rows`` and produce a ``ParsedPie``.

    Rows that lack a parseable quantity (blank, non-numeric, zero) are
    silently skipped — broker exports often include summary or
    placeholder rows after the position list. ``name_col`` falls back
    to the ticker symbol when null."""
    idx = {h: i for i, h in enumerate(headers)}
    ticker_col = mapping["ticker_col"]
    qty_col = mapping["qty_col"]
    name_col = mapping.get("name_col")
    cost_col = mapping.get("cost_col")

    positions: list[ParsedPosition] = []
    invested_total = 0.0
    invested_seen = False

    for row in data_rows:
        if not any(c.strip() for c in row):
            continue
        ticker_raw = row[idx[ticker_col]] if idx[ticker_col] < len(row) else ""
        ticker = ticker_raw.strip().upper()
        if not ticker:
            continue
        qty_raw = row[idx[qty_col]] if idx[qty_col] < len(row) else ""
        qty = _parse_number(qty_raw)
        if qty is None or qty <= 0:
            continue
        avg_cost: float | None = None
        if cost_col is not None and idx[cost_col] < len(row):
            avg_cost = _parse_number(row[idx[cost_col]])
        invested_value: float | None = None
        if avg_cost is not None:
            invested_value = qty * avg_cost
            invested_total += invested_value
            invested_seen = True
        name = ""
        if name_col is not None and idx[name_col] < len(row):
            name = row[idx[name_col]].strip()
        if not name:
            name = ticker
        positions.append(ParsedPosition(
            slice=ticker,
            name=name,
            invested_value=invested_value,
            current_value=None,
            result=None,
            quantity=qty,
        ))

    return ParsedPie(
        name=None,
        positions=tuple(positions),
        invested=(invested_total if invested_seen else None),
        value=None,
        result=None,
    )
  • Step 4: Run tests to verify they pass
docker compose -f docker-compose.test.yml run --rm test pytest tests/test_llm_csv_parser.py -k apply_mapping -v

Expected: 3 PASS.

  • Step 5: Commit
git add app/services/llm_csv_parser.py tests/test_llm_csv_parser.py
git commit -m "csv-parser: add _apply_mapping helper"

Task 8: _extract_mapping_via_llm helper

Files:

  • Modify: app/services/llm_csv_parser.py

  • Test: tests/test_llm_csv_parser.py

  • Step 1: Write failing tests

Append to tests/test_llm_csv_parser.py:

@pytest.mark.asyncio
async def test_extract_mapping_via_llm_parses_valid_json():
    from unittest.mock import AsyncMock, MagicMock
    from app.services.llm_csv_parser import _extract_mapping_via_llm
    from app.services.openrouter import LogResult

    fake_result = LogResult(
        content='{"ticker_col": "Symbol", "qty_col": "Quantity", '
                '"cost_col": "Avg Price", "currency_col": "Currency", '
                '"name_col": null, "broker_label": "IBKR Activity Statement"}',
        model="deepseek/deepseek-v4-flash",
        prompt_tokens=100,
        completion_tokens=50,
        cost_usd=0.0001,
    )
    fake_client = MagicMock()
    fake_call_llm = AsyncMock(return_value=fake_result)

    import app.services.llm_csv_parser as mod
    mod.call_llm = fake_call_llm  # monkeypatch

    headers = ["Symbol", "Quantity", "Avg Price", "Currency"]
    samples = [["AAPL", "100", "150.25", "USD"]]
    mapping, log = await _extract_mapping_via_llm(fake_client, headers, samples)

    assert mapping["ticker_col"] == "Symbol"
    assert mapping["qty_col"] == "Quantity"
    assert mapping["broker_label"] == "IBKR Activity Statement"
    assert log.model == "deepseek/deepseek-v4-flash"
    fake_call_llm.assert_awaited_once()


@pytest.mark.asyncio
async def test_extract_mapping_via_llm_malformed_json_raises():
    from unittest.mock import AsyncMock, MagicMock
    from app.services.llm_csv_parser import LLMParseError, _extract_mapping_via_llm
    from app.services.openrouter import LogResult

    fake_result = LogResult(
        content="Sure thing — here is the mapping!  ticker=Symbol",
        model="deepseek/deepseek-v4-flash",
        prompt_tokens=10,
        completion_tokens=20,
        cost_usd=0.00005,
    )
    fake_client = MagicMock()
    fake_call_llm = AsyncMock(return_value=fake_result)

    import app.services.llm_csv_parser as mod
    mod.call_llm = fake_call_llm

    with pytest.raises(LLMParseError, match="JSON"):
        await _extract_mapping_via_llm(fake_client, ["Symbol"], [["AAPL"]])


@pytest.mark.asyncio
async def test_extract_mapping_via_llm_provider_failure_wraps():
    from unittest.mock import AsyncMock, MagicMock
    from app.services.llm_csv_parser import LLMParseError, _extract_mapping_via_llm

    fake_client = MagicMock()
    fake_call_llm = AsyncMock(side_effect=RuntimeError("provider down"))

    import app.services.llm_csv_parser as mod
    mod.call_llm = fake_call_llm

    with pytest.raises(LLMParseError, match="provider"):
        await _extract_mapping_via_llm(fake_client, ["Symbol"], [["AAPL"]])

NOTE: If pytest-asyncio is not installed in the test container, the engineer must add asyncio_mode = "auto" to pytest.ini or use the existing decorator pattern from tests/test_referral_conversion.py. Check that file's top for @pytest.mark.asyncio usage and replicate it.

  • Step 2: Run tests to verify they fail
docker compose -f docker-compose.test.yml run --rm test pytest tests/test_llm_csv_parser.py -k extract_mapping_via_llm -v

Expected: 3 FAIL with ImportError for _extract_mapping_via_llm.

  • Step 3: Implement _extract_mapping_via_llm

Append to app/services/llm_csv_parser.py:

import json

import httpx

from app.services.openrouter import LogResult, call_llm


# Hard caps on what we send to the LLM, so prompt cost stays bounded.
_LLM_SAMPLES = 5
_LLM_MAX_TOKENS = 400


_SYSTEM_PROMPT = """\
You are an expert at recognising broker portfolio CSV formats.

You will be given the header row and 3-5 sample data rows from a CSV.
Identify which column contains each field. Return ONLY a single JSON
object, no prose, no markdown fences.

Schema (use the EXACT header string from the input; use null if no
column is a good match):

{
  "ticker_col":   "<header name or null>",
  "qty_col":      "<header name or null>",
  "name_col":     "<header name or null>",
  "cost_col":     "<header name or null>",   // average price per share
  "currency_col": "<header name or null>",
  "broker_label": "<short identifier like 'IBKR Activity Statement' or null>"
}

Rules:
- ticker_col and qty_col are required. If either is missing, return all nulls.
- Use the EXACT header string as it appears in the input — do not paraphrase.
- Output JSON ONLY. No prose, no code fences.
"""


def _build_user_prompt(headers: list[str], samples: list[list[str]]) -> str:
    lines = ["headers: " + json.dumps(headers)]
    lines.append("samples:")
    for s in samples[:_LLM_SAMPLES]:
        lines.append("  " + ",".join(s))
    return "\n".join(lines)


async def _extract_mapping_via_llm(
    client: httpx.AsyncClient,
    headers: list[str],
    samples: list[list[str]],
) -> tuple[dict, LogResult]:
    """Single LLM call returning ``(mapping_dict, LogResult)``.

    The LLM is asked for a strict JSON object (no markdown). We attempt
    to parse the returned content; ``LLMParseError`` wraps any failure
    in a way callers can surface to the user."""
    messages = [
        {"role": "system", "content": _SYSTEM_PROMPT},
        {"role": "user", "content": _build_user_prompt(headers, samples)},
    ]
    try:
        result = await call_llm(client, messages, max_tokens=_LLM_MAX_TOKENS)
    except Exception as e:
        raise LLMParseError(f"LLM provider failed: {e}") from e

    content = (result.content or "").strip()
    # Strip code fences if the model added them despite instructions.
    if content.startswith("```"):
        content = content.strip("`")
        # Drop optional 'json' language tag.
        if content.lstrip().lower().startswith("json"):
            content = content.lstrip()[4:]
        content = content.strip()
    try:
        mapping = json.loads(content)
    except json.JSONDecodeError as e:
        raise LLMParseError(f"LLM did not return valid JSON: {e}") from e
    if not isinstance(mapping, dict):
        raise LLMParseError("LLM JSON was not an object")
    return mapping, result
  • Step 4: Run tests to verify they pass
docker compose -f docker-compose.test.yml run --rm test pytest tests/test_llm_csv_parser.py -k extract_mapping_via_llm -v

Expected: 3 PASS.

  • Step 5: Commit
git add app/services/llm_csv_parser.py tests/test_llm_csv_parser.py
git commit -m "csv-parser: add _extract_mapping_via_llm with provider-failure wrapping"

Task 9: Public parse_with_llm orchestration

Files:

  • Modify: app/services/llm_csv_parser.py

  • Test: tests/test_llm_csv_parser.py

  • Step 1: Write the per-test session factory helper (copied from test_referral_conversion.py)

Append to the top of tests/test_llm_csv_parser.py (after the existing imports):

def _build_session_factory(tmp_path):
    """Spin up a fresh in-memory schema and return (engine, factory).
    Matches the pattern used in tests/test_referral_conversion.py."""
    from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine

    from app import db as db_mod
    from app.db import Base
    import app.models  # noqa: F401 — registers models on Base.metadata

    engine = create_async_engine(f"sqlite+aiosqlite:///{tmp_path}/csv.db")
    factory = async_sessionmaker(engine, expire_on_commit=False)
    db_mod._engine = engine
    db_mod._session_factory = factory

    async def _setup():
        async with engine.begin() as conn:
            await conn.run_sync(Base.metadata.create_all)

    return engine, factory, _setup

Tests will await setup() themselves before using the factory.

  • Step 2: Write failing tests for cache miss + cache hit

Append to tests/test_llm_csv_parser.py:

@pytest.mark.asyncio
async def test_parse_with_llm_cache_miss_inserts_template(tmp_path):
    from unittest.mock import AsyncMock
    from sqlalchemy import select

    from app.models import CsvFormatTemplate
    from app.services.llm_csv_parser import parse_with_llm
    from app.services.openrouter import LogResult

    _, factory, setup = _build_session_factory(tmp_path)
    await setup()

    raw = (
        b"Symbol,Quantity,Avg Price,Currency\n"
        b"AAPL,100,150.25,USD\n"
        b"MSFT,50,310.00,USD\n"
    )

    import app.services.llm_csv_parser as mod
    mod.call_llm = AsyncMock(return_value=LogResult(
        content='{"ticker_col":"Symbol","qty_col":"Quantity",'
                '"cost_col":"Avg Price","currency_col":"Currency",'
                '"name_col":null,"broker_label":"Generic broker"}',
        model="deepseek/deepseek-v4-flash",
        prompt_tokens=120, completion_tokens=40, cost_usd=0.0002,
    ))

    async with factory() as session:
        pie = await parse_with_llm(raw, session)

    assert len(pie.positions) == 2
    assert pie.positions[0].slice == "AAPL"

    async with factory() as session:
        rows = (await session.execute(select(CsvFormatTemplate))).scalars().all()
    assert len(rows) == 1
    tmpl = rows[0]
    assert tmpl.headers == ["Symbol", "Quantity", "Avg Price", "Currency"]
    assert tmpl.sample_row == ["AAPL", "100", "150.25", "USD"]
    assert tmpl.mapping["ticker_col"] == "Symbol"
    assert tmpl.broker_label == "Generic broker"
    assert tmpl.use_count == 1
    assert tmpl.llm_cost_usd == pytest.approx(0.0002)
    # The crucial PII guarantee:
    assert not hasattr(tmpl, "user_id"), "sample row must not be linked to a user"


@pytest.mark.asyncio
async def test_parse_with_llm_cache_hit_skips_llm(tmp_path):
    from unittest.mock import AsyncMock
    from sqlalchemy import select

    from app.db import utcnow
    from app.models import CsvFormatTemplate
    from app.services.llm_csv_parser import _fingerprint, parse_with_llm

    _, factory, setup = _build_session_factory(tmp_path)
    await setup()

    headers = ["Symbol", "Quantity", "Avg Price", "Currency"]
    fp = _fingerprint(headers)

    # Pre-populate a cache hit row.
    async with factory() as session:
        session.add(CsvFormatTemplate(
            fingerprint=fp,
            headers=headers,
            sample_row=["AAPL", "100", "150.25", "USD"],
            mapping={
                "ticker_col": "Symbol", "qty_col": "Quantity",
                "cost_col": "Avg Price", "currency_col": "Currency",
                "name_col": None,
            },
            preamble_rows=0,
            delimiter=",",
            broker_label="Cached broker",
            first_seen_at=utcnow(),
            last_used_at=utcnow(),
            use_count=1,
            llm_model="seed",
            llm_cost_usd=0.0,
        ))
        await session.commit()

    raw = (
        b"Symbol,Quantity,Avg Price,Currency\n"
        b"NVDA,40,425.50,USD\n"
    )

    import app.services.llm_csv_parser as mod
    mod.call_llm = AsyncMock(side_effect=AssertionError("call_llm must NOT be called on cache hit"))

    async with factory() as session:
        pie = await parse_with_llm(raw, session)

    assert pie.positions[0].slice == "NVDA"

    async with factory() as session:
        rows = (await session.execute(select(CsvFormatTemplate))).scalars().all()
    assert len(rows) == 1
    assert rows[0].use_count == 2


@pytest.mark.asyncio
async def test_parse_with_llm_stale_mapping_raises_but_does_not_evict(tmp_path):
    from unittest.mock import AsyncMock
    from sqlalchemy import select

    from app.db import utcnow
    from app.models import CsvFormatTemplate
    from app.services.llm_csv_parser import LLMParseError, _fingerprint, parse_with_llm

    _, factory, setup = _build_session_factory(tmp_path)
    await setup()

    headers = ["Symbol", "Quantity"]
    fp = _fingerprint(headers)
    # Cached mapping says qty is in column "Symbol" — clearly wrong; will
    # never produce a parseable row.
    async with factory() as session:
        session.add(CsvFormatTemplate(
            fingerprint=fp, headers=headers,
            sample_row=["AAPL", "100"],
            mapping={"ticker_col": "Symbol", "qty_col": "Symbol"},
            preamble_rows=0, delimiter=",", broker_label=None,
            first_seen_at=utcnow(), last_used_at=utcnow(), use_count=1,
            llm_model="seed", llm_cost_usd=0.0,
        ))
        await session.commit()

    raw = b"Symbol,Quantity\nAAPL,100\nMSFT,50\n"

    import app.services.llm_csv_parser as mod
    mod.call_llm = AsyncMock(side_effect=AssertionError("must not be called"))

    async with factory() as session:
        with pytest.raises(LLMParseError):
            await parse_with_llm(raw, session)

    # Stale template must NOT have been auto-deleted (operator owns eviction).
    async with factory() as session:
        rows = (await session.execute(select(CsvFormatTemplate))).scalars().all()
    assert len(rows) == 1
  • Step 3: Run tests to verify they fail
docker compose -f docker-compose.test.yml run --rm test pytest tests/test_llm_csv_parser.py -k parse_with_llm -v

Expected: 3 FAIL with ImportError for parse_with_llm.

  • Step 4: Implement parse_with_llm

Append to app/services/llm_csv_parser.py:

from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession

from app.db import utcnow
from app.logging import get_logger
from app.models import CsvFormatTemplate


log = get_logger("llm_csv_parser")


# Hard cap shared with /api/portfolio/parse — bytes-level, mirrors T212 path.
_MAX_CSV_BYTES = 1_048_576


async def parse_with_llm(raw: bytes, session: AsyncSession) -> ParsedPie:
    """Cache-first LLM-fallback CSV parse.

    On cache hit, applies the stored mapping deterministically and
    increments ``use_count``. On cache miss, calls the LLM, validates
    the returned mapping against the first data row, and persists a
    new ``CsvFormatTemplate``. Raises ``LLMParseError`` on any
    failure; the caller (route layer) maps that to a 400."""
    if len(raw) > _MAX_CSV_BYTES:
        raise LLMParseError("CSV too large (1 MB max)")
    if not raw or not raw.strip():
        raise LLMParseError("empty CSV")

    delimiter, preamble_rows = _detect_dialect(raw)
    text = _decode_raw(raw)

    reader = csv.reader(io.StringIO(text), delimiter=delimiter)
    rows = list(reader)
    if preamble_rows >= len(rows):
        raise LLMParseError("no header row found in CSV")
    headers = [c.strip() for c in rows[preamble_rows]]
    data_rows = rows[preamble_rows + 1:]
    if not headers:
        raise LLMParseError("empty header row")

    first_data_row = next(
        (r for r in data_rows if any(c.strip() for c in r)), None,
    )
    if first_data_row is None:
        raise LLMParseError("CSV contains a header but no data rows")

    fp = _fingerprint(headers)
    existing = (await session.execute(
        select(CsvFormatTemplate).where(CsvFormatTemplate.fingerprint == fp)
    )).scalar_one_or_none()

    if existing is not None:
        log.info("csv.format.cache_hit", fingerprint=fp,
                 broker_label=existing.broker_label, use_count=existing.use_count)
        pie = _apply_mapping(headers, data_rows, existing.mapping)
        if not pie.positions:
            raise LLMParseError(
                "cached mapping produced no positions — the broker may have "
                "changed their CSV shape; ask the operator to evict the "
                "stale template"
            )
        existing.use_count += 1
        existing.last_used_at = utcnow()
        await session.commit()
        return pie

    log.info("csv.format.cache_miss", fingerprint=fp,
             header_count=len(headers))
    samples = [r for r in data_rows[:_LLM_SAMPLES] if any(c.strip() for c in r)]
    async with httpx.AsyncClient(follow_redirects=True, timeout=30) as client:
        mapping, llm_log = await _extract_mapping_via_llm(client, headers, samples)
    _validate_mapping(mapping, headers, first_data_row)

    pie = _apply_mapping(headers, data_rows, mapping)
    if not pie.positions:
        raise LLMParseError(
            "LLM mapping validated but produced no positions — the file "
            "may not contain portfolio data"
        )

    now = utcnow()
    session.add(CsvFormatTemplate(
        fingerprint=fp,
        headers=headers,
        sample_row=first_data_row,
        mapping=mapping,
        preamble_rows=preamble_rows,
        delimiter=delimiter,
        broker_label=mapping.get("broker_label"),
        first_seen_at=now,
        last_used_at=now,
        use_count=1,
        llm_model=llm_log.model,
        llm_cost_usd=llm_log.cost_usd,
    ))
    await session.commit()
    return pie
  • Step 5: Run tests to verify they pass
docker compose -f docker-compose.test.yml run --rm test pytest tests/test_llm_csv_parser.py -v

Expected: every test passes (including everything from earlier tasks).

  • Step 6: Commit
git add app/services/llm_csv_parser.py tests/test_llm_csv_parser.py
git commit -m "csv-parser: add public parse_with_llm with cache hit/miss orchestration"

Task 10: Wire parse_with_llm into the route and add require_paid

Files:

  • Modify: app/routers/universe.py:192-214 (the parse_portfolio route + decorator)

  • Test: tests/test_llm_csv_parser.py

  • Step 1: Write the route-level integration test (direct function call, no HTTP layer)

Calling parse_portfolio directly with a fake UploadFile and a real session sidesteps the pytest-asyncio + TestClient event-loop awkwardness. Depends(require_paid) is decorator-level and is not invoked when we call the function directly — which is what we want (paid gating is mechanical and trusted to FastAPI; we verify it by inspection in Step 2 of the next stage).

Append to tests/test_llm_csv_parser.py:

@pytest.mark.asyncio
async def test_parse_portfolio_route_falls_through_to_llm(tmp_path, monkeypatch):
    """End-to-end: T212 parser raises CSVImportError, LLM fallback runs,
    response shape matches the existing JSON contract."""
    from io import BytesIO
    from types import SimpleNamespace
    from unittest.mock import AsyncMock

    from fastapi import UploadFile

    _, factory, setup = _build_session_factory(tmp_path)
    await setup()

    import app.services.llm_csv_parser as mod
    from app.services.openrouter import LogResult
    mod.call_llm = AsyncMock(return_value=LogResult(
        content='{"ticker_col":"Symbol","qty_col":"Quantity",'
                '"cost_col":"Avg Price","currency_col":"Currency",'
                '"name_col":"Description",'
                '"broker_label":"IBKR Activity Statement"}',
        model="deepseek/deepseek-v4-flash",
        prompt_tokens=150, completion_tokens=60, cost_usd=0.0003,
    ))

    # The route's inline Yahoo-fetch block would otherwise hit the network.
    # Patch market.fetch to return a benign placeholder per ticker.
    from app.services import market as market_mod

    async def _fake_fetch(client, symbol, label, group, anchor):
        return SimpleNamespace(
            symbol=symbol, source="test", label=label,
            price=None, currency="USD", as_of="2026-05-27",
            changes=None, error=None,
        )
    monkeypatch.setattr(market_mod, "fetch", _fake_fetch)

    raw = open("tests/fixtures/ibkr_sample.csv", "rb").read()
    upload = UploadFile(filename="ibkr.csv", file=BytesIO(raw))

    from app.routers.universe import parse_portfolio
    async with factory() as session:
        result = await parse_portfolio(file=upload, session=session)

    assert result["base_currency"] == "GBP"
    # At least the AAPL/MSFT/NVDA rows should be present; resolve_slice may
    # filter some if there's no InstrumentMap row, which is fine for this
    # test — we just want to confirm the LLM fallback ran end-to-end.
    assert isinstance(result["positions"], list)
    # LLM was called exactly once (cache miss).
    assert mod.call_llm.await_count == 1
  • Step 1b: Add a paid-gate inspection test (no HTTP needed)
def test_parse_portfolio_route_requires_paid():
    """Static check that the /portfolio/parse route is gated by require_paid."""
    from app.routers.universe import router
    from app.services.access import require_paid

    parse_route = next(
        r for r in router.routes
        if getattr(r, "path", "") == "/portfolio/parse"
    )
    dep_callables = [d.dependency for d in parse_route.dependant.dependencies]
    assert require_paid in dep_callables, (
        "The /portfolio/parse route must have Depends(require_paid)"
    )
  • Step 2: Run the test to verify it fails
docker compose -f docker-compose.test.yml run --rm test pytest tests/test_llm_csv_parser.py::test_parse_portfolio_route_falls_through_to_llm -v

Expected: FAIL — probably with the T212 parser raising CSVImportError because the route does not yet have the LLM fallback wired up. The exact failure message confirms we need the wiring.

  • Step 3: Wire parse_with_llm into the route

In app/routers/universe.py, find the parse_portfolio definition (search async def parse_portfolio). Make these two changes:

Change A: Add require_paid to the route decorator. Find the existing line:

@router.post("/portfolio/parse")
async def parse_portfolio(
    file: UploadFile = File(...),
    session: AsyncSession = Depends(get_session),
) -> dict:

Replace with:

@router.post("/portfolio/parse", dependencies=[Depends(require_paid)])
async def parse_portfolio(
    file: UploadFile = File(...),
    session: AsyncSession = Depends(get_session),
) -> dict:

Change B: Add the LLM fallback. Find the existing block:

    try:
        pie = parse_t212_csv(raw)
    except CSVImportError as e:
        raise HTTPException(status_code=400, detail=str(e))

Replace with:

    try:
        pie = parse_t212_csv(raw)
    except CSVImportError:
        # Unrecognised format — try the LLM-fallback parser. It hits a
        # global format-fingerprint cache first; only the very first
        # upload of each broker format pays an LLM call.
        from app.services.llm_csv_parser import LLMParseError, parse_with_llm
        try:
            pie = await parse_with_llm(raw, session)
        except LLMParseError as e:
            raise HTTPException(status_code=400, detail=str(e))
  • Step 4: Run the integration test to verify it passes
docker compose -f docker-compose.test.yml run --rm test pytest tests/test_llm_csv_parser.py::test_parse_portfolio_route_falls_through_to_llm -v

Expected: PASS.

  • Step 5: Run the full test file + the existing T212 tests to confirm no regression
docker compose -f docker-compose.test.yml run --rm test pytest tests/test_llm_csv_parser.py tests/test_csv_import.py -v

Expected: all PASS. Confirms the T212 happy path is untouched.

  • Step 6: Commit
git add app/routers/universe.py tests/test_llm_csv_parser.py
git commit -m "universe: paid-gate + LLM fallback on /portfolio/parse"

Task 11: UI copy tweaks

Files:

  • Modify: app/templates/settings.html (search the file for "Trading 212 CSV", "T212 pie CSV")

  • Step 1: Update the section heading

Find:

<summary class="settings-section__head">Import portfolio (Trading 212 CSV)</summary>

Replace with:

<summary class="settings-section__head">Import portfolio (CSV)</summary>
  • Step 2: Update the drop-zone label

Find:

<div class="dz__label">Drop a T212 pie CSV here</div>

Replace with:

<div class="dz__label">Drop your broker's portfolio CSV here</div>
  • Step 3: Update the drop-zone hint

Find:

<div class="dz__hint">or <a href="#" id="browse-link">browse</a> &middot; max 1 MB</div>

Replace with:

<div class="dz__hint">or <a href="#" id="browse-link">browse</a> &middot; max 1 MB &middot; T212, IBKR and others auto-detected</div>
  • Step 4: Soften the help paragraph

If there is a paragraph above or beside the drop-zone that begins with "Export your pie from T212", change its opening phrase from declarative to conditional. For example, if the current line is:

<p>Export your pie from T212 as CSV ...</p>

Replace with:

<p>If you use Trading 212, export your pie as CSV ...</p>

(Search the file for Export your pie from T212 to locate the exact paragraph; preserve any surrounding markup.)

  • Step 5: Commit
git add app/templates/settings.html
git commit -m "settings: soften import copy to be broker-agnostic"

Task 12: Final regression run + manual smoke

Files:

  • (no code changes — verification only)

  • Step 1: Full test suite

docker compose -f docker-compose.test.yml run --rm test pytest tests/ -v

Expected: every existing test still passes; the new tests in tests/test_llm_csv_parser.py all pass.

  • Step 2: Apply the migration against the dev DB and confirm the table exists
docker compose exec app alembic upgrade head
docker compose exec app python -c "
import asyncio
from sqlalchemy import inspect
from app.db import get_engine

async def main():
    eng = get_engine()
    async with eng.connect() as conn:
        names = await conn.run_sync(lambda c: inspect(c).get_table_names())
        assert 'csv_format_templates' in names, names
        print('csv_format_templates table present:', sorted(names))
asyncio.run(main())
"

Expected: prints the table name in the list. NOTE: this touches the prod DB on this host — only run when the user has explicitly approved this deploy.

  • Step 3: Restart the app container
docker compose restart app
docker compose logs app --tail 30 | grep -E "(Uvicorn|startup complete|error)" || true

Expected: clean startup; no tracebacks.

  • Step 4: Manual smoke — re-import a T212 CSV

Through the browser at /settings, drop a known T212 CSV. The dashboard should load as it always has. (Confirms zero regression on the happy path.)

  • Step 5: Manual smoke — first IBKR-shaped upload

Through the browser at /settings, drop tests/fixtures/ibkr_sample.csv (or a real IBKR statement). The dashboard should load with the IBKR positions. Then query the DB to confirm the template was cached:

docker compose exec app python -c "
import asyncio
from sqlalchemy import select
from app.db import get_session_factory
from app.models import CsvFormatTemplate

async def main():
    factory = get_session_factory()
    async with factory() as s:
        rows = (await s.execute(select(CsvFormatTemplate))).scalars().all()
        for r in rows:
            print(r.fingerprint[:12], r.broker_label, 'use_count=', r.use_count,
                  'cost=', r.llm_cost_usd)
asyncio.run(main())
"

NOTE: this is a prod DB read; only run with explicit user approval.

Expected: a single row, with use_count=1 and a small positive llm_cost_usd.

  • Step 6: Manual smoke — second IBKR-shaped upload (cache hit)

Drop the same fixture again. The dashboard should load identically, and the DB row should now show use_count=2. AICall ledger should NOT have a new row for this second upload (only the first paid the LLM cost).

  • Step 7: Manual smoke — paid gating

In a free-tier browser session, attempt the upload. Expect a 402 response visible in network tools / surfaced as an "upgrade required" message in the UI.


Self-Review

Spec coverage walkthrough:

  • Trigger: transparent fallback → Task 10 (route try/except)
  • Cache for reuse → Task 9 (cache-hit branch in parse_with_llm)
  • Paid-only → Task 10 Step 3 Change A (adds Depends(require_paid))
  • LLM column-mapping only → Tasks 68 (_validate_mapping, _extract_mapping_via_llm, no full-CSV extraction anywhere)
  • Global cache → Tasks 1, 2 (no user_id column); Task 9 cache lookup is global
  • sample_row is a real first data row, anonymous → Task 9 Step 4 (the INSERT uses first_data_row); Task 1 test asserts user_id absent
  • No self-heal / no auto-eviction → Task 9 stale-mapping test asserts row is NOT deleted
  • No code authoring → out of scope by construction (no code writes anywhere in the plan)
  • fingerprint = sha256(normalised headers) → Task 4
  • Preamble detection → Task 5
  • Drop-zone + heading copy softened → Task 11
  • Error handling (LLM down → 502 in spirit; nonsense → 400; non-numeric qty → 400) → Tasks 6, 8, 10 (the route raises HTTPException(400) on LLMParseError; LLM provider failure is wrapped in LLMParseError by _extract_mapping_via_llm, which the route surfaces as 400 — note this is 400 in the implementation, not 502 as the spec implied; if you specifically want 502 for provider-down vs 400 for mapping-bad, split the exception types in Task 8 and branch in Task 10)
  • Migration 0021 → Task 2
  • Fabricated IBKR fixture → Task 3
  • Test pattern matches test_referral_conversion.py → Task 9 Step 1 (copies the factory pattern)

One spec-vs-plan deviation worth flagging to the engineer: the spec error-handling table says "LLM provider down → 502". This plan returns 400 for all LLMParseError cases including provider-down, because the wrapping is uniform. If you want a 502/400 split, the simplest fix is to introduce a sibling LLMProviderError(LLMParseError) raised inside _extract_mapping_via_llm's exception path, and branch on it in Task 10. Two-line change. Either behaviour is defensible — flagging so it's a conscious choice.