read.markets/app/jobs/ai_log_job.py
Giorgio Gilestro 6e7f57c6b2 phase G: data minimisation + passwordless auth + DeepSeek-first LLM
Server no longer holds portfolios. Holdings live in the browser
(localStorage); the server publishes an anonymous ticker_universe and a
gzipped /api/universe payload identical for every authenticated user, so
access patterns can't betray which tickers a user holds. AI commentary
is generated ephemerally from the browser-supplied pie and the cost
ledger row records no positions. Migrations 0009-0011 added the
universe table and dropped positions / portfolio_snapshots /
portfolios.

Authentication is now e-mail OTP only. Migration 0010 dropped
password_hash and email_verified (every active session is by
construction proof of email control). The /signup endpoint is gone;
signup and login share a single email-entry page. Email rendering is
HTML+plain-text multipart with a shared brand palette (app/branding.py)
asserted in sync with the CSS by a drift-detection test.

LLM provider defaults to DeepSeek-direct (cheaper, api.deepseek.com)
with OpenRouter as automatic fallback if DeepSeek fails. ai_log_job and
indicator_summary_job now iterate the two tones (NOVICE, INTERMEDIATE)
per cycle so the dashboard's tone toggle is instant; PROMPT_VERSION
bumped to 6 with an educational anti-TA / anti-gambling stance baked
into _CORE. NOVICE mode renders a curated glossary inline (CBOE VIX,
yield curve, HY OAS, etc.) with JS-positioned tooltips that survive
viewport edges and sticky bars. Model name and tokens hidden from the
user UI; still recorded in StrategicLog.model and AICall for admin.

Layout adds a sticky top nav, a sticky bottom markets bar (one chip per
exchange with status LED + headline index + 1d change), and
Phase H feedback reporting is queued in tasks/todo.md.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-18 14:16:57 +01:00

226 lines
8.1 KiB
Python

