read.markets/app/jobs/ai_log_job.py
Giorgio Gilestro a10409c02b initial commit — cassandra v0.1
Containerised macro-strategy dashboard: 4-panel web UI (indicators,
portfolio, flash news, AI strategic log), MariaDB store, hourly
ingestion jobs, OpenRouter-backed AI analysis.

Ports the four prototype scripts in the parent dir (market_pulse,
flash_news, trading212, strategic_log) into async services backed by a
persistent DB and served via FastAPI + Jinja2 + HTMX. APScheduler runs
as a separate compose service for crash-safety and easier restarts.

Portfolio composition + position names come live from Trading 212;
news per-ticker headlines reuse those names. Tone (NOVICE/INTERMEDIATE/
PRO) and analysis style (DRY/SPECULATIVE) are env-configurable and
stored on each log row so historical entries show what produced them.

Default model is deepseek/deepseek-v4-flash (overridable via env).
Light/dark theme toggle, sans-serif for prose surfaces, monospace for
data. Bearer-token auth, OpenRouter monthly cost cap, RSS feeds auto-
disabled on consecutive failures.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-15 21:56:10 +01:00

173 lines
5.6 KiB
Python

"""Hourly AI strategic-log generator. Pulls already-persisted market data and
headlines from the DB (no live fetches), calls OpenRouter, persists the log
and a row in the cost ledger."""
from __future__ import annotations
import asyncio
from collections import defaultdict
from datetime import timedelta
import httpx
from sqlalchemy import desc, func, select
from app.config import get_settings
from app.db import utcnow
from app.jobs._helpers import job_lifecycle, log
from app.models import AICall, Headline, Quote, StrategicLog
from app.services.openrouter import (
PROMPT_VERSION,
build_system_prompt,
build_user_prompt,
call_openrouter,
month_start,
)
REFERENCE_LINE = (
"S&P 7,501 (ATH) · VIX 18.0 · US 10y 4.45% · HY OAS 279bps · "
"Brent $109/bbl · Gold $4,651/oz · CPI 3.8% YoY"
)
async def _latest_quotes_by_group(session) -> dict[str, list[dict]]:
"""Latest quote per (group, symbol). Skips error rows where price is null."""
sub = (
select(
Quote.group_name,
Quote.symbol,
func.max(Quote.fetched_at).label("mx"),
)
.group_by(Quote.group_name, Quote.symbol)
.subquery()
)
stmt = (
select(Quote)
.join(
sub,
(Quote.group_name == sub.c.group_name)
& (Quote.symbol == sub.c.symbol)
& (Quote.fetched_at == sub.c.mx),
)
.order_by(Quote.group_name, Quote.symbol)
)
rows = (await session.execute(stmt)).scalars().all()
by_group: dict[str, list[dict]] = defaultdict(list)
for q in rows:
by_group[q.group_name].append(dict(
symbol=q.symbol, source=q.source, label=q.label,
note="", price=q.price, currency=q.currency,
as_of=q.as_of, changes=q.changes,
))
return by_group
async def _recent_headlines_by_bucket(session, hours: float = 24) -> dict[str, list[dict]]:
"""Last N hours of headlines, bucketed by category. Hard cap per bucket
to keep the prompt under ~40KB."""
cutoff = utcnow() - timedelta(hours=hours)
stmt = (
select(Headline)
.where(Headline.published_at >= cutoff)
.order_by(desc(Headline.published_at))
.limit(400)
)
rows = (await session.execute(stmt)).scalars().all()
by_bucket: dict[str, list[dict]] = defaultdict(list)
for h in rows:
if len(by_bucket[h.category]) >= 40:
continue
by_bucket[h.category].append(dict(
when=h.published_at.isoformat(),
source=h.source, title=h.title,
))
return by_bucket
async def _month_spend(session) -> float:
start = month_start()
total = (await session.execute(
select(func.coalesce(func.sum(AICall.cost_usd), 0.0))
.where(AICall.called_at >= start)
)).scalar()
return float(total or 0.0)
async def run() -> None:
async with job_lifecycle("ai_log_job") as (session, jr):
if jr.status == "skipped":
return
s = get_settings()
if not s.OPENROUTER_API_KEY:
log.warning("ai_log.skipped_no_key")
jr.status = "skipped"
return
spent = await _month_spend(session)
if spent >= s.OPENROUTER_MONTHLY_CAP_USD:
log.warning("ai_log.cap_reached", spent=spent,
cap=s.OPENROUTER_MONTHLY_CAP_USD)
jr.status = "skipped"
jr.error = f"monthly cost cap reached (${spent:.2f})"
return
quotes = await _latest_quotes_by_group(session)
news = await _recent_headlines_by_bucket(session)
if not quotes and not news:
log.warning("ai_log.no_data_yet")
jr.status = "skipped"
return
anchor = s.CASSANDRA_ANCHOR_DATE or None
user_prompt = build_user_prompt(
today=utcnow(),
anchor=anchor,
quotes_by_group=quotes,
headlines_by_bucket=news,
reference_line=REFERENCE_LINE,
)
system_prompt = build_system_prompt(s.CASSANDRA_TONE, s.CASSANDRA_ANALYSIS)
try:
async with httpx.AsyncClient(follow_redirects=True) as client:
result = await call_openrouter(
client,
[{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt}],
model=s.OPENROUTER_MODEL,
)
except Exception as e:
session.add(AICall(
model=s.OPENROUTER_MODEL, status="error", error=str(e)[:500],
))
await session.commit()
raise
session.add(StrategicLog(
generated_at=utcnow(),
model=result.model,
anchor_date=anchor,
prompt_version=PROMPT_VERSION,
tone=s.CASSANDRA_TONE.upper(),
analysis=s.CASSANDRA_ANALYSIS.upper(),
content=result.content,
prompt_tokens=result.prompt_tokens,
completion_tokens=result.completion_tokens,
cost_usd=result.cost_usd,
))
session.add(AICall(
model=result.model,
prompt_tokens=result.prompt_tokens,
completion_tokens=result.completion_tokens,
cost_usd=result.cost_usd,
status="ok",
))
await session.commit()
jr.items_written = 1
log.info("ai_log.done",
model=result.model,
prompt_tokens=result.prompt_tokens,
completion_tokens=result.completion_tokens)
if __name__ == "__main__":
asyncio.run(run())