diff --git a/app/jobs/ai_log_job.py b/app/jobs/ai_log_job.py index 4524bfe..eacc703 100644 --- a/app/jobs/ai_log_job.py +++ b/app/jobs/ai_log_job.py @@ -13,7 +13,8 @@ from sqlalchemy import desc, func, select from app.config import get_settings from app.db import utcnow from app.jobs._helpers import job_lifecycle, log -from app.models import AICall, Headline, Quote, StrategicLog +from app.models import AICall, Headline, JobRun, Quote, StrategicLog +from app.services.cadence import DEFAULT_POLICY from app.services.openrouter import ( PROMPT_VERSION, build_system_prompt, @@ -102,6 +103,20 @@ async def run() -> None: jr.status = "skipped" return + # Cadence: hourly during EU/US active hours; throttled off-hours. + last_success = (await session.execute( + select(func.max(JobRun.finished_at)).where( + JobRun.name == "ai_log_job", + JobRun.status == "success", + ) + )).scalar() + should_run, reason = DEFAULT_POLICY.should_run(last_success) + if not should_run: + log.info("ai_log.cadence_skip", reason=reason) + jr.status = "skipped" + jr.error = reason + return + spent = await _month_spend(session) if spent >= s.OPENROUTER_MONTHLY_CAP_USD: log.warning("ai_log.cap_reached", spent=spent, @@ -117,6 +132,17 @@ async def run() -> None: jr.status = "skipped" return + # Look up the most recent log generated today (UTC) so the model can + # update it rather than start from scratch. This gives the model + # temporal awareness — "since this morning's read, X has changed". + today_start = utcnow().replace(hour=0, minute=0, second=0, microsecond=0) + previous_log = (await session.execute( + select(StrategicLog) + .where(StrategicLog.generated_at >= today_start) + .order_by(desc(StrategicLog.generated_at)) + .limit(1) + )).scalar_one_or_none() + anchor = s.CASSANDRA_ANCHOR_DATE or None user_prompt = build_user_prompt( today=utcnow(), @@ -124,6 +150,7 @@ async def run() -> None: quotes_by_group=quotes, headlines_by_bucket=news, reference_line=REFERENCE_LINE, + previous_log=previous_log, ) system_prompt = build_system_prompt(s.CASSANDRA_TONE, s.CASSANDRA_ANALYSIS) diff --git a/app/jobs/indicator_summary_job.py b/app/jobs/indicator_summary_job.py index efefd20..63da4e1 100644 --- a/app/jobs/indicator_summary_job.py +++ b/app/jobs/indicator_summary_job.py @@ -13,7 +13,8 @@ from sqlalchemy import desc, func, select from app.config import get_settings, load_groups from app.db import utcnow from app.jobs._helpers import job_lifecycle, log -from app.models import AICall, IndicatorSummary, Quote +from app.models import AICall, IndicatorSummary, JobRun, Quote +from app.services.cadence import DEFAULT_POLICY from app.services.openrouter import ( PROMPT_VERSION, build_aggregate_summary_system_prompt, @@ -234,6 +235,21 @@ async def run() -> None: jr.status = "skipped" return + # Cadence — same policy as ai_log_job: hourly during EU/US active, + # throttled off-hours and weekends. + last_success = (await session.execute( + select(func.max(JobRun.finished_at)).where( + JobRun.name == "indicator_summary_job", + JobRun.status == "success", + ) + )).scalar() + should_run, reason = DEFAULT_POLICY.should_run(last_success) + if not should_run: + log.info("ind_summary.cadence_skip", reason=reason) + jr.status = "skipped" + jr.error = reason + return + spent = await _month_spend(session) if spent >= s.OPENROUTER_MONTHLY_CAP_USD: jr.status = "skipped" diff --git a/app/services/cadence.py b/app/services/cadence.py new file mode 100644 index 0000000..866b4c9 --- /dev/null +++ b/app/services/cadence.py @@ -0,0 +1,66 @@ +"""When should expensive AI jobs fire? + +Markets matter. The scheduler wakes every hour, but there's no point spending +OpenRouter tokens at 03:00 UTC on a Sunday when nothing has moved. This module +encodes a single policy: weekday active hours (LSE open through NYSE close, +roughly 07:00-21:00 UTC) get the full hourly cadence; off-hours and weekends +get throttled. + +Used by ai_log_job and indicator_summary_job to decide whether to run NOW or +skip until enough time has passed since the last successful run. Market / +news / portfolio ingestion jobs keep running hourly — they're cheap. +""" +from __future__ import annotations + +from dataclasses import dataclass +from datetime import datetime, timezone + + +@dataclass(frozen=True) +class CadencePolicy: + # Active window in UTC. LSE opens 07:00 BST → 07:00 UTC summer, 08:00 UTC + # winter. NYSE closes 16:00 ET → 21:00 UTC summer, 21:00 UTC winter. The + # combined EU/US trading window is well covered by 07:00-21:00 UTC. + active_start_hour: int = 7 + active_end_hour: int = 21 + # Minimum gap between successful runs outside the active window. + off_hours_gap_h: float = 4.0 + weekend_gap_h: float = 12.0 + + def is_active_window(self, now: datetime | None = None) -> bool: + now = now or datetime.now(timezone.utc) + if now.weekday() >= 5: # Saturday / Sunday + return False + return self.active_start_hour <= now.hour < self.active_end_hour + + def min_gap_hours(self, now: datetime | None = None) -> float: + now = now or datetime.now(timezone.utc) + if now.weekday() >= 5: + return self.weekend_gap_h + if self.is_active_window(now): + return 0.0 # always run during the active window + return self.off_hours_gap_h + + def should_run( + self, + last_success_at: datetime | None, + now: datetime | None = None, + ) -> tuple[bool, str]: + """Returns (should_run, reason). The reason is human-readable for logs + and the job_runs.error column when a run is skipped.""" + now = now or datetime.now(timezone.utc) + if self.is_active_window(now): + return True, "active window" + min_gap = self.min_gap_hours(now) + if last_success_at is None: + return True, "no prior successful run" + # Normalise tz; DB returns naive but we treat it as UTC. + if last_success_at.tzinfo is None: + last_success_at = last_success_at.replace(tzinfo=timezone.utc) + age_h = (now - last_success_at).total_seconds() / 3600.0 + if age_h >= min_gap: + return True, f"off-hours but last run {age_h:.1f}h ago (≥ {min_gap}h)" + return False, f"off-hours throttled — last run {age_h:.1f}h ago (< {min_gap}h)" + + +DEFAULT_POLICY = CadencePolicy() diff --git a/app/services/openrouter.py b/app/services/openrouter.py index 7ddf2fd..e49197a 100644 --- a/app/services/openrouter.py +++ b/app/services/openrouter.py @@ -20,7 +20,7 @@ OPENROUTER_URL = "https://openrouter.ai/api/v1/chat/completions" # Bump when the composed prompt changes meaningfully. Stored on every # StrategicLog row so historical logs can be linked to the prompt that produced # them. -PROMPT_VERSION = 4 +PROMPT_VERSION = 5 # --- Core: invariant across tone/analysis settings ---------------------------- @@ -99,7 +99,23 @@ Close the log with a single sentence on a line of its own, formatted exactly: This is the line a reader who only sees the watch list scrolls down to. Make \ it earn its place: cite real signals (HY OAS, breadth, VIX, valuation, real \ -yields), not vibes.""" +yields), not vibes. + +# Update mode (when an earlier log from today is provided) +If the user message includes a section labelled "Earlier log from today \ +(generated HH:MM UTC)", treat that as YOUR OWN earlier draft. You are \ +UPDATING it for the current data, not starting from scratch. +- Don't restate context that hasn't changed. Anchor on what's moved SINCE \ +that timestamp: confirmations, refutations, new emergent patterns. +- The TL;DR should lead with the move since the earlier read when there \ +was a meaningful intra-day change ("Since this morning's read, …") — \ +otherwise stay regime-level. +- The watch list should evolve: drop items that triggered or settled, add \ +items that emerged. Keep items still load-bearing. +- Preserve any insights from the earlier draft that remain valid; sharpen \ +or revise the ones that don't. Avoid contradicting yourself silently — if \ +you change a stance, name it briefly ("Earlier I read X; with Y now, the \ +read shifts to Z").""" # --- Tone: audience-shaping block -------------------------------------------- @@ -312,8 +328,11 @@ def build_user_prompt( quotes_by_group: dict[str, list[dict]], headlines_by_bucket: dict[str, list[dict]], reference_line: str | None = None, + previous_log: object | None = None, ) -> str: - """Assemble the user message from already-fetched-and-persisted data.""" + """Assemble the user message from already-fetched-and-persisted data. + If `previous_log` is a StrategicLog from earlier today, it's included + as 'Update mode' context — the model will revise rather than restart.""" parts = [f"# Strategic log request — {today.strftime('%Y-%m-%d')}"] if anchor: parts.append(f"Anchor reference date: {anchor}") @@ -322,6 +341,20 @@ def build_user_prompt( "\n## Reference snapshot (when the macro thesis was authored)" f"\n{reference_line}\nCompare live readings against it." ) + + if previous_log is not None: + gen = getattr(previous_log, "generated_at", None) + ts = gen.strftime("%H:%M UTC") if gen else "earlier today" + parts.append( + f"\n## Earlier log from today (generated {ts})\n" + "Treat this as YOUR OWN earlier draft for today. Update it for\n" + "the current data — don't restate unchanged context. See the\n" + "'Update mode' section of the system prompt for how to handle it.\n" + "```markdown\n" + f"{previous_log.content}\n" + "```" + ) + parts.append("\n## Live market data (per group)") parts.append("```json\n" + json.dumps(quotes_by_group, indent=2, default=str) + "\n```") parts.append("\n## News flow (last 24h, filtered by bucket)") @@ -331,11 +364,20 @@ def build_user_prompt( parts.append(f"\n### {label.upper()}") for h in items[:30]: parts.append(f"- [{h['when'][:16].replace('T',' ')}] [{h['source']}] {h['title']}") - parts.append( + + task_line = ( "\n## Task\nWrite the daily strategic log in ~800 words, following " "the discipline in the system prompt. No preamble; begin directly " "with the date header." ) + if previous_log is not None: + task_line = ( + "\n## Task\nUpdate the earlier log above for the current data. " + "Keep the same structure (date header, TL;DR, sections, watch " + "list, system temperature) but anchor on what has CHANGED since " + "the earlier draft's timestamp. ~800 words. No preamble." + ) + parts.append(task_line) return "\n".join(parts)