215 lines
8.1 KiB
Python
215 lines
8.1 KiB
Python
"""Hourly AI strategic-log generator. Pulls already-persisted market data and
|
|
headlines from the DB (no live fetches), calls OpenRouter, persists the log
|
|
and a row in the cost ledger."""
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
|
|
import httpx
|
|
from sqlalchemy import desc, func, select
|
|
|
|
from app.config import get_settings
|
|
from app.db import utcnow
|
|
from app.jobs._helpers import job_lifecycle, log
|
|
from app.jobs._market_context import (
|
|
REFERENCE_LINE,
|
|
latest_quotes_by_group,
|
|
month_spend,
|
|
recent_headlines_by_bucket,
|
|
)
|
|
from app.models import AICall, JobRun, StrategicLog, StrategicLogTranslation, User
|
|
from app.services.cadence import DEFAULT_POLICY
|
|
from app.services.i18n import ACTIVE_LANGUAGES
|
|
from app.services.openrouter import (
|
|
PROMPT_VERSION,
|
|
active_model,
|
|
build_system_prompt,
|
|
build_user_prompt,
|
|
call_llm,
|
|
llm_configured,
|
|
)
|
|
from app.services.translation import translate
|
|
|
|
|
|
async def translate_log_for_active_languages(session, log_id: int) -> None:
|
|
"""Fan out per-language translations for the strategic log identified
|
|
by ``log_id``.
|
|
|
|
Reads ``users.lang`` (deduplicated, restricted to ACTIVE_LANGUAGES
|
|
minus English), one translation call per language in parallel via
|
|
``asyncio.gather``, persists each successful result as a
|
|
``StrategicLogTranslation`` row. Per-language failures are logged
|
|
but never raise — the strategic log itself is already committed at
|
|
this point and translation is a best-effort enhancement.
|
|
|
|
The job orchestrator calls this AFTER the English ``StrategicLog``
|
|
row is committed; pass the row's ``id`` in.
|
|
"""
|
|
target_langs = sorted({l for l in ACTIVE_LANGUAGES if l != "en"})
|
|
if not target_langs:
|
|
return
|
|
|
|
active_langs = (await session.execute(
|
|
select(User.lang).distinct().where(User.lang.in_(target_langs))
|
|
)).scalars().all()
|
|
if not active_langs:
|
|
return
|
|
|
|
log_row = await session.get(StrategicLog, log_id)
|
|
if log_row is None:
|
|
log.warning("log.translate.missing_log", log_id=log_id)
|
|
return
|
|
|
|
async with httpx.AsyncClient(follow_redirects=True, timeout=60) as client:
|
|
results = await asyncio.gather(*[
|
|
translate(client, log_row.content, lang)
|
|
for lang in active_langs
|
|
], return_exceptions=True)
|
|
|
|
for lang, result in zip(active_langs, results):
|
|
if isinstance(result, Exception):
|
|
log.warning("log.translate.failed", lang=lang, log_id=log_id,
|
|
error=str(result)[:200])
|
|
continue
|
|
translated_md, llm_result = result
|
|
session.add(StrategicLogTranslation(
|
|
log_id=log_id, lang=lang,
|
|
content_md=translated_md,
|
|
generated_at=utcnow(),
|
|
llm_model=llm_result.model,
|
|
llm_cost_usd=llm_result.cost_usd,
|
|
))
|
|
await session.commit()
|
|
|
|
|
|
async def run() -> None:
|
|
async with job_lifecycle("ai_log_job") as (session, jr):
|
|
if jr.status == "skipped":
|
|
return
|
|
s = get_settings()
|
|
if not llm_configured():
|
|
log.warning("ai_log.skipped_no_key", provider=s.LLM_PROVIDER)
|
|
jr.status = "skipped"
|
|
return
|
|
|
|
# Cadence: hourly during EU/US active hours; throttled off-hours.
|
|
last_success = (await session.execute(
|
|
select(func.max(JobRun.finished_at)).where(
|
|
JobRun.name == "ai_log_job",
|
|
JobRun.status == "success",
|
|
)
|
|
)).scalar()
|
|
should_run, reason = DEFAULT_POLICY.should_run(last_success)
|
|
if not should_run:
|
|
log.info("ai_log.cadence_skip", reason=reason)
|
|
jr.status = "skipped"
|
|
jr.error = reason
|
|
return
|
|
|
|
spent = await month_spend(session)
|
|
if spent >= s.OPENROUTER_MONTHLY_CAP_USD:
|
|
log.warning("ai_log.cap_reached", spent=spent,
|
|
cap=s.OPENROUTER_MONTHLY_CAP_USD)
|
|
jr.status = "skipped"
|
|
jr.error = f"monthly cost cap reached (${spent:.2f})"
|
|
return
|
|
|
|
quotes = await latest_quotes_by_group(session)
|
|
news = await recent_headlines_by_bucket(session)
|
|
if not quotes and not news:
|
|
log.warning("ai_log.no_data_yet")
|
|
jr.status = "skipped"
|
|
return
|
|
|
|
# Look up the most recent log generated today (UTC) so the model can
|
|
# update it rather than start from scratch. This gives the model
|
|
# temporal awareness — "since this morning's read, X has changed".
|
|
today_start = utcnow().replace(hour=0, minute=0, second=0, microsecond=0)
|
|
previous_log = (await session.execute(
|
|
select(StrategicLog)
|
|
.where(StrategicLog.generated_at >= today_start)
|
|
.order_by(desc(StrategicLog.generated_at))
|
|
.limit(1)
|
|
)).scalar_one_or_none()
|
|
|
|
anchor = s.CASSANDRA_ANCHOR_DATE or None
|
|
user_prompt = build_user_prompt(
|
|
today=utcnow(),
|
|
anchor=anchor,
|
|
quotes_by_group=quotes,
|
|
headlines_by_bucket=news,
|
|
reference_line=REFERENCE_LINE,
|
|
previous_log=previous_log,
|
|
)
|
|
|
|
# Phase 2 voice pivot (PROMPT_VERSION 6): generate both tones per
|
|
# run so the dashboard toggle is instant. Analysis stays on the
|
|
# operator-configured default (DRY|SPECULATIVE is a system-wide
|
|
# preference, not a per-user toggle). PRO was dropped.
|
|
analysis = (s.CASSANDRA_ANALYSIS or "SPECULATIVE").upper()
|
|
variants = [
|
|
("NOVICE", analysis),
|
|
("INTERMEDIATE", analysis),
|
|
]
|
|
written = 0
|
|
async with httpx.AsyncClient(follow_redirects=True) as client:
|
|
for tone, analysis in variants:
|
|
# Re-check cost cap between variants so a runaway run is
|
|
# bounded.
|
|
spent = await month_spend(session)
|
|
if spent >= s.OPENROUTER_MONTHLY_CAP_USD:
|
|
log.warning("ai_log.cap_reached_midrun",
|
|
spent=spent, completed=written)
|
|
break
|
|
|
|
system_prompt = build_system_prompt(tone, analysis)
|
|
try:
|
|
result = await call_llm(
|
|
client,
|
|
[{"role": "system", "content": system_prompt},
|
|
{"role": "user", "content": user_prompt}],
|
|
)
|
|
except Exception as e:
|
|
session.add(AICall(
|
|
model=active_model(), status="error",
|
|
error=f"{tone}/{analysis}: {str(e)[:480]}",
|
|
))
|
|
await session.commit()
|
|
log.error("ai_log.variant_failed",
|
|
tone=tone, analysis=analysis, error=str(e)[:200])
|
|
continue
|
|
|
|
slog = StrategicLog(
|
|
generated_at=utcnow(),
|
|
model=result.model,
|
|
anchor_date=anchor,
|
|
prompt_version=PROMPT_VERSION,
|
|
tone=tone,
|
|
analysis=analysis,
|
|
content=result.content,
|
|
prompt_tokens=result.prompt_tokens,
|
|
completion_tokens=result.completion_tokens,
|
|
cost_usd=result.cost_usd,
|
|
)
|
|
session.add(slog)
|
|
session.add(AICall(
|
|
model=result.model,
|
|
prompt_tokens=result.prompt_tokens,
|
|
completion_tokens=result.completion_tokens,
|
|
cost_usd=result.cost_usd,
|
|
status="ok",
|
|
))
|
|
await session.commit()
|
|
await translate_log_for_active_languages(session, slog.id)
|
|
written += 1
|
|
log.info("ai_log.variant_done",
|
|
tone=tone, analysis=analysis,
|
|
prompt_tokens=result.prompt_tokens,
|
|
completion_tokens=result.completion_tokens)
|
|
|
|
jr.items_written = written
|
|
log.info("ai_log.done", variants=written, total=len(variants))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(run())
|