"""Market data fetchers — Yahoo Finance (no auth) and FRED (key required). Ported from /home/gg/ownCloud/Family/Finances/Wealth/market_pulse.py. Logic preserved verbatim where possible; HTTP switched to httpx.AsyncClient. """ from __future__ import annotations from dataclasses import dataclass, field from datetime import datetime, timezone from typing import Callable import httpx from app.config import get_settings YAHOO_CHART = "https://query1.finance.yahoo.com/v8/finance/chart/{symbol}" FRED_API = "https://api.stlouisfed.org/fred/series/observations" EUROSTAT_API = "https://ec.europa.eu/eurostat/api/dissemination/statistics/1.0/data/{dataset}" ONS_API = "https://www.ons.gov.uk/{topic}/timeseries/{cdid}/{dataset}/data" ECB_API = "https://data-api.ecb.europa.eu/service/data/{dataset}/{series_key}" UA = {"User-Agent": "Mozilla/5.0 (cassandra) Python/httpx"} # --- In-flight data shape (services return these; jobs persist to DB) -------- @dataclass class Quote: symbol: str source: str label: str note: str price: float | None currency: str | None as_of: str | None changes: dict[str, float | None] = field(default_factory=dict) price_base: float | None = None currency_base: str | None = None anchor_date: str | None = None error: str | None = None def _pct(old: float | None, new: float | None) -> float | None: if old is None or new is None or old == 0: return None return (new - old) / old * 100.0 def _parse_date(s: str) -> datetime: return datetime.strptime(s, "%Y-%m-%d") def _yahoo_range_covering(anchor: str | None) -> str: if not anchor: return "1y" days = (datetime.now(timezone.utc).date() - _parse_date(anchor).date()).days if days <= 360: return "1y" if days <= 1800: return "5y" if days <= 3600: return "10y" return "max" # --- Fetchers ----------------------------------------------------------------- async def fetch_yahoo( client: httpx.AsyncClient, symbol: str, label: str, note: str, anchor: str | None = None, ) -> Quote: """Latest close + %1d / %1m / %1y / (optional) %anchor from Yahoo's chart endpoint.""" try: r = await client.get( YAHOO_CHART.format(symbol=symbol), params={"interval": "1d", "range": _yahoo_range_covering(anchor), "includePrePost": "false"}, headers=UA, timeout=15, ) r.raise_for_status() result = r.json()["chart"]["result"] if not result: raise ValueError("empty result") res = result[0] meta = res["meta"] price = meta.get("regularMarketPrice") prev_session = meta.get("previousClose") timestamps = res.get("timestamp") or [] closes = (res.get("indicators", {}).get("quote") or [{}])[0].get("close") or [] series = [(t, c) for t, c in zip(timestamps, closes) if c is not None] if not series: raise ValueError("no closes in series") last_ts = series[-1][0] if prev_session is None and len(series) >= 2: prev_session = series[-2][1] chg_1m = _pct(series[-22][1], price) if len(series) >= 22 else None chg_1y = _pct(series[0][1], price) if len(series) >= 2 else None changes: dict[str, float | None] = { "1d": _pct(prev_session, price), "1m": chg_1m, "1y": chg_1y, } anchor_used: str | None = None if anchor: anchor_ts = int(_parse_date(anchor).replace(tzinfo=timezone.utc).timestamp()) anchor_close = next((c for t, c in series if t >= anchor_ts), None) anchor_actual_ts = next((t for t, c in series if t >= anchor_ts), None) changes["anchor"] = _pct(anchor_close, price) if anchor_actual_ts: anchor_used = datetime.fromtimestamp( anchor_actual_ts, timezone.utc ).strftime("%Y-%m-%d") return Quote( symbol=symbol, source="yahoo", label=label, note=note, price=price, currency=meta.get("currency"), as_of=datetime.fromtimestamp(last_ts, timezone.utc).strftime("%Y-%m-%d"), changes=changes, anchor_date=anchor_used, ) except Exception as e: return Quote(symbol, "yahoo", label, note, None, None, None, error=str(e)) async def fetch_fred( client: httpx.AsyncClient, symbol: str, label: str, note: str, anchor: str | None = None, ) -> Quote: """Latest value + 1d/1m/1y deltas from a FRED series. Requires FRED_API_KEY.""" api_key = get_settings().FRED_API_KEY if not api_key: return Quote(symbol, "fred", label, note, None, None, None, error="FRED_API_KEY not set") try: r = await client.get( FRED_API, params={ "series_id": symbol, "api_key": api_key, "file_type": "json", "sort_order": "desc", "limit": 5000 if anchor else 800, }, headers=UA, timeout=20, ) r.raise_for_status() obs = r.json().get("observations", []) data = [ (o["date"], float(o["value"])) for o in obs if o.get("value") not in (".", "", None) ] if not data: raise ValueError("no observations") last_date, last_val = data[0] # CPI levels reported as YoY%. if symbol in ("CPIAUCSL", "CPILFESL") and len(data) >= 13: yoy = _pct(data[12][1], last_val) changes: dict[str, float | None] = {"1d": None, "1m": None, "1y": None} anchor_used: str | None = None if anchor: anchor_dt = _parse_date(anchor) anchor_idx = next( (i for i, (d, _) in enumerate(data) if _parse_date(d) <= anchor_dt), None, ) if anchor_idx is not None and anchor_idx + 12 < len(data): yoy_then = _pct(data[anchor_idx + 12][1], data[anchor_idx][1]) changes["anchor"] = (yoy or 0) - (yoy_then or 0) anchor_used = anchor return Quote(symbol, "fred", label, note, yoy, "%", last_date, changes=changes, anchor_date=anchor_used) last_dt = _parse_date(last_date) def find_back(min_days: int) -> float | None: for d, v in data[1:]: if (last_dt - _parse_date(d)).days >= min_days: return v return None prev_val = data[1][1] if len(data) >= 2 else None changes = { "1d": _pct(prev_val, last_val), "1m": _pct(find_back(28), last_val), "1y": _pct(find_back(360), last_val), } anchor_used = None if anchor: anchor_dt = _parse_date(anchor) anchor_obs = next( ((d, v) for d, v in data if _parse_date(d) <= anchor_dt), None ) if anchor_obs: changes["anchor"] = _pct(anchor_obs[1], last_val) anchor_used = anchor_obs[0] return Quote( symbol=symbol, source="fred", label=label, note=note, price=last_val, currency=None, as_of=last_date, changes=changes, anchor_date=anchor_used, ) except Exception as e: return Quote(symbol, "fred", label, note, None, None, None, error=str(e)) # --- Eurostat (no API key needed) ------------------------------------------- def _eurostat_time_to_iso(t: str) -> str: """Convert Eurostat time codes into ISO-style dates so they sort and compare correctly. Accepts YYYY-MM, YYYY-Qn, YYYY, and YYYY-MM-DD.""" t = t.strip() if len(t) == 4 and t.isdigit(): # annual: "2026" return f"{t}-01-01" if len(t) == 6 and t[4] == "Q": # quarterly: "2026Q1" q = int(t[5]) return f"{t[:4]}-{(q - 1) * 3 + 1:02d}-01" if len(t) == 7 and t[4] == "-": # monthly: "2026-03" return f"{t}-01" if len(t) == 10: # daily: "2026-03-15" return t return t # fall through; caller may flag async def fetch_eurostat( client: httpx.AsyncClient, symbol: str, label: str, note: str, anchor: str | None = None, ) -> Quote: """Fetch a Eurostat time series. `symbol` format: DATASET?dim1=val1&dim2=val2 e.g. 'irt_lt_mcby_m?geo=DE&int_rt=MCBY' for German 10y bond yield. Eurostat's API is open (no key), uses JSON-stat 2.0.""" import urllib.parse try: if "?" in symbol: dataset, query = symbol.split("?", 1) params = dict(urllib.parse.parse_qsl(query)) else: dataset, params = symbol, {} params.setdefault("format", "JSON") params.setdefault("lang", "EN") r = await client.get( EUROSTAT_API.format(dataset=dataset), params=params, headers=UA, timeout=20, ) r.raise_for_status() data = r.json() time_cat = data["dimension"]["time"]["category"] # JSON-stat 2.0: {"index": {timecode: pos}, "label": {timecode: human}} time_index = time_cat["index"] values = data.get("value") or {} # Build (iso_date, value) pairs, sorted ascending in time. rows: list[tuple[str, float]] = [] for tcode, pos in sorted(time_index.items(), key=lambda kv: kv[1]): raw = values.get(str(pos)) if raw is None: continue try: rows.append((_eurostat_time_to_iso(tcode), float(raw))) except (TypeError, ValueError): continue if not rows: raise ValueError("no observations") last_date, last_val = rows[-1] def _find_back(min_days: int) -> float | None: ref = datetime.strptime(last_date, "%Y-%m-%d").date() for d, v in reversed(rows[:-1]): if (ref - datetime.strptime(d, "%Y-%m-%d").date()).days >= min_days: return v return None prev_val = rows[-2][1] if len(rows) >= 2 else None changes = { "1d": _pct(prev_val, last_val), "1m": _pct(_find_back(28), last_val), "1y": _pct(_find_back(360), last_val), } anchor_used: str | None = None if anchor: anchor_d = _parse_date(anchor).date() for d, v in reversed(rows): if datetime.strptime(d, "%Y-%m-%d").date() <= anchor_d: changes["anchor"] = _pct(v, last_val) anchor_used = d break return Quote( symbol=symbol, source="eurostat", label=label, note=note, price=last_val, currency=None, as_of=last_date, changes=changes, anchor_date=anchor_used, ) except Exception as e: return Quote(symbol, "eurostat", label, note, None, None, None, error=str(e)) # --- UK ONS (Office for National Statistics, no API key needed) ------------- _ONS_MONTH = { "JAN": 1, "FEB": 2, "MAR": 3, "APR": 4, "MAY": 5, "JUN": 6, "JUL": 7, "AUG": 8, "SEP": 9, "OCT": 10, "NOV": 11, "DEC": 12, } def _ons_date_to_iso(s: str) -> str | None: """ONS date formats: monthly '2026 MAR', quarterly '2026 Q1', annual '2025'.""" s = s.strip().upper() parts = s.split() try: if len(parts) == 1 and parts[0].isdigit(): return f"{parts[0]}-01-01" if len(parts) == 2: year = int(parts[0]) tag = parts[1] if tag in _ONS_MONTH: return f"{year:04d}-{_ONS_MONTH[tag]:02d}-01" if tag.startswith("Q") and tag[1:].isdigit(): q = int(tag[1:]) return f"{year:04d}-{(q - 1) * 3 + 1:02d}-01" except (ValueError, IndexError): pass return None async def fetch_ons( client: httpx.AsyncClient, symbol: str, label: str, note: str, anchor: str | None = None, ) -> Quote: """Fetch a UK ONS time series. `symbol` format: // e.g. 'economy/inflationandpriceindices/d7g7/mm23' for UK CPI YoY. ONS publishes via www.ons.gov.uk; no auth, JSON when Accept header set.""" try: parts = symbol.split("/") if len(parts) < 3: raise ValueError("ONS symbol must be topic/cdid/dataset") dataset = parts[-1] cdid = parts[-2] topic = "/".join(parts[:-2]) r = await client.get( ONS_API.format(topic=topic, cdid=cdid, dataset=dataset), headers={**UA, "Accept": "application/json"}, timeout=20, ) r.raise_for_status() data = r.json() # Use the most granular series available: months > quarters > years. for key in ("months", "quarters", "years"): raw_seq = data.get(key) or [] if raw_seq: break if not raw_seq: raise ValueError("no observations") rows: list[tuple[str, float]] = [] for entry in raw_seq: iso = _ons_date_to_iso(entry.get("date", "")) v = entry.get("value") if iso is None or v in (None, "", "."): continue try: rows.append((iso, float(v))) except (TypeError, ValueError): continue if not rows: raise ValueError("no parseable observations") last_date, last_val = rows[-1] def _find_back(min_days: int) -> float | None: ref = datetime.strptime(last_date, "%Y-%m-%d").date() for d, v in reversed(rows[:-1]): if (ref - datetime.strptime(d, "%Y-%m-%d").date()).days >= min_days: return v return None prev_val = rows[-2][1] if len(rows) >= 2 else None changes = { "1d": _pct(prev_val, last_val), "1m": _pct(_find_back(28), last_val), "1y": _pct(_find_back(360), last_val), } anchor_used: str | None = None if anchor: anchor_d = _parse_date(anchor).date() for d, v in reversed(rows): if datetime.strptime(d, "%Y-%m-%d").date() <= anchor_d: changes["anchor"] = _pct(v, last_val) anchor_used = d break return Quote( symbol=symbol, source="ons", label=label, note=note, price=last_val, currency=None, as_of=last_date, changes=changes, anchor_date=anchor_used, ) except Exception as e: return Quote(symbol, "ons", label, note, None, None, None, error=str(e)) # --- ECB Data Portal (no API key, daily euro-area data) --------------------- async def fetch_ecb( client: httpx.AsyncClient, symbol: str, label: str, note: str, anchor: str | None = None, ) -> Quote: """Fetch an ECB Statistical Data Warehouse series. `symbol` format: / e.g. 'YC/B.U2.EUR.4F.G_N_A.SV_C_YM.SR_10Y' for 10y EZ AAA yield (daily). ECB returns SDMX-JSON; we extract the single observation series.""" try: if "/" not in symbol: raise ValueError("ECB symbol must be dataset/series_key") dataset, series_key = symbol.split("/", 1) r = await client.get( ECB_API.format(dataset=dataset, series_key=series_key), params={"format": "jsondata"}, headers={**UA, "Accept": "application/json"}, timeout=20, ) r.raise_for_status() data = r.json() # SDMX-JSON: dataSets[0].series[key].observations is { "": [value, ...] } # where indexes into structure.dimensions.observation[0].values for time. ds = (data.get("dataSets") or [{}])[0] series_map = ds.get("series") or {} if not series_map: raise ValueError("no series in response") ser = next(iter(series_map.values())) obs = ser.get("observations") or {} times = [ v["id"] for v in data["structure"]["dimensions"]["observation"][0]["values"] ] rows: list[tuple[str, float]] = [] for idx_str in sorted(obs, key=int): v = obs[idx_str][0] if v is None: continue t = times[int(idx_str)] iso = _eurostat_time_to_iso(t) # reuse: ECB time codes match Eurostat try: rows.append((iso, float(v))) except (TypeError, ValueError): continue if not rows: raise ValueError("no observations") last_date, last_val = rows[-1] def _find_back(min_days: int) -> float | None: ref = datetime.strptime(last_date, "%Y-%m-%d").date() for d, v in reversed(rows[:-1]): if (ref - datetime.strptime(d, "%Y-%m-%d").date()).days >= min_days: return v return None prev_val = rows[-2][1] if len(rows) >= 2 else None changes = { "1d": _pct(prev_val, last_val), "1m": _pct(_find_back(28), last_val), "1y": _pct(_find_back(360), last_val), } anchor_used: str | None = None if anchor: anchor_d = _parse_date(anchor).date() for d, v in reversed(rows): if datetime.strptime(d, "%Y-%m-%d").date() <= anchor_d: changes["anchor"] = _pct(v, last_val) anchor_used = d break return Quote( symbol=symbol, source="ecb", label=label, note=note, price=last_val, currency=None, as_of=last_date, changes=changes, anchor_date=anchor_used, ) except Exception as e: return Quote(symbol, "ecb", label, note, None, None, None, error=str(e)) # --- Source registry ---------------------------------------------------------- FetcherFn = Callable[..., "Quote"] SOURCES: dict[str, FetcherFn] = { "yahoo": fetch_yahoo, "FRED": fetch_fred, "EUROSTAT": fetch_eurostat, "ONS": fetch_ons, "ECB": fetch_ecb, } def parse_symbol(symbol: str) -> tuple[FetcherFn, str]: if ":" in symbol: prefix, _, ident = symbol.partition(":") if prefix in SOURCES: return SOURCES[prefix], ident return SOURCES["yahoo"], symbol async def fetch( client: httpx.AsyncClient, symbol: str, label: str, note: str, anchor: str | None = None, ) -> Quote: fn, ident = parse_symbol(symbol) return await fn(client, ident, label, note, anchor) # --- Currency normalisation --------------------------------------------------- async def _get_fx_rate( client: httpx.AsyncClient, from_ccy: str, to_ccy: str, cache: dict[tuple[str, str], float | None], ) -> float | None: if from_ccy == to_ccy: return 1.0 if from_ccy == "GBp": # LSE pence gbp = await _get_fx_rate(client, "GBP", to_ccy, cache) return None if gbp is None else 0.01 * gbp key = (from_ccy, to_ccy) if key in cache: return cache[key] pair = f"{from_ccy}{to_ccy}=X" try: r = await client.get( YAHOO_CHART.format(symbol=pair), params={"interval": "1d", "range": "5d"}, headers=UA, timeout=10, ) r.raise_for_status() rate = r.json()["chart"]["result"][0]["meta"].get("regularMarketPrice") cache[key] = rate return rate except Exception: cache[key] = None return None async def normalise_to_base( client: httpx.AsyncClient, quotes: list[Quote], base: str ) -> None: cache: dict[tuple[str, str], float | None] = {} base = base.upper() for q in quotes: if q.price is None or not q.currency: continue rate = await _get_fx_rate(client, q.currency, base, cache) if rate is None: continue q.price_base = q.price * rate q.currency_base = base