initial commit — cassandra v0.1

Containerised macro-strategy dashboard: 4-panel web UI (indicators,
portfolio, flash news, AI strategic log), MariaDB store, hourly
ingestion jobs, OpenRouter-backed AI analysis.

Ports the four prototype scripts in the parent dir (market_pulse,
flash_news, trading212, strategic_log) into async services backed by a
persistent DB and served via FastAPI + Jinja2 + HTMX. APScheduler runs
as a separate compose service for crash-safety and easier restarts.

Portfolio composition + position names come live from Trading 212;
news per-ticker headlines reuse those names. Tone (NOVICE/INTERMEDIATE/
PRO) and analysis style (DRY/SPECULATIVE) are env-configurable and
stored on each log row so historical entries show what produced them.

Default model is deepseek/deepseek-v4-flash (overridable via env).
Light/dark theme toggle, sans-serif for prose surfaces, monospace for
data. Bearer-token auth, OpenRouter monthly cost cap, RSS feeds auto-
disabled on consecutive failures.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Giorgio Gilestro 2026-05-15 21:56:10 +01:00
commit a10409c02b
61 changed files with 4890 additions and 0 deletions

0
app/jobs/__init__.py Normal file
View file

58
app/jobs/_helpers.py Normal file
View file

@ -0,0 +1,58 @@
"""Shared job machinery: job_runs lifecycle, MariaDB advisory lock,
mock-mode short-circuits."""
from __future__ import annotations
from contextlib import asynccontextmanager
from typing import AsyncIterator
from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncSession
from app.db import get_session_factory, utcnow
from app.logging import get_logger
from app.models import JobRun
log = get_logger("jobs")
@asynccontextmanager
async def job_lifecycle(name: str) -> AsyncIterator[tuple[AsyncSession, JobRun]]:
"""Wraps a job invocation. Creates a JobRun row on entry; updates it on
exit. Failures are caught by the caller's try/except; this context only
handles the bookkeeping.
A MariaDB GET_LOCK(name, 0) is acquired to prevent concurrent runs of the
same job across processes. If the lock is busy, we skip the run."""
factory = get_session_factory()
async with factory() as session:
# Try lock; skip if held.
got = (await session.execute(
text("SELECT GET_LOCK(:n, 0)"), {"n": f"cassandra_{name}"}
)).scalar()
if not got:
log.warning("job.skipped_locked", name=name)
yield session, JobRun(name=name, started_at=utcnow(), status="skipped")
return
run = JobRun(name=name, started_at=utcnow(), status="running")
session.add(run)
await session.commit()
await session.refresh(run)
try:
yield session, run
run.status = run.status if run.status not in ("running",) else "success"
run.finished_at = utcnow()
await session.commit()
log.info("job.finished",
name=name, status=run.status, items=run.items_written)
except Exception as e:
run.status = "failed"
run.error = str(e)[:1000]
run.finished_at = utcnow()
await session.commit()
log.error("job.failed", name=name, error=str(e))
raise
finally:
await session.execute(text("SELECT RELEASE_LOCK(:n)"),
{"n": f"cassandra_{name}"})
await session.commit()

173
app/jobs/ai_log_job.py Normal file
View file

