Three new data sources hooked into the existing SOURCES registry. All
open APIs, no keys:
- EUROSTAT: prefix EUROSTAT:dataset?dim=val&... — current EU bond
yields (Bund/OAT/BTP/EZ) and Eurozone economic indicators that
FRED's OECD-mirror series stopped updating in 2022-2023.
- ONS: prefix ONS:topic/cdid/dataset — current UK CPI, unemployment,
GDP, industrial production. Replaces the 5+ month-stale FRED
LRHUTTTTGBM156S mirror.
New indicator groups in default.toml feed the strategic/fundamental
lens we converged on: valuation (CAPE/Buffett anchors), bubble_watch
(SKEW/VVIX/RSP vs SPY/HYG vs TLT/IPO/crypto), economy (multi-region,
ALL current-or-stale-flagged), bonds (UK/EU/US/JPN sovereign yields).
Indicator panel now opens with an AI "read" interpretation per group
(generated hourly at :07 UTC alongside an aggregate cross-group read
shown in the dashboard header). The aggregate is grounded by a markets
strip — NYSE/LSE/Frankfurt/Tokyo/HK/Shanghai with open/closed LEDs and
next-open countdown, computed locally from each exchange's tz.
Other UX bits: indicator-row tooltips populated from TOML notes;
rows whose last observation is >90 days old get a 'stale' chip;
ghost symbols (in DB but no longer in TOML) filtered out of the
panel; Eurostat/ONS symbols display as short codes rather than the
full API path.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
237 lines
8.7 KiB
Python
237 lines
8.7 KiB
Python
"""Hourly per-group indicator summaries — a short AI read at the top of each
|
||
Indicators tab. Costs ~$0.0003 per call on DeepSeek V4 Flash, so 10+ groups
|
||
hourly stays comfortably under the monthly cap."""
|
||
from __future__ import annotations
|
||
|
||
import asyncio
|
||
import re
|
||
from collections import defaultdict
|
||
|
||
import httpx
|
||
from sqlalchemy import desc, func, select
|
||
|
||
from app.config import get_settings, load_groups
|
||
from app.db import utcnow
|
||
from app.jobs._helpers import job_lifecycle, log
|
||
from app.models import AICall, IndicatorSummary, Quote
|
||
from app.services.openrouter import (
|
||
PROMPT_VERSION,
|
||
build_aggregate_summary_system_prompt,
|
||
build_aggregate_summary_user_prompt,
|
||
build_summary_system_prompt,
|
||
build_summary_user_prompt,
|
||
call_openrouter,
|
||
month_start,
|
||
)
|
||
|
||
|
||
AGGREGATE_GROUP_NAME = "__all__"
|
||
|
||
|
||
# Strip known meta-commentary openers the model sometimes leaks despite the
|
||
# prompt's hard constraints. Each pattern matches one leading sentence.
|
||
_LEAK_PATTERNS = [
|
||
re.compile(p, re.IGNORECASE | re.DOTALL)
|
||
for p in (
|
||
# First-person meta — "I need to / I'll / I have to / I'm going to ..."
|
||
r"^i\s+(?:need|have|must|should|am going|'ll|will|shall|can|am)[^.]*\.\s*",
|
||
# "We need / we're / we are asked / we will ..."
|
||
r"^we\s+(?:need|are|'re|will|shall|can|should|must|have)[^.]*\.\s*",
|
||
r"^let\s+(?:me|us|'?s)[^.]*\.\s*",
|
||
r"^here['’]s[^.]*\.\s*",
|
||
r"^sure[,!]?\s[^.]*\.\s*",
|
||
r"^looking at[^.]*\.\s*",
|
||
r"^based on[^.]*\.\s*",
|
||
r"^to (?:address|answer|write|summarise|summarize)[^.]*\.\s*",
|
||
r"^first[,]?\s[^.]*\.\s*",
|
||
r"^the (?:user|data shows|reader|task|request)[^.]*\.\s*",
|
||
r"^summary[:.]\s*",
|
||
r"^okay[,]?\s+",
|
||
r"^alright[,]?\s+",
|
||
r"^thinking[^.]*\.\s*",
|
||
)
|
||
]
|
||
|
||
|
||
def clean_summary(text: str) -> str:
|
||
"""Strip leading meta-commentary. If cleaning removes nearly everything
|
||
(suggesting the model emitted reasoning then ran out of tokens), fall
|
||
back to the last non-empty paragraph of the raw output — that's usually
|
||
where the actual answer ended up."""
|
||
raw = text.strip()
|
||
out = raw
|
||
for _ in range(2):
|
||
before = out
|
||
for pat in _LEAK_PATTERNS:
|
||
out = pat.sub("", out, count=1).lstrip()
|
||
if out == before:
|
||
break
|
||
if len(out) < 60 and len(raw) > 120:
|
||
# Cleaning ate too much; take the last non-empty paragraph of raw.
|
||
paragraphs = [p.strip() for p in re.split(r"\n\s*\n", raw) if p.strip()]
|
||
if paragraphs:
|
||
out = paragraphs[-1]
|
||
return out
|
||
|
||
|
||
async def _latest_quotes_by_group(session) -> dict[str, list[dict]]:
|
||
"""Latest non-null quote per (group, symbol). Drops error rows."""
|
||
sub = (
|
||
select(Quote.group_name, Quote.symbol,
|
||
func.max(Quote.fetched_at).label("mx"))
|
||
.group_by(Quote.group_name, Quote.symbol)
|
||
.subquery()
|
||
)
|
||
rows = (await session.execute(
|
||
select(Quote).join(
|
||
sub,
|
||
(Quote.group_name == sub.c.group_name)
|
||
& (Quote.symbol == sub.c.symbol)
|
||
& (Quote.fetched_at == sub.c.mx),
|
||
).where(Quote.price.is_not(None))
|
||
.order_by(Quote.group_name, Quote.symbol)
|
||
)).scalars().all()
|
||
by_group: dict[str, list[dict]] = defaultdict(list)
|
||
for q in rows:
|
||
by_group[q.group_name].append({
|
||
"symbol": q.symbol, "label": q.label,
|
||
"price": q.price, "currency": q.currency,
|
||
"as_of": q.as_of, "changes": q.changes,
|
||
})
|
||
return by_group
|
||
|
||
|
||
async def _month_spend(session) -> float:
|
||
total = (await session.execute(
|
||
select(func.coalesce(func.sum(AICall.cost_usd), 0.0))
|
||
.where(AICall.called_at >= month_start())
|
||
)).scalar()
|
||
return float(total or 0.0)
|
||
|
||
|
||
async def _generate_one(
|
||
session, client: httpx.AsyncClient, group: str, quotes: list[dict],
|
||
system_prompt: str, model: str, tone: str, analysis: str,
|
||
) -> bool:
|
||
"""Generate + persist one group's summary. Returns True on success."""
|
||
user_prompt = build_summary_user_prompt(group, quotes)
|
||
try:
|
||
result = await call_openrouter(
|
||
client,
|
||
[{"role": "system", "content": system_prompt},
|
||
{"role": "user", "content": user_prompt}],
|
||
model=model,
|
||
max_tokens=800, # DeepSeek sometimes spends 300+ on internal reasoning
|
||
)
|
||
except Exception as e:
|
||
session.add(AICall(model=model, status="error", error=str(e)[:500]))
|
||
log.warning("ind_summary.failed", group=group, error=str(e)[:120])
|
||
return False
|
||
|
||
session.add(IndicatorSummary(
|
||
group_name=group,
|
||
generated_at=utcnow(),
|
||
model=result.model,
|
||
tone=tone,
|
||
analysis=analysis,
|
||
prompt_version=PROMPT_VERSION,
|
||
content=clean_summary(result.content),
|
||
prompt_tokens=result.prompt_tokens,
|
||
completion_tokens=result.completion_tokens,
|
||
cost_usd=result.cost_usd,
|
||
))
|
||
session.add(AICall(
|
||
model=result.model,
|
||
prompt_tokens=result.prompt_tokens,
|
||
completion_tokens=result.completion_tokens,
|
||
cost_usd=result.cost_usd,
|
||
status="ok",
|
||
))
|
||
return True
|
||
|
||
|
||
async def run() -> None:
|
||
async with job_lifecycle("indicator_summary_job") as (session, jr):
|
||
if jr.status == "skipped":
|
||
return
|
||
s = get_settings()
|
||
if not s.OPENROUTER_API_KEY:
|
||
jr.status = "skipped"
|
||
return
|
||
|
||
spent = await _month_spend(session)
|
||
if spent >= s.OPENROUTER_MONTHLY_CAP_USD:
|
||
jr.status = "skipped"
|
||
jr.error = f"monthly cap reached (${spent:.2f})"
|
||
return
|
||
|
||
groups = await _latest_quotes_by_group(session)
|
||
# Only summarise groups currently configured in TOML — drops stale
|
||
# group names (e.g. an old "pie" before T212 sourcing) that still have
|
||
# quotes in the table but no UI presence.
|
||
configured = set(load_groups(s.BASELINE_TOML, s.PORTFOLIO_TOML).keys())
|
||
groups = {g: q for g, q in groups.items() if g in configured}
|
||
if not groups:
|
||
jr.status = "skipped"
|
||
return
|
||
|
||
tone = s.CASSANDRA_TONE.upper()
|
||
analysis = s.CASSANDRA_ANALYSIS.upper()
|
||
system_prompt = build_summary_system_prompt(tone, analysis)
|
||
|
||
written = 0
|
||
async with httpx.AsyncClient(follow_redirects=True) as client:
|
||
# Sequential rather than parallel — OpenRouter free tiers can
|
||
# throttle bursts; total work is small (~12 calls × ~5s each).
|
||
for group, quotes in groups.items():
|
||
ok = await _generate_one(
|
||
session, client, group, quotes,
|
||
system_prompt, s.OPENROUTER_MODEL, tone, analysis,
|
||
)
|
||
if ok:
|
||
written += 1
|
||
await session.commit() # partial progress survives mid-job error
|
||
|
||
# One aggregate read across all groups, stored under __all__.
|
||
agg_system = build_aggregate_summary_system_prompt(tone, analysis)
|
||
agg_user = build_aggregate_summary_user_prompt(groups)
|
||
try:
|
||
result = await call_openrouter(
|
||
client,
|
||
[{"role": "system", "content": agg_system},
|
||
{"role": "user", "content": agg_user}],
|
||
model=s.OPENROUTER_MODEL,
|
||
max_tokens=1500, # room for reasoning + 80-word output
|
||
)
|
||
session.add(IndicatorSummary(
|
||
group_name=AGGREGATE_GROUP_NAME,
|
||
generated_at=utcnow(),
|
||
model=result.model,
|
||
tone=tone,
|
||
analysis=analysis,
|
||
prompt_version=PROMPT_VERSION,
|
||
content=clean_summary(result.content),
|
||
prompt_tokens=result.prompt_tokens,
|
||
completion_tokens=result.completion_tokens,
|
||
cost_usd=result.cost_usd,
|
||
))
|
||
session.add(AICall(
|
||
model=result.model,
|
||
prompt_tokens=result.prompt_tokens,
|
||
completion_tokens=result.completion_tokens,
|
||
cost_usd=result.cost_usd, status="ok",
|
||
))
|
||
written += 1
|
||
except Exception as e:
|
||
session.add(AICall(
|
||
model=s.OPENROUTER_MODEL, status="error", error=str(e)[:500],
|
||
))
|
||
log.warning("ind_summary.agg_failed", error=str(e)[:120])
|
||
await session.commit()
|
||
|
||
jr.items_written = written
|
||
log.info("ind_summary.done", groups=len(groups), written=written)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
asyncio.run(run())
|