read.markets/docs/superpowers/plans/2026-05-27-llm-csv-fallback-parser.md
Giorgio Gilestro 08b4dddcdd docs: implementation plan for LLM-fallback CSV parser
12 TDD tasks covering model + migration, fingerprint, dialect detection,
mapping validation/application, LLM extraction (mocked in tests), cache
orchestration, route wiring + paid gate, UI copy tweaks, and final
manual smoke.
2026-05-27 11:41:44 +02:00

1732 lines
58 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# LLM-fallback CSV Parser Implementation Plan
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
**Goal:** Add an LLM-fallback CSV parser so non-T212 portfolio uploads succeed by extracting a column mapping (not data) via the LLM, caching that mapping globally by header fingerprint, and replaying it deterministically on every subsequent upload of the same broker format.
**Architecture:** New service `app/services/llm_csv_parser.py` wraps `openrouter.call_llm` for one-time format discovery, persists results to a new `csv_format_templates` table, and produces the same `ParsedPie` shape as `parse_t212_csv`. The route `/api/portfolio/parse` in `app/routers/universe.py` gains a try/except fall-through: T212 first, LLM-cache lookup second, LLM call only on first encounter of a new format. The cache table stores headers + one anonymous data row + the JSON mapping; no `user_id` is ever recorded against the row.
**Tech Stack:** FastAPI · SQLAlchemy 2.0 async · Alembic · MariaDB (prod) / aiosqlite (tests) · existing `openrouter.call_llm` (provider fallback + AICall ledger)
**Spec:** `docs/superpowers/specs/2026-05-27-llm-csv-fallback-parser-design.md`
---
## File Structure
**Create:**
- `app/services/llm_csv_parser.py` — the new service; `parse_with_llm`, `LLMParseError`, helpers
- `alembic/versions/0021_csv_format_template.py` — hand-rolled migration (matches the style of `0020_trial_end.py`)
- `tests/test_llm_csv_parser.py` — unit + integration tests for the service
- `tests/fixtures/ibkr_sample.csv` — fabricated IBKR-shaped CSV (no real holdings)
**Modify:**
- `app/models.py` — add `CsvFormatTemplate` class
- `app/routers/universe.py` — add `Depends(require_paid)` to `/portfolio/parse`; wrap `parse_t212_csv` in a try/except that falls through to `parse_with_llm`
- `app/templates/settings.html` — soften "Trading 212 CSV" copy to broker-agnostic
**Reuse without modification:**
- `app/services/openrouter.py::call_llm, llm_configured, LogResult`
- `app/services/csv_import.py::ParsedPie, ParsedPosition, CSVImportError`
- `app/services/access.py::require_paid`
- `app/db::Base, utcnow`
- `tests/conftest.py` env setup (in-memory aiosqlite, `CASSANDRA_MOCK=1`)
- Session/engine bootstrap pattern from `tests/test_referral_conversion.py::_build_session_factory`
---
## Test Conventions
All tests must be runnable inside the test container:
```bash
docker compose -f docker-compose.test.yml run --rm test pytest tests/test_llm_csv_parser.py -v
```
DB-touching tests use the same `_build_session_factory(tmp_path)` pattern as `tests/test_referral_conversion.py` — a fresh per-test sqlite file, schema created via `Base.metadata.create_all`. Do NOT introduce a shared fixture across tests; per-test isolation matches the existing pattern.
Network-touching tests (the LLM) MUST mock `app.services.openrouter.call_llm` — no real HTTP. Use `unittest.mock.AsyncMock`.
---
### Task 1: Add `CsvFormatTemplate` model
**Files:**
- Modify: `app/models.py` (append after the `Referral` class around line 270+)
- Test: `tests/test_llm_csv_parser.py`
- [ ] **Step 1: Write the failing test**
Create the test file with the import + schema test:
```python
"""Unit + integration tests for the LLM-fallback CSV parser."""
from __future__ import annotations
import pytest
def test_csv_format_template_model_columns():
"""Model exposes every column the spec requires, with correct types."""
from sqlalchemy import inspect
from app.models import CsvFormatTemplate
cols = {c.name: c for c in inspect(CsvFormatTemplate).columns}
assert "fingerprint" in cols
assert "headers" in cols
assert "sample_row" in cols
assert "mapping" in cols
assert "preamble_rows" in cols
assert "delimiter" in cols
assert "broker_label" in cols
assert "first_seen_at" in cols
assert "use_count" in cols
assert "last_used_at" in cols
assert "llm_model" in cols
assert "llm_cost_usd" in cols
# Crucially, no user attribution.
assert "user_id" not in cols
assert "first_seen_user_id" not in cols
# Fingerprint is the cache key.
assert cols["fingerprint"].unique is True
assert cols["fingerprint"].nullable is False
```
- [ ] **Step 2: Run the test to verify it fails**
```bash
docker compose -f docker-compose.test.yml run --rm test pytest tests/test_llm_csv_parser.py::test_csv_format_template_model_columns -v
```
Expected: FAIL with `ImportError: cannot import name 'CsvFormatTemplate'`.
- [ ] **Step 3: Add the model in `app/models.py`**
Append after the existing `Referral` class (around line 270+, before any trailing module helpers):
```python
class CsvFormatTemplate(Base):
"""Cached column-mapping for a single broker CSV format.
Populated on the first upload of a previously-unseen format via the
LLM-fallback parser. Subsequent uploads of the same format
(identified by ``fingerprint``, a sha256 of the normalised header
row) replay ``mapping`` deterministically with no LLM call.
The table holds the actual ``headers`` and one anonymous ``sample_row``
from the originating upload — there is no ``user_id`` column, no link
back to the uploader. The sample exists so the operator has concrete
material to look at when hand-writing future native parsers; the
system never auto-generates or modifies parser code from this data.
"""
__tablename__ = "csv_format_templates"
id: Mapped[int] = mapped_column(_PK, primary_key=True, autoincrement=True)
fingerprint: Mapped[str] = mapped_column(String(64), unique=True, nullable=False)
headers: Mapped[list] = mapped_column(JSON, nullable=False)
sample_row: Mapped[list] = mapped_column(JSON, nullable=False)
mapping: Mapped[dict] = mapped_column(JSON, nullable=False)
preamble_rows: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
delimiter: Mapped[str] = mapped_column(String(1), nullable=False, default=",")
broker_label: Mapped[str | None] = mapped_column(String(128))
first_seen_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), nullable=False, default=utcnow,
)
use_count: Mapped[int] = mapped_column(Integer, nullable=False, default=1)
last_used_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), nullable=False, default=utcnow,
)
llm_model: Mapped[str | None] = mapped_column(String(64))
llm_cost_usd: Mapped[float | None] = mapped_column(Float)
```
- [ ] **Step 4: Run the test to verify it passes**
```bash
docker compose -f docker-compose.test.yml run --rm test pytest tests/test_llm_csv_parser.py::test_csv_format_template_model_columns -v
```
Expected: PASS.
- [ ] **Step 5: Commit**
```bash
git add app/models.py tests/test_llm_csv_parser.py
git commit -m "csv-parser: add CsvFormatTemplate model"
```
---
### Task 2: Add Alembic migration `0021`
**Files:**
- Create: `alembic/versions/0021_csv_format_template.py`
- [ ] **Step 1: Write the migration**
Create `alembic/versions/0021_csv_format_template.py`:
```python
"""csv format templates table — LLM-fallback parser cache.
Revision ID: 0021
Revises: 0020
Create Date: 2026-05-27
"""
from typing import Sequence, Union
import sqlalchemy as sa
from alembic import op
revision: str = "0021"
down_revision: Union[str, None] = "0020"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
op.create_table(
"csv_format_templates",
sa.Column("id", sa.BigInteger(), primary_key=True, autoincrement=True),
sa.Column("fingerprint", sa.String(length=64), nullable=False),
sa.Column("headers", sa.JSON(), nullable=False),
sa.Column("sample_row", sa.JSON(), nullable=False),
sa.Column("mapping", sa.JSON(), nullable=False),
sa.Column("preamble_rows", sa.Integer(), nullable=False, server_default="0"),
sa.Column("delimiter", sa.String(length=1), nullable=False, server_default=","),
sa.Column("broker_label", sa.String(length=128), nullable=True),
sa.Column("first_seen_at", sa.DateTime(timezone=True), nullable=False),
sa.Column("use_count", sa.Integer(), nullable=False, server_default="1"),
sa.Column("last_used_at", sa.DateTime(timezone=True), nullable=False),
sa.Column("llm_model", sa.String(length=64), nullable=True),
sa.Column("llm_cost_usd", sa.Float(), nullable=True),
sa.UniqueConstraint("fingerprint", name="uq_csv_format_templates_fingerprint"),
)
def downgrade() -> None:
op.drop_table("csv_format_templates")
```
- [ ] **Step 2: Verify the migration applies cleanly in the test container**
```bash
docker compose -f docker-compose.test.yml run --rm test alembic upgrade head
docker compose -f docker-compose.test.yml run --rm test alembic downgrade -1
docker compose -f docker-compose.test.yml run --rm test alembic upgrade head
```
Expected: each command exits 0; the second leaves us at `0020`, the third returns us to `0021`.
If the test container doesn't have an alembic entrypoint, run the migration check via Python instead:
```bash
docker compose -f docker-compose.test.yml run --rm test python -c "
from alembic.config import Config
from alembic import command
cfg = Config('alembic.ini')
command.upgrade(cfg, 'head')
command.downgrade(cfg, '-1')
command.upgrade(cfg, 'head')
print('OK')
"
```
Expected: prints `OK`.
- [ ] **Step 3: Commit**
```bash
git add alembic/versions/0021_csv_format_template.py
git commit -m "alembic: add 0021 csv_format_templates"
```
---
### Task 3: Create the fabricated IBKR test fixture
**Files:**
- Create: `tests/fixtures/ibkr_sample.csv`
- [ ] **Step 1: Write the fixture file**
This is a fabricated IBKR-style activity statement — column names and shape are realistic, but the values are made up (no real holdings, no real account):
```csv
Statement,Header,Field Name,Field Value
Statement,Data,BrokerName,Interactive Brokers LLC
Statement,Data,Title,Activity Statement
Statement,Data,Period,"January 1, 2026 - January 31, 2026"
Symbol,Quantity,Avg Price,Currency,Description
AAPL,100,150.25,USD,Apple Inc
MSFT,50,310.00,USD,Microsoft Corp
NVDA,40,425.50,USD,NVIDIA Corp
VOD.L,2000,0.74,GBP,Vodafone Group Plc
ASML.AS,10,650.00,EUR,ASML Holding NV
```
Note: lines 1-4 are a preamble (IBKR's exports often have multi-line headers). The actual data table starts at line 5 (`Symbol,Quantity,Avg Price,Currency,Description`).
- [ ] **Step 2: Commit**
```bash
git add tests/fixtures/ibkr_sample.csv
git commit -m "tests: add fabricated IBKR fixture for LLM parser"
```
---
### Task 4: `_fingerprint` helper
**Files:**
- Create: `app/services/llm_csv_parser.py` (initial scaffold)
- Test: `tests/test_llm_csv_parser.py`
- [ ] **Step 1: Write failing test**
Append to `tests/test_llm_csv_parser.py`:
```python
def test_fingerprint_stable_across_case_and_whitespace():
from app.services.llm_csv_parser import _fingerprint
a = _fingerprint(["Symbol", "Quantity", "Avg Price"])
b = _fingerprint(["symbol", "quantity", "avg price"])
c = _fingerprint([" SYMBOL ", "Quantity", " AVG PRICE"])
assert a == b == c
def test_fingerprint_differs_for_different_columns():
from app.services.llm_csv_parser import _fingerprint
a = _fingerprint(["Symbol", "Quantity"])
b = _fingerprint(["Symbol", "Quantity", "Avg Price"])
assert a != b
def test_fingerprint_is_sha256_hex_64_chars():
from app.services.llm_csv_parser import _fingerprint
f = _fingerprint(["Symbol", "Quantity"])
assert len(f) == 64
assert all(c in "0123456789abcdef" for c in f)
```
- [ ] **Step 2: Run tests to verify they fail**
```bash
docker compose -f docker-compose.test.yml run --rm test pytest tests/test_llm_csv_parser.py -k fingerprint -v
```
Expected: FAIL with `ImportError`.
- [ ] **Step 3: Create the service scaffold + `_fingerprint`**
Create `app/services/llm_csv_parser.py`:
```python
"""LLM-fallback CSV parser.
When the deterministic Trading 212 parser (``csv_import.parse_t212_csv``)
raises ``CSVImportError`` on an unrecognised format, this service kicks
in:
1. Detect the CSV dialect (delimiter, preamble offset).
2. Compute a fingerprint of the normalised header row.
3. Look up ``CsvFormatTemplate`` by fingerprint. On hit, replay the
cached column-mapping deterministically. On miss, ask the LLM for a
mapping, validate it, persist a new template, and apply it.
The LLM sees only headers + the first 3-5 sample rows. It returns a
column-mapping JSON, never transcribed numbers. The system never
auto-promotes a learned format to a hand-written parser — the operator
does that by inspecting collected ``sample_row`` values.
"""
from __future__ import annotations
import hashlib
from app.services.csv_import import CSVImportError
class LLMParseError(CSVImportError):
"""Raised when the LLM call fails or returns an unusable mapping.
Inherits from ``CSVImportError`` so route-level error handling can
treat both deterministic and LLM-path failures uniformly when
desired."""
def _fingerprint(headers: list[str]) -> str:
"""Stable hash of the header row.
Lowercases each header, strips surrounding whitespace, joins with
``|`` (a character extremely unlikely to appear inside a real
header), and returns the sha256 hex digest. Whitespace/case drift
in the same broker's export does not change the fingerprint;
adding or removing a column does."""
normalised = "|".join(h.strip().lower() for h in headers)
return hashlib.sha256(normalised.encode("utf-8")).hexdigest()
```
- [ ] **Step 4: Run tests to verify they pass**
```bash
docker compose -f docker-compose.test.yml run --rm test pytest tests/test_llm_csv_parser.py -k fingerprint -v
```
Expected: 3 PASS.
- [ ] **Step 5: Commit**
```bash
git add app/services/llm_csv_parser.py tests/test_llm_csv_parser.py
git commit -m "csv-parser: add _fingerprint helper"
```
---
### Task 5: `_detect_dialect` helper
**Files:**
- Modify: `app/services/llm_csv_parser.py`
- Test: `tests/test_llm_csv_parser.py`
- [ ] **Step 1: Write failing tests**
Append to `tests/test_llm_csv_parser.py`:
```python
def test_detect_dialect_no_preamble_comma():
from app.services.llm_csv_parser import _detect_dialect
raw = b"Symbol,Quantity,Avg Price\nAAPL,100,150.25\nMSFT,50,310.00\n"
delimiter, preamble = _detect_dialect(raw)
assert delimiter == ","
assert preamble == 0
def test_detect_dialect_with_preamble():
from app.services.llm_csv_parser import _detect_dialect
raw = (
b"Statement,Header,Field Name,Field Value\n"
b"Statement,Data,BrokerName,Interactive Brokers LLC\n"
b"Statement,Data,Title,Activity Statement\n"
b"Statement,Data,Period,\"January 1, 2026 - January 31, 2026\"\n"
b"Symbol,Quantity,Avg Price,Currency,Description\n"
b"AAPL,100,150.25,USD,Apple Inc\n"
)
delimiter, preamble = _detect_dialect(raw)
assert delimiter == ","
# The data-row header line is the FIFTH line (index 4); preamble = 4.
assert preamble == 4
def test_detect_dialect_tab_delimited():
from app.services.llm_csv_parser import _detect_dialect
raw = b"Symbol\tQuantity\tAvg Price\nAAPL\t100\t150.25\n"
delimiter, preamble = _detect_dialect(raw)
assert delimiter == "\t"
assert preamble == 0
def test_detect_dialect_empty_raises():
from app.services.llm_csv_parser import LLMParseError, _detect_dialect
with pytest.raises(LLMParseError):
_detect_dialect(b"")
```
- [ ] **Step 2: Run tests to verify they fail**
```bash
docker compose -f docker-compose.test.yml run --rm test pytest tests/test_llm_csv_parser.py -k detect_dialect -v
```
Expected: 4 FAIL with `ImportError` for `_detect_dialect`.
- [ ] **Step 3: Implement `_detect_dialect`**
Append to `app/services/llm_csv_parser.py`:
```python
import csv
import io
# Cap for how many leading lines we'll scan looking for the header row.
# Real broker preambles are typically 1-10 lines.
_MAX_PREAMBLE_SCAN = 30
def _decode_raw(raw: bytes) -> str:
"""Best-effort UTF-8 decode with BOM strip and lossy fallback."""
text = raw.decode("utf-8-sig", errors="replace")
return text
def _detect_dialect(raw: bytes) -> tuple[str, int]:
"""Detect (delimiter, preamble_rows).
``preamble_rows`` is the number of lines BEFORE the row we identify
as the actual table header. The header row is the first line whose
tokens are all non-numeric (so "Symbol,Quantity" is a header but
"AAPL,100" is data). Falls back to assuming the first line is the
header if no clear non-numeric line is found within the scan
window.
Raises ``LLMParseError`` on empty input."""
if not raw or not raw.strip():
raise LLMParseError("empty CSV")
text = _decode_raw(raw)
# csv.Sniffer is happy with ~4KB. Anything more and it gets slow.
sample = text[:4096]
try:
dialect = csv.Sniffer().sniff(sample, delimiters=",;\t|")
delimiter = dialect.delimiter
except csv.Error:
# Most broker exports are comma-delimited; default rather than
# error out — the caller will still validate column shapes.
delimiter = ","
reader = csv.reader(io.StringIO(text), delimiter=delimiter)
preamble = 0
for i, row in enumerate(reader):
if i >= _MAX_PREAMBLE_SCAN:
break
if not row:
continue
# Skip rows that are obviously preamble: <2 tokens, or any token
# is purely numeric. The header row should have multiple
# alphabetical tokens.
non_empty = [c.strip() for c in row if c.strip()]
if len(non_empty) < 2:
continue
all_alpha = all(not _looks_numeric(c) for c in non_empty)
if all_alpha:
preamble = i
return delimiter, preamble
return delimiter, 0
def _looks_numeric(value: str) -> bool:
"""True if ``value`` parses as a number after stripping common
decoration (thousands separators, currency symbols, percent signs)."""
s = value.strip().replace(",", "").replace("$", "").replace("€", "")
s = s.replace("£", "").replace("%", "").lstrip("-+")
if not s:
return False
try:
float(s)
return True
except ValueError:
return False
```
- [ ] **Step 4: Run tests to verify they pass**
```bash
docker compose -f docker-compose.test.yml run --rm test pytest tests/test_llm_csv_parser.py -k detect_dialect -v
```
Expected: 4 PASS.
- [ ] **Step 5: Commit**
```bash
git add app/services/llm_csv_parser.py tests/test_llm_csv_parser.py
git commit -m "csv-parser: add _detect_dialect helper"
```
---
### Task 6: `_validate_mapping` helper
**Files:**
- Modify: `app/services/llm_csv_parser.py`
- Test: `tests/test_llm_csv_parser.py`
- [ ] **Step 1: Write failing tests**
Append to `tests/test_llm_csv_parser.py`:
```python
def test_validate_mapping_accepts_well_formed():
from app.services.llm_csv_parser import _validate_mapping
headers = ["Symbol", "Quantity", "Avg Price", "Currency"]
first_row = ["AAPL", "100", "150.25", "USD"]
mapping = {
"ticker_col": "Symbol",
"qty_col": "Quantity",
"cost_col": "Avg Price",
"currency_col": "Currency",
"name_col": None,
}
_validate_mapping(mapping, headers, first_row) # no raise
def test_validate_mapping_missing_ticker_raises():
from app.services.llm_csv_parser import LLMParseError, _validate_mapping
headers = ["Symbol", "Quantity"]
first_row = ["AAPL", "100"]
mapping = {"ticker_col": None, "qty_col": "Quantity"}
with pytest.raises(LLMParseError, match="ticker"):
_validate_mapping(mapping, headers, first_row)
def test_validate_mapping_missing_qty_raises():
from app.services.llm_csv_parser import LLMParseError, _validate_mapping
headers = ["Symbol", "Quantity"]
first_row = ["AAPL", "100"]
mapping = {"ticker_col": "Symbol", "qty_col": None}
with pytest.raises(LLMParseError, match="qty"):
_validate_mapping(mapping, headers, first_row)
def test_validate_mapping_unknown_column_raises():
from app.services.llm_csv_parser import LLMParseError, _validate_mapping
headers = ["Symbol", "Quantity"]
first_row = ["AAPL", "100"]
mapping = {"ticker_col": "Symbol", "qty_col": "NotARealColumn"}
with pytest.raises(LLMParseError, match="NotARealColumn"):
_validate_mapping(mapping, headers, first_row)
def test_validate_mapping_non_numeric_qty_raises():
from app.services.llm_csv_parser import LLMParseError, _validate_mapping
headers = ["Symbol", "Description"]
first_row = ["AAPL", "Apple Inc"]
# Mapping says qty is "Description", but "Apple Inc" can't parse as a number.
mapping = {"ticker_col": "Symbol", "qty_col": "Description"}
with pytest.raises(LLMParseError, match="numeric"):
_validate_mapping(mapping, headers, first_row)
```
- [ ] **Step 2: Run tests to verify they fail**
```bash
docker compose -f docker-compose.test.yml run --rm test pytest tests/test_llm_csv_parser.py -k validate_mapping -v
```
Expected: 5 FAIL with `ImportError`.
- [ ] **Step 3: Implement `_validate_mapping`**
Append to `app/services/llm_csv_parser.py`:
```python
_REQUIRED_MAPPING_KEYS = ("ticker_col", "qty_col")
_OPTIONAL_MAPPING_KEYS = ("name_col", "cost_col", "currency_col")
def _validate_mapping(
mapping: dict, headers: list[str], first_row: list[str],
) -> None:
"""Verify the LLM-returned mapping is sane.
- ``ticker_col`` and ``qty_col`` are required (non-null).
- Every named column must exist in ``headers``.
- The value at ``qty_col`` on ``first_row`` must parse as a number.
- The value at ``cost_col`` on ``first_row`` (if present) must parse
as a number.
Raises ``LLMParseError`` on any failure, with a message that names
the specific problem (helpful for log forensics and for the
user-facing 400)."""
for key in _REQUIRED_MAPPING_KEYS:
if not mapping.get(key):
raise LLMParseError(
f"LLM mapping missing required column: {key.replace('_col', '')}"
)
headers_set = set(headers)
for key in _REQUIRED_MAPPING_KEYS + _OPTIONAL_MAPPING_KEYS:
col = mapping.get(key)
if col is not None and col not in headers_set:
raise LLMParseError(
f"LLM mapping references unknown column: {col!r}"
)
# Numeric sanity check: qty and (if present) cost must parse on row 1.
header_index = {h: i for i, h in enumerate(headers)}
qty_col = mapping["qty_col"]
qty_value = first_row[header_index[qty_col]] if header_index[qty_col] < len(first_row) else ""
if not _looks_numeric(qty_value):
raise LLMParseError(
f"LLM mapping qty_col={qty_col!r} maps to non-numeric value {qty_value!r}"
)
cost_col = mapping.get("cost_col")
if cost_col is not None:
cost_value = first_row[header_index[cost_col]] if header_index[cost_col] < len(first_row) else ""
if cost_value and not _looks_numeric(cost_value):
raise LLMParseError(
f"LLM mapping cost_col={cost_col!r} maps to non-numeric value {cost_value!r}"
)
```
- [ ] **Step 4: Run tests to verify they pass**
```bash
docker compose -f docker-compose.test.yml run --rm test pytest tests/test_llm_csv_parser.py -k validate_mapping -v
```
Expected: 5 PASS.
- [ ] **Step 5: Commit**
```bash
git add app/services/llm_csv_parser.py tests/test_llm_csv_parser.py
git commit -m "csv-parser: add _validate_mapping helper"
```
---
### Task 7: `_apply_mapping` helper
**Files:**
- Modify: `app/services/llm_csv_parser.py`
- Test: `tests/test_llm_csv_parser.py`
- [ ] **Step 1: Write failing tests**
Append to `tests/test_llm_csv_parser.py`:
```python
def test_apply_mapping_builds_parsed_pie():
from app.services.csv_import import ParsedPie, ParsedPosition
from app.services.llm_csv_parser import _apply_mapping
headers = ["Symbol", "Quantity", "Avg Price", "Currency", "Description"]
data_rows = [
["AAPL", "100", "150.25", "USD", "Apple Inc"],
["MSFT", "50", "310.00", "USD", "Microsoft Corp"],
]
mapping = {
"ticker_col": "Symbol",
"qty_col": "Quantity",
"cost_col": "Avg Price",
"currency_col": "Currency",
"name_col": "Description",
}
pie = _apply_mapping(headers, data_rows, mapping)
assert isinstance(pie, ParsedPie)
assert len(pie.positions) == 2
p0 = pie.positions[0]
assert isinstance(p0, ParsedPosition)
assert p0.slice == "AAPL"
assert p0.name == "Apple Inc"
assert p0.quantity == 100.0
assert p0.invested_value == pytest.approx(15025.0)
# invested = qty * avg_cost = 100 * 150.25 = 15025
assert pie.invested == pytest.approx(15025.0 + 50 * 310.00)
def test_apply_mapping_handles_missing_optional_columns():
from app.services.llm_csv_parser import _apply_mapping
headers = ["Symbol", "Quantity"]
data_rows = [["AAPL", "100"]]
mapping = {
"ticker_col": "Symbol",
"qty_col": "Quantity",
"cost_col": None,
"currency_col": None,
"name_col": None,
}
pie = _apply_mapping(headers, data_rows, mapping)
p = pie.positions[0]
assert p.slice == "AAPL"
assert p.quantity == 100.0
assert p.invested_value is None
assert p.name == "AAPL" # falls back to ticker when name_col absent
def test_apply_mapping_skips_blank_and_unparseable_rows():
from app.services.llm_csv_parser import _apply_mapping
headers = ["Symbol", "Quantity"]
data_rows = [
["AAPL", "100"],
["", ""], # blank
["MSFT", "not-a-number"], # bad qty
["NVDA", "40"],
]
mapping = {"ticker_col": "Symbol", "qty_col": "Quantity"}
pie = _apply_mapping(headers, data_rows, mapping)
assert [p.slice for p in pie.positions] == ["AAPL", "NVDA"]
```
- [ ] **Step 2: Run tests to verify they fail**
```bash
docker compose -f docker-compose.test.yml run --rm test pytest tests/test_llm_csv_parser.py -k apply_mapping -v
```
Expected: 3 FAIL with `ImportError`.
- [ ] **Step 3: Implement `_apply_mapping`**
Append to `app/services/llm_csv_parser.py`:
```python
from app.services.csv_import import ParsedPie, ParsedPosition
def _parse_number(value: str) -> float | None:
"""Permissive float parse: strips thousands separators, currency
symbols, percent signs. Returns None on failure (so callers can
decide whether to skip or raise)."""
s = value.strip().replace(",", "").replace("$", "")
s = s.replace("€", "").replace("£", "").replace("%", "")
if not s:
return None
try:
return float(s)
except ValueError:
return None
def _apply_mapping(
headers: list[str],
data_rows: list[list[str]],
mapping: dict,
) -> ParsedPie:
"""Iterate ``data_rows`` and produce a ``ParsedPie``.
Rows that lack a parseable quantity (blank, non-numeric, zero) are
silently skipped — broker exports often include summary or
placeholder rows after the position list. ``name_col`` falls back
to the ticker symbol when null."""
idx = {h: i for i, h in enumerate(headers)}
ticker_col = mapping["ticker_col"]
qty_col = mapping["qty_col"]
name_col = mapping.get("name_col")
cost_col = mapping.get("cost_col")
positions: list[ParsedPosition] = []
invested_total = 0.0
invested_seen = False
for row in data_rows:
if not any(c.strip() for c in row):
continue
ticker_raw = row[idx[ticker_col]] if idx[ticker_col] < len(row) else ""
ticker = ticker_raw.strip().upper()
if not ticker:
continue
qty_raw = row[idx[qty_col]] if idx[qty_col] < len(row) else ""
qty = _parse_number(qty_raw)
if qty is None or qty <= 0:
continue
avg_cost: float | None = None
if cost_col is not None and idx[cost_col] < len(row):
avg_cost = _parse_number(row[idx[cost_col]])
invested_value: float | None = None
if avg_cost is not None:
invested_value = qty * avg_cost
invested_total += invested_value
invested_seen = True
name = ""
if name_col is not None and idx[name_col] < len(row):
name = row[idx[name_col]].strip()
if not name:
name = ticker
positions.append(ParsedPosition(
slice=ticker,
name=name,
invested_value=invested_value,
current_value=None,
result=None,
quantity=qty,
))
return ParsedPie(
name=None,
positions=tuple(positions),
invested=(invested_total if invested_seen else None),
value=None,
result=None,
)
```
- [ ] **Step 4: Run tests to verify they pass**
```bash
docker compose -f docker-compose.test.yml run --rm test pytest tests/test_llm_csv_parser.py -k apply_mapping -v
```
Expected: 3 PASS.
- [ ] **Step 5: Commit**
```bash
git add app/services/llm_csv_parser.py tests/test_llm_csv_parser.py
git commit -m "csv-parser: add _apply_mapping helper"
```
---
### Task 8: `_extract_mapping_via_llm` helper
**Files:**
- Modify: `app/services/llm_csv_parser.py`
- Test: `tests/test_llm_csv_parser.py`
- [ ] **Step 1: Write failing tests**
Append to `tests/test_llm_csv_parser.py`:
```python
@pytest.mark.asyncio
async def test_extract_mapping_via_llm_parses_valid_json():
from unittest.mock import AsyncMock, MagicMock
from app.services.llm_csv_parser import _extract_mapping_via_llm
from app.services.openrouter import LogResult
fake_result = LogResult(
content='{"ticker_col": "Symbol", "qty_col": "Quantity", '
'"cost_col": "Avg Price", "currency_col": "Currency", '
'"name_col": null, "broker_label": "IBKR Activity Statement"}',
model="deepseek/deepseek-v4-flash",
prompt_tokens=100,
completion_tokens=50,
cost_usd=0.0001,
)
fake_client = MagicMock()
fake_call_llm = AsyncMock(return_value=fake_result)
import app.services.llm_csv_parser as mod
mod.call_llm = fake_call_llm # monkeypatch
headers = ["Symbol", "Quantity", "Avg Price", "Currency"]
samples = [["AAPL", "100", "150.25", "USD"]]
mapping, log = await _extract_mapping_via_llm(fake_client, headers, samples)
assert mapping["ticker_col"] == "Symbol"
assert mapping["qty_col"] == "Quantity"
assert mapping["broker_label"] == "IBKR Activity Statement"
assert log.model == "deepseek/deepseek-v4-flash"
fake_call_llm.assert_awaited_once()
@pytest.mark.asyncio
async def test_extract_mapping_via_llm_malformed_json_raises():
from unittest.mock import AsyncMock, MagicMock
from app.services.llm_csv_parser import LLMParseError, _extract_mapping_via_llm
from app.services.openrouter import LogResult
fake_result = LogResult(
content="Sure thing — here is the mapping! ticker=Symbol",
model="deepseek/deepseek-v4-flash",
prompt_tokens=10,
completion_tokens=20,
cost_usd=0.00005,
)
fake_client = MagicMock()
fake_call_llm = AsyncMock(return_value=fake_result)
import app.services.llm_csv_parser as mod
mod.call_llm = fake_call_llm
with pytest.raises(LLMParseError, match="JSON"):
await _extract_mapping_via_llm(fake_client, ["Symbol"], [["AAPL"]])
@pytest.mark.asyncio
async def test_extract_mapping_via_llm_provider_failure_wraps():
from unittest.mock import AsyncMock, MagicMock
from app.services.llm_csv_parser import LLMParseError, _extract_mapping_via_llm
fake_client = MagicMock()
fake_call_llm = AsyncMock(side_effect=RuntimeError("provider down"))
import app.services.llm_csv_parser as mod
mod.call_llm = fake_call_llm
with pytest.raises(LLMParseError, match="provider"):
await _extract_mapping_via_llm(fake_client, ["Symbol"], [["AAPL"]])
```
NOTE: If `pytest-asyncio` is not installed in the test container, the engineer must add `asyncio_mode = "auto"` to `pytest.ini` or use the existing decorator pattern from `tests/test_referral_conversion.py`. Check that file's top for `@pytest.mark.asyncio` usage and replicate it.
- [ ] **Step 2: Run tests to verify they fail**
```bash
docker compose -f docker-compose.test.yml run --rm test pytest tests/test_llm_csv_parser.py -k extract_mapping_via_llm -v
```
Expected: 3 FAIL with `ImportError` for `_extract_mapping_via_llm`.
- [ ] **Step 3: Implement `_extract_mapping_via_llm`**
Append to `app/services/llm_csv_parser.py`:
```python
import json
import httpx
from app.services.openrouter import LogResult, call_llm
# Hard caps on what we send to the LLM, so prompt cost stays bounded.
_LLM_SAMPLES = 5
_LLM_MAX_TOKENS = 400
_SYSTEM_PROMPT = """\
You are an expert at recognising broker portfolio CSV formats.
You will be given the header row and 3-5 sample data rows from a CSV.
Identify which column contains each field. Return ONLY a single JSON
object, no prose, no markdown fences.
Schema (use the EXACT header string from the input; use null if no
column is a good match):
{
"ticker_col": "<header name or null>",
"qty_col": "<header name or null>",
"name_col": "<header name or null>",
"cost_col": "<header name or null>", // average price per share
"currency_col": "<header name or null>",
"broker_label": "<short identifier like 'IBKR Activity Statement' or null>"
}
Rules:
- ticker_col and qty_col are required. If either is missing, return all nulls.
- Use the EXACT header string as it appears in the input — do not paraphrase.
- Output JSON ONLY. No prose, no code fences.
"""
def _build_user_prompt(headers: list[str], samples: list[list[str]]) -> str:
lines = ["headers: " + json.dumps(headers)]
lines.append("samples:")
for s in samples[:_LLM_SAMPLES]:
lines.append(" " + ",".join(s))
return "\n".join(lines)
async def _extract_mapping_via_llm(
client: httpx.AsyncClient,
headers: list[str],
samples: list[list[str]],
) -> tuple[dict, LogResult]:
"""Single LLM call returning ``(mapping_dict, LogResult)``.
The LLM is asked for a strict JSON object (no markdown). We attempt
to parse the returned content; ``LLMParseError`` wraps any failure
in a way callers can surface to the user."""
messages = [
{"role": "system", "content": _SYSTEM_PROMPT},
{"role": "user", "content": _build_user_prompt(headers, samples)},
]
try:
result = await call_llm(client, messages, max_tokens=_LLM_MAX_TOKENS)
except Exception as e:
raise LLMParseError(f"LLM provider failed: {e}") from e
content = (result.content or "").strip()
# Strip code fences if the model added them despite instructions.
if content.startswith("```"):
content = content.strip("`")
# Drop optional 'json' language tag.
if content.lstrip().lower().startswith("json"):
content = content.lstrip()[4:]
content = content.strip()
try:
mapping = json.loads(content)
except json.JSONDecodeError as e:
raise LLMParseError(f"LLM did not return valid JSON: {e}") from e
if not isinstance(mapping, dict):
raise LLMParseError("LLM JSON was not an object")
return mapping, result
```
- [ ] **Step 4: Run tests to verify they pass**
```bash
docker compose -f docker-compose.test.yml run --rm test pytest tests/test_llm_csv_parser.py -k extract_mapping_via_llm -v
```
Expected: 3 PASS.
- [ ] **Step 5: Commit**
```bash
git add app/services/llm_csv_parser.py tests/test_llm_csv_parser.py
git commit -m "csv-parser: add _extract_mapping_via_llm with provider-failure wrapping"
```
---
### Task 9: Public `parse_with_llm` orchestration
**Files:**
- Modify: `app/services/llm_csv_parser.py`
- Test: `tests/test_llm_csv_parser.py`
- [ ] **Step 1: Write the per-test session factory helper (copied from `test_referral_conversion.py`)**
Append to the **top** of `tests/test_llm_csv_parser.py` (after the existing imports):
```python
def _build_session_factory(tmp_path):
"""Spin up a fresh in-memory schema and return (engine, factory).
Matches the pattern used in tests/test_referral_conversion.py."""
from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine
from app import db as db_mod
from app.db import Base
import app.models # noqa: F401 — registers models on Base.metadata
engine = create_async_engine(f"sqlite+aiosqlite:///{tmp_path}/csv.db")
factory = async_sessionmaker(engine, expire_on_commit=False)
db_mod._engine = engine
db_mod._session_factory = factory
async def _setup():
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
return engine, factory, _setup
```
Tests will `await setup()` themselves before using the factory.
- [ ] **Step 2: Write failing tests for cache miss + cache hit**
Append to `tests/test_llm_csv_parser.py`:
```python
@pytest.mark.asyncio
async def test_parse_with_llm_cache_miss_inserts_template(tmp_path):
from unittest.mock import AsyncMock
from sqlalchemy import select
from app.models import CsvFormatTemplate
from app.services.llm_csv_parser import parse_with_llm
from app.services.openrouter import LogResult
_, factory, setup = _build_session_factory(tmp_path)
await setup()
raw = (
b"Symbol,Quantity,Avg Price,Currency\n"
b"AAPL,100,150.25,USD\n"
b"MSFT,50,310.00,USD\n"
)
import app.services.llm_csv_parser as mod
mod.call_llm = AsyncMock(return_value=LogResult(
content='{"ticker_col":"Symbol","qty_col":"Quantity",'
'"cost_col":"Avg Price","currency_col":"Currency",'
'"name_col":null,"broker_label":"Generic broker"}',
model="deepseek/deepseek-v4-flash",
prompt_tokens=120, completion_tokens=40, cost_usd=0.0002,
))
async with factory() as session:
pie = await parse_with_llm(raw, session)
assert len(pie.positions) == 2
assert pie.positions[0].slice == "AAPL"
async with factory() as session:
rows = (await session.execute(select(CsvFormatTemplate))).scalars().all()
assert len(rows) == 1
tmpl = rows[0]
assert tmpl.headers == ["Symbol", "Quantity", "Avg Price", "Currency"]
assert tmpl.sample_row == ["AAPL", "100", "150.25", "USD"]
assert tmpl.mapping["ticker_col"] == "Symbol"
assert tmpl.broker_label == "Generic broker"
assert tmpl.use_count == 1
assert tmpl.llm_cost_usd == pytest.approx(0.0002)
# The crucial PII guarantee:
assert not hasattr(tmpl, "user_id"), "sample row must not be linked to a user"
@pytest.mark.asyncio
async def test_parse_with_llm_cache_hit_skips_llm(tmp_path):
from unittest.mock import AsyncMock
from sqlalchemy import select
from app.db import utcnow
from app.models import CsvFormatTemplate
from app.services.llm_csv_parser import _fingerprint, parse_with_llm
_, factory, setup = _build_session_factory(tmp_path)
await setup()
headers = ["Symbol", "Quantity", "Avg Price", "Currency"]
fp = _fingerprint(headers)
# Pre-populate a cache hit row.
async with factory() as session:
session.add(CsvFormatTemplate(
fingerprint=fp,
headers=headers,
sample_row=["AAPL", "100", "150.25", "USD"],
mapping={
"ticker_col": "Symbol", "qty_col": "Quantity",
"cost_col": "Avg Price", "currency_col": "Currency",
"name_col": None,
},
preamble_rows=0,
delimiter=",",
broker_label="Cached broker",
first_seen_at=utcnow(),
last_used_at=utcnow(),
use_count=1,
llm_model="seed",
llm_cost_usd=0.0,
))
await session.commit()
raw = (
b"Symbol,Quantity,Avg Price,Currency\n"
b"NVDA,40,425.50,USD\n"
)
import app.services.llm_csv_parser as mod
mod.call_llm = AsyncMock(side_effect=AssertionError("call_llm must NOT be called on cache hit"))
async with factory() as session:
pie = await parse_with_llm(raw, session)
assert pie.positions[0].slice == "NVDA"
async with factory() as session:
rows = (await session.execute(select(CsvFormatTemplate))).scalars().all()
assert len(rows) == 1
assert rows[0].use_count == 2
@pytest.mark.asyncio
async def test_parse_with_llm_stale_mapping_raises_but_does_not_evict(tmp_path):
from unittest.mock import AsyncMock
from sqlalchemy import select
from app.db import utcnow
from app.models import CsvFormatTemplate
from app.services.llm_csv_parser import LLMParseError, _fingerprint, parse_with_llm
_, factory, setup = _build_session_factory(tmp_path)
await setup()
headers = ["Symbol", "Quantity"]
fp = _fingerprint(headers)
# Cached mapping says qty is in column "Symbol" — clearly wrong; will
# never produce a parseable row.
async with factory() as session:
session.add(CsvFormatTemplate(
fingerprint=fp, headers=headers,
sample_row=["AAPL", "100"],
mapping={"ticker_col": "Symbol", "qty_col": "Symbol"},
preamble_rows=0, delimiter=",", broker_label=None,
first_seen_at=utcnow(), last_used_at=utcnow(), use_count=1,
llm_model="seed", llm_cost_usd=0.0,
))
await session.commit()
raw = b"Symbol,Quantity\nAAPL,100\nMSFT,50\n"
import app.services.llm_csv_parser as mod
mod.call_llm = AsyncMock(side_effect=AssertionError("must not be called"))
async with factory() as session:
with pytest.raises(LLMParseError):
await parse_with_llm(raw, session)
# Stale template must NOT have been auto-deleted (operator owns eviction).
async with factory() as session:
rows = (await session.execute(select(CsvFormatTemplate))).scalars().all()
assert len(rows) == 1
```
- [ ] **Step 3: Run tests to verify they fail**
```bash
docker compose -f docker-compose.test.yml run --rm test pytest tests/test_llm_csv_parser.py -k parse_with_llm -v
```
Expected: 3 FAIL with `ImportError` for `parse_with_llm`.
- [ ] **Step 4: Implement `parse_with_llm`**
Append to `app/services/llm_csv_parser.py`:
```python
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.db import utcnow
from app.logging import get_logger
from app.models import CsvFormatTemplate
log = get_logger("llm_csv_parser")
# Hard cap shared with /api/portfolio/parse — bytes-level, mirrors T212 path.
_MAX_CSV_BYTES = 1_048_576
async def parse_with_llm(raw: bytes, session: AsyncSession) -> ParsedPie:
"""Cache-first LLM-fallback CSV parse.
On cache hit, applies the stored mapping deterministically and
increments ``use_count``. On cache miss, calls the LLM, validates
the returned mapping against the first data row, and persists a
new ``CsvFormatTemplate``. Raises ``LLMParseError`` on any
failure; the caller (route layer) maps that to a 400."""
if len(raw) > _MAX_CSV_BYTES:
raise LLMParseError("CSV too large (1 MB max)")
if not raw or not raw.strip():
raise LLMParseError("empty CSV")
delimiter, preamble_rows = _detect_dialect(raw)
text = _decode_raw(raw)
reader = csv.reader(io.StringIO(text), delimiter=delimiter)
rows = list(reader)
if preamble_rows >= len(rows):
raise LLMParseError("no header row found in CSV")
headers = [c.strip() for c in rows[preamble_rows]]
data_rows = rows[preamble_rows + 1:]
if not headers:
raise LLMParseError("empty header row")
first_data_row = next(
(r for r in data_rows if any(c.strip() for c in r)), None,
)
if first_data_row is None:
raise LLMParseError("CSV contains a header but no data rows")
fp = _fingerprint(headers)
existing = (await session.execute(
select(CsvFormatTemplate).where(CsvFormatTemplate.fingerprint == fp)
)).scalar_one_or_none()
if existing is not None:
log.info("csv.format.cache_hit", fingerprint=fp,
broker_label=existing.broker_label, use_count=existing.use_count)
pie = _apply_mapping(headers, data_rows, existing.mapping)
if not pie.positions:
raise LLMParseError(
"cached mapping produced no positions — the broker may have "
"changed their CSV shape; ask the operator to evict the "
"stale template"
)
existing.use_count += 1
existing.last_used_at = utcnow()
await session.commit()
return pie
log.info("csv.format.cache_miss", fingerprint=fp,
header_count=len(headers))
samples = [r for r in data_rows[:_LLM_SAMPLES] if any(c.strip() for c in r)]
async with httpx.AsyncClient(follow_redirects=True, timeout=30) as client:
mapping, llm_log = await _extract_mapping_via_llm(client, headers, samples)
_validate_mapping(mapping, headers, first_data_row)
pie = _apply_mapping(headers, data_rows, mapping)
if not pie.positions:
raise LLMParseError(
"LLM mapping validated but produced no positions — the file "
"may not contain portfolio data"
)
now = utcnow()
session.add(CsvFormatTemplate(
fingerprint=fp,
headers=headers,
sample_row=first_data_row,
mapping=mapping,
preamble_rows=preamble_rows,
delimiter=delimiter,
broker_label=mapping.get("broker_label"),
first_seen_at=now,
last_used_at=now,
use_count=1,
llm_model=llm_log.model,
llm_cost_usd=llm_log.cost_usd,
))
await session.commit()
return pie
```
- [ ] **Step 5: Run tests to verify they pass**
```bash
docker compose -f docker-compose.test.yml run --rm test pytest tests/test_llm_csv_parser.py -v
```
Expected: every test passes (including everything from earlier tasks).
- [ ] **Step 6: Commit**
```bash
git add app/services/llm_csv_parser.py tests/test_llm_csv_parser.py
git commit -m "csv-parser: add public parse_with_llm with cache hit/miss orchestration"
```
---
### Task 10: Wire `parse_with_llm` into the route and add `require_paid`
**Files:**
- Modify: `app/routers/universe.py:192-214` (the `parse_portfolio` route + decorator)
- Test: `tests/test_llm_csv_parser.py`
- [ ] **Step 1: Write the route-level integration test (direct function call, no HTTP layer)**
Calling `parse_portfolio` directly with a fake `UploadFile` and a real session sidesteps the pytest-asyncio + `TestClient` event-loop awkwardness. `Depends(require_paid)` is decorator-level and is not invoked when we call the function directly — which is what we want (paid gating is mechanical and trusted to FastAPI; we verify it by inspection in Step 2 of the next stage).
Append to `tests/test_llm_csv_parser.py`:
```python
@pytest.mark.asyncio
async def test_parse_portfolio_route_falls_through_to_llm(tmp_path, monkeypatch):
"""End-to-end: T212 parser raises CSVImportError, LLM fallback runs,
response shape matches the existing JSON contract."""
from io import BytesIO
from types import SimpleNamespace
from unittest.mock import AsyncMock
from fastapi import UploadFile
_, factory, setup = _build_session_factory(tmp_path)
await setup()
import app.services.llm_csv_parser as mod
from app.services.openrouter import LogResult
mod.call_llm = AsyncMock(return_value=LogResult(
content='{"ticker_col":"Symbol","qty_col":"Quantity",'
'"cost_col":"Avg Price","currency_col":"Currency",'
'"name_col":"Description",'
'"broker_label":"IBKR Activity Statement"}',
model="deepseek/deepseek-v4-flash",
prompt_tokens=150, completion_tokens=60, cost_usd=0.0003,
))
# The route's inline Yahoo-fetch block would otherwise hit the network.
# Patch market.fetch to return a benign placeholder per ticker.
from app.services import market as market_mod
async def _fake_fetch(client, symbol, label, group, anchor):
return SimpleNamespace(
symbol=symbol, source="test", label=label,
price=None, currency="USD", as_of="2026-05-27",
changes=None, error=None,
)
monkeypatch.setattr(market_mod, "fetch", _fake_fetch)
raw = open("tests/fixtures/ibkr_sample.csv", "rb").read()
upload = UploadFile(filename="ibkr.csv", file=BytesIO(raw))
from app.routers.universe import parse_portfolio
async with factory() as session:
result = await parse_portfolio(file=upload, session=session)
assert result["base_currency"] == "GBP"
# At least the AAPL/MSFT/NVDA rows should be present; resolve_slice may
# filter some if there's no InstrumentMap row, which is fine for this
# test — we just want to confirm the LLM fallback ran end-to-end.
assert isinstance(result["positions"], list)
# LLM was called exactly once (cache miss).
assert mod.call_llm.await_count == 1
```
- [ ] **Step 1b: Add a paid-gate inspection test (no HTTP needed)**
```python
def test_parse_portfolio_route_requires_paid():
"""Static check that the /portfolio/parse route is gated by require_paid."""
from app.routers.universe import router
from app.services.access import require_paid
parse_route = next(
r for r in router.routes
if getattr(r, "path", "") == "/portfolio/parse"
)
dep_callables = [d.dependency for d in parse_route.dependant.dependencies]
assert require_paid in dep_callables, (
"The /portfolio/parse route must have Depends(require_paid)"
)
```
- [ ] **Step 2: Run the test to verify it fails**
```bash
docker compose -f docker-compose.test.yml run --rm test pytest tests/test_llm_csv_parser.py::test_parse_portfolio_route_falls_through_to_llm -v
```
Expected: FAIL — probably with the T212 parser raising `CSVImportError` because the route does not yet have the LLM fallback wired up. The exact failure message confirms we need the wiring.
- [ ] **Step 3: Wire `parse_with_llm` into the route**
In `app/routers/universe.py`, find the `parse_portfolio` definition (search `async def parse_portfolio`). Make these two changes:
**Change A: Add `require_paid` to the route decorator.** Find the existing line:
```python
@router.post("/portfolio/parse")
async def parse_portfolio(
file: UploadFile = File(...),
session: AsyncSession = Depends(get_session),
) -> dict:
```
Replace with:
```python
@router.post("/portfolio/parse", dependencies=[Depends(require_paid)])
async def parse_portfolio(
file: UploadFile = File(...),
session: AsyncSession = Depends(get_session),
) -> dict:
```
**Change B: Add the LLM fallback.** Find the existing block:
```python
try:
pie = parse_t212_csv(raw)
except CSVImportError as e:
raise HTTPException(status_code=400, detail=str(e))
```
Replace with:
```python
try:
pie = parse_t212_csv(raw)
except CSVImportError:
# Unrecognised format — try the LLM-fallback parser. It hits a
# global format-fingerprint cache first; only the very first
# upload of each broker format pays an LLM call.
from app.services.llm_csv_parser import LLMParseError, parse_with_llm
try:
pie = await parse_with_llm(raw, session)
except LLMParseError as e:
raise HTTPException(status_code=400, detail=str(e))
```
- [ ] **Step 4: Run the integration test to verify it passes**
```bash
docker compose -f docker-compose.test.yml run --rm test pytest tests/test_llm_csv_parser.py::test_parse_portfolio_route_falls_through_to_llm -v
```
Expected: PASS.
- [ ] **Step 5: Run the full test file + the existing T212 tests to confirm no regression**
```bash
docker compose -f docker-compose.test.yml run --rm test pytest tests/test_llm_csv_parser.py tests/test_csv_import.py -v
```
Expected: all PASS. Confirms the T212 happy path is untouched.
- [ ] **Step 6: Commit**
```bash
git add app/routers/universe.py tests/test_llm_csv_parser.py
git commit -m "universe: paid-gate + LLM fallback on /portfolio/parse"
```
---
### Task 11: UI copy tweaks
**Files:**
- Modify: `app/templates/settings.html` (search the file for "Trading 212 CSV", "T212 pie CSV")
- [ ] **Step 1: Update the section heading**
Find:
```html
<summary class="settings-section__head">Import portfolio (Trading 212 CSV)</summary>
```
Replace with:
```html
<summary class="settings-section__head">Import portfolio (CSV)</summary>
```
- [ ] **Step 2: Update the drop-zone label**
Find:
```html
<div class="dz__label">Drop a T212 pie CSV here</div>
```
Replace with:
```html
<div class="dz__label">Drop your broker's portfolio CSV here</div>
```
- [ ] **Step 3: Update the drop-zone hint**
Find:
```html
<div class="dz__hint">or <a href="#" id="browse-link">browse</a> &middot; max 1 MB</div>
```
Replace with:
```html
<div class="dz__hint">or <a href="#" id="browse-link">browse</a> &middot; max 1 MB &middot; T212, IBKR and others auto-detected</div>
```
- [ ] **Step 4: Soften the help paragraph**
If there is a paragraph above or beside the drop-zone that begins with "Export your pie from T212", change its opening phrase from declarative to conditional. For example, if the current line is:
```html
<p>Export your pie from T212 as CSV ...</p>
```
Replace with:
```html
<p>If you use Trading 212, export your pie as CSV ...</p>
```
(Search the file for `Export your pie from T212` to locate the exact paragraph; preserve any surrounding markup.)
- [ ] **Step 5: Commit**
```bash
git add app/templates/settings.html
git commit -m "settings: soften import copy to be broker-agnostic"
```
---
### Task 12: Final regression run + manual smoke
**Files:**
- (no code changes — verification only)
- [ ] **Step 1: Full test suite**
```bash
docker compose -f docker-compose.test.yml run --rm test pytest tests/ -v
```
Expected: every existing test still passes; the new tests in `tests/test_llm_csv_parser.py` all pass.
- [ ] **Step 2: Apply the migration against the dev DB and confirm the table exists**
```bash
docker compose exec app alembic upgrade head
docker compose exec app python -c "
import asyncio
from sqlalchemy import inspect
from app.db import get_engine
async def main():
eng = get_engine()
async with eng.connect() as conn:
names = await conn.run_sync(lambda c: inspect(c).get_table_names())
assert 'csv_format_templates' in names, names
print('csv_format_templates table present:', sorted(names))
asyncio.run(main())
"
```
Expected: prints the table name in the list. NOTE: this touches the prod DB on this host — only run when the user has explicitly approved this deploy.
- [ ] **Step 3: Restart the app container**
```bash
docker compose restart app
docker compose logs app --tail 30 | grep -E "(Uvicorn|startup complete|error)" || true
```
Expected: clean startup; no tracebacks.
- [ ] **Step 4: Manual smoke — re-import a T212 CSV**
Through the browser at `/settings`, drop a known T212 CSV. The dashboard should load as it always has. (Confirms zero regression on the happy path.)
- [ ] **Step 5: Manual smoke — first IBKR-shaped upload**
Through the browser at `/settings`, drop `tests/fixtures/ibkr_sample.csv` (or a real IBKR statement). The dashboard should load with the IBKR positions. Then query the DB to confirm the template was cached:
```bash
docker compose exec app python -c "
import asyncio
from sqlalchemy import select
from app.db import get_session_factory
from app.models import CsvFormatTemplate
async def main():
factory = get_session_factory()
async with factory() as s:
rows = (await s.execute(select(CsvFormatTemplate))).scalars().all()
for r in rows:
print(r.fingerprint[:12], r.broker_label, 'use_count=', r.use_count,
'cost=', r.llm_cost_usd)
asyncio.run(main())
"
```
NOTE: this is a prod DB read; only run with explicit user approval.
Expected: a single row, with `use_count=1` and a small positive `llm_cost_usd`.
- [ ] **Step 6: Manual smoke — second IBKR-shaped upload (cache hit)**
Drop the same fixture again. The dashboard should load identically, and the DB row should now show `use_count=2`. AICall ledger should NOT have a new row for this second upload (only the first paid the LLM cost).
- [ ] **Step 7: Manual smoke — paid gating**
In a free-tier browser session, attempt the upload. Expect a 402 response visible in network tools / surfaced as an "upgrade required" message in the UI.
---
## Self-Review
Spec coverage walkthrough:
- **Trigger: transparent fallback** → Task 10 (route try/except)
- **Cache for reuse** → Task 9 (cache-hit branch in `parse_with_llm`)
- **Paid-only** → Task 10 Step 3 Change A (adds `Depends(require_paid)`)
- **LLM column-mapping only** → Tasks 68 (`_validate_mapping`, `_extract_mapping_via_llm`, no full-CSV extraction anywhere)
- **Global cache** → Tasks 1, 2 (no `user_id` column); Task 9 cache lookup is global
- **`sample_row` is a real first data row, anonymous** → Task 9 Step 4 (the `INSERT` uses `first_data_row`); Task 1 test asserts `user_id` absent
- **No self-heal / no auto-eviction** → Task 9 stale-mapping test asserts row is NOT deleted
- **No code authoring** → out of scope by construction (no code writes anywhere in the plan)
- **`fingerprint` = sha256(normalised headers)** → Task 4
- **Preamble detection** → Task 5
- **Drop-zone + heading copy softened** → Task 11
- **Error handling** (LLM down → 502 in spirit; nonsense → 400; non-numeric qty → 400) → Tasks 6, 8, 10 (the route raises `HTTPException(400)` on `LLMParseError`; LLM provider failure is wrapped in `LLMParseError` by `_extract_mapping_via_llm`, which the route surfaces as 400 — note this is 400 in the implementation, not 502 as the spec implied; if you specifically want 502 for provider-down vs 400 for mapping-bad, split the exception types in Task 8 and branch in Task 10)
- **Migration 0021** → Task 2
- **Fabricated IBKR fixture** → Task 3
- **Test pattern matches `test_referral_conversion.py`** → Task 9 Step 1 (copies the factory pattern)
One spec-vs-plan deviation worth flagging to the engineer: the spec error-handling table says "LLM provider down → 502". This plan returns 400 for all `LLMParseError` cases including provider-down, because the wrapping is uniform. If you want a 502/400 split, the simplest fix is to introduce a sibling `LLMProviderError(LLMParseError)` raised inside `_extract_mapping_via_llm`'s exception path, and branch on it in Task 10. Two-line change. Either behaviour is defensible — flagging so it's a conscious choice.