@ -0,0 +1,173 @@
"""Hourly AI strategic-log generator. Pulls already-persisted market data and
headlines from the DB (no live fetches), calls OpenRouter, persists the log
and a row in the cost ledger."""
from __future__ import annotations
import asyncio
from collections import defaultdict
from datetime import timedelta
import httpx
from sqlalchemy import desc, func, select
from app.config import get_settings
from app.db import utcnow
from app.jobs._helpers import job_lifecycle, log
from app.models import AICall, Headline, Quote, StrategicLog
from app.services.openrouter import (
PROMPT_VERSION,
build_system_prompt,
build_user_prompt,
call_openrouter,
month_start,
)
REFERENCE_LINE = (
"S&P 7,501 (ATH) · VIX 18.0 · US 10y 4.45% · HY OAS 279bps · "
"Brent $109/bbl · Gold $4,651/oz · CPI 3.8% YoY"
)
async def _latest_quotes_by_group(session) -> dict[str, list[dict]]:
"""Latest quote per (group, symbol). Skips error rows where price is null."""
sub = (
select(
Quote.group_name,
Quote.symbol,
func.max(Quote.fetched_at).label("mx"),
)
.group_by(Quote.group_name, Quote.symbol)
.subquery()
)
stmt = (
select(Quote)
.join(
sub,
(Quote.group_name == sub.c.group_name)
& (Quote.symbol == sub.c.symbol)
& (Quote.fetched_at == sub.c.mx),
)
.order_by(Quote.group_name, Quote.symbol)
)
rows = (await session.execute(stmt)).scalars().all()
by_group: dict[str, list[dict]] = defaultdict(list)
for q in rows:
by_group[q.group_name].append(dict(
symbol=q.symbol, source=q.source, label=q.label,
note="", price=q.price, currency=q.currency,
as_of=q.as_of, changes=q.changes,
))
return by_group
async def _recent_headlines_by_bucket(session, hours: float = 24) -> dict[str, list[dict]]:
"""Last N hours of headlines, bucketed by category. Hard cap per bucket
to keep the prompt under ~40KB."""
cutoff = utcnow() - timedelta(hours=hours)
stmt = (
select(Headline)
.where(Headline.published_at >= cutoff)
.order_by(desc(Headline.published_at))
.limit(400)
)
rows = (await session.execute(stmt)).scalars().all()
by_bucket: dict[str, list[dict]] = defaultdict(list)
for h in rows:
if len(by_bucket[h.category]) >= 40:
continue
by_bucket[h.category].append(dict(
when=h.published_at.isoformat(),
source=h.source, title=h.title,
))
return by_bucket
async def _month_spend(session) -> float:
start = month_start()
total = (await session.execute(
select(func.coalesce(func.sum(AICall.cost_usd), 0.0))
.where(AICall.called_at >= start)
)).scalar()
return float(total or 0.0)
async def run() -> None:
async with job_lifecycle("ai_log_job") as (session, jr):
if jr.status == "skipped":
return
s = get_settings()
if not s.OPENROUTER_API_KEY:
log.warning("ai_log.skipped_no_key")
jr.status = "skipped"
return
spent = await _month_spend(session)
if spent >= s.OPENROUTER_MONTHLY_CAP_USD:
log.warning("ai_log.cap_reached", spent=spent,
cap=s.OPENROUTER_MONTHLY_CAP_USD)
jr.status = "skipped"
jr.error = f"monthly cost cap reached (${spent:.2f})"
return
quotes = await _latest_quotes_by_group(session)
news = await _recent_headlines_by_bucket(session)
if not quotes and not news:
log.warning("ai_log.no_data_yet")
jr.status = "skipped"
return
anchor = s.CASSANDRA_ANCHOR_DATE or None
user_prompt = build_user_prompt(
today=utcnow(),
anchor=anchor,
quotes_by_group=quotes,
headlines_by_bucket=news,
reference_line=REFERENCE_LINE,
)
system_prompt = build_system_prompt(s.CASSANDRA_TONE, s.CASSANDRA_ANALYSIS)
try:
async with httpx.AsyncClient(follow_redirects=True) as client:
result = await call_openrouter(
client,
[{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt}],
model=s.OPENROUTER_MODEL,
)
except Exception as e:
session.add(AICall(
model=s.OPENROUTER_MODEL, status="error", error=str(e)[:500],
))
await session.commit()
raise
session.add(StrategicLog(
generated_at=utcnow(),
model=result.model,
anchor_date=anchor,
prompt_version=PROMPT_VERSION,
tone=s.CASSANDRA_TONE.upper(),
analysis=s.CASSANDRA_ANALYSIS.upper(),
content=result.content,
prompt_tokens=result.prompt_tokens,
completion_tokens=result.completion_tokens,
cost_usd=result.cost_usd,
))
session.add(AICall(
model=result.model,
prompt_tokens=result.prompt_tokens,
completion_tokens=result.completion_tokens,
cost_usd=result.cost_usd,
status="ok",
))
await session.commit()
jr.items_written = 1
log.info("ai_log.done",
model=result.model,
prompt_tokens=result.prompt_tokens,
completion_tokens=result.completion_tokens)
if __name__ == "__main__":
asyncio.run(run())

63
app/jobs/market_job.py Normal file
View file