"""Hourly AI strategic-log generator. Pulls already-persisted market data and
headlines from the DB (no live fetches), calls OpenRouter, persists the log
and a row in the cost ledger."""
from __future__ import annotations
import asyncio
from collections import defaultdict
from datetime import timedelta
import httpx
from sqlalchemy import desc, func, select
from app.config import get_settings
from app.db import utcnow
from app.jobs._helpers import job_lifecycle, log
from app.models import AICall, Headline, JobRun, Quote, StrategicLog
from app.services.cadence import DEFAULT_POLICY
from app.services.openrouter import (
PROMPT_VERSION,
active_model,
build_system_prompt,
build_user_prompt,
call_llm,
llm_configured,
month_start,
)
REFERENCE_LINE = (
"S&P 7,501 (ATH) · VIX 18.0 · US 10y 4.45% · HY OAS 279bps · "
"Brent $109/bbl · Gold $4,651/oz · CPI 3.8% YoY"
)
async def _latest_quotes_by_group(session) -> dict[str, list[dict]]:
"""Latest quote per (group, symbol). Skips error rows where price is null."""
sub = (
select(
Quote.group_name,
Quote.symbol,
func.max(Quote.fetched_at).label("mx"),
)
.group_by(Quote.group_name, Quote.symbol)
.subquery()
)
stmt = (
select(Quote)
.join(
sub,
(Quote.group_name == sub.c.group_name)
& (Quote.symbol == sub.c.symbol)
& (Quote.fetched_at == sub.c.mx),
)
.order_by(Quote.group_name, Quote.symbol)
)
rows = (await session.execute(stmt)).scalars().all()
by_group: dict[str, list[dict]] = defaultdict(list)
for q in rows:
by_group[q.group_name].append(dict(
symbol=q.symbol, source=q.source, label=q.label,
note="", price=q.price, currency=q.currency,
as_of=q.as_of, changes=q.changes,
))
return by_group
async def _recent_headlines_by_bucket(session, hours: float = 24) -> dict[str, list[dict]]:
"""Last N hours of headlines, bucketed by category. Hard cap per bucket
to keep the prompt under ~40KB."""
cutoff = utcnow() - timedelta(hours=hours)
stmt = (
select(Headline)
.where(Headline.published_at >= cutoff)
.order_by(desc(Headline.published_at))
.limit(400)
)
rows = (await session.execute(stmt)).scalars().all()
by_bucket: dict[str, list[dict]] = defaultdict(list)
for h in rows:
if len(by_bucket[h.category]) >= 40:
continue
by_bucket[h.category].append(dict(
when=h.published_at.isoformat(),
source=h.source, title=h.title,
))
return by_bucket
async def _month_spend(session) -> float:
start = month_start()
total = (await session.execute(
select(func.coalesce(func.sum(AICall.cost_usd), 0.0))
.where(AICall.called_at >= start)
)).scalar()
return float(total or 0.0)
async def run() -> None:
async with job_lifecycle("ai_log_job") as (session, jr):
if jr.status == "skipped":
return
s = get_settings()
if not llm_configured():
log.warning("ai_log.skipped_no_key", provider=s.LLM_PROVIDER)
jr.status = "skipped"
return
# Cadence: hourly during EU/US active hours; throttled off-hours.
last_success = (await session.execute(
select(func.max(JobRun.finished_at)).where(
JobRun.name == "ai_log_job",
JobRun.status == "success",
)
)).scalar()
should_run, reason = DEFAULT_POLICY.should_run(last_success)
if not should_run:
log.info("ai_log.cadence_skip", reason=reason)
jr.status = "skipped"
jr.error = reason
return
spent = await _month_spend(session)
if spent >= s.OPENROUTER_MONTHLY_CAP_USD:
log.warning("ai_log.cap_reached", spent=spent,
cap=s.OPENROUTER_MONTHLY_CAP_USD)
jr.status = "skipped"
jr.error = f"monthly cost cap reached (${spent:.2f})"
return
quotes = await _latest_quotes_by_group(session)
news = await _recent_headlines_by_bucket(session)
if not quotes and not news:
log.warning("ai_log.no_data_yet")
jr.status = "skipped"
return
# Look up the most recent log generated today (UTC) so the model can
# update it rather than start from scratch. This gives the model
# temporal awareness — "since this morning's read, X has changed".
today_start = utcnow().replace(hour=0, minute=0, second=0, microsecond=0)
previous_log = (await session.execute(
select(StrategicLog)
.where(StrategicLog.generated_at >= today_start)
.order_by(desc(StrategicLog.generated_at))
.limit(1)
)).scalar_one_or_none()
anchor = s.CASSANDRA_ANCHOR_DATE or None
user_prompt = build_user_prompt(
today=utcnow(),
anchor=anchor,
quotes_by_group=quotes,
headlines_by_bucket=news,
reference_line=REFERENCE_LINE,
previous_log=previous_log,
)
# Phase 2 voice pivot (PROMPT_VERSION 6): generate both tones per
# run so the dashboard toggle is instant. Analysis stays on the
# operator-configured default (DRY|SPECULATIVE is a system-wide
# preference, not a per-user toggle). PRO was dropped.
analysis = (s.CASSANDRA_ANALYSIS or "SPECULATIVE").upper()
variants = [
("NOVICE", analysis),
("INTERMEDIATE", analysis),
]
written = 0
async with httpx.AsyncClient(follow_redirects=True) as client:
for tone, analysis in variants:
# Re-check cost cap between variants so a runaway run is
# bounded.
spent = await _month_spend(session)
if spent >= s.OPENROUTER_MONTHLY_CAP_USD:
log.warning("ai_log.cap_reached_midrun",
spent=spent, completed=written)
break
system_prompt = build_system_prompt(tone, analysis)
try:
result = await call_llm(
client,
[{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt}],
)
except Exception as e:
session.add(AICall(
model=active_model(), status="error",
error=f"{tone}/{analysis}: {str(e)[:480]}",
))
await session.commit()
log.error("ai_log.variant_failed",
tone=tone, analysis=analysis, error=str(e)[:200])
continue
session.add(StrategicLog(
generated_at=utcnow(),
model=result.model,
anchor_date=anchor,
prompt_version=PROMPT_VERSION,
tone=tone,
analysis=analysis,
content=result.content,
prompt_tokens=result.prompt_tokens,
completion_tokens=result.completion_tokens,
cost_usd=result.cost_usd,
))
session.add(AICall(
model=result.model,
prompt_tokens=result.prompt_tokens,
completion_tokens=result.completion_tokens,
cost_usd=result.cost_usd,
status="ok",
))
await session.commit()
written += 1
log.info("ai_log.variant_done",
tone=tone, analysis=analysis,
prompt_tokens=result.prompt_tokens,
completion_tokens=result.completion_tokens)
jr.items_written = written
log.info("ai_log.done", variants=written, total=len(variants))
if __name__ == "__main__":
asyncio.run(run())