read.markets/app/routers/api.py

690 lines
24 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""JSON / HTMX-partial endpoints. Each route content-negotiates via `?as=html`:
bare requests return Pydantic JSON, `as=html` renders the matching partial.
The bearer-token dependency applies to all routes here.
"""
from __future__ import annotations
import calendar as _cal
import re
from datetime import date, datetime, timedelta, timezone
from typing import Literal
from fastapi import APIRouter, Depends, HTTPException, Query, Request
from fastapi.responses import JSONResponse
from sqlalchemy import desc, func, select
from sqlalchemy.ext.asyncio import AsyncSession
from pydantic import BaseModel
from app.auth import require_token, maybe_current_user, CurrentUser
from app.services.i18n import ACTIVE_LANGUAGES
from app.config import get_settings
from app.db import get_session, utcnow
from app.logging import get_logger
log = get_logger("api_router")
from app.templates_env import templates
from app.models import (
Headline,
IndicatorSummary,
IndicatorSummaryTranslation,
Quote,
StrategicLog,
StrategicLogTranslation,
User,
)
from app.schemas import (
HeadlineOut,
QuoteOut,
StrategicLogOut,
)
router = APIRouter(dependencies=[Depends(require_token)])
# Per-group expected freshness — bonds and intraday tape want daily data,
# macro/economy/valuation are monthly/quarterly by nature. Older than this
# many days from today → row gets a "stale" badge.
_STALE_DAYS_BY_GROUP = {
"bonds": 21,
"rates": 7,
"equity": 7,
"mag7": 7,
"commodities": 7,
"fx": 7,
"tech_ai": 7,
"financials": 7,
"bubble_watch":21,
"valuation": 60,
"macro": 60,
"economy": 90,
}
_STALE_DAYS_DEFAULT = 90
# --- Small helpers -----------------------------------------------------------
def _as_utc(d: datetime) -> datetime:
"""MariaDB returns naive datetimes — tag them UTC so arithmetic with
tz-aware utcnow() doesn't blow up."""
return d if d.tzinfo is not None else d.replace(tzinfo=timezone.utc)
def _age_seconds(now: datetime, when: datetime | None) -> float | None:
if when is None:
return None
return (_as_utc(now) - _as_utc(when)).total_seconds()
def _fmt_age(now: datetime, when: datetime | None) -> str:
secs = _age_seconds(now, when)
if secs is None:
return ""
if secs < 60:
return f"{int(secs)}s"
if secs < 3600:
return f"{int(secs // 60)}m"
if secs < 86400:
return f"{int(secs // 3600)}h"
return f"{int(secs // 86400)}d"
_MD_HEADER = re.compile(r"^(#{1,3})\s+(.+)$", re.MULTILINE)
_MD_BOLD = re.compile(r"\*\*([^*]+)\*\*")
def _md_to_html(text: str) -> str:
"""Tiny markdown subset for log output — headers, bold, paragraphs."""
def header_sub(m):
level = min(3, len(m.group(1))) + 1 # h2/h3/h4
return f"<h{level}>{m.group(2).strip()}</h{level}>"
out = _MD_HEADER.sub(header_sub, text)
out = _MD_BOLD.sub(r"<strong>\1</strong>", out)
# Convert blank-line-separated paragraphs to <p> blocks.
blocks = re.split(r"\n\s*\n", out.strip())
rendered: list[str] = []
for b in blocks:
if b.startswith("<h"):
rendered.append(b)
else:
rendered.append(f"<p>{b.strip().replace(chr(10), '<br>')}</p>")
return "\n".join(rendered)
# --- Indicators --------------------------------------------------------------
@router.get("/indicators/{group}")
async def indicators(
group: str,
request: Request,
as_: str | None = Query(default=None, alias="as"),
tone: str | None = Query(default=None),
session: AsyncSession = Depends(get_session),
principal: CurrentUser | None = Depends(maybe_current_user),
):
sub = (
select(Quote.symbol, func.max(Quote.fetched_at).label("mx"))
.where(Quote.group_name == group)
.group_by(Quote.symbol)
.subquery()
)
rows = (await session.execute(
select(Quote)
.join(sub,
(Quote.symbol == sub.c.symbol) & (Quote.fetched_at == sub.c.mx))
.where(Quote.group_name == group)
.order_by(Quote.symbol)
)).scalars().all()
if as_ == "html":
from app.config import get_settings, load_groups
from app.services.market import parse_symbol
s_ = get_settings()
# Build the set of symbols currently configured for this group AND a
# notes lookup keyed by the post-parse identifier (the form stored in
# the DB).
notes: dict[str, str] = {}
configured: set[str] = set()
for sym, _lab, note in load_groups(s_.BASELINE_TOML, s_.PORTFOLIO_TOML).get(group, []):
_fn, ident = parse_symbol(sym)
notes[ident] = note
configured.add(ident)
# Drop ghost rows: symbols that used to be in this group but were
# removed from the TOML — their last quote still sits in the DB until
# rollup prunes it, but we shouldn't show them.
rows = [r for r in rows if r.symbol in configured]
has_anchor = any((r.changes or {}).get("anchor") is not None for r in rows)
wanted_tone = _resolve_tone_param(tone)
summary = (await session.execute(
select(IndicatorSummary)
.where(IndicatorSummary.group_name == group)
.where(IndicatorSummary.tone == wanted_tone)
.order_by(desc(IndicatorSummary.generated_at))
.limit(1)
)).scalar_one_or_none()
if summary is None:
# Fallback during rollout: any tone for this group.
summary = (await session.execute(
select(IndicatorSummary)
.where(IndicatorSummary.group_name == group)
.order_by(desc(IndicatorSummary.generated_at))
.limit(1)
)).scalar_one_or_none()
# Mark rows whose `as_of` is older than the group-specific threshold.
# Daily-tape groups (bonds, rates, equity, ...) flag stale earlier
# than monthly groups (economy, macro, valuation).
today = utcnow().date()
threshold = _STALE_DAYS_BY_GROUP.get(group, _STALE_DAYS_DEFAULT)
stale_symbols: set[str] = set()
for r in rows:
try:
as_of_d = datetime.strptime(r.as_of, "%Y-%m-%d").date() if r.as_of else None
except ValueError:
as_of_d = None
if as_of_d and (today - as_of_d).days > threshold:
stale_symbols.add(r.symbol)
await _apply_localized_summary(session, summary, principal)
return templates.TemplateResponse(
request, "partials/indicators.html",
{"quotes": rows, "has_anchor": has_anchor,
"summary": summary, "notes": notes,
"stale_symbols": stale_symbols,
"tone": wanted_tone},
)
return [QuoteOut.model_validate(r, from_attributes=True) for r in rows]
# --- News --------------------------------------------------------------------
def _split_tag_param(s: str | None) -> set[str]:
"""Parse a comma-separated tags query param, lowercase + trim."""
if not s:
return set()
return {t.strip().lower() for t in s.split(",") if t.strip()}
@router.get("/news")
async def news_list(
request: Request,
session: AsyncSession = Depends(get_session),
category: str | None = Query(None),
since_hours: float = Query(24.0, ge=0.1, le=720.0),
limit: int = Query(50, ge=1, le=500),
tags: str | None = Query(None, description="comma-separated include list"),
exclude_tags: str | None = Query(None, description="comma-separated exclude list"),
principal: CurrentUser | None = Depends(maybe_current_user),
as_: str | None = Query(default=None, alias="as"),
):
from app.services.news_tagging import TAG_LABELS, TAG_VOCABULARY
from app.services.access import FREE_NEWS_WINDOW_HOURS, is_paid_active
effective_hours = since_hours
capped = not is_paid_active(principal)
if capped:
effective_hours = min(since_hours, FREE_NEWS_WINDOW_HOURS)
cutoff = utcnow() - timedelta(hours=effective_hours)
stmt = select(Headline).where(Headline.published_at >= cutoff)
if category:
stmt = stmt.where(Headline.category == category)
# Fetch a wider window than `limit` because we tag-filter client-of-DB.
# JSON column filters in MariaDB are doable but messy; in-Python is
# simple at our scale.
stmt = stmt.order_by(desc(Headline.published_at)).limit(max(limit * 3, 200))
rows = (await session.execute(stmt)).scalars().all()
include = _split_tag_param(tags)
exclude = _split_tag_param(exclude_tags)
def _keep(h: Headline) -> bool:
ts = set(h.tags or [])
if include and not (ts & include):
return False
if exclude and (ts & exclude):
return False
return True
filtered = [h for h in rows if _keep(h)][:limit]
if as_ == "html":
now = utcnow()
items = []
for h in filtered:
when = _as_utc(h.published_at) if h.published_at else None
items.append({
"age": _fmt_age(now, h.published_at),
"source": h.source,
"title": h.title,
"url": h.url,
"iso": when.isoformat() if when else None,
"utc_short": when.strftime("%d %b %H:%M") + "Z" if when else "",
"tags": h.tags or [],
})
return templates.TemplateResponse(
request, "partials/news.html",
{"headlines": items,
"tag_vocabulary": TAG_VOCABULARY,
"tag_labels": TAG_LABELS,
"active_include": sorted(include),
"active_exclude": sorted(exclude),
"capped": capped,
"window_hours": effective_hours},
)
return [HeadlineOut.model_validate(r, from_attributes=True) for r in filtered]
# --- Strategic log -----------------------------------------------------------
def _log_partial_payload(
row: StrategicLog | None,
content_override: str | None = None,
) -> dict | None:
if row is None:
return None
content = content_override if content_override is not None else row.content
return {
"content_html": _md_to_html(content),
"generated_at": row.generated_at,
"model": row.model,
"tone": row.tone,
"analysis": row.analysis,
"prompt_version": row.prompt_version,
"cost_usd": row.cost_usd,
"prompt_tokens": row.prompt_tokens,
"completion_tokens": row.completion_tokens,
}
async def _localized_content(
session: AsyncSession,
row: StrategicLog | None,
principal: CurrentUser | None,
) -> str | None:
"""Return the translated content for ``row`` when the principal has
a non-English lang preference and a matching translation row exists.
Returns None to signal 'use row.content as-is' (the default English
path)."""
if row is None or principal is None or principal.user is None:
log.info("i18n.log.skip", reason="no_row_or_principal",
row_id=getattr(row, "id", None),
has_principal=principal is not None,
has_user=(principal.user is not None) if principal else False)
return None
lang = (principal.user.lang or "en")
log.info("i18n.log.lookup", row_id=row.id, lang=lang,
user_id=principal.user.id)
if lang == "en":
return None
t = (await session.execute(
select(StrategicLogTranslation)
.where(StrategicLogTranslation.log_id == row.id)
.where(StrategicLogTranslation.lang == lang)
)).scalar_one_or_none()
log.info("i18n.log.result", row_id=row.id, lang=lang,
found=(t is not None),
content_preview=(t.content[:60] if t is not None else None))
return t.content if t is not None else None
async def _apply_localized_summary(
session: AsyncSession,
row: IndicatorSummary | None,
principal: CurrentUser | None,
) -> None:
"""If ``row`` has a matching translation for ``principal.user.lang``,
overwrite the in-memory ``content`` attribute so the template renders
the localized version. No DB write happens — the mutation lives only
for the lifetime of this GET request.
"""
if row is None or principal is None or principal.user is None:
log.info("i18n.summary.skip", reason="no_row_or_principal",
row_id=getattr(row, "id", None),
has_principal=principal is not None,
has_user=(principal.user is not None) if principal else False)
return
lang = (principal.user.lang or "en")
log.info("i18n.summary.lookup", row_id=row.id, lang=lang,
user_id=principal.user.id)
if lang == "en":
return
t = (await session.execute(
select(IndicatorSummaryTranslation)
.where(IndicatorSummaryTranslation.summary_id == row.id)
.where(IndicatorSummaryTranslation.lang == lang)
)).scalar_one_or_none()
log.info("i18n.summary.result", row_id=row.id, lang=lang,
found=(t is not None),
content_preview=(t.content[:60] if t is not None else None))
if t is not None:
row.content = t.content
def _resolve_tone_param(tone: str | None) -> str:
"""Normalise a query-param tone to one of the two valid values.
PRO is silently mapped to INTERMEDIATE (see openrouter.PROMPT_VERSION 6)."""
if not tone:
return get_settings().CASSANDRA_TONE.upper()
upper = tone.upper().strip()
if upper in ("NOVICE", "INTERMEDIATE"):
return upper
return "INTERMEDIATE"
def _free_tier_hour_filter():
"""Free-tier cadence filter for the strategic log: restrict matches to
logs generated at one of the 6-hour boundary hours (00, 06, 12, 18
UTC). The job itself runs at :20 every hour, so this effectively gives
free users a fresh log roughly every six hours."""
from app.services.access import FREE_LOG_HOURS_UTC
# `func.extract` works on both MariaDB and SQLite.
return func.extract("hour", StrategicLog.generated_at).in_(FREE_LOG_HOURS_UTC)
@router.get("/log/latest")
async def log_latest(
request: Request,
session: AsyncSession = Depends(get_session),
as_: str | None = Query(default=None, alias="as"),
tone: str | None = Query(default=None),
principal: CurrentUser | None = Depends(maybe_current_user),
):
from app.services.access import is_paid_active
free_only = not is_paid_active(principal)
wanted_tone = _resolve_tone_param(tone)
stmt = (
select(StrategicLog)
.where(StrategicLog.tone == wanted_tone)
.order_by(desc(StrategicLog.generated_at))
.limit(1)
)
if free_only:
stmt = stmt.where(_free_tier_hour_filter())
row = (await session.execute(stmt)).scalar_one_or_none()
# Fallback during rollout: if the requested tone isn't produced yet,
# serve whatever is latest rather than 404 the panel.
if row is None:
fallback = (
select(StrategicLog)
.order_by(desc(StrategicLog.generated_at))
.limit(1)
)
if free_only:
fallback = fallback.where(_free_tier_hour_filter())
row = (await session.execute(fallback)).scalar_one_or_none()
if as_ == "html":
content_override = await _localized_content(session, row, principal)
return templates.TemplateResponse(
request, "partials/log.html",
{"log": _log_partial_payload(row, content_override=content_override),
"tone": wanted_tone, "paid": not free_only},
)
if row is None:
raise HTTPException(status_code=404, detail="No strategic log generated yet")
return StrategicLogOut.model_validate(row, from_attributes=True)
@router.get("/log/by-date/{day}")
async def log_by_date(
request: Request,
day: str,
session: AsyncSession = Depends(get_session),
as_: str | None = Query(default=None, alias="as"),
tone: str | None = Query(default=None),
principal: CurrentUser | None = Depends(maybe_current_user),
):
"""Canonical log for a given day = MAX(generated_at) within that day,
filtered by tone (NOVICE | INTERMEDIATE; default from settings).
Free-tier users only see logs generated at the 6-hour boundary slots."""
try:
target = datetime.strptime(day, "%Y-%m-%d").date()
except ValueError:
raise HTTPException(status_code=400, detail="day must be YYYY-MM-DD")
from app.services.access import is_paid_active
free_only = not is_paid_active(principal)
wanted_tone = _resolve_tone_param(tone)
stmt = (
select(StrategicLog)
.where(func.date(StrategicLog.generated_at) == target)
.where(StrategicLog.tone == wanted_tone)
.order_by(desc(StrategicLog.generated_at))
.limit(1)
)
if free_only:
stmt = stmt.where(_free_tier_hour_filter())
row = (await session.execute(stmt)).scalar_one_or_none()
if row is None:
# Fallback: any tone for that day (still tier-filtered).
fallback = (
select(StrategicLog)
.where(func.date(StrategicLog.generated_at) == target)
.order_by(desc(StrategicLog.generated_at))
.limit(1)
)
if free_only:
fallback = fallback.where(_free_tier_hour_filter())
row = (await session.execute(fallback)).scalar_one_or_none()
if as_ == "html":
content_override = await _localized_content(session, row, principal)
return templates.TemplateResponse(
request, "partials/log.html",
{"log": _log_partial_payload(row, content_override=content_override),
"tone": wanted_tone, "paid": not free_only},
)
if row is None:
raise HTTPException(status_code=404, detail="No log on this date")
return StrategicLogOut.model_validate(row, from_attributes=True)
# --- Calendar archive --------------------------------------------------------
def _month_grid(year: int, month: int) -> list[list[int | None]]:
"""6 weeks × 7 days, Monday-first; None for cells outside the month."""
weeks = _cal.Calendar(firstweekday=0).monthdayscalendar(year, month)
while len(weeks) < 6:
weeks.append([0] * 7)
return [[d or None for d in w] for w in weeks]
def _shift_month(year: int, month: int, delta: int) -> tuple[int, int]:
m = month + delta
y = year + (m - 1) // 12
m = ((m - 1) % 12) + 1
return y, m
@router.get("/log/days")
async def log_days(
request: Request,
month: str = Query(..., pattern=r"^\d{4}-\d{2}$"),
selected: str | None = Query(None),
session: AsyncSession = Depends(get_session),
as_: str | None = Query(default=None, alias="as"),
):
"""Return the calendar widget for a given month with the days that have
a strategic log highlighted."""
year, mnum = (int(p) for p in month.split("-"))
month_start = date(year, mnum, 1)
next_y, next_m = _shift_month(year, mnum, 1)
month_end = date(next_y, next_m, 1)
rows = (await session.execute(
select(func.distinct(func.date(StrategicLog.generated_at)))
.where(StrategicLog.generated_at >= month_start)
.where(StrategicLog.generated_at < month_end)
)).all()
# SQLAlchemy returns date or string depending on dialect; normalise to ints.
days_with_logs: set[int] = set()
for (d,) in rows:
if isinstance(d, str):
d = datetime.strptime(d, "%Y-%m-%d").date()
days_with_logs.add(d.day)
prev_y, prev_m = _shift_month(year, mnum, -1)
sel_date: date | None = None
if selected:
try:
sel_date = datetime.strptime(selected, "%Y-%m-%d").date()
except ValueError:
sel_date = None
payload = {
"year": year,
"month": mnum,
"month_name": _cal.month_name[mnum],
"grid": _month_grid(year, mnum),
"days_with_logs": days_with_logs,
"selected": sel_date,
"today": datetime.now(timezone.utc).date(),
"prev_month": f"{prev_y:04d}-{prev_m:02d}",
"next_month": f"{next_y:04d}-{next_m:02d}",
}
if as_ == "json":
return JSONResponse({
"month": month,
"days_with_logs": sorted(days_with_logs),
"prev_month": payload["prev_month"],
"next_month": payload["next_month"],
})
return templates.TemplateResponse(request, "partials/calendar.html", payload)
# --- Aggregate summary + market status (dashboard header) -------------------
AGGREGATE_GROUP_NAME = "__all__"
@router.get("/summary/aggregate")
async def aggregate_summary(
request: Request,
session: AsyncSession = Depends(get_session),
as_: str | None = Query(default=None, alias="as"),
tone: str | None = Query(default=None),
principal: CurrentUser | None = Depends(maybe_current_user),
):
wanted_tone = _resolve_tone_param(tone)
row = (await session.execute(
select(IndicatorSummary)
.where(IndicatorSummary.group_name == AGGREGATE_GROUP_NAME)
.where(IndicatorSummary.tone == wanted_tone)
.order_by(desc(IndicatorSummary.generated_at))
.limit(1)
)).scalar_one_or_none()
if row is None:
row = (await session.execute(
select(IndicatorSummary)
.where(IndicatorSummary.group_name == AGGREGATE_GROUP_NAME)
.order_by(desc(IndicatorSummary.generated_at))
.limit(1)
)).scalar_one_or_none()
from app.services.markets import all_statuses
statuses = all_statuses()
if as_ == "html":
await _apply_localized_summary(session, row, principal)
return templates.TemplateResponse(
request, "partials/dashboard_header.html",
{"summary": row, "markets": statuses, "tone": wanted_tone},
)
return {
"summary": (
{"content": row.content,
"generated_at": row.generated_at.isoformat(),
"model": row.model}
if row else None
),
"markets": [
{**m, "until": m["until"].isoformat()} for m in statuses
],
}
# ---------------------------------------------------------------------------
# Settings — digest preferences
# ---------------------------------------------------------------------------
class DigestPrefsIn(BaseModel):
opt_in: bool
tone: Literal["NOVICE", "INTERMEDIATE"]
class DigestPrefsOut(BaseModel):
opt_in: bool
tone: str
@router.patch("/settings/digest", response_model=DigestPrefsOut)
async def patch_digest_prefs(
payload: DigestPrefsIn,
principal: CurrentUser = Depends(require_token),
session: AsyncSession = Depends(get_session),
) -> DigestPrefsOut:
if principal.user is None:
# Admin bearer-token path — no per-user row to persist to.
raise HTTPException(status_code=400, detail="no_user_context")
# require_token loads `principal.user` in its own short-lived session.
# By the time this handler runs, that session is closed; mutating the
# detached object and committing via `session` would persist nothing.
# Re-fetch in the active session before writing.
user = await session.get(User, principal.user.id)
if user is None:
raise HTTPException(status_code=404, detail="user_not_found")
user.email_digest_opt_in = payload.opt_in
user.digest_tone = payload.tone
await session.commit()
return DigestPrefsOut(opt_in=payload.opt_in, tone=payload.tone)
# ---------------------------------------------------------------------------
# Settings — language preference
# ---------------------------------------------------------------------------
class LanguagePrefsIn(BaseModel):
lang: str
class LanguagePrefsOut(BaseModel):
lang: str
@router.patch("/settings/language", response_model=LanguagePrefsOut)
async def patch_language_prefs(
payload: LanguagePrefsIn,
principal: CurrentUser = Depends(require_token),
session: AsyncSession = Depends(get_session),
) -> LanguagePrefsOut:
if principal.user is None:
raise HTTPException(status_code=400, detail="no_user_context")
lang = (payload.lang or "").strip().lower()
if lang not in ACTIVE_LANGUAGES:
raise HTTPException(
status_code=400,
detail=f"unsupported language: {payload.lang!r}",
)
user = await session.get(User, principal.user.id)
if user is None:
raise HTTPException(status_code=404, detail="user_not_found")
user.lang = lang
await session.commit()
return LanguagePrefsOut(lang=lang)