@ -0,0 +1,63 @@
"""Hourly market ingestion: fetch every (symbol, group) defined in TOML and
insert one Quote row per fetch."""
from __future__ import annotations
import asyncio
import httpx
from app.config import get_settings, load_groups
from app.db import utcnow
from app.jobs._helpers import job_lifecycle, log
from app.models import Quote
from app.services.market import fetch
async def run() -> None:
async with job_lifecycle("market_job") as (session, run):
if run.status == "skipped":
return
s = get_settings()
groups = load_groups(s.BASELINE_TOML, s.PORTFOLIO_TOML)
anchor = s.CASSANDRA_ANCHOR_DATE or None
async with httpx.AsyncClient(follow_redirects=True) as client:
tasks = [
fetch(client, sym, lab, note, anchor)
for group, items in groups.items()
for sym, lab, note in items
]
# Run in parallel but bounded — Yahoo can throttle if we hammer.
sem = asyncio.Semaphore(16)
async def bounded(t):
async with sem:
return await t
quotes = await asyncio.gather(*(bounded(t) for t in tasks))
# Re-index quotes back to their group for persistence.
items_flat = [
(group, sym)
for group, items in groups.items()
for sym, _, _ in items
]
now = utcnow()
for (group, _sym), q in zip(items_flat, quotes):
session.add(Quote(
symbol=q.symbol,
source=q.source,
label=q.label,
group_name=group,
price=q.price,
currency=q.currency,
as_of=q.as_of,
changes=q.changes or None,
error=q.error,
fetched_at=now,
))
await session.commit()
run.items_written = len(quotes)
log.info("market_job.done", count=len(quotes))
if __name__ == "__main__":
asyncio.run(run())

99
app/jobs/news_job.py Normal file
View file

@ -0,0 +1,99 @@
"""Hourly news ingestion. Reads enabled feeds from the DB (not TOML — DB has
the authoritative enabled/failure state). Per-ticker Yahoo news pulled for
each symbol in the default portfolio group ('pie')."""
from __future__ import annotations
import asyncio
import httpx
from sqlalchemy import desc, select
from sqlalchemy.dialects.mysql import insert as mysql_insert
from app.db import utcnow
from app.jobs._helpers import job_lifecycle, log
from app.models import Feed, Headline, Portfolio, PortfolioSnapshot, Position
from app.services.news import dedupe, fetch_feed, fetch_yahoo_news
AUTO_DISABLE_AT = 5
async def _process_feed(client: httpx.AsyncClient, feed: Feed) -> tuple[Feed, list]:
try:
items = await fetch_feed(client, feed.name, feed.category, feed.url)
feed.consecutive_failures = 0
feed.last_success_at = utcnow()
return feed, items
except Exception as e:
feed.consecutive_failures += 1
if feed.consecutive_failures >= AUTO_DISABLE_AT:
feed.enabled = False
log.warning("feed.fetch_failed", name=feed.name,
fails=feed.consecutive_failures, error=str(e))
return feed, []
async def run() -> None:
async with job_lifecycle("news_job") as (session, run):
if run.status == "skipped":
return
feeds = (
await session.execute(select(Feed).where(Feed.enabled == True))
).scalars().all()
# Portfolio tickers + names now come from the latest T212 snapshot,
# not from TOML. The (ticker, name) pair lets fetch_yahoo_news skip
# the chart-meta round-trip and use the proper company name directly.
latest_snap_id = (await session.execute(
select(PortfolioSnapshot.id)
.order_by(desc(PortfolioSnapshot.snapshot_at))
.limit(1)
)).scalar_one_or_none()
ticker_pairs: list[tuple[str, str]] = []
if latest_snap_id is not None:
positions = (await session.execute(
select(Position).where(Position.snapshot_id == latest_snap_id)
)).scalars().all()
ticker_pairs = [(p.ticker, p.name or p.ticker) for p in positions]
async with httpx.AsyncClient(follow_redirects=True) as client:
feed_results = await asyncio.gather(
*(_process_feed(client, f) for f in feeds)
)
ticker_results = await asyncio.gather(
*(fetch_yahoo_news(client, t, query_override=n)
for t, n in ticker_pairs)
)
all_headlines = []
for _feed, items in feed_results:
all_headlines.extend(items)
for items in ticker_results:
all_headlines.extend(items)
headlines = dedupe(all_headlines)
# Bulk INSERT IGNORE (fingerprint UNIQUE de-dupes across runs).
if headlines:
stmt = mysql_insert(Headline).values([
dict(
source=h.source,
category=h.category,
title=h.title[:512],
url=h.url[:1024],
published_at=h.when,
fetched_at=utcnow(),
fingerprint=h.fingerprint,
)
for h in headlines
]).prefix_with("IGNORE")
await session.execute(stmt)
await session.commit()
run.items_written = len(headlines)
log.info("news_job.done", fetched=len(all_headlines), kept=len(headlines))
if __name__ == "__main__":
asyncio.run(run())

90
app/jobs/portfolio_job.py Normal file
View file

