read.markets/tests/test_llm_csv_parser.py
Giorgio Gilestro bc55ab7d26 csv-parser: keep LLM-mapped tickers; don't pass them through T212 mapping
The route's resolve-slice loop is T212-specific — it looks tickers up
against the InstrumentMap, which only has T212's universe. For the LLM
path the ticker is already Yahoo-ready (e.g. VOD.L, ASML.AS), so
sending it through resolve_slice produced spurious "could not be
resolved" warnings and dropped the positions.

Fix: ParsedPie gains a ``tickers_resolved`` flag (default False for
T212 backward-compat); _apply_mapping in the LLM path sets it True
and also extracts currency from the LLM-mapped currency_col into a
new ``ParsedPosition.currency`` field. The route branches on the flag:
LLM-path positions are kept verbatim with a best-effort InstrumentMap
lookup for nicer name/currency overrides, never dropped.

Integration test tightened to assert all 5 IBKR fixture positions
round-trip with the right currencies (USD / GBP / EUR).

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-27 12:48:27 +02:00

548 lines
19 KiB
Python

"""Unit + integration tests for the LLM-fallback CSV parser."""
from __future__ import annotations
import pytest
def _build_session_factory(tmp_path):
"""Spin up a fresh in-memory schema and return (engine, factory).
Matches the pattern used in tests/test_referral_conversion.py."""
from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine
from app import db as db_mod
from app.db import Base
import app.models # noqa: F401 — registers models on Base.metadata
engine = create_async_engine(f"sqlite+aiosqlite:///{tmp_path}/csv.db")
factory = async_sessionmaker(engine, expire_on_commit=False)
db_mod._engine = engine
db_mod._session_factory = factory
async def _setup():
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
return engine, factory, _setup
def test_csv_format_template_model_columns():
"""Model exposes every column the spec requires, with correct types."""
from sqlalchemy import inspect
from app.models import CsvFormatTemplate
cols = {c.name: c for c in inspect(CsvFormatTemplate).columns}
assert "fingerprint" in cols
assert "headers" in cols
assert "sample_row" in cols
assert "mapping" in cols
assert "preamble_rows" in cols
assert "delimiter" in cols
assert "broker_label" in cols
assert "first_seen_at" in cols
assert "use_count" in cols
assert "last_used_at" in cols
assert "llm_model" in cols
assert "llm_cost_usd" in cols
# Crucially, no user attribution.
assert "user_id" not in cols
assert "first_seen_user_id" not in cols
# Fingerprint is the cache key.
assert cols["fingerprint"].unique is True
assert cols["fingerprint"].nullable is False
def test_fingerprint_stable_across_case_and_whitespace():
from app.services.llm_csv_parser import _fingerprint
a = _fingerprint(["Symbol", "Quantity", "Avg Price"])
b = _fingerprint(["symbol", "quantity", "avg price"])
c = _fingerprint([" SYMBOL ", "Quantity", " AVG PRICE"])
assert a == b == c
def test_fingerprint_differs_for_different_columns():
from app.services.llm_csv_parser import _fingerprint
a = _fingerprint(["Symbol", "Quantity"])
b = _fingerprint(["Symbol", "Quantity", "Avg Price"])
assert a != b
def test_fingerprint_is_sha256_hex_64_chars():
from app.services.llm_csv_parser import _fingerprint
f = _fingerprint(["Symbol", "Quantity"])
assert len(f) == 64
assert all(c in "0123456789abcdef" for c in f)
def test_detect_dialect_no_preamble_comma():
from app.services.llm_csv_parser import _detect_dialect
raw = b"Symbol,Quantity,Avg Price\nAAPL,100,150.25\nMSFT,50,310.00\n"
delimiter, preamble = _detect_dialect(raw)
assert delimiter == ","
assert preamble == 0
def test_detect_dialect_with_preamble():
from app.services.llm_csv_parser import _detect_dialect
raw = (
b"Statement,Header,Field Name,Field Value\n"
b"Statement,Data,BrokerName,Interactive Brokers LLC\n"
b"Statement,Data,Title,Activity Statement\n"
b"Statement,Data,Period,\"January 1, 2026 - January 31, 2026\"\n"
b"Symbol,Quantity,Avg Price,Currency,Description\n"
b"AAPL,100,150.25,USD,Apple Inc\n"
)
delimiter, preamble = _detect_dialect(raw)
assert delimiter == ","
# The data-row header line is the FIFTH line (index 4); preamble = 4.
assert preamble == 4
def test_detect_dialect_tab_delimited():
from app.services.llm_csv_parser import _detect_dialect
raw = b"Symbol\tQuantity\tAvg Price\nAAPL\t100\t150.25\n"
delimiter, preamble = _detect_dialect(raw)
assert delimiter == "\t"
assert preamble == 0
def test_detect_dialect_empty_raises():
from app.services.llm_csv_parser import LLMParseError, _detect_dialect
with pytest.raises(LLMParseError):
_detect_dialect(b"")
def test_validate_mapping_accepts_well_formed():
from app.services.llm_csv_parser import _validate_mapping
headers = ["Symbol", "Quantity", "Avg Price", "Currency"]
first_row = ["AAPL", "100", "150.25", "USD"]
mapping = {
"ticker_col": "Symbol",
"qty_col": "Quantity",
"cost_col": "Avg Price",
"currency_col": "Currency",
"name_col": None,
}
_validate_mapping(mapping, headers, first_row) # no raise
def test_validate_mapping_missing_ticker_raises():
from app.services.llm_csv_parser import LLMParseError, _validate_mapping
headers = ["Symbol", "Quantity"]
first_row = ["AAPL", "100"]
mapping = {"ticker_col": None, "qty_col": "Quantity"}
with pytest.raises(LLMParseError, match="ticker"):
_validate_mapping(mapping, headers, first_row)
def test_validate_mapping_missing_qty_raises():
from app.services.llm_csv_parser import LLMParseError, _validate_mapping
headers = ["Symbol", "Quantity"]
first_row = ["AAPL", "100"]
mapping = {"ticker_col": "Symbol", "qty_col": None}
with pytest.raises(LLMParseError, match="qty"):
_validate_mapping(mapping, headers, first_row)
def test_validate_mapping_unknown_column_raises():
from app.services.llm_csv_parser import LLMParseError, _validate_mapping
headers = ["Symbol", "Quantity"]
first_row = ["AAPL", "100"]
mapping = {"ticker_col": "Symbol", "qty_col": "NotARealColumn"}
with pytest.raises(LLMParseError, match="NotARealColumn"):
_validate_mapping(mapping, headers, first_row)
def test_validate_mapping_non_numeric_qty_raises():
from app.services.llm_csv_parser import LLMParseError, _validate_mapping
headers = ["Symbol", "Description"]
first_row = ["AAPL", "Apple Inc"]
# Mapping says qty is "Description", but "Apple Inc" can't parse as a number.
mapping = {"ticker_col": "Symbol", "qty_col": "Description"}
with pytest.raises(LLMParseError, match="numeric"):
_validate_mapping(mapping, headers, first_row)
def test_apply_mapping_builds_parsed_pie():
from app.services.csv_import import ParsedPie, ParsedPosition
from app.services.llm_csv_parser import _apply_mapping
headers = ["Symbol", "Quantity", "Avg Price", "Currency", "Description"]
data_rows = [
["AAPL", "100", "150.25", "USD", "Apple Inc"],
["MSFT", "50", "310.00", "USD", "Microsoft Corp"],
]
mapping = {
"ticker_col": "Symbol",
"qty_col": "Quantity",
"cost_col": "Avg Price",
"currency_col": "Currency",
"name_col": "Description",
}
pie = _apply_mapping(headers, data_rows, mapping)
assert isinstance(pie, ParsedPie)
assert len(pie.positions) == 2
p0 = pie.positions[0]
assert isinstance(p0, ParsedPosition)
assert p0.slice == "AAPL"
assert p0.name == "Apple Inc"
assert p0.quantity == 100.0
assert p0.invested_value == pytest.approx(15025.0)
# invested = qty * avg_cost = 100 * 150.25 = 15025
assert pie.invested == pytest.approx(15025.0 + 50 * 310.00)
def test_apply_mapping_handles_missing_optional_columns():
from app.services.llm_csv_parser import _apply_mapping
headers = ["Symbol", "Quantity"]
data_rows = [["AAPL", "100"]]
mapping = {
"ticker_col": "Symbol",
"qty_col": "Quantity",
"cost_col": None,
"currency_col": None,
"name_col": None,
}
pie = _apply_mapping(headers, data_rows, mapping)
p = pie.positions[0]
assert p.slice == "AAPL"
assert p.quantity == 100.0
assert p.invested_value is None
assert p.name == "AAPL" # falls back to ticker when name_col absent
def test_apply_mapping_skips_blank_and_unparseable_rows():
from app.services.llm_csv_parser import _apply_mapping
headers = ["Symbol", "Quantity"]
data_rows = [
["AAPL", "100"],
["", ""], # blank
["MSFT", "not-a-number"], # bad qty
["NVDA", "40"],
]
mapping = {"ticker_col": "Symbol", "qty_col": "Quantity"}
pie = _apply_mapping(headers, data_rows, mapping)
assert [p.slice for p in pie.positions] == ["AAPL", "NVDA"]
@pytest.mark.asyncio
async def test_extract_mapping_via_llm_parses_valid_json():
from unittest.mock import AsyncMock, MagicMock
from app.services.llm_csv_parser import _extract_mapping_via_llm
from app.services.openrouter import LogResult
fake_result = LogResult(
content='{"ticker_col": "Symbol", "qty_col": "Quantity", '
'"cost_col": "Avg Price", "currency_col": "Currency", '
'"name_col": null, "broker_label": "IBKR Activity Statement"}',
model="deepseek/deepseek-v4-flash",
prompt_tokens=100,
completion_tokens=50,
cost_usd=0.0001,
)
fake_client = MagicMock()
fake_call_llm = AsyncMock(return_value=fake_result)
import app.services.llm_csv_parser as mod
mod.call_llm = fake_call_llm # monkeypatch
headers = ["Symbol", "Quantity", "Avg Price", "Currency"]
samples = [["AAPL", "100", "150.25", "USD"]]
mapping, log = await _extract_mapping_via_llm(fake_client, headers, samples)
assert mapping["ticker_col"] == "Symbol"
assert mapping["qty_col"] == "Quantity"
assert mapping["broker_label"] == "IBKR Activity Statement"
assert log.model == "deepseek/deepseek-v4-flash"
fake_call_llm.assert_awaited_once()
@pytest.mark.asyncio
async def test_extract_mapping_via_llm_malformed_json_raises():
from unittest.mock import AsyncMock, MagicMock
from app.services.llm_csv_parser import LLMParseError, _extract_mapping_via_llm
from app.services.openrouter import LogResult
fake_result = LogResult(
content="Sure thing — here is the mapping! ticker=Symbol",
model="deepseek/deepseek-v4-flash",
prompt_tokens=10,
completion_tokens=20,
cost_usd=0.00005,
)
fake_client = MagicMock()
fake_call_llm = AsyncMock(return_value=fake_result)
import app.services.llm_csv_parser as mod
mod.call_llm = fake_call_llm
with pytest.raises(LLMParseError, match="JSON"):
await _extract_mapping_via_llm(fake_client, ["Symbol"], [["AAPL"]])
@pytest.mark.asyncio
async def test_extract_mapping_via_llm_provider_failure_wraps():
from unittest.mock import AsyncMock, MagicMock
from app.services.llm_csv_parser import LLMParseError, _extract_mapping_via_llm
fake_client = MagicMock()
fake_call_llm = AsyncMock(side_effect=RuntimeError("provider down"))
import app.services.llm_csv_parser as mod
mod.call_llm = fake_call_llm
with pytest.raises(LLMParseError, match="provider"):
await _extract_mapping_via_llm(fake_client, ["Symbol"], [["AAPL"]])
@pytest.mark.asyncio
async def test_parse_with_llm_cache_miss_inserts_template(tmp_path):
from unittest.mock import AsyncMock
from sqlalchemy import select
from app.models import CsvFormatTemplate
from app.services.llm_csv_parser import parse_with_llm
from app.services.openrouter import LogResult
_, factory, setup = _build_session_factory(tmp_path)
await setup()
raw = (
b"Symbol,Quantity,Avg Price,Currency\n"
b"AAPL,100,150.25,USD\n"
b"MSFT,50,310.00,USD\n"
)
import app.services.llm_csv_parser as mod
mod.call_llm = AsyncMock(return_value=LogResult(
content='{"ticker_col":"Symbol","qty_col":"Quantity",'
'"cost_col":"Avg Price","currency_col":"Currency",'
'"name_col":null,"broker_label":"Generic broker"}',
model="deepseek/deepseek-v4-flash",
prompt_tokens=120, completion_tokens=40, cost_usd=0.0002,
))
async with factory() as session:
pie = await parse_with_llm(raw, session)
assert len(pie.positions) == 2
assert pie.positions[0].slice == "AAPL"
async with factory() as session:
rows = (await session.execute(select(CsvFormatTemplate))).scalars().all()
assert len(rows) == 1
tmpl = rows[0]
assert tmpl.headers == ["Symbol", "Quantity", "Avg Price", "Currency"]
assert tmpl.sample_row == ["AAPL", "100", "150.25", "USD"]
assert tmpl.mapping["ticker_col"] == "Symbol"
assert tmpl.broker_label == "Generic broker"
assert tmpl.use_count == 1
assert tmpl.llm_cost_usd == pytest.approx(0.0002)
# The crucial PII guarantee:
assert not hasattr(tmpl, "user_id"), "sample row must not be linked to a user"
@pytest.mark.asyncio
async def test_parse_with_llm_cache_hit_skips_llm(tmp_path):
from unittest.mock import AsyncMock
from sqlalchemy import select
from app.db import utcnow
from app.models import CsvFormatTemplate
from app.services.llm_csv_parser import _fingerprint, parse_with_llm
_, factory, setup = _build_session_factory(tmp_path)
await setup()
headers = ["Symbol", "Quantity", "Avg Price", "Currency"]
fp = _fingerprint(headers)
# Pre-populate a cache hit row.
async with factory() as session:
session.add(CsvFormatTemplate(
fingerprint=fp,
headers=headers,
sample_row=["AAPL", "100", "150.25", "USD"],
mapping={
"ticker_col": "Symbol", "qty_col": "Quantity",
"cost_col": "Avg Price", "currency_col": "Currency",
"name_col": None,
},
preamble_rows=0,
delimiter=",",
broker_label="Cached broker",
first_seen_at=utcnow(),
last_used_at=utcnow(),
use_count=1,
llm_model="seed",
llm_cost_usd=0.0,
))
await session.commit()
raw = (
b"Symbol,Quantity,Avg Price,Currency\n"
b"NVDA,40,425.50,USD\n"
)
import app.services.llm_csv_parser as mod
mod.call_llm = AsyncMock(side_effect=AssertionError("call_llm must NOT be called on cache hit"))
async with factory() as session:
pie = await parse_with_llm(raw, session)
assert pie.positions[0].slice == "NVDA"
async with factory() as session:
rows = (await session.execute(select(CsvFormatTemplate))).scalars().all()
assert len(rows) == 1
assert rows[0].use_count == 2
@pytest.mark.asyncio
async def test_parse_with_llm_stale_mapping_raises_but_does_not_evict(tmp_path):
from unittest.mock import AsyncMock
from sqlalchemy import select
from app.db import utcnow
from app.models import CsvFormatTemplate
from app.services.llm_csv_parser import LLMParseError, _fingerprint, parse_with_llm
_, factory, setup = _build_session_factory(tmp_path)
await setup()
headers = ["Symbol", "Quantity"]
fp = _fingerprint(headers)
# Cached mapping says qty is in column "Symbol" — clearly wrong; will
# never produce a parseable row.
async with factory() as session:
session.add(CsvFormatTemplate(
fingerprint=fp, headers=headers,
sample_row=["AAPL", "100"],
mapping={"ticker_col": "Symbol", "qty_col": "Symbol"},
preamble_rows=0, delimiter=",", broker_label=None,
first_seen_at=utcnow(), last_used_at=utcnow(), use_count=1,
llm_model="seed", llm_cost_usd=0.0,
))
await session.commit()
raw = b"Symbol,Quantity\nAAPL,100\nMSFT,50\n"
import app.services.llm_csv_parser as mod
mod.call_llm = AsyncMock(side_effect=AssertionError("must not be called"))
async with factory() as session:
with pytest.raises(LLMParseError):
await parse_with_llm(raw, session)
# Stale template must NOT have been auto-deleted (operator owns eviction).
async with factory() as session:
rows = (await session.execute(select(CsvFormatTemplate))).scalars().all()
assert len(rows) == 1
@pytest.mark.asyncio
async def test_parse_portfolio_route_falls_through_to_llm(tmp_path, monkeypatch):
"""End-to-end: T212 parser raises CSVImportError, LLM fallback runs,
response shape matches the existing JSON contract."""
from io import BytesIO
from types import SimpleNamespace
from unittest.mock import AsyncMock
from fastapi import UploadFile
_, factory, setup = _build_session_factory(tmp_path)
await setup()
import app.services.llm_csv_parser as mod
from app.services.openrouter import LogResult
mod.call_llm = AsyncMock(return_value=LogResult(
content='{"ticker_col":"Symbol","qty_col":"Quantity",'
'"cost_col":"Avg Price","currency_col":"Currency",'
'"name_col":"Description",'
'"broker_label":"IBKR Activity Statement"}',
model="deepseek/deepseek-v4-flash",
prompt_tokens=150, completion_tokens=60, cost_usd=0.0003,
))
# The route's inline Yahoo-fetch block would otherwise hit the network.
# Patch market.fetch to return a benign placeholder per ticker.
from app.services import market as market_mod
async def _fake_fetch(client, symbol, label, group, anchor):
return SimpleNamespace(
symbol=symbol, source="test", label=label,
price=None, currency="USD", as_of="2026-05-27",
changes=None, error=None,
)
monkeypatch.setattr(market_mod, "fetch", _fake_fetch)
# ticker_universe.upsert_tickers uses MySQL ON DUPLICATE KEY UPDATE
# which SQLite doesn't compile. Mock the two universe-side effects;
# neither contributes to the JSON contract we're testing here.
from app.services import ticker_universe as tu_mod
async def _fake_upsert(session, tickers):
return len(list(tickers))
async def _fake_buffer(tickers):
return len(list(tickers))
monkeypatch.setattr(tu_mod, "upsert_tickers", _fake_upsert)
monkeypatch.setattr(tu_mod, "buffer_tickers", _fake_buffer)
raw = open("tests/fixtures/ibkr_sample.csv", "rb").read()
upload = UploadFile(filename="ibkr.csv", file=BytesIO(raw))
from app.routers.universe import parse_portfolio
async with factory() as session:
result = await parse_portfolio(file=upload, session=session)
assert result["base_currency"] == "GBP"
# All 5 IBKR positions should round-trip — the LLM path trusts the
# Yahoo-ready tickers from the file and does NOT drop on a
# resolve_slice miss (that's the T212 path's behaviour).
tickers = {p["yahoo_ticker"] for p in result["positions"]}
assert tickers == {"AAPL", "MSFT", "NVDA", "VOD.L", "ASML.AS"}
# LLM was called exactly once (cache miss).
assert mod.call_llm.await_count == 1
# Currency comes from the LLM-mapped currency_col, falling back to
# USD only when neither InstrumentMap nor the file specified one.
by_t = {p["yahoo_ticker"]: p["currency"] for p in result["positions"]}
assert by_t["VOD.L"] == "GBP"
assert by_t["ASML.AS"] == "EUR"
def test_parse_portfolio_route_requires_paid():
"""Static check that the /portfolio/parse route is gated by require_paid."""
from app.routers.universe import router
from app.services.access import require_paid
parse_route = next(
r for r in router.routes
if getattr(r, "path", "") == "/portfolio/parse"
)
# FastAPI stores each Depends(...) as a Dependant whose `.call` attribute
# is the wrapped callable (`.dependency` is the older name, removed in
# recent FastAPI versions).
dep_callables = [d.call for d in parse_route.dependant.dependencies]
assert require_paid in dep_callables, (
"The /portfolio/parse route must have Depends(require_paid)"
)