read.markets/app/services/market.py
Giorgio Gilestro 4e7e4981e3 add ECB Data Portal source; group-aware stale thresholds
ECB Statistical Data Warehouse joins as a 5th data source — open API,
no key, daily euro-area yield curve data. Symbol format
'ECB:dataset/series_key', e.g. 'ECB:YC/B.U2.EUR.4F.G_N_A.SV_C_YM.SR_10Y'
for daily 10y AAA spot rate.

Bonds tab adds ECB EZ 10y AAA + 2y AAA so there's at least some
currently-fresh European sovereign data alongside the US Treasuries.
Country-specific yields (Bund/OAT/BTP/Gilt/JGB) remain on Eurostat/FRED
monthly mirrors — no free daily source exists for those.

Stale threshold is now per-group instead of a flat 90 days. Daily-tape
groups (bonds, rates, equity, etc.) flag stale after a week or three;
monthly groups (economy, macro, valuation) stay at 60-90 days. The
bonds tab will now correctly show 30-60 day-old country yields as
stale next to the daily US/ECB ones.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-15 23:13:58 +01:00

591 lines
20 KiB
Python

"""Market data fetchers — Yahoo Finance (no auth) and FRED (key required).
Ported from /home/gg/ownCloud/Family/Finances/Wealth/market_pulse.py.
Logic preserved verbatim where possible; HTTP switched to httpx.AsyncClient.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Callable
import httpx
from app.config import get_settings
YAHOO_CHART = "https://query1.finance.yahoo.com/v8/finance/chart/{symbol}"
FRED_API = "https://api.stlouisfed.org/fred/series/observations"
EUROSTAT_API = "https://ec.europa.eu/eurostat/api/dissemination/statistics/1.0/data/{dataset}"
ONS_API = "https://www.ons.gov.uk/{topic}/timeseries/{cdid}/{dataset}/data"
ECB_API = "https://data-api.ecb.europa.eu/service/data/{dataset}/{series_key}"
UA = {"User-Agent": "Mozilla/5.0 (cassandra) Python/httpx"}
# --- In-flight data shape (services return these; jobs persist to DB) --------
@dataclass
class Quote:
symbol: str
source: str
label: str
note: str
price: float | None
currency: str | None
as_of: str | None
changes: dict[str, float | None] = field(default_factory=dict)
price_base: float | None = None
currency_base: str | None = None
anchor_date: str | None = None
error: str | None = None
def _pct(old: float | None, new: float | None) -> float | None:
if old is None or new is None or old == 0:
return None
return (new - old) / old * 100.0
def _parse_date(s: str) -> datetime:
return datetime.strptime(s, "%Y-%m-%d")
def _yahoo_range_covering(anchor: str | None) -> str:
if not anchor:
return "1y"
days = (datetime.now(timezone.utc).date() - _parse_date(anchor).date()).days
if days <= 360:
return "1y"
if days <= 1800:
return "5y"
if days <= 3600:
return "10y"
return "max"
# --- Fetchers -----------------------------------------------------------------
async def fetch_yahoo(
client: httpx.AsyncClient,
symbol: str,
label: str,
note: str,
anchor: str | None = None,
) -> Quote:
"""Latest close + %1d / %1m / %1y / (optional) %anchor from Yahoo's chart endpoint."""
try:
r = await client.get(
YAHOO_CHART.format(symbol=symbol),
params={"interval": "1d", "range": _yahoo_range_covering(anchor),
"includePrePost": "false"},
headers=UA,
timeout=15,
)
r.raise_for_status()
result = r.json()["chart"]["result"]
if not result:
raise ValueError("empty result")
res = result[0]
meta = res["meta"]
price = meta.get("regularMarketPrice")
prev_session = meta.get("previousClose")
timestamps = res.get("timestamp") or []
closes = (res.get("indicators", {}).get("quote") or [{}])[0].get("close") or []
series = [(t, c) for t, c in zip(timestamps, closes) if c is not None]
if not series:
raise ValueError("no closes in series")
last_ts = series[-1][0]
if prev_session is None and len(series) >= 2:
prev_session = series[-2][1]
chg_1m = _pct(series[-22][1], price) if len(series) >= 22 else None
chg_1y = _pct(series[0][1], price) if len(series) >= 2 else None
changes: dict[str, float | None] = {
"1d": _pct(prev_session, price),
"1m": chg_1m,
"1y": chg_1y,
}
anchor_used: str | None = None
if anchor:
anchor_ts = int(_parse_date(anchor).replace(tzinfo=timezone.utc).timestamp())
anchor_close = next((c for t, c in series if t >= anchor_ts), None)
anchor_actual_ts = next((t for t, c in series if t >= anchor_ts), None)
changes["anchor"] = _pct(anchor_close, price)
if anchor_actual_ts:
anchor_used = datetime.fromtimestamp(
anchor_actual_ts, timezone.utc
).strftime("%Y-%m-%d")
return Quote(
symbol=symbol,
source="yahoo",
label=label,
note=note,
price=price,
currency=meta.get("currency"),
as_of=datetime.fromtimestamp(last_ts, timezone.utc).strftime("%Y-%m-%d"),
changes=changes,
anchor_date=anchor_used,
)
except Exception as e:
return Quote(symbol, "yahoo", label, note, None, None, None, error=str(e))
async def fetch_fred(
client: httpx.AsyncClient,
symbol: str,
label: str,
note: str,
anchor: str | None = None,
) -> Quote:
"""Latest value + 1d/1m/1y deltas from a FRED series. Requires FRED_API_KEY."""
api_key = get_settings().FRED_API_KEY
if not api_key:
return Quote(symbol, "fred", label, note, None, None, None,
error="FRED_API_KEY not set")
try:
r = await client.get(
FRED_API,
params={
"series_id": symbol,
"api_key": api_key,
"file_type": "json",
"sort_order": "desc",
"limit": 5000 if anchor else 800,
},
headers=UA,
timeout=20,
)
r.raise_for_status()
obs = r.json().get("observations", [])
data = [
(o["date"], float(o["value"]))
for o in obs if o.get("value") not in (".", "", None)
]
if not data:
raise ValueError("no observations")
last_date, last_val = data[0]
# CPI levels reported as YoY%.
if symbol in ("CPIAUCSL", "CPILFESL") and len(data) >= 13:
yoy = _pct(data[12][1], last_val)
changes: dict[str, float | None] = {"1d": None, "1m": None, "1y": None}
anchor_used: str | None = None
if anchor:
anchor_dt = _parse_date(anchor)
anchor_idx = next(
(i for i, (d, _) in enumerate(data) if _parse_date(d) <= anchor_dt),
None,
)
if anchor_idx is not None and anchor_idx + 12 < len(data):
yoy_then = _pct(data[anchor_idx + 12][1], data[anchor_idx][1])
changes["anchor"] = (yoy or 0) - (yoy_then or 0)
anchor_used = anchor
return Quote(symbol, "fred", label, note, yoy, "%", last_date,
changes=changes, anchor_date=anchor_used)
last_dt = _parse_date(last_date)
def find_back(min_days: int) -> float | None:
for d, v in data[1:]:
if (last_dt - _parse_date(d)).days >= min_days:
return v
return None
prev_val = data[1][1] if len(data) >= 2 else None
changes = {
"1d": _pct(prev_val, last_val),
"1m": _pct(find_back(28), last_val),
"1y": _pct(find_back(360), last_val),
}
anchor_used = None
if anchor:
anchor_dt = _parse_date(anchor)
anchor_obs = next(
((d, v) for d, v in data if _parse_date(d) <= anchor_dt), None
)
if anchor_obs:
changes["anchor"] = _pct(anchor_obs[1], last_val)
anchor_used = anchor_obs[0]
return Quote(
symbol=symbol, source="fred", label=label, note=note,
price=last_val, currency=None, as_of=last_date,
changes=changes, anchor_date=anchor_used,
)
except Exception as e:
return Quote(symbol, "fred", label, note, None, None, None, error=str(e))
# --- Eurostat (no API key needed) -------------------------------------------
def _eurostat_time_to_iso(t: str) -> str:
"""Convert Eurostat time codes into ISO-style dates so they sort and
compare correctly. Accepts YYYY-MM, YYYY-Qn, YYYY, and YYYY-MM-DD."""
t = t.strip()
if len(t) == 4 and t.isdigit(): # annual: "2026"
return f"{t}-01-01"
if len(t) == 6 and t[4] == "Q": # quarterly: "2026Q1"
q = int(t[5])
return f"{t[:4]}-{(q - 1) * 3 + 1:02d}-01"
if len(t) == 7 and t[4] == "-": # monthly: "2026-03"
return f"{t}-01"
if len(t) == 10: # daily: "2026-03-15"
return t
return t # fall through; caller may flag
async def fetch_eurostat(
client: httpx.AsyncClient,
symbol: str,
label: str,
note: str,
anchor: str | None = None,
) -> Quote:
"""Fetch a Eurostat time series. `symbol` format:
DATASET?dim1=val1&dim2=val2
e.g. 'irt_lt_mcby_m?geo=DE&int_rt=MCBY' for German 10y bond yield.
Eurostat's API is open (no key), uses JSON-stat 2.0."""
import urllib.parse
try:
if "?" in symbol:
dataset, query = symbol.split("?", 1)
params = dict(urllib.parse.parse_qsl(query))
else:
dataset, params = symbol, {}
params.setdefault("format", "JSON")
params.setdefault("lang", "EN")
r = await client.get(
EUROSTAT_API.format(dataset=dataset),
params=params, headers=UA, timeout=20,
)
r.raise_for_status()
data = r.json()
time_cat = data["dimension"]["time"]["category"]
# JSON-stat 2.0: {"index": {timecode: pos}, "label": {timecode: human}}
time_index = time_cat["index"]
values = data.get("value") or {}
# Build (iso_date, value) pairs, sorted ascending in time.
rows: list[tuple[str, float]] = []
for tcode, pos in sorted(time_index.items(), key=lambda kv: kv[1]):
raw = values.get(str(pos))
if raw is None:
continue
try:
rows.append((_eurostat_time_to_iso(tcode), float(raw)))
except (TypeError, ValueError):
continue
if not rows:
raise ValueError("no observations")
last_date, last_val = rows[-1]
def _find_back(min_days: int) -> float | None:
ref = datetime.strptime(last_date, "%Y-%m-%d").date()
for d, v in reversed(rows[:-1]):
if (ref - datetime.strptime(d, "%Y-%m-%d").date()).days >= min_days:
return v
return None
prev_val = rows[-2][1] if len(rows) >= 2 else None
changes = {
"1d": _pct(prev_val, last_val),
"1m": _pct(_find_back(28), last_val),
"1y": _pct(_find_back(360), last_val),
}
anchor_used: str | None = None
if anchor:
anchor_d = _parse_date(anchor).date()
for d, v in reversed(rows):
if datetime.strptime(d, "%Y-%m-%d").date() <= anchor_d:
changes["anchor"] = _pct(v, last_val)
anchor_used = d
break
return Quote(
symbol=symbol, source="eurostat", label=label, note=note,
price=last_val, currency=None, as_of=last_date,
changes=changes, anchor_date=anchor_used,
)
except Exception as e:
return Quote(symbol, "eurostat", label, note, None, None, None, error=str(e))
# --- UK ONS (Office for National Statistics, no API key needed) -------------
_ONS_MONTH = {
"JAN": 1, "FEB": 2, "MAR": 3, "APR": 4, "MAY": 5, "JUN": 6,
"JUL": 7, "AUG": 8, "SEP": 9, "OCT": 10, "NOV": 11, "DEC": 12,
}
def _ons_date_to_iso(s: str) -> str | None:
"""ONS date formats: monthly '2026 MAR', quarterly '2026 Q1', annual '2025'."""
s = s.strip().upper()
parts = s.split()
try:
if len(parts) == 1 and parts[0].isdigit():
return f"{parts[0]}-01-01"
if len(parts) == 2:
year = int(parts[0])
tag = parts[1]
if tag in _ONS_MONTH:
return f"{year:04d}-{_ONS_MONTH[tag]:02d}-01"
if tag.startswith("Q") and tag[1:].isdigit():
q = int(tag[1:])
return f"{year:04d}-{(q - 1) * 3 + 1:02d}-01"
except (ValueError, IndexError):
pass
return None
async def fetch_ons(
client: httpx.AsyncClient,
symbol: str,
label: str,
note: str,
anchor: str | None = None,
) -> Quote:
"""Fetch a UK ONS time series. `symbol` format:
<topic_path>/<cdid>/<dataset>
e.g. 'economy/inflationandpriceindices/d7g7/mm23' for UK CPI YoY.
ONS publishes via www.ons.gov.uk; no auth, JSON when Accept header set."""
try:
parts = symbol.split("/")
if len(parts) < 3:
raise ValueError("ONS symbol must be topic/cdid/dataset")
dataset = parts[-1]
cdid = parts[-2]
topic = "/".join(parts[:-2])
r = await client.get(
ONS_API.format(topic=topic, cdid=cdid, dataset=dataset),
headers={**UA, "Accept": "application/json"},
timeout=20,
)
r.raise_for_status()
data = r.json()
# Use the most granular series available: months > quarters > years.
for key in ("months", "quarters", "years"):
raw_seq = data.get(key) or []
if raw_seq:
break
if not raw_seq:
raise ValueError("no observations")
rows: list[tuple[str, float]] = []
for entry in raw_seq:
iso = _ons_date_to_iso(entry.get("date", ""))
v = entry.get("value")
if iso is None or v in (None, "", "."):
continue
try:
rows.append((iso, float(v)))
except (TypeError, ValueError):
continue
if not rows:
raise ValueError("no parseable observations")
last_date, last_val = rows[-1]
def _find_back(min_days: int) -> float | None:
ref = datetime.strptime(last_date, "%Y-%m-%d").date()
for d, v in reversed(rows[:-1]):
if (ref - datetime.strptime(d, "%Y-%m-%d").date()).days >= min_days:
return v
return None
prev_val = rows[-2][1] if len(rows) >= 2 else None
changes = {
"1d": _pct(prev_val, last_val),
"1m": _pct(_find_back(28), last_val),
"1y": _pct(_find_back(360), last_val),
}
anchor_used: str | None = None
if anchor:
anchor_d = _parse_date(anchor).date()
for d, v in reversed(rows):
if datetime.strptime(d, "%Y-%m-%d").date() <= anchor_d:
changes["anchor"] = _pct(v, last_val)
anchor_used = d
break
return Quote(
symbol=symbol, source="ons", label=label, note=note,
price=last_val, currency=None, as_of=last_date,
changes=changes, anchor_date=anchor_used,
)
except Exception as e:
return Quote(symbol, "ons", label, note, None, None, None, error=str(e))
# --- ECB Data Portal (no API key, daily euro-area data) ---------------------
async def fetch_ecb(
client: httpx.AsyncClient,
symbol: str,
label: str,
note: str,
anchor: str | None = None,
) -> Quote:
"""Fetch an ECB Statistical Data Warehouse series. `symbol` format:
<dataset>/<series_key>
e.g. 'YC/B.U2.EUR.4F.G_N_A.SV_C_YM.SR_10Y' for 10y EZ AAA yield (daily).
ECB returns SDMX-JSON; we extract the single observation series."""
try:
if "/" not in symbol:
raise ValueError("ECB symbol must be dataset/series_key")
dataset, series_key = symbol.split("/", 1)
r = await client.get(
ECB_API.format(dataset=dataset, series_key=series_key),
params={"format": "jsondata"},
headers={**UA, "Accept": "application/json"},
timeout=20,
)
r.raise_for_status()
data = r.json()
# SDMX-JSON: dataSets[0].series[key].observations is { "<idx>": [value, ...] }
# where <idx> indexes into structure.dimensions.observation[0].values for time.
ds = (data.get("dataSets") or [{}])[0]
series_map = ds.get("series") or {}
if not series_map:
raise ValueError("no series in response")
ser = next(iter(series_map.values()))
obs = ser.get("observations") or {}
times = [
v["id"]
for v in data["structure"]["dimensions"]["observation"][0]["values"]
]
rows: list[tuple[str, float]] = []
for idx_str in sorted(obs, key=int):
v = obs[idx_str][0]
if v is None:
continue
t = times[int(idx_str)]
iso = _eurostat_time_to_iso(t) # reuse: ECB time codes match Eurostat
try:
rows.append((iso, float(v)))
except (TypeError, ValueError):
continue
if not rows:
raise ValueError("no observations")
last_date, last_val = rows[-1]
def _find_back(min_days: int) -> float | None:
ref = datetime.strptime(last_date, "%Y-%m-%d").date()
for d, v in reversed(rows[:-1]):
if (ref - datetime.strptime(d, "%Y-%m-%d").date()).days >= min_days:
return v
return None
prev_val = rows[-2][1] if len(rows) >= 2 else None
changes = {
"1d": _pct(prev_val, last_val),
"1m": _pct(_find_back(28), last_val),
"1y": _pct(_find_back(360), last_val),
}
anchor_used: str | None = None
if anchor:
anchor_d = _parse_date(anchor).date()
for d, v in reversed(rows):
if datetime.strptime(d, "%Y-%m-%d").date() <= anchor_d:
changes["anchor"] = _pct(v, last_val)
anchor_used = d
break
return Quote(
symbol=symbol, source="ecb", label=label, note=note,
price=last_val, currency=None, as_of=last_date,
changes=changes, anchor_date=anchor_used,
)
except Exception as e:
return Quote(symbol, "ecb", label, note, None, None, None, error=str(e))
# --- Source registry ----------------------------------------------------------
FetcherFn = Callable[..., "Quote"]
SOURCES: dict[str, FetcherFn] = {
"yahoo": fetch_yahoo,
"FRED": fetch_fred,
"EUROSTAT": fetch_eurostat,
"ONS": fetch_ons,
"ECB": fetch_ecb,
}
def parse_symbol(symbol: str) -> tuple[FetcherFn, str]:
if ":" in symbol:
prefix, _, ident = symbol.partition(":")
if prefix in SOURCES:
return SOURCES[prefix], ident
return SOURCES["yahoo"], symbol
async def fetch(
client: httpx.AsyncClient,
symbol: str,
label: str,
note: str,
anchor: str | None = None,
) -> Quote:
fn, ident = parse_symbol(symbol)
return await fn(client, ident, label, note, anchor)
# --- Currency normalisation ---------------------------------------------------
async def _get_fx_rate(
client: httpx.AsyncClient,
from_ccy: str,
to_ccy: str,
cache: dict[tuple[str, str], float | None],
) -> float | None:
if from_ccy == to_ccy:
return 1.0
if from_ccy == "GBp": # LSE pence
gbp = await _get_fx_rate(client, "GBP", to_ccy, cache)
return None if gbp is None else 0.01 * gbp
key = (from_ccy, to_ccy)
if key in cache:
return cache[key]
pair = f"{from_ccy}{to_ccy}=X"
try:
r = await client.get(
YAHOO_CHART.format(symbol=pair),
params={"interval": "1d", "range": "5d"},
headers=UA, timeout=10,
)
r.raise_for_status()
rate = r.json()["chart"]["result"][0]["meta"].get("regularMarketPrice")
cache[key] = rate
return rate
except Exception:
cache[key] = None
return None
async def normalise_to_base(
client: httpx.AsyncClient, quotes: list[Quote], base: str
) -> None:
cache: dict[tuple[str, str], float | None] = {}
base = base.upper()
for q in quotes:
if q.price is None or not q.currency:
continue
rate = await _get_fx_rate(client, q.currency, base, cache)
if rate is None:
continue
q.price_base = q.price * rate
q.currency_base = base