phase B (1/4): CSV parser + InstrumentMap (T212 shortcode → Yahoo ticker)

First two slices of the multi-user roadmap (Phase B). Validates the
core onboarding mechanic against the user's real T212 export before
paying any auth/tenancy tax.

CSV parser (app/services/csv_import.py):
  - Header-name matched (survives T212 reordering columns between
    exports), tolerant of UTF-8 BOM, dash/N/A/empty markers, thousand-
    separator commas, blank rows, zero-quantity stubs, missing Total row.
  - Returns ParsedPie(name, positions, invested, value, result) with
    derived avg_price + current_price per share in account currency.
  - 14 tests covering happy path on the real CSV + 13 edge cases.

InstrumentMap (migration 0006 + app/services/instrument_map.py):
  - Catalogue table mapping T212 ticker → Yahoo ticker, populated by
    sync_from_t212() against the dev's read-only API key. Manual rows
    (manual=True) are protected from auto-overwrite.
  - Pure t212_ticker_to_yahoo() handles both suffix forms: single
    trailing exchange letter (l/a/p/d/m/s/...) and country code (US,
    DE, FR, IT, CA, ...). All 13 of the user's holdings + 15 case-
    coverage tests pass.
  - Live sync against T212 ingests 17,050 instruments (~2.2% unmappable
    on exotic exchanges; can extend the suffix map later).
  - resolve_slice() picks the right listing per shortName using a
    UK-friendly currency preference (GBX > GBP > EUR > USD). Resolved
    correctly for all 13 of the user's positions, including TTE on
    Paris vs the NYSE dual-listing.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Giorgio Gilestro 2026-05-16 10:53:08 +01:00
parent 6dac8a2c7f
commit 16e9f5f0cc
7 changed files with 840 additions and 0 deletions

View file

@ -187,6 +187,33 @@ class Position(Base):
snapshot: Mapped[PortfolioSnapshot] = relationship(back_populates="positions")
class InstrumentMap(Base):
"""Maps T212's tickers/shortnames to Yahoo Finance tickers so we can
refresh prices via Yahoo after a user uploads a T212 pie CSV.
Synced periodically from T212's /equity/metadata/instruments endpoint
via the admin's read-only API key. Each row is one T212 listing.
Multiple rows can share a shortName (e.g. SHEL on LSE in GBX vs
SHEL on NYSE in USD); the resolver picks the right one per user."""
__tablename__ = "instrument_map"
id: Mapped[int] = mapped_column(BigInteger, primary_key=True, autoincrement=True)
t212_ticker: Mapped[str] = mapped_column(String(64), nullable=False)
t212_shortname: Mapped[str] = mapped_column(String(32), nullable=False)
yahoo_ticker: Mapped[str | None] = mapped_column(String(32))
name: Mapped[str] = mapped_column(String(128), nullable=False)
currency: Mapped[str | None] = mapped_column(String(8))
isin: Mapped[str | None] = mapped_column(String(16))
instrument_type: Mapped[str | None] = mapped_column(String(16))
manual: Mapped[bool] = mapped_column(Boolean, default=False)
last_verified_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=utcnow)
__table_args__ = (
UniqueConstraint("t212_ticker", name="uq_imap_t212_ticker"),
Index("ix_imap_shortname", "t212_shortname"),
Index("ix_imap_isin", "isin"),
)
class JobRun(Base):
"""One row per scheduled-job invocation; powers /api/health + the ops footer."""
__tablename__ = "job_runs"

199
app/services/csv_import.py Normal file
View file

