- Move news_job from hourly to 3x/hour (cron 10,30,50), with a CadencePolicy gate that throttles to active hours (07-21 UTC weekdays at 20 min), off-hours (3 h), weekends (6 h). Keeps the daytime feed fresh without spamming RSS sources overnight. - Tag each headline on ingestion via DeepSeek (BATCH_SIZE=25, max_tokens=4000, json.JSONDecoder().raw_decode + per-row regex recovery for resilient parsing). Vocabulary: 16 tags including new EU / USA / AI / Conflict. NULL tags are picked up automatically on the next news_job run, so back-tagging is implicit rather than a separate migration step. - Tag UI: pill bar above the feed with off → include → exclude cycle on click; shift-click jumps straight to exclude. State persists in localStorage and is injected into /api/news requests via htmx:configRequest. Per-row chips sit to the right of the headline (new 5-column grid: age | source | title | tags | UTC) so vertical density stays high. - Strategic log header bug: model was hallucinating "(Updated 21:30 UTC)" in future tense. Bumped PROMPT_VERSION 6→7, added explicit ban on time-of-day clauses, and supply the actual current UTC time in the user prompt so the model has no need to invent one. Migration 0012 adds headlines.tags (JSON, nullable). Tests cover vocabulary integrity, validation/normalisation, and the JSON-recovery parser (17 tests).
802 lines
27 KiB
Python
802 lines
27 KiB
Python
"""JSON / HTMX-partial endpoints. Each route content-negotiates via `?as=html`:
|
||
bare requests return Pydantic JSON, `as=html` renders the matching partial.
|
||
|
||
The bearer-token dependency applies to all routes here.
|
||
"""
|
||
from __future__ import annotations
|
||
|
||
import calendar as _cal
|
||
import re
|
||
from datetime import date, datetime, timedelta, timezone
|
||
|
||
from fastapi import APIRouter, Depends, File, Form, HTTPException, Query, Request, UploadFile
|
||
from fastapi.responses import HTMLResponse, JSONResponse
|
||
from sqlalchemy import desc, func, select
|
||
from sqlalchemy.ext.asyncio import AsyncSession
|
||
|
||
from collections import defaultdict
|
||
|
||
import httpx
|
||
from pydantic import BaseModel, Field
|
||
|
||
from app.auth import require_token
|
||
from app.config import get_settings
|
||
from app.db import get_session, utcnow
|
||
from app.services.openrouter import (
|
||
PROMPT_VERSION,
|
||
build_chat_system_prompt,
|
||
call_openrouter,
|
||
month_start,
|
||
)
|
||
from app.templates_env import templates
|
||
from app.models import (
|
||
AICall,
|
||
Headline,
|
||
IndicatorSummary,
|
||
JobRun,
|
||
Quote,
|
||
StrategicLog,
|
||
)
|
||
from app.schemas import (
|
||
HealthOut,
|
||
HeadlineOut,
|
||
JobStatus,
|
||
QuoteOut,
|
||
StrategicLogOut,
|
||
)
|
||
|
||
|
||
router = APIRouter(dependencies=[Depends(require_token)])
|
||
|
||
JOB_NAMES = ("market_job", "news_job", "ai_log_job", "rollup_job",
|
||
"indicator_summary_job", "universe_flush_job")
|
||
JOB_STALE_HOURS = 2.0 # job is "warn" if its last success was >2h ago
|
||
|
||
# Per-group expected freshness — bonds and intraday tape want daily data,
|
||
# macro/economy/valuation are monthly/quarterly by nature. Older than this
|
||
# many days from today → row gets a "stale" badge.
|
||
_STALE_DAYS_BY_GROUP = {
|
||
"bonds": 21,
|
||
"rates": 7,
|
||
"equity": 7,
|
||
"mag7": 7,
|
||
"commodities": 7,
|
||
"fx": 7,
|
||
"tech_ai": 7,
|
||
"financials": 7,
|
||
"bubble_watch":21,
|
||
"valuation": 60,
|
||
"macro": 60,
|
||
"economy": 90,
|
||
}
|
||
_STALE_DAYS_DEFAULT = 90
|
||
|
||
|
||
# --- Small helpers -----------------------------------------------------------
|
||
|
||
|
||
def _as_utc(d: datetime) -> datetime:
|
||
"""MariaDB returns naive datetimes — tag them UTC so arithmetic with
|
||
tz-aware utcnow() doesn't blow up."""
|
||
return d if d.tzinfo is not None else d.replace(tzinfo=timezone.utc)
|
||
|
||
|
||
def _age_seconds(now: datetime, when: datetime | None) -> float | None:
|
||
if when is None:
|
||
return None
|
||
return (_as_utc(now) - _as_utc(when)).total_seconds()
|
||
|
||
|
||
def _fmt_age(now: datetime, when: datetime | None) -> str:
|
||
secs = _age_seconds(now, when)
|
||
if secs is None:
|
||
return "—"
|
||
if secs < 60:
|
||
return f"{int(secs)}s"
|
||
if secs < 3600:
|
||
return f"{int(secs // 60)}m"
|
||
if secs < 86400:
|
||
return f"{int(secs // 3600)}h"
|
||
return f"{int(secs // 86400)}d"
|
||
|
||
|
||
_MD_HEADER = re.compile(r"^(#{1,3})\s+(.+)$", re.MULTILINE)
|
||
_MD_BOLD = re.compile(r"\*\*([^*]+)\*\*")
|
||
|
||
|
||
def _md_to_html(text: str) -> str:
|
||
"""Tiny markdown subset for log output — headers, bold, paragraphs."""
|
||
def header_sub(m):
|
||
level = min(3, len(m.group(1))) + 1 # h2/h3/h4
|
||
return f"<h{level}>{m.group(2).strip()}</h{level}>"
|
||
out = _MD_HEADER.sub(header_sub, text)
|
||
out = _MD_BOLD.sub(r"<strong>\1</strong>", out)
|
||
# Convert blank-line-separated paragraphs to <p> blocks.
|
||
blocks = re.split(r"\n\s*\n", out.strip())
|
||
rendered: list[str] = []
|
||
for b in blocks:
|
||
if b.startswith("<h"):
|
||
rendered.append(b)
|
||
else:
|
||
rendered.append(f"<p>{b.strip().replace(chr(10), '<br>')}</p>")
|
||
return "\n".join(rendered)
|
||
|
||
|
||
# --- Indicators --------------------------------------------------------------
|
||
|
||
|
||
@router.get("/indicators/{group}")
|
||
async def indicators(
|
||
group: str,
|
||
request: Request,
|
||
as_: str | None = Query(default=None, alias="as"),
|
||
tone: str | None = Query(default=None),
|
||
session: AsyncSession = Depends(get_session),
|
||
):
|
||
sub = (
|
||
select(Quote.symbol, func.max(Quote.fetched_at).label("mx"))
|
||
.where(Quote.group_name == group)
|
||
.group_by(Quote.symbol)
|
||
.subquery()
|
||
)
|
||
rows = (await session.execute(
|
||
select(Quote)
|
||
.join(sub,
|
||
(Quote.symbol == sub.c.symbol) & (Quote.fetched_at == sub.c.mx))
|
||
.where(Quote.group_name == group)
|
||
.order_by(Quote.symbol)
|
||
)).scalars().all()
|
||
|
||
if as_ == "html":
|
||
from app.config import get_settings, load_groups
|
||
from app.services.market import parse_symbol
|
||
s_ = get_settings()
|
||
|
||
# Build the set of symbols currently configured for this group AND a
|
||
# notes lookup keyed by the post-parse identifier (the form stored in
|
||
# the DB).
|
||
notes: dict[str, str] = {}
|
||
configured: set[str] = set()
|
||
for sym, _lab, note in load_groups(s_.BASELINE_TOML, s_.PORTFOLIO_TOML).get(group, []):
|
||
_fn, ident = parse_symbol(sym)
|
||
notes[ident] = note
|
||
configured.add(ident)
|
||
|
||
# Drop ghost rows: symbols that used to be in this group but were
|
||
# removed from the TOML — their last quote still sits in the DB until
|
||
# rollup prunes it, but we shouldn't show them.
|
||
rows = [r for r in rows if r.symbol in configured]
|
||
|
||
has_anchor = any((r.changes or {}).get("anchor") is not None for r in rows)
|
||
wanted_tone = _resolve_tone_param(tone)
|
||
summary = (await session.execute(
|
||
select(IndicatorSummary)
|
||
.where(IndicatorSummary.group_name == group)
|
||
.where(IndicatorSummary.tone == wanted_tone)
|
||
.order_by(desc(IndicatorSummary.generated_at))
|
||
.limit(1)
|
||
)).scalar_one_or_none()
|
||
if summary is None:
|
||
# Fallback during rollout: any tone for this group.
|
||
summary = (await session.execute(
|
||
select(IndicatorSummary)
|
||
.where(IndicatorSummary.group_name == group)
|
||
.order_by(desc(IndicatorSummary.generated_at))
|
||
.limit(1)
|
||
)).scalar_one_or_none()
|
||
|
||
# Mark rows whose `as_of` is older than the group-specific threshold.
|
||
# Daily-tape groups (bonds, rates, equity, ...) flag stale earlier
|
||
# than monthly groups (economy, macro, valuation).
|
||
today = utcnow().date()
|
||
threshold = _STALE_DAYS_BY_GROUP.get(group, _STALE_DAYS_DEFAULT)
|
||
stale_symbols: set[str] = set()
|
||
for r in rows:
|
||
try:
|
||
as_of_d = datetime.strptime(r.as_of, "%Y-%m-%d").date() if r.as_of else None
|
||
except ValueError:
|
||
as_of_d = None
|
||
if as_of_d and (today - as_of_d).days > threshold:
|
||
stale_symbols.add(r.symbol)
|
||
|
||
return templates.TemplateResponse(
|
||
request, "partials/indicators.html",
|
||
{"quotes": rows, "has_anchor": has_anchor,
|
||
"summary": summary, "notes": notes,
|
||
"stale_symbols": stale_symbols,
|
||
"tone": wanted_tone},
|
||
)
|
||
return [QuoteOut.model_validate(r, from_attributes=True) for r in rows]
|
||
|
||
|
||
# --- News --------------------------------------------------------------------
|
||
|
||
|
||
def _split_tag_param(s: str | None) -> set[str]:
|
||
"""Parse a comma-separated tags query param, lowercase + trim."""
|
||
if not s:
|
||
return set()
|
||
return {t.strip().lower() for t in s.split(",") if t.strip()}
|
||
|
||
|
||
@router.get("/news")
|
||
async def news_list(
|
||
request: Request,
|
||
session: AsyncSession = Depends(get_session),
|
||
category: str | None = Query(None),
|
||
since_hours: float = Query(24.0, ge=0.1, le=720.0),
|
||
limit: int = Query(50, ge=1, le=500),
|
||
tags: str | None = Query(None, description="comma-separated include list"),
|
||
exclude_tags: str | None = Query(None, description="comma-separated exclude list"),
|
||
as_: str | None = Query(default=None, alias="as"),
|
||
):
|
||
from app.services.news_tagging import TAG_LABELS, TAG_VOCABULARY
|
||
|
||
cutoff = utcnow() - timedelta(hours=since_hours)
|
||
stmt = select(Headline).where(Headline.published_at >= cutoff)
|
||
if category:
|
||
stmt = stmt.where(Headline.category == category)
|
||
# Fetch a wider window than `limit` because we tag-filter client-of-DB.
|
||
# JSON column filters in MariaDB are doable but messy; in-Python is
|
||
# simple at our scale.
|
||
stmt = stmt.order_by(desc(Headline.published_at)).limit(max(limit * 3, 200))
|
||
rows = (await session.execute(stmt)).scalars().all()
|
||
|
||
include = _split_tag_param(tags)
|
||
exclude = _split_tag_param(exclude_tags)
|
||
|
||
def _keep(h: Headline) -> bool:
|
||
ts = set(h.tags or [])
|
||
if include and not (ts & include):
|
||
return False
|
||
if exclude and (ts & exclude):
|
||
return False
|
||
return True
|
||
|
||
filtered = [h for h in rows if _keep(h)][:limit]
|
||
|
||
if as_ == "html":
|
||
now = utcnow()
|
||
items = []
|
||
for h in filtered:
|
||
when = _as_utc(h.published_at) if h.published_at else None
|
||
items.append({
|
||
"age": _fmt_age(now, h.published_at),
|
||
"source": h.source,
|
||
"title": h.title,
|
||
"url": h.url,
|
||
"iso": when.isoformat() if when else None,
|
||
"utc_short": when.strftime("%d %b %H:%M") + "Z" if when else "",
|
||
"tags": h.tags or [],
|
||
})
|
||
return templates.TemplateResponse(
|
||
request, "partials/news.html",
|
||
{"headlines": items,
|
||
"tag_vocabulary": TAG_VOCABULARY,
|
||
"tag_labels": TAG_LABELS,
|
||
"active_include": sorted(include),
|
||
"active_exclude": sorted(exclude)},
|
||
)
|
||
return [HeadlineOut.model_validate(r, from_attributes=True) for r in filtered]
|
||
|
||
|
||
# --- Strategic log -----------------------------------------------------------
|
||
|
||
|
||
def _log_partial_payload(row: StrategicLog | None) -> dict | None:
|
||
if row is None:
|
||
return None
|
||
return {
|
||
"content_html": _md_to_html(row.content),
|
||
"generated_at": row.generated_at,
|
||
"model": row.model,
|
||
"tone": row.tone,
|
||
"analysis": row.analysis,
|
||
"prompt_version": row.prompt_version,
|
||
"cost_usd": row.cost_usd,
|
||
"prompt_tokens": row.prompt_tokens,
|
||
"completion_tokens": row.completion_tokens,
|
||
}
|
||
|
||
|
||
def _resolve_tone_param(tone: str | None) -> str:
|
||
"""Normalise a query-param tone to one of the two valid values.
|
||
PRO is silently mapped to INTERMEDIATE (see openrouter.PROMPT_VERSION 6)."""
|
||
if not tone:
|
||
return get_settings().CASSANDRA_TONE.upper()
|
||
upper = tone.upper().strip()
|
||
if upper in ("NOVICE", "INTERMEDIATE"):
|
||
return upper
|
||
return "INTERMEDIATE"
|
||
|
||
|
||
@router.get("/log/latest")
|
||
async def log_latest(
|
||
request: Request,
|
||
session: AsyncSession = Depends(get_session),
|
||
as_: str | None = Query(default=None, alias="as"),
|
||
tone: str | None = Query(default=None),
|
||
):
|
||
wanted_tone = _resolve_tone_param(tone)
|
||
row = (await session.execute(
|
||
select(StrategicLog)
|
||
.where(StrategicLog.tone == wanted_tone)
|
||
.order_by(desc(StrategicLog.generated_at))
|
||
.limit(1)
|
||
)).scalar_one_or_none()
|
||
# Fallback during rollout: if the requested tone isn't produced yet,
|
||
# serve whatever is latest rather than 404 the panel.
|
||
if row is None:
|
||
row = (await session.execute(
|
||
select(StrategicLog).order_by(desc(StrategicLog.generated_at)).limit(1)
|
||
)).scalar_one_or_none()
|
||
|
||
if as_ == "html":
|
||
return templates.TemplateResponse(
|
||
request, "partials/log.html",
|
||
{"log": _log_partial_payload(row), "tone": wanted_tone},
|
||
)
|
||
|
||
if row is None:
|
||
raise HTTPException(status_code=404, detail="No strategic log generated yet")
|
||
return StrategicLogOut.model_validate(row, from_attributes=True)
|
||
|
||
|
||
@router.get("/log/by-date/{day}")
|
||
async def log_by_date(
|
||
request: Request,
|
||
day: str,
|
||
session: AsyncSession = Depends(get_session),
|
||
as_: str | None = Query(default=None, alias="as"),
|
||
tone: str | None = Query(default=None),
|
||
):
|
||
"""Canonical log for a given day = MAX(generated_at) within that day,
|
||
filtered by tone (NOVICE | INTERMEDIATE; default from settings)."""
|
||
try:
|
||
target = datetime.strptime(day, "%Y-%m-%d").date()
|
||
except ValueError:
|
||
raise HTTPException(status_code=400, detail="day must be YYYY-MM-DD")
|
||
wanted_tone = _resolve_tone_param(tone)
|
||
row = (await session.execute(
|
||
select(StrategicLog)
|
||
.where(func.date(StrategicLog.generated_at) == target)
|
||
.where(StrategicLog.tone == wanted_tone)
|
||
.order_by(desc(StrategicLog.generated_at))
|
||
.limit(1)
|
||
)).scalar_one_or_none()
|
||
if row is None:
|
||
# Fallback: any tone for that day.
|
||
row = (await session.execute(
|
||
select(StrategicLog)
|
||
.where(func.date(StrategicLog.generated_at) == target)
|
||
.order_by(desc(StrategicLog.generated_at))
|
||
.limit(1)
|
||
)).scalar_one_or_none()
|
||
|
||
if as_ == "html":
|
||
return templates.TemplateResponse(
|
||
request, "partials/log.html",
|
||
{"log": _log_partial_payload(row), "tone": wanted_tone},
|
||
)
|
||
if row is None:
|
||
raise HTTPException(status_code=404, detail="No log on this date")
|
||
return StrategicLogOut.model_validate(row, from_attributes=True)
|
||
|
||
|
||
# --- Calendar archive --------------------------------------------------------
|
||
|
||
|
||
def _month_grid(year: int, month: int) -> list[list[int | None]]:
|
||
"""6 weeks × 7 days, Monday-first; None for cells outside the month."""
|
||
weeks = _cal.Calendar(firstweekday=0).monthdayscalendar(year, month)
|
||
while len(weeks) < 6:
|
||
weeks.append([0] * 7)
|
||
return [[d or None for d in w] for w in weeks]
|
||
|
||
|
||
def _shift_month(year: int, month: int, delta: int) -> tuple[int, int]:
|
||
m = month + delta
|
||
y = year + (m - 1) // 12
|
||
m = ((m - 1) % 12) + 1
|
||
return y, m
|
||
|
||
|
||
@router.get("/log/days")
|
||
async def log_days(
|
||
request: Request,
|
||
month: str = Query(..., pattern=r"^\d{4}-\d{2}$"),
|
||
selected: str | None = Query(None),
|
||
session: AsyncSession = Depends(get_session),
|
||
as_: str | None = Query(default=None, alias="as"),
|
||
):
|
||
"""Return the calendar widget for a given month with the days that have
|
||
a strategic log highlighted."""
|
||
year, mnum = (int(p) for p in month.split("-"))
|
||
month_start = date(year, mnum, 1)
|
||
next_y, next_m = _shift_month(year, mnum, 1)
|
||
month_end = date(next_y, next_m, 1)
|
||
|
||
rows = (await session.execute(
|
||
select(func.distinct(func.date(StrategicLog.generated_at)))
|
||
.where(StrategicLog.generated_at >= month_start)
|
||
.where(StrategicLog.generated_at < month_end)
|
||
)).all()
|
||
# SQLAlchemy returns date or string depending on dialect; normalise to ints.
|
||
days_with_logs: set[int] = set()
|
||
for (d,) in rows:
|
||
if isinstance(d, str):
|
||
d = datetime.strptime(d, "%Y-%m-%d").date()
|
||
days_with_logs.add(d.day)
|
||
|
||
prev_y, prev_m = _shift_month(year, mnum, -1)
|
||
sel_date: date | None = None
|
||
if selected:
|
||
try:
|
||
sel_date = datetime.strptime(selected, "%Y-%m-%d").date()
|
||
except ValueError:
|
||
sel_date = None
|
||
|
||
payload = {
|
||
"year": year,
|
||
"month": mnum,
|
||
"month_name": _cal.month_name[mnum],
|
||
"grid": _month_grid(year, mnum),
|
||
"days_with_logs": days_with_logs,
|
||
"selected": sel_date,
|
||
"today": datetime.now(timezone.utc).date(),
|
||
"prev_month": f"{prev_y:04d}-{prev_m:02d}",
|
||
"next_month": f"{next_y:04d}-{next_m:02d}",
|
||
}
|
||
|
||
if as_ == "json":
|
||
return JSONResponse({
|
||
"month": month,
|
||
"days_with_logs": sorted(days_with_logs),
|
||
"prev_month": payload["prev_month"],
|
||
"next_month": payload["next_month"],
|
||
})
|
||
return templates.TemplateResponse(request, "partials/calendar.html", payload)
|
||
|
||
|
||
# Portfolio endpoints moved to app/routers/universe.py (Phase G). The
|
||
# server no longer persists per-user portfolio data; holdings live in
|
||
# the browser's localStorage and prices come from /api/universe.
|
||
|
||
|
||
# --- Health / ops footer -----------------------------------------------------
|
||
|
||
|
||
# --- Aggregate summary + market status (dashboard header) -------------------
|
||
|
||
|
||
AGGREGATE_GROUP_NAME = "__all__"
|
||
|
||
|
||
@router.get("/summary/aggregate")
|
||
async def aggregate_summary(
|
||
request: Request,
|
||
session: AsyncSession = Depends(get_session),
|
||
as_: str | None = Query(default=None, alias="as"),
|
||
tone: str | None = Query(default=None),
|
||
):
|
||
wanted_tone = _resolve_tone_param(tone)
|
||
row = (await session.execute(
|
||
select(IndicatorSummary)
|
||
.where(IndicatorSummary.group_name == AGGREGATE_GROUP_NAME)
|
||
.where(IndicatorSummary.tone == wanted_tone)
|
||
.order_by(desc(IndicatorSummary.generated_at))
|
||
.limit(1)
|
||
)).scalar_one_or_none()
|
||
if row is None:
|
||
row = (await session.execute(
|
||
select(IndicatorSummary)
|
||
.where(IndicatorSummary.group_name == AGGREGATE_GROUP_NAME)
|
||
.order_by(desc(IndicatorSummary.generated_at))
|
||
.limit(1)
|
||
)).scalar_one_or_none()
|
||
|
||
from app.services.markets import all_statuses
|
||
statuses = all_statuses()
|
||
|
||
if as_ == "html":
|
||
return templates.TemplateResponse(
|
||
request, "partials/dashboard_header.html",
|
||
{"summary": row, "markets": statuses, "tone": wanted_tone},
|
||
)
|
||
return {
|
||
"summary": (
|
||
{"content": row.content,
|
||
"generated_at": row.generated_at.isoformat(),
|
||
"model": row.model}
|
||
if row else None
|
||
),
|
||
"markets": [
|
||
{**m, "until": m["until"].isoformat()} for m in statuses
|
||
],
|
||
}
|
||
|
||
|
||
# Market → headline index mapping for the sticky bottom bar. Symbols must
|
||
# be present in config/default.toml so market_job populates `quotes`.
|
||
_MARKET_INDEX = {
|
||
"NYSE": ("^GSPC", "S&P 500"),
|
||
"LSE": ("^FTSE", "FTSE 100"),
|
||
# XETRA → Euro Stoxx 50 rather than ^GDAXI: Yahoo's DAX ticker is
|
||
# patchy via the chart endpoint, and ^STOXX50E is already tracked in
|
||
# config/default.toml's equity group.
|
||
"XETRA": ("^STOXX50E", "STOXX 50"),
|
||
"JPX": ("^N225", "Nikkei 225"),
|
||
"HKEX": ("^HSI", "Hang Seng"),
|
||
"SSE": ("000300.SS", "CSI 300"),
|
||
}
|
||
|
||
|
||
def _fmt_price(p: float | None) -> str:
|
||
if p is None:
|
||
return "—"
|
||
if abs(p) >= 1000:
|
||
return f"{p:,.0f}"
|
||
if abs(p) >= 100:
|
||
return f"{p:,.1f}"
|
||
return f"{p:,.2f}"
|
||
|
||
|
||
@router.get("/markets-bar", response_class=HTMLResponse, include_in_schema=False)
|
||
async def markets_bar(
|
||
request: Request,
|
||
session: AsyncSession = Depends(get_session),
|
||
as_: str | None = Query(default=None, alias="as"),
|
||
):
|
||
"""The sticky bottom-bar payload: per-market open/close status with the
|
||
market's headline index price + 1d change. Refreshed by HTMX every 60s.
|
||
"""
|
||
from app.services.markets import all_statuses
|
||
|
||
statuses = all_statuses()
|
||
# Latest quote per headline-index symbol in one query.
|
||
wanted_syms = [sym for sym, _ in _MARKET_INDEX.values()]
|
||
sub = (
|
||
select(Quote.symbol, func.max(Quote.fetched_at).label("mx"))
|
||
.where(Quote.symbol.in_(wanted_syms))
|
||
.group_by(Quote.symbol)
|
||
.subquery()
|
||
)
|
||
rows = (await session.execute(
|
||
select(Quote).join(
|
||
sub,
|
||
(Quote.symbol == sub.c.symbol) & (Quote.fetched_at == sub.c.mx),
|
||
)
|
||
)).scalars().all()
|
||
by_sym = {q.symbol: q for q in rows}
|
||
|
||
markets: list[dict] = []
|
||
for st in statuses:
|
||
sym, label = _MARKET_INDEX.get(st["code"], (None, None))
|
||
q = by_sym.get(sym) if sym else None
|
||
idx = None
|
||
if q is not None and q.price is not None:
|
||
idx = {
|
||
"symbol": q.symbol,
|
||
"label": label,
|
||
"price_fmt": _fmt_price(q.price),
|
||
"change_1d_pct": (q.changes or {}).get("1d"),
|
||
}
|
||
markets.append({
|
||
"code": st["code"],
|
||
"label": st["label"],
|
||
"open": st["open"],
|
||
"until_iso": st["until"].isoformat(),
|
||
"until_hhmm": st["until"].strftime("%H:%M"),
|
||
"index": idx,
|
||
})
|
||
|
||
return templates.TemplateResponse(
|
||
request, "partials/markets_bar.html",
|
||
{"markets": markets},
|
||
)
|
||
|
||
|
||
@router.get("/health", response_class=HTMLResponse, include_in_schema=False)
|
||
async def health_html(
|
||
request: Request,
|
||
session: AsyncSession = Depends(get_session),
|
||
as_: str | None = Query(default=None, alias="as"),
|
||
):
|
||
"""Returns an HTML fragment by default (the ops footer); ?as=json returns the
|
||
structured object. The default is HTML because that's how the dashboard
|
||
consumes it; CLI/curl users will pass ?as=json."""
|
||
try:
|
||
await session.execute(select(func.now()))
|
||
db_ok = True
|
||
except Exception:
|
||
db_ok = False
|
||
|
||
now = utcnow()
|
||
jobs: list[dict] = []
|
||
structured: list[JobStatus] = []
|
||
for name in JOB_NAMES:
|
||
row = (await session.execute(
|
||
select(JobRun).where(JobRun.name == name)
|
||
.order_by(desc(JobRun.started_at)).limit(1)
|
||
)).scalar_one_or_none()
|
||
if row is None:
|
||
jobs.append({"name": name, "led": "idle", "age": "—",
|
||
"last_finished": None})
|
||
structured.append(JobStatus(name=name))
|
||
continue
|
||
if row.status == "success":
|
||
secs = _age_seconds(now, row.finished_at or row.started_at) or 0
|
||
led = "ok" if secs < JOB_STALE_HOURS * 3600 else "warn"
|
||
elif row.status == "skipped":
|
||
led = "warn"
|
||
elif row.status == "running":
|
||
led = "warn"
|
||
else:
|
||
led = "err"
|
||
jobs.append({
|
||
"name": name, "led": led,
|
||
"age": _fmt_age(now, row.finished_at or row.started_at),
|
||
"last_finished": row.finished_at,
|
||
})
|
||
structured.append(JobStatus(
|
||
name=name, last_started=row.started_at,
|
||
last_finished=row.finished_at, status=row.status,
|
||
error=row.error, items_written=row.items_written,
|
||
))
|
||
|
||
if as_ == "json":
|
||
return JSONResponse(
|
||
HealthOut(db="ok" if db_ok else "down", jobs=structured).model_dump(mode="json")
|
||
)
|
||
return templates.TemplateResponse(
|
||
request, "partials/ops_footer.html",
|
||
{"db_ok": db_ok, "jobs": jobs},
|
||
)
|
||
|
||
|
||
# --- Chat -------------------------------------------------------------------
|
||
|
||
|
||
class ChatMessage(BaseModel):
|
||
role: str = Field(pattern="^(user|assistant)$")
|
||
content: str
|
||
|
||
|
||
class ChatRequest(BaseModel):
|
||
messages: list[ChatMessage]
|
||
|
||
|
||
CHAT_REFERENCE_LINE = (
|
||
"S&P 7,501 (ATH) · VIX 18.0 · US 10y 4.45% · HY OAS 279bps · "
|
||
"Brent $109/bbl · Gold $4,651/oz · CPI 3.8% YoY"
|
||
)
|
||
THESIS_KEYWORDS_FALLBACK = [
|
||
"hormuz", "iran", "opec", "brent", "wti", "crude", "oil",
|
||
"china", "taiwan", "yuan", "fed", "inflation", "cpi", "yield",
|
||
"gold", "dollar", "yen", "saudi", "russia", "ukraine", "israel",
|
||
"nato", "defence", "defense",
|
||
]
|
||
|
||
|
||
async def _latest_quotes_by_group_chat(session: AsyncSession) -> dict[str, list[dict]]:
|
||
sub = (
|
||
select(Quote.group_name, Quote.symbol,
|
||
func.max(Quote.fetched_at).label("mx"))
|
||
.group_by(Quote.group_name, Quote.symbol)
|
||
.subquery()
|
||
)
|
||
rows = (await session.execute(
|
||
select(Quote).join(
|
||
sub,
|
||
(Quote.group_name == sub.c.group_name)
|
||
& (Quote.symbol == sub.c.symbol)
|
||
& (Quote.fetched_at == sub.c.mx),
|
||
).order_by(Quote.group_name, Quote.symbol)
|
||
)).scalars().all()
|
||
by_group: dict[str, list[dict]] = defaultdict(list)
|
||
for q in rows:
|
||
by_group[q.group_name].append({
|
||
"symbol": q.symbol, "label": q.label,
|
||
"price": q.price, "currency": q.currency,
|
||
"as_of": q.as_of, "changes": q.changes,
|
||
})
|
||
return by_group
|
||
|
||
|
||
async def _thesis_headlines_for_chat(session: AsyncSession, limit: int = 50) -> list[dict]:
|
||
cutoff = utcnow() - timedelta(hours=24)
|
||
rows = (await session.execute(
|
||
select(Headline)
|
||
.where(Headline.published_at >= cutoff)
|
||
.order_by(desc(Headline.published_at))
|
||
.limit(300)
|
||
)).scalars().all()
|
||
out = []
|
||
for h in rows:
|
||
if any(kw in h.title.lower() for kw in THESIS_KEYWORDS_FALLBACK):
|
||
out.append({"source": h.source, "title": h.title})
|
||
if len(out) >= limit:
|
||
break
|
||
return out
|
||
|
||
|
||
async def _month_spend(session: AsyncSession) -> float:
|
||
total = (await session.execute(
|
||
select(func.coalesce(func.sum(AICall.cost_usd), 0.0))
|
||
.where(AICall.called_at >= month_start())
|
||
)).scalar()
|
||
return float(total or 0.0)
|
||
|
||
|
||
@router.post("/chat")
|
||
async def chat(
|
||
body: ChatRequest,
|
||
session: AsyncSession = Depends(get_session),
|
||
):
|
||
"""Answer one user turn given the conversation so far. Grounded on the
|
||
latest strategic log + market data + thesis-filtered headlines.
|
||
Ephemeral — the conversation lives entirely in the client; the endpoint
|
||
just records each call's cost in `ai_calls`."""
|
||
s = get_settings()
|
||
if not s.OPENROUTER_API_KEY:
|
||
raise HTTPException(status_code=503, detail="OPENROUTER_API_KEY not set")
|
||
|
||
# Monthly cost cap — same one the log job respects.
|
||
spent = await _month_spend(session)
|
||
if spent >= s.OPENROUTER_MONTHLY_CAP_USD:
|
||
raise HTTPException(
|
||
status_code=429,
|
||
detail=f"Monthly OpenRouter cap reached (${spent:.2f})",
|
||
)
|
||
|
||
# Trim runaway conversations: keep last 20 turns.
|
||
history = body.messages[-20:]
|
||
if not history or history[-1].role != "user":
|
||
raise HTTPException(status_code=400, detail="Last message must be user")
|
||
|
||
# Gather grounding context.
|
||
log_row = (await session.execute(
|
||
select(StrategicLog).order_by(desc(StrategicLog.generated_at)).limit(1)
|
||
)).scalar_one_or_none()
|
||
quotes = await _latest_quotes_by_group_chat(session)
|
||
headlines = await _thesis_headlines_for_chat(session)
|
||
|
||
system_prompt = build_chat_system_prompt(
|
||
s.CASSANDRA_TONE, s.CASSANDRA_ANALYSIS,
|
||
log_content=log_row.content if log_row else None,
|
||
log_generated_at=log_row.generated_at if log_row else None,
|
||
quotes_by_group=quotes,
|
||
headlines=headlines,
|
||
reference_line=CHAT_REFERENCE_LINE,
|
||
)
|
||
|
||
msgs = [{"role": "system", "content": system_prompt}]
|
||
for m in history:
|
||
msgs.append({"role": m.role, "content": m.content})
|
||
|
||
try:
|
||
async with httpx.AsyncClient(follow_redirects=True) as client:
|
||
result = await call_openrouter(client, msgs, model=s.OPENROUTER_MODEL)
|
||
except Exception as e:
|
||
session.add(AICall(
|
||
model=s.OPENROUTER_MODEL, status="error", error=str(e)[:500],
|
||
))
|
||
await session.commit()
|
||
raise HTTPException(status_code=502, detail=f"OpenRouter error: {e}")
|
||
|
||
session.add(AICall(
|
||
model=result.model,
|
||
prompt_tokens=result.prompt_tokens,
|
||
completion_tokens=result.completion_tokens,
|
||
cost_usd=result.cost_usd,
|
||
status="ok",
|
||
))
|
||
await session.commit()
|
||
|
||
return {
|
||
"role": "assistant",
|
||
"content": result.content,
|
||
"content_html": _md_to_html(result.content),
|
||
"prompt_tokens": result.prompt_tokens,
|
||
"completion_tokens": result.completion_tokens,
|
||
}
|