Two changes that together cut OpenRouter spend ~50% and give the daily log temporal awareness. 1. CadencePolicy (app/services/cadence.py): expensive AI jobs only fire hourly during the EU/US active window (Mon-Fri 07-21 UTC). Off-hours weekdays throttle to every 4h; weekends to every 12h. ai_log_job and indicator_summary_job both consult the policy before doing real work; market/news/portfolio ingest jobs stay hourly (cheap, no API cost). Skipped runs land in job_runs with status 'skipped' and the throttle reason in error. 2. Update mode for ai_log_job: when an earlier log exists for the current UTC day, it's passed to the model as 'Earlier log from today (generated HH:MM UTC)'. The system prompt grows an Update mode section instructing the model to revise — not restart — and anchor on what has CHANGED since the earlier draft. The TL;DR leads with intra-day change when meaningful, the watch list evolves rather than restarts. PROMPT_VERSION bumped to 5. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
200 lines
6.7 KiB
Python
200 lines
6.7 KiB
Python
"""Hourly AI strategic-log generator. Pulls already-persisted market data and
|
|
headlines from the DB (no live fetches), calls OpenRouter, persists the log
|
|
and a row in the cost ledger."""
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
from collections import defaultdict
|
|
from datetime import timedelta
|
|
|
|
import httpx
|
|
from sqlalchemy import desc, func, select
|
|
|
|
from app.config import get_settings
|
|
from app.db import utcnow
|
|
from app.jobs._helpers import job_lifecycle, log
|
|
from app.models import AICall, Headline, JobRun, Quote, StrategicLog
|
|
from app.services.cadence import DEFAULT_POLICY
|
|
from app.services.openrouter import (
|
|
PROMPT_VERSION,
|
|
build_system_prompt,
|
|
build_user_prompt,
|
|
call_openrouter,
|
|
month_start,
|
|
)
|
|
|
|
|
|
REFERENCE_LINE = (
|
|
"S&P 7,501 (ATH) · VIX 18.0 · US 10y 4.45% · HY OAS 279bps · "
|
|
"Brent $109/bbl · Gold $4,651/oz · CPI 3.8% YoY"
|
|
)
|
|
|
|
|
|
async def _latest_quotes_by_group(session) -> dict[str, list[dict]]:
|
|
"""Latest quote per (group, symbol). Skips error rows where price is null."""
|
|
sub = (
|
|
select(
|
|
Quote.group_name,
|
|
Quote.symbol,
|
|
func.max(Quote.fetched_at).label("mx"),
|
|
)
|
|
.group_by(Quote.group_name, Quote.symbol)
|
|
.subquery()
|
|
)
|
|
stmt = (
|
|
select(Quote)
|
|
.join(
|
|
sub,
|
|
(Quote.group_name == sub.c.group_name)
|
|
& (Quote.symbol == sub.c.symbol)
|
|
& (Quote.fetched_at == sub.c.mx),
|
|
)
|
|
.order_by(Quote.group_name, Quote.symbol)
|
|
)
|
|
rows = (await session.execute(stmt)).scalars().all()
|
|
by_group: dict[str, list[dict]] = defaultdict(list)
|
|
for q in rows:
|
|
by_group[q.group_name].append(dict(
|
|
symbol=q.symbol, source=q.source, label=q.label,
|
|
note="", price=q.price, currency=q.currency,
|
|
as_of=q.as_of, changes=q.changes,
|
|
))
|
|
return by_group
|
|
|
|
|
|
async def _recent_headlines_by_bucket(session, hours: float = 24) -> dict[str, list[dict]]:
|
|
"""Last N hours of headlines, bucketed by category. Hard cap per bucket
|
|
to keep the prompt under ~40KB."""
|
|
cutoff = utcnow() - timedelta(hours=hours)
|
|
stmt = (
|
|
select(Headline)
|
|
.where(Headline.published_at >= cutoff)
|
|
.order_by(desc(Headline.published_at))
|
|
.limit(400)
|
|
)
|
|
rows = (await session.execute(stmt)).scalars().all()
|
|
by_bucket: dict[str, list[dict]] = defaultdict(list)
|
|
for h in rows:
|
|
if len(by_bucket[h.category]) >= 40:
|
|
continue
|
|
by_bucket[h.category].append(dict(
|
|
when=h.published_at.isoformat(),
|
|
source=h.source, title=h.title,
|
|
))
|
|
return by_bucket
|
|
|
|
|
|
async def _month_spend(session) -> float:
|
|
start = month_start()
|
|
total = (await session.execute(
|
|
select(func.coalesce(func.sum(AICall.cost_usd), 0.0))
|
|
.where(AICall.called_at >= start)
|
|
)).scalar()
|
|
return float(total or 0.0)
|
|
|
|
|
|
async def run() -> None:
|
|
async with job_lifecycle("ai_log_job") as (session, jr):
|
|
if jr.status == "skipped":
|
|
return
|
|
s = get_settings()
|
|
if not s.OPENROUTER_API_KEY:
|
|
log.warning("ai_log.skipped_no_key")
|
|
jr.status = "skipped"
|
|
return
|
|
|
|
# Cadence: hourly during EU/US active hours; throttled off-hours.
|
|
last_success = (await session.execute(
|
|
select(func.max(JobRun.finished_at)).where(
|
|
JobRun.name == "ai_log_job",
|
|
JobRun.status == "success",
|
|
)
|
|
)).scalar()
|
|
should_run, reason = DEFAULT_POLICY.should_run(last_success)
|
|
if not should_run:
|
|
log.info("ai_log.cadence_skip", reason=reason)
|
|
jr.status = "skipped"
|
|
jr.error = reason
|
|
return
|
|
|
|
spent = await _month_spend(session)
|
|
if spent >= s.OPENROUTER_MONTHLY_CAP_USD:
|
|
log.warning("ai_log.cap_reached", spent=spent,
|
|
cap=s.OPENROUTER_MONTHLY_CAP_USD)
|
|
jr.status = "skipped"
|
|
jr.error = f"monthly cost cap reached (${spent:.2f})"
|
|
return
|
|
|
|
quotes = await _latest_quotes_by_group(session)
|
|
news = await _recent_headlines_by_bucket(session)
|
|
if not quotes and not news:
|
|
log.warning("ai_log.no_data_yet")
|
|
jr.status = "skipped"
|
|
return
|
|
|
|
# Look up the most recent log generated today (UTC) so the model can
|
|
# update it rather than start from scratch. This gives the model
|
|
# temporal awareness — "since this morning's read, X has changed".
|
|
today_start = utcnow().replace(hour=0, minute=0, second=0, microsecond=0)
|
|
previous_log = (await session.execute(
|
|
select(StrategicLog)
|
|
.where(StrategicLog.generated_at >= today_start)
|
|
.order_by(desc(StrategicLog.generated_at))
|
|
.limit(1)
|
|
)).scalar_one_or_none()
|
|
|
|
anchor = s.CASSANDRA_ANCHOR_DATE or None
|
|
user_prompt = build_user_prompt(
|
|
today=utcnow(),
|
|
anchor=anchor,
|
|
quotes_by_group=quotes,
|
|
headlines_by_bucket=news,
|
|
reference_line=REFERENCE_LINE,
|
|
previous_log=previous_log,
|
|
)
|
|
|
|
system_prompt = build_system_prompt(s.CASSANDRA_TONE, s.CASSANDRA_ANALYSIS)
|
|
try:
|
|
async with httpx.AsyncClient(follow_redirects=True) as client:
|
|
result = await call_openrouter(
|
|
client,
|
|
[{"role": "system", "content": system_prompt},
|
|
{"role": "user", "content": user_prompt}],
|
|
model=s.OPENROUTER_MODEL,
|
|
)
|
|
except Exception as e:
|
|
session.add(AICall(
|
|
model=s.OPENROUTER_MODEL, status="error", error=str(e)[:500],
|
|
))
|
|
await session.commit()
|
|
raise
|
|
|
|
session.add(StrategicLog(
|
|
generated_at=utcnow(),
|
|
model=result.model,
|
|
anchor_date=anchor,
|
|
prompt_version=PROMPT_VERSION,
|
|
tone=s.CASSANDRA_TONE.upper(),
|
|
analysis=s.CASSANDRA_ANALYSIS.upper(),
|
|
content=result.content,
|
|
prompt_tokens=result.prompt_tokens,
|
|
completion_tokens=result.completion_tokens,
|
|
cost_usd=result.cost_usd,
|
|
))
|
|
session.add(AICall(
|
|
model=result.model,
|
|
prompt_tokens=result.prompt_tokens,
|
|
completion_tokens=result.completion_tokens,
|
|
cost_usd=result.cost_usd,
|
|
status="ok",
|
|
))
|
|
await session.commit()
|
|
jr.items_written = 1
|
|
log.info("ai_log.done",
|
|
model=result.model,
|
|
prompt_tokens=result.prompt_tokens,
|
|
completion_tokens=result.completion_tokens)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(run())
|