@ -0,0 +1,199 @@
"""Defensive parser for Trading 212 pie-export CSVs.
T212 has changed column order between exports historically; matching on header
NAME rather than column index makes this robust. We also explicitly skip the
'Total' aggregate row (it has slice='Total' and quantity='-').
Pure function no DB, no HTTP. Persisting into PortfolioSnapshot/Position is
done by the upload endpoint after mapping each Slice to a Yahoo ticker via the
InstrumentMap service.
"""
from __future__ import annotations
import csv
import io
from dataclasses import dataclass
class CSVImportError(ValueError):
"""Raised when the CSV is unparseable or missing required columns."""
# Header name -> normalised key used in the parsed dict. Lowercase, ignore
# leading/trailing whitespace, treat case-insensitively. Extra columns are
# silently ignored.
_HEADER_MAP = {
"slice": "slice",
"name": "name",
"invested value": "invested_value",
"value": "current_value",
"result": "result",
"owned quantity": "quantity",
"dividends gained": "dividends_gained",
"dividends cash": "dividends_cash",
"dividends reinvested": "dividends_reinvested",
}
# These must be present for the import to be meaningful at all.
_REQUIRED_FIELDS = ("slice", "quantity")
@dataclass(frozen=True)
class ParsedPosition:
slice: str # T212 shortcode, e.g. "SGLN"
name: str
invested_value: float | None
current_value: float | None
result: float | None # P/L in pie currency
quantity: float
dividends_gained: float | None = None
dividends_cash: float | None = None
dividends_reinvested: float | None = None
@property
def average_price(self) -> float | None:
if self.invested_value is None or not self.quantity:
return None
return self.invested_value / self.quantity
@property
def current_price(self) -> float | None:
if self.current_value is None or not self.quantity:
return None
return self.current_value / self.quantity
@dataclass(frozen=True)
class ParsedPie:
name: str | None # from the Total row's Name column
positions: tuple[ParsedPosition, ...]
invested: float | None # totals from the Total row
value: float | None
result: float | None
def _normalise_header(h: str) -> str:
return h.strip().lower()
def _parse_num(raw: str | None) -> float | None:
"""Empty / 'N/A' / '-' / '' → None. Otherwise float."""
if raw is None:
return None
s = raw.strip()
if not s or s in {"-", "", "N/A", "n/a", "NA"}:
return None
# T212 occasionally exports with thousand-comma. Strip safely.
s = s.replace(",", "")
try:
return float(s)
except ValueError:
return None
def parse_t212_csv(content: str | bytes) -> ParsedPie:
"""Parse a T212 pie-export CSV.
Args:
content: bytes or str containing the CSV (raw export file contents).
Returns:
ParsedPie with positions list and aggregate totals.
Raises:
CSVImportError: if the file is empty, missing required headers,
or contains no usable rows.
"""
if isinstance(content, bytes):
try:
content = content.decode("utf-8-sig") # handle Excel BOM
except UnicodeDecodeError:
content = content.decode("latin-1")
reader = csv.reader(io.StringIO(content))
try:
header_row = next(reader)
except StopIteration:
raise CSVImportError("Empty CSV file")
# Map column index -> normalised field name. Unknown headers are ignored.
field_by_index: dict[int, str] = {}
for i, h in enumerate(header_row):
key = _HEADER_MAP.get(_normalise_header(h))
if key:
field_by_index[i] = key
missing = [f for f in _REQUIRED_FIELDS if f not in field_by_index.values()]
if missing:
raise CSVImportError(
f"CSV missing required column(s): {', '.join(missing)}. "
f"Found headers: {header_row}"
)
positions: list[ParsedPosition] = []
total: ParsedPosition | None = None
pie_name: str | None = None
for row_num, row in enumerate(reader, start=2):
if not row or not any(cell.strip() for cell in row):
continue # skip blank lines
record: dict[str, object] = {}
for idx, field in field_by_index.items():
raw = row[idx] if idx < len(row) else ""
if field in {"slice", "name"}:
record[field] = raw.strip()
else:
record[field] = _parse_num(raw)
slice_code = record.get("slice") or ""
if not slice_code:
continue # malformed; skip silently rather than abort
# The 'Total' row uses slice='Total' and quantity='-' — capture it
# for aggregate totals but don't list it as a position.
if slice_code.lower() == "total":
pie_name = (record.get("name") or "").strip() or None
total = ParsedPosition(
slice=slice_code,
name=pie_name or "Total",
invested_value=record.get("invested_value"),
current_value=record.get("current_value"),
result=record.get("result"),
quantity=0.0,
dividends_gained=record.get("dividends_gained"),
dividends_cash=record.get("dividends_cash"),
dividends_reinvested=record.get("dividends_reinvested"),
)
continue
qty = record.get("quantity")
if qty is None or qty == 0:
# Position row with no usable quantity — skip rather than fail.
continue
positions.append(ParsedPosition(
slice=slice_code,
name=(record.get("name") or "").strip(),
invested_value=record.get("invested_value"),
current_value=record.get("current_value"),
result=record.get("result"),
quantity=qty,
dividends_gained=record.get("dividends_gained"),
dividends_cash=record.get("dividends_cash"),
dividends_reinvested=record.get("dividends_reinvested"),
))
if not positions:
raise CSVImportError(
"CSV contained no parseable position rows. "
"Expected at least one row with a Slice code and quantity."
)
return ParsedPie(
name=pie_name,
positions=tuple(positions),
invested=total.invested_value if total else None,
value=total.current_value if total else None,
result=total.result if total else None,
)

View file

@ -0,0 +1,251 @@
"""T212 shortcode → Yahoo Finance ticker resolution.
The CSV path gives us only T212's `Slice` (short name like "SGLN", "TTE").
To refresh prices via Yahoo we need the proper Yahoo symbol (`SGLN.L`,
`TTE.PA`, ). The mapping comes from T212's own catalogue
(/equity/metadata/instruments), synced into the `instrument_map` table
via the admin's read-only API key. The resolver then picks the right
listing per user using currency preference.
This module has three responsibilities:
1. **Pure translation** turn a T212 ticker like `SGLNl_EQ` into a Yahoo
symbol like `SGLN.L` from suffix rules. No DB, no HTTP.
2. **Catalogue sync** pull every T212 instrument and upsert into
`instrument_map`. Hand-edited rows (`manual=True`) are never overwritten.
3. **Slice resolution** given a CSV `Slice` like "SHEL", find the best
matching `instrument_map` row using configurable currency preference.
"""
from __future__ import annotations
from dataclasses import dataclass
from datetime import timedelta
import httpx
from sqlalchemy import select
from sqlalchemy.dialects.mysql import insert as mysql_insert
from sqlalchemy.ext.asyncio import AsyncSession
from app.db import utcnow
from app.models import InstrumentMap
from app.services.trading212 import Trading212
# --- Pure translation: T212 ticker → Yahoo symbol ---------------------------
# Single trailing letter before "_EQ" → exchange suffix.
# These conventions come from observing T212's instrument catalogue.
_T212_LETTER_TO_YAHOO = {
"l": ".L", # London (LSE)
"a": ".AS", # Amsterdam (Euronext)
"p": ".PA", # Paris (Euronext)
"d": ".DE", # Frankfurt (Xetra)
"m": ".MI", # Milan (Borsa Italiana)
"s": ".SW", # Swiss (SIX)
"b": ".BR", # Brussels (Euronext)
"i": ".IR", # Ireland (Euronext Dublin)
"h": ".HE", # Helsinki (Nasdaq Nordic)
"c": ".CO", # Copenhagen
"o": ".OL", # Oslo
}
# Country-code suffix (e.g. _US_EQ, _CA_EQ) → Yahoo suffix.
_T212_COUNTRY_TO_YAHOO = {
"US": "", # NYSE/NASDAQ — Yahoo bare ticker
"CA": ".TO", # Toronto
"DE": ".DE",
"FR": ".PA",
"GB": ".L",
"IT": ".MI",
"ES": ".MC",
"NL": ".AS",
"BE": ".BR",
"IE": ".IR",
"FI": ".HE",
"CH": ".SW",
"NO": ".OL",
"DK": ".CO",
"SE": ".ST",
}
def t212_ticker_to_yahoo(t212_ticker: str, short_name: str) -> str | None:
"""Translate a T212 ticker like 'SGLNl_EQ' or 'EQNR_US_EQ' to its
Yahoo Finance symbol. Returns None when the pattern isn't recognised.
Rules, in order:
'XXXX_<CC>_EQ' (country code) short_name + country suffix
e.g. EQNR_US_EQ EQNR
SHEL_US_EQ SHEL
'XXXX<x>_EQ' (single trailing lowercase letter) short_name + suffix
e.g. SGLNl_EQ SGLN.L
SHELLa_EQ SHELL.AS
FPp_EQ FP.PA
"""
if not t212_ticker.endswith("_EQ"):
return None
body = t212_ticker[:-3] # strip "_EQ"
# Country-code form: '..._XX' where XX is a 2-letter country code
if len(body) >= 3 and body[-3] == "_" and body[-2:].isupper():
cc = body[-2:]
suffix = _T212_COUNTRY_TO_YAHOO.get(cc)
if suffix is None:
return None
return f"{short_name}{suffix}"
# Single-letter form: '...x' where x is a recognised exchange letter
if body and body[-1] in _T212_LETTER_TO_YAHOO:
return f"{short_name}{_T212_LETTER_TO_YAHOO[body[-1]]}"
return None
# --- Catalogue sync ---------------------------------------------------------
@dataclass
class SyncResult:
fetched: int
upserted: int
unmappable: int
skipped_manual: int
async def sync_from_t212(
session: AsyncSession,
client: httpx.AsyncClient,
t212: Trading212 | None = None,
) -> SyncResult:
"""Pull every T212 instrument and upsert into instrument_map. Hand-
edited rows (manual=True) are never overwritten. Runs idempotently."""
t212 = t212 or Trading212()
instruments = await t212.instruments(client) or []
# Pre-fetch existing manual mappings so we know which t212_tickers
# to skip on upsert.
manual_tickers = {
r.t212_ticker for r in (await session.execute(
select(InstrumentMap).where(InstrumentMap.manual == True)
)).scalars().all()
}
now = utcnow()
rows = []
unmappable = 0
skipped_manual = 0
for inst in instruments:
tkr = inst.get("ticker")
sn = inst.get("shortName") or ""
name = inst.get("name") or sn or tkr
if not tkr:
continue
if tkr in manual_tickers:
skipped_manual += 1
continue
yahoo = t212_ticker_to_yahoo(tkr, sn)
if yahoo is None:
unmappable += 1
rows.append({
"t212_ticker": tkr,
"t212_shortname": sn,
"yahoo_ticker": yahoo,
"name": (name or sn)[:128],
"currency": inst.get("currencyCode"),
"isin": inst.get("isin"),
"instrument_type": inst.get("type"),
"manual": False,
"last_verified_at": now,
})
if not rows:
return SyncResult(fetched=len(instruments), upserted=0,
unmappable=unmappable, skipped_manual=skipped_manual)
# Bulk upsert. MySQL: ON DUPLICATE KEY UPDATE on t212_ticker unique key.
# Chunk to avoid hitting MySQL's max_allowed_packet on 17k+ rows.
chunk = 500
upserted = 0
for i in range(0, len(rows), chunk):
stmt = mysql_insert(InstrumentMap).values(rows[i:i + chunk])
stmt = stmt.on_duplicate_key_update(
yahoo_ticker=stmt.inserted.yahoo_ticker,
t212_shortname=stmt.inserted.t212_shortname,
name=stmt.inserted.name,
currency=stmt.inserted.currency,
isin=stmt.inserted.isin,
instrument_type=stmt.inserted.instrument_type,
last_verified_at=stmt.inserted.last_verified_at,
)
await session.execute(stmt)
upserted += len(rows[i:i + chunk])
await session.commit()
return SyncResult(
fetched=len(instruments), upserted=upserted,
unmappable=unmappable, skipped_manual=skipped_manual,
)
# --- Resolution: CSV Slice → preferred Yahoo ticker -------------------------
# Currency preference for users in a UK account. Listings denominated in the
# user's account currency or its smaller-unit (GBX = pence) come first. EUR
# ranks ABOVE USD because UK retail brokers (incl. T212) typically default
# users to the London + Euronext listings; the NYSE dual-listing only wins
# when no European listing exists (e.g. EQNR isn't on T212's Oslo book).
_DEFAULT_CCY_PREFERENCE = ("GBX", "GBP", "EUR", "USD", "CHF", "JPY")
@dataclass
class ResolvedInstrument:
t212_ticker: str
yahoo_ticker: str | None
name: str
currency: str | None
isin: str | None
async def resolve_slice(
session: AsyncSession,
slice_code: str,
currency_preference: tuple[str, ...] = _DEFAULT_CCY_PREFERENCE,
) -> ResolvedInstrument | None:
"""Find the best Yahoo ticker for a given CSV Slice.
Picks the listing whose currency comes first in `currency_preference`.
Manual mappings always win over auto-resolved ones."""
if not slice_code:
return None
rows = (await session.execute(
select(InstrumentMap)
.where(InstrumentMap.t212_shortname == slice_code)
)).scalars().all()
if not rows:
return None
def rank(row: InstrumentMap) -> tuple[int, int]:
manual_rank = 0 if row.manual else 1
try:
ccy_rank = currency_preference.index(row.currency or "")
except ValueError:
ccy_rank = len(currency_preference)
return (manual_rank, ccy_rank)
rows.sort(key=rank)
chosen = rows[0]
return ResolvedInstrument(
t212_ticker=chosen.t212_ticker,
yahoo_ticker=chosen.yahoo_ticker,
name=chosen.name,
currency=chosen.currency,
isin=chosen.isin,
)
def is_stale(row: InstrumentMap, max_age: timedelta = timedelta(days=14)) -> bool:
"""True if the row hasn't been refreshed from T212 recently."""
age = utcnow() - row.last_verified_at
return age > max_age