@ -0,0 +1,90 @@
"""Hourly Trading 212 snapshot. One Portfolio row per portfolio name
(currently just 'pie'); one PortfolioSnapshot per run; N Position rows."""
from __future__ import annotations
import asyncio
import httpx
from sqlalchemy import select
from app.config import get_settings
from app.db import utcnow
from app.jobs._helpers import job_lifecycle, log
from app.models import Portfolio, PortfolioSnapshot, Position
from app.services.trading212 import Trading212
PORTFOLIO_NAME = "pie" # only one for now; multi-portfolio extension is schema-ready
async def run() -> None:
async with job_lifecycle("portfolio_job") as (session, jr):
if jr.status == "skipped":
return
s = get_settings()
if not (s.API_KEY and s.SECRET_KEY):
log.warning("portfolio_job.skipped_no_creds")
jr.status = "skipped"
return
t212 = Trading212()
async with httpx.AsyncClient(follow_redirects=True) as client:
summary = await t212.summary(client)
positions = await t212.positions(client)
# The instruments call is heavy (~5 MB / 17k rows) but it's our
# only path to a human-readable name per ticker. Once per hour is
# fine; later we could cache to disk.
try:
instruments = await t212.instruments(client)
name_by_ticker = {
i["ticker"]: i.get("name") or i.get("shortName") or i["ticker"]
for i in (instruments or [])
}
except Exception:
name_by_ticker = {}
portfolio = (
await session.execute(
select(Portfolio).where(Portfolio.name == PORTFOLIO_NAME)
)
).scalar_one_or_none()
if portfolio is None:
portfolio = Portfolio(
name=PORTFOLIO_NAME, source="trading212",
currency=summary.get("currency", "GBP"),
)
session.add(portfolio)
await session.flush() # need id for FK
cash = (summary.get("cash") or {})
investments = (summary.get("investments") or {})
snap = PortfolioSnapshot(
portfolio_id=portfolio.id,
snapshot_at=utcnow(),
total_value=summary.get("totalValue"),
cash=cash.get("availableToTrade"),
invested=investments.get("currentValue"),
raw_json=summary,
)
session.add(snap)
await session.flush()
for p in positions or []:
tkr = p.get("ticker", "")
session.add(Position(
snapshot_id=snap.id,
ticker=tkr,
name=name_by_ticker.get(tkr),
quantity=p.get("quantity"),
average_price=p.get("averagePrice"),
current_price=p.get("currentPrice"),
ppl=p.get("ppl"),
))
await session.commit()
jr.items_written = len(positions or []) + 1
log.info("portfolio_job.done", positions=len(positions or []))
if __name__ == "__main__":
asyncio.run(run())

70
app/jobs/rollup_job.py Normal file
View file

@ -0,0 +1,70 @@
"""Daily rollup: collapse `quotes` into `quotes_daily` (one row per
(symbol, date) using the latest fetched_at of the day) and prune `quotes`
older than 30 days. Sparklines read from `quotes_daily`."""
from __future__ import annotations
import asyncio
from datetime import date, timedelta
from sqlalchemy import delete, func, select
from sqlalchemy.dialects.mysql import insert as mysql_insert
from app.db import utcnow
from app.jobs._helpers import job_lifecycle, log
from app.models import Quote, QuoteDaily
PRUNE_DAYS = 30
async def run() -> None:
async with job_lifecycle("rollup_job") as (session, jr):
if jr.status == "skipped":
return
# 1. Rollup: latest fetched_at of each (symbol, date) gets stored as close.
sub = (
select(
Quote.symbol.label("symbol"),
func.date(Quote.fetched_at).label("date"),
func.max(Quote.fetched_at).label("mx"),
)
.where(Quote.price.is_not(None))
.group_by(Quote.symbol, func.date(Quote.fetched_at))
.subquery()
)
rows = (await session.execute(
select(
Quote.symbol,
func.date(Quote.fetched_at).label("d"),
Quote.price,
Quote.source,
)
.join(sub,
(Quote.symbol == sub.c.symbol)
& (Quote.fetched_at == sub.c.mx))
)).all()
if rows:
stmt = mysql_insert(QuoteDaily).values([
dict(symbol=r.symbol, date=r.d, close=r.price,
high=r.price, low=r.price, source=r.source)
for r in rows
])
stmt = stmt.on_duplicate_key_update(close=stmt.inserted.close)
await session.execute(stmt)
# 2. Prune raw quotes older than PRUNE_DAYS.
cutoff = utcnow() - timedelta(days=PRUNE_DAYS)
result = await session.execute(
delete(Quote).where(Quote.fetched_at < cutoff)
)
await session.commit()
pruned = result.rowcount or 0
jr.items_written = len(rows)
log.info("rollup_job.done", rolled=len(rows), pruned=pruned)
if __name__ == "__main__":
asyncio.run(run())