"""Defensive parser for Trading 212 pie-export CSVs + writer that persists the parsed pie into PortfolioSnapshot/Position rows. The parser is pure: no DB, no HTTP, no I/O. The writer (`persist_pie`) takes a ParsedPie and resolves each position's Slice via InstrumentMap to find its Yahoo ticker + canonical name before persisting. """ from __future__ import annotations import csv import io from dataclasses import dataclass from typing import TYPE_CHECKING if TYPE_CHECKING: from sqlalchemy.ext.asyncio import AsyncSession class CSVImportError(ValueError): """Raised when the CSV is unparseable or missing required columns.""" # Header name -> normalised key used in the parsed dict. Lowercase, ignore # leading/trailing whitespace, treat case-insensitively. Extra columns are # silently ignored. _HEADER_MAP = { "slice": "slice", "name": "name", "invested value": "invested_value", "value": "current_value", "result": "result", "owned quantity": "quantity", "dividends gained": "dividends_gained", "dividends cash": "dividends_cash", "dividends reinvested": "dividends_reinvested", } # These must be present for the import to be meaningful at all. _REQUIRED_FIELDS = ("slice", "quantity") @dataclass(frozen=True) class ParsedPosition: slice: str # T212 shortcode, e.g. "SGLN" name: str invested_value: float | None current_value: float | None result: float | None # P/L in pie currency quantity: float dividends_gained: float | None = None dividends_cash: float | None = None dividends_reinvested: float | None = None @property def average_price(self) -> float | None: if self.invested_value is None or not self.quantity: return None return self.invested_value / self.quantity @property def current_price(self) -> float | None: if self.current_value is None or not self.quantity: return None return self.current_value / self.quantity @dataclass(frozen=True) class ParsedPie: name: str | None # from the Total row's Name column positions: tuple[ParsedPosition, ...] invested: float | None # totals from the Total row value: float | None result: float | None def _normalise_header(h: str) -> str: return h.strip().lower() def _parse_num(raw: str | None) -> float | None: """Empty / 'N/A' / '-' / '—' → None. Otherwise float.""" if raw is None: return None s = raw.strip() if not s or s in {"-", "—", "N/A", "n/a", "NA"}: return None # T212 occasionally exports with thousand-comma. Strip safely. s = s.replace(",", "") try: return float(s) except ValueError: return None def parse_t212_csv(content: str | bytes) -> ParsedPie: """Parse a T212 pie-export CSV. Args: content: bytes or str containing the CSV (raw export file contents). Returns: ParsedPie with positions list and aggregate totals. Raises: CSVImportError: if the file is empty, missing required headers, or contains no usable rows. """ if isinstance(content, bytes): try: content = content.decode("utf-8-sig") # handle Excel BOM except UnicodeDecodeError: content = content.decode("latin-1") reader = csv.reader(io.StringIO(content)) try: header_row = next(reader) except StopIteration: raise CSVImportError("Empty CSV file") # Map column index -> normalised field name. Unknown headers are ignored. field_by_index: dict[int, str] = {} for i, h in enumerate(header_row): key = _HEADER_MAP.get(_normalise_header(h)) if key: field_by_index[i] = key missing = [f for f in _REQUIRED_FIELDS if f not in field_by_index.values()] if missing: raise CSVImportError( f"CSV missing required column(s): {', '.join(missing)}. " f"Found headers: {header_row}" ) positions: list[ParsedPosition] = [] total: ParsedPosition | None = None pie_name: str | None = None for row_num, row in enumerate(reader, start=2): if not row or not any(cell.strip() for cell in row): continue # skip blank lines record: dict[str, object] = {} for idx, field in field_by_index.items(): raw = row[idx] if idx < len(row) else "" if field in {"slice", "name"}: record[field] = raw.strip() else: record[field] = _parse_num(raw) slice_code = record.get("slice") or "" if not slice_code: continue # malformed; skip silently rather than abort # The 'Total' row uses slice='Total' and quantity='-' — capture it # for aggregate totals but don't list it as a position. if slice_code.lower() == "total": pie_name = (record.get("name") or "").strip() or None total = ParsedPosition( slice=slice_code, name=pie_name or "Total", invested_value=record.get("invested_value"), current_value=record.get("current_value"), result=record.get("result"), quantity=0.0, dividends_gained=record.get("dividends_gained"), dividends_cash=record.get("dividends_cash"), dividends_reinvested=record.get("dividends_reinvested"), ) continue qty = record.get("quantity") if qty is None or qty == 0: # Position row with no usable quantity — skip rather than fail. continue positions.append(ParsedPosition( slice=slice_code, name=(record.get("name") or "").strip(), invested_value=record.get("invested_value"), current_value=record.get("current_value"), result=record.get("result"), quantity=qty, dividends_gained=record.get("dividends_gained"), dividends_cash=record.get("dividends_cash"), dividends_reinvested=record.get("dividends_reinvested"), )) if not positions: raise CSVImportError( "CSV contained no parseable position rows. " "Expected at least one row with a Slice code and quantity." ) return ParsedPie( name=pie_name, positions=tuple(positions), invested=total.invested_value if total else None, value=total.current_value if total else None, result=total.result if total else None, ) # --- Persist parsed pie into portfolio/snapshot/positions ------------------- @dataclass class PersistResult: portfolio_id: int snapshot_id: int positions_written: int unmapped_slices: list[str] # slices we couldn't resolve to a Yahoo ticker portfolio_name: str is_new_portfolio: bool async def persist_pie( session: "AsyncSession", pie: ParsedPie, *, portfolio_name: str | None = None, source: str = "t212-csv", currency: str = "GBP", ) -> PersistResult: """Write a ParsedPie into Portfolio/PortfolioSnapshot/Position. - Portfolio is created on first sight of a given name; subsequent uploads stack as new snapshots under the same portfolio. - Each position's Slice is resolved to a T212 ticker + name via the InstrumentMap. Unmapped slices still get stored using their raw CSV values; we collect them in `unmapped_slices` for the UI to surface. """ # Late imports keep this module dependency-light for unit tests. from sqlalchemy import select from app.db import utcnow from app.models import Portfolio, PortfolioSnapshot, Position from app.services.instrument_map import resolve_slice name = portfolio_name or pie.name or "Imported pie" name = name.strip()[:64] portfolio = (await session.execute( select(Portfolio).where(Portfolio.name == name) )).scalar_one_or_none() is_new = portfolio is None if portfolio is None: portfolio = Portfolio(name=name, source=source, currency=currency) session.add(portfolio) await session.flush() snap = PortfolioSnapshot( portfolio_id=portfolio.id, snapshot_at=utcnow(), total_value=pie.value, cash=None, invested=pie.invested, raw_json={ "source": source, "pie_name": pie.name, "result": pie.result, }, ) session.add(snap) await session.flush() unmapped: list[str] = [] for p in pie.positions: resolved = await resolve_slice(session, p.slice) if resolved and resolved.t212_ticker: ticker = resolved.t212_ticker position_name = resolved.name or p.name else: ticker = p.slice position_name = p.name unmapped.append(p.slice) session.add(Position( snapshot_id=snap.id, ticker=ticker, name=position_name[:128] if position_name else None, quantity=p.quantity, average_price=p.average_price, current_price=p.current_price, ppl=p.result, )) await session.commit() return PersistResult( portfolio_id=portfolio.id, snapshot_id=snap.id, positions_written=len(pie.positions), unmapped_slices=unmapped, portfolio_name=name, is_new_portfolio=is_new, )