"""JSON / HTMX-partial endpoints. Each route content-negotiates via `?as=html`: bare requests return Pydantic JSON, `as=html` renders the matching partial. The bearer-token dependency applies to all routes here. """ from __future__ import annotations import calendar as _cal import re from datetime import date, datetime, timedelta, timezone from typing import Literal from fastapi import APIRouter, Depends, File, Form, HTTPException, Query, Request, UploadFile from fastapi.responses import HTMLResponse, JSONResponse from sqlalchemy import desc, func, select from sqlalchemy.ext.asyncio import AsyncSession from collections import defaultdict import httpx from pydantic import BaseModel, Field from app.auth import require_token, maybe_current_user, CurrentUser from app.services.i18n import ACTIVE_LANGUAGES from app.config import get_settings from app.db import get_session, utcnow from app.jobs._market_context import REFERENCE_LINE from app.services.llm_prompts import ( PROMPT_VERSION, build_chat_system_prompt, ) from app.services.openrouter import ( call_llm, month_start, ) from app.templates_env import templates from app.models import ( AICall, Headline, IndicatorSummary, IndicatorSummaryTranslation, JobRun, Quote, StrategicLog, StrategicLogTranslation, User, ) from app.schemas import ( HealthOut, HeadlineOut, JobStatus, QuoteOut, StrategicLogOut, ) router = APIRouter(dependencies=[Depends(require_token)]) JOB_NAMES = ("market_job", "news_job", "ai_log_job", "rollup_job", "indicator_summary_job", "universe_flush_job", "email_digest_job") JOB_STALE_HOURS = 2.0 # job is "warn" if its last success was >2h ago # Per-group expected freshness — bonds and intraday tape want daily data, # macro/economy/valuation are monthly/quarterly by nature. Older than this # many days from today → row gets a "stale" badge. _STALE_DAYS_BY_GROUP = { "bonds": 21, "rates": 7, "equity": 7, "mag7": 7, "commodities": 7, "fx": 7, "tech_ai": 7, "financials": 7, "bubble_watch":21, "valuation": 60, "macro": 60, "economy": 90, } _STALE_DAYS_DEFAULT = 90 # --- Small helpers ----------------------------------------------------------- def _as_utc(d: datetime) -> datetime: """MariaDB returns naive datetimes — tag them UTC so arithmetic with tz-aware utcnow() doesn't blow up.""" return d if d.tzinfo is not None else d.replace(tzinfo=timezone.utc) def _age_seconds(now: datetime, when: datetime | None) -> float | None: if when is None: return None return (_as_utc(now) - _as_utc(when)).total_seconds() def _fmt_age(now: datetime, when: datetime | None) -> str: secs = _age_seconds(now, when) if secs is None: return "—" if secs < 60: return f"{int(secs)}s" if secs < 3600: return f"{int(secs // 60)}m" if secs < 86400: return f"{int(secs // 3600)}h" return f"{int(secs // 86400)}d" _MD_HEADER = re.compile(r"^(#{1,3})\s+(.+)$", re.MULTILINE) _MD_BOLD = re.compile(r"\*\*([^*]+)\*\*") def _md_to_html(text: str) -> str: """Tiny markdown subset for log output — headers, bold, paragraphs.""" def header_sub(m): level = min(3, len(m.group(1))) + 1 # h2/h3/h4 return f"{m.group(2).strip()}" out = _MD_HEADER.sub(header_sub, text) out = _MD_BOLD.sub(r"\1", out) # Convert blank-line-separated paragraphs to

blocks. blocks = re.split(r"\n\s*\n", out.strip()) rendered: list[str] = [] for b in blocks: if b.startswith("{b.strip().replace(chr(10), '
')}

") return "\n".join(rendered) # --- Indicators -------------------------------------------------------------- @router.get("/indicators/{group}") async def indicators( group: str, request: Request, as_: str | None = Query(default=None, alias="as"), tone: str | None = Query(default=None), session: AsyncSession = Depends(get_session), principal: CurrentUser | None = Depends(maybe_current_user), ): sub = ( select(Quote.symbol, func.max(Quote.fetched_at).label("mx")) .where(Quote.group_name == group) .group_by(Quote.symbol) .subquery() ) rows = (await session.execute( select(Quote) .join(sub, (Quote.symbol == sub.c.symbol) & (Quote.fetched_at == sub.c.mx)) .where(Quote.group_name == group) .order_by(Quote.symbol) )).scalars().all() if as_ == "html": from app.config import get_settings, load_groups from app.services.market import parse_symbol s_ = get_settings() # Build the set of symbols currently configured for this group AND a # notes lookup keyed by the post-parse identifier (the form stored in # the DB). notes: dict[str, str] = {} configured: set[str] = set() for sym, _lab, note in load_groups(s_.BASELINE_TOML, s_.PORTFOLIO_TOML).get(group, []): _fn, ident = parse_symbol(sym) notes[ident] = note configured.add(ident) # Drop ghost rows: symbols that used to be in this group but were # removed from the TOML — their last quote still sits in the DB until # rollup prunes it, but we shouldn't show them. rows = [r for r in rows if r.symbol in configured] has_anchor = any((r.changes or {}).get("anchor") is not None for r in rows) wanted_tone = _resolve_tone_param(tone) summary = (await session.execute( select(IndicatorSummary) .where(IndicatorSummary.group_name == group) .where(IndicatorSummary.tone == wanted_tone) .order_by(desc(IndicatorSummary.generated_at)) .limit(1) )).scalar_one_or_none() if summary is None: # Fallback during rollout: any tone for this group. summary = (await session.execute( select(IndicatorSummary) .where(IndicatorSummary.group_name == group) .order_by(desc(IndicatorSummary.generated_at)) .limit(1) )).scalar_one_or_none() # Mark rows whose `as_of` is older than the group-specific threshold. # Daily-tape groups (bonds, rates, equity, ...) flag stale earlier # than monthly groups (economy, macro, valuation). today = utcnow().date() threshold = _STALE_DAYS_BY_GROUP.get(group, _STALE_DAYS_DEFAULT) stale_symbols: set[str] = set() for r in rows: try: as_of_d = datetime.strptime(r.as_of, "%Y-%m-%d").date() if r.as_of else None except ValueError: as_of_d = None if as_of_d and (today - as_of_d).days > threshold: stale_symbols.add(r.symbol) await _apply_localized_summary(session, summary, principal) return templates.TemplateResponse( request, "partials/indicators.html", {"quotes": rows, "has_anchor": has_anchor, "summary": summary, "notes": notes, "stale_symbols": stale_symbols, "tone": wanted_tone}, ) return [QuoteOut.model_validate(r, from_attributes=True) for r in rows] # --- News -------------------------------------------------------------------- def _split_tag_param(s: str | None) -> set[str]: """Parse a comma-separated tags query param, lowercase + trim.""" if not s: return set() return {t.strip().lower() for t in s.split(",") if t.strip()} @router.get("/news") async def news_list( request: Request, session: AsyncSession = Depends(get_session), category: str | None = Query(None), since_hours: float = Query(24.0, ge=0.1, le=720.0), limit: int = Query(50, ge=1, le=500), tags: str | None = Query(None, description="comma-separated include list"), exclude_tags: str | None = Query(None, description="comma-separated exclude list"), principal: CurrentUser | None = Depends(maybe_current_user), as_: str | None = Query(default=None, alias="as"), ): from app.services.news_tagging import TAG_LABELS, TAG_VOCABULARY from app.services.access import FREE_NEWS_WINDOW_HOURS, is_paid_active effective_hours = since_hours capped = not is_paid_active(principal) if capped: effective_hours = min(since_hours, FREE_NEWS_WINDOW_HOURS) cutoff = utcnow() - timedelta(hours=effective_hours) stmt = select(Headline).where(Headline.published_at >= cutoff) if category: stmt = stmt.where(Headline.category == category) # Fetch a wider window than `limit` because we tag-filter client-of-DB. # JSON column filters in MariaDB are doable but messy; in-Python is # simple at our scale. stmt = stmt.order_by(desc(Headline.published_at)).limit(max(limit * 3, 200)) rows = (await session.execute(stmt)).scalars().all() include = _split_tag_param(tags) exclude = _split_tag_param(exclude_tags) def _keep(h: Headline) -> bool: ts = set(h.tags or []) if include and not (ts & include): return False if exclude and (ts & exclude): return False return True filtered = [h for h in rows if _keep(h)][:limit] if as_ == "html": now = utcnow() items = [] for h in filtered: when = _as_utc(h.published_at) if h.published_at else None items.append({ "age": _fmt_age(now, h.published_at), "source": h.source, "title": h.title, "url": h.url, "iso": when.isoformat() if when else None, "utc_short": when.strftime("%d %b %H:%M") + "Z" if when else "", "tags": h.tags or [], }) return templates.TemplateResponse( request, "partials/news.html", {"headlines": items, "tag_vocabulary": TAG_VOCABULARY, "tag_labels": TAG_LABELS, "active_include": sorted(include), "active_exclude": sorted(exclude), "capped": capped, "window_hours": effective_hours}, ) return [HeadlineOut.model_validate(r, from_attributes=True) for r in filtered] # --- Strategic log ----------------------------------------------------------- def _log_partial_payload( row: StrategicLog | None, content_override: str | None = None, ) -> dict | None: if row is None: return None content = content_override if content_override is not None else row.content return { "content_html": _md_to_html(content), "generated_at": row.generated_at, "model": row.model, "tone": row.tone, "analysis": row.analysis, "prompt_version": row.prompt_version, "cost_usd": row.cost_usd, "prompt_tokens": row.prompt_tokens, "completion_tokens": row.completion_tokens, } async def _localized_content( session: AsyncSession, row: StrategicLog | None, principal: CurrentUser | None, ) -> str | None: """Return the translated content for ``row`` when the principal has a non-English lang preference and a matching translation row exists. Returns None to signal 'use row.content as-is' (the default English path).""" if row is None or principal is None or principal.user is None: return None lang = (principal.user.lang or "en") if lang == "en": return None t = (await session.execute( select(StrategicLogTranslation) .where(StrategicLogTranslation.log_id == row.id) .where(StrategicLogTranslation.lang == lang) )).scalar_one_or_none() return t.content if t is not None else None async def _apply_localized_summary( session: AsyncSession, row: IndicatorSummary | None, principal: CurrentUser | None, ) -> None: """If ``row`` has a matching translation for ``principal.user.lang``, overwrite the in-memory ``content`` attribute so the template renders the localized version. No DB write happens — the mutation lives only for the lifetime of this GET request. """ if row is None or principal is None or principal.user is None: return lang = (principal.user.lang or "en") if lang == "en": return t = (await session.execute( select(IndicatorSummaryTranslation) .where(IndicatorSummaryTranslation.summary_id == row.id) .where(IndicatorSummaryTranslation.lang == lang) )).scalar_one_or_none() if t is not None: row.content = t.content def _resolve_tone_param(tone: str | None) -> str: """Normalise a query-param tone to one of the two valid values. PRO is silently mapped to INTERMEDIATE (see openrouter.PROMPT_VERSION 6).""" if not tone: return get_settings().CASSANDRA_TONE.upper() upper = tone.upper().strip() if upper in ("NOVICE", "INTERMEDIATE"): return upper return "INTERMEDIATE" def _free_tier_hour_filter(): """Free-tier cadence filter for the strategic log: restrict matches to logs generated at one of the 6-hour boundary hours (00, 06, 12, 18 UTC). The job itself runs at :20 every hour, so this effectively gives free users a fresh log roughly every six hours.""" from app.services.access import FREE_LOG_HOURS_UTC # `func.extract` works on both MariaDB and SQLite. return func.extract("hour", StrategicLog.generated_at).in_(FREE_LOG_HOURS_UTC) @router.get("/log/latest") async def log_latest( request: Request, session: AsyncSession = Depends(get_session), as_: str | None = Query(default=None, alias="as"), tone: str | None = Query(default=None), principal: CurrentUser | None = Depends(maybe_current_user), ): from app.services.access import is_paid_active free_only = not is_paid_active(principal) wanted_tone = _resolve_tone_param(tone) stmt = ( select(StrategicLog) .where(StrategicLog.tone == wanted_tone) .order_by(desc(StrategicLog.generated_at)) .limit(1) ) if free_only: stmt = stmt.where(_free_tier_hour_filter()) row = (await session.execute(stmt)).scalar_one_or_none() # Fallback during rollout: if the requested tone isn't produced yet, # serve whatever is latest rather than 404 the panel. if row is None: fallback = ( select(StrategicLog) .order_by(desc(StrategicLog.generated_at)) .limit(1) ) if free_only: fallback = fallback.where(_free_tier_hour_filter()) row = (await session.execute(fallback)).scalar_one_or_none() if as_ == "html": content_override = await _localized_content(session, row, principal) return templates.TemplateResponse( request, "partials/log.html", {"log": _log_partial_payload(row, content_override=content_override), "tone": wanted_tone, "paid": not free_only}, ) if row is None: raise HTTPException(status_code=404, detail="No strategic log generated yet") return StrategicLogOut.model_validate(row, from_attributes=True) @router.get("/log/by-date/{day}") async def log_by_date( request: Request, day: str, session: AsyncSession = Depends(get_session), as_: str | None = Query(default=None, alias="as"), tone: str | None = Query(default=None), principal: CurrentUser | None = Depends(maybe_current_user), ): """Canonical log for a given day = MAX(generated_at) within that day, filtered by tone (NOVICE | INTERMEDIATE; default from settings). Free-tier users only see logs generated at the 6-hour boundary slots.""" try: target = datetime.strptime(day, "%Y-%m-%d").date() except ValueError: raise HTTPException(status_code=400, detail="day must be YYYY-MM-DD") from app.services.access import is_paid_active free_only = not is_paid_active(principal) wanted_tone = _resolve_tone_param(tone) stmt = ( select(StrategicLog) .where(func.date(StrategicLog.generated_at) == target) .where(StrategicLog.tone == wanted_tone) .order_by(desc(StrategicLog.generated_at)) .limit(1) ) if free_only: stmt = stmt.where(_free_tier_hour_filter()) row = (await session.execute(stmt)).scalar_one_or_none() if row is None: # Fallback: any tone for that day (still tier-filtered). fallback = ( select(StrategicLog) .where(func.date(StrategicLog.generated_at) == target) .order_by(desc(StrategicLog.generated_at)) .limit(1) ) if free_only: fallback = fallback.where(_free_tier_hour_filter()) row = (await session.execute(fallback)).scalar_one_or_none() if as_ == "html": content_override = await _localized_content(session, row, principal) return templates.TemplateResponse( request, "partials/log.html", {"log": _log_partial_payload(row, content_override=content_override), "tone": wanted_tone, "paid": not free_only}, ) if row is None: raise HTTPException(status_code=404, detail="No log on this date") return StrategicLogOut.model_validate(row, from_attributes=True) # --- Calendar archive -------------------------------------------------------- def _month_grid(year: int, month: int) -> list[list[int | None]]: """6 weeks × 7 days, Monday-first; None for cells outside the month.""" weeks = _cal.Calendar(firstweekday=0).monthdayscalendar(year, month) while len(weeks) < 6: weeks.append([0] * 7) return [[d or None for d in w] for w in weeks] def _shift_month(year: int, month: int, delta: int) -> tuple[int, int]: m = month + delta y = year + (m - 1) // 12 m = ((m - 1) % 12) + 1 return y, m @router.get("/log/days") async def log_days( request: Request, month: str = Query(..., pattern=r"^\d{4}-\d{2}$"), selected: str | None = Query(None), session: AsyncSession = Depends(get_session), as_: str | None = Query(default=None, alias="as"), ): """Return the calendar widget for a given month with the days that have a strategic log highlighted.""" year, mnum = (int(p) for p in month.split("-")) month_start = date(year, mnum, 1) next_y, next_m = _shift_month(year, mnum, 1) month_end = date(next_y, next_m, 1) rows = (await session.execute( select(func.distinct(func.date(StrategicLog.generated_at))) .where(StrategicLog.generated_at >= month_start) .where(StrategicLog.generated_at < month_end) )).all() # SQLAlchemy returns date or string depending on dialect; normalise to ints. days_with_logs: set[int] = set() for (d,) in rows: if isinstance(d, str): d = datetime.strptime(d, "%Y-%m-%d").date() days_with_logs.add(d.day) prev_y, prev_m = _shift_month(year, mnum, -1) sel_date: date | None = None if selected: try: sel_date = datetime.strptime(selected, "%Y-%m-%d").date() except ValueError: sel_date = None payload = { "year": year, "month": mnum, "month_name": _cal.month_name[mnum], "grid": _month_grid(year, mnum), "days_with_logs": days_with_logs, "selected": sel_date, "today": datetime.now(timezone.utc).date(), "prev_month": f"{prev_y:04d}-{prev_m:02d}", "next_month": f"{next_y:04d}-{next_m:02d}", } if as_ == "json": return JSONResponse({ "month": month, "days_with_logs": sorted(days_with_logs), "prev_month": payload["prev_month"], "next_month": payload["next_month"], }) return templates.TemplateResponse(request, "partials/calendar.html", payload) # --- Health / ops footer ----------------------------------------------------- # --- Aggregate summary + market status (dashboard header) ------------------- AGGREGATE_GROUP_NAME = "__all__" @router.get("/summary/aggregate") async def aggregate_summary( request: Request, session: AsyncSession = Depends(get_session), as_: str | None = Query(default=None, alias="as"), tone: str | None = Query(default=None), principal: CurrentUser | None = Depends(maybe_current_user), ): wanted_tone = _resolve_tone_param(tone) row = (await session.execute( select(IndicatorSummary) .where(IndicatorSummary.group_name == AGGREGATE_GROUP_NAME) .where(IndicatorSummary.tone == wanted_tone) .order_by(desc(IndicatorSummary.generated_at)) .limit(1) )).scalar_one_or_none() if row is None: row = (await session.execute( select(IndicatorSummary) .where(IndicatorSummary.group_name == AGGREGATE_GROUP_NAME) .order_by(desc(IndicatorSummary.generated_at)) .limit(1) )).scalar_one_or_none() from app.services.markets import all_statuses statuses = all_statuses() if as_ == "html": await _apply_localized_summary(session, row, principal) return templates.TemplateResponse( request, "partials/dashboard_header.html", {"summary": row, "markets": statuses, "tone": wanted_tone}, ) return { "summary": ( {"content": row.content, "generated_at": row.generated_at.isoformat(), "model": row.model} if row else None ), "markets": [ {**m, "until": m["until"].isoformat()} for m in statuses ], } # Market → headline index mapping for the sticky bottom bar. Symbols must # be present in config/default.toml so market_job populates `quotes`. _MARKET_INDEX = { "NYSE": ("^GSPC", "S&P 500"), "LSE": ("^FTSE", "FTSE 100"), # XETRA → Euro Stoxx 50 rather than ^GDAXI: Yahoo's DAX ticker is # patchy via the chart endpoint, and ^STOXX50E is already tracked in # config/default.toml's equity group. "XETRA": ("^STOXX50E", "STOXX 50"), "JPX": ("^N225", "Nikkei 225"), "HKEX": ("^HSI", "Hang Seng"), "SSE": ("000300.SS", "CSI 300"), } def _fmt_price(p: float | None) -> str: if p is None: return "—" if abs(p) >= 1000: return f"{p:,.0f}" if abs(p) >= 100: return f"{p:,.1f}" return f"{p:,.2f}" @router.get("/markets-bar", response_class=HTMLResponse, include_in_schema=False) async def markets_bar( request: Request, session: AsyncSession = Depends(get_session), as_: str | None = Query(default=None, alias="as"), ): """The sticky bottom-bar payload: per-market open/close status with the market's headline index price + 1d change. Refreshed by HTMX every 60s. """ from app.services.markets import all_statuses statuses = all_statuses() # Latest quote per headline-index symbol in one query. wanted_syms = [sym for sym, _ in _MARKET_INDEX.values()] sub = ( select(Quote.symbol, func.max(Quote.fetched_at).label("mx")) .where(Quote.symbol.in_(wanted_syms)) .group_by(Quote.symbol) .subquery() ) rows = (await session.execute( select(Quote).join( sub, (Quote.symbol == sub.c.symbol) & (Quote.fetched_at == sub.c.mx), ) )).scalars().all() by_sym = {q.symbol: q for q in rows} markets: list[dict] = [] for st in statuses: sym, label = _MARKET_INDEX.get(st["code"], (None, None)) q = by_sym.get(sym) if sym else None idx = None if q is not None and q.price is not None: idx = { "symbol": q.symbol, "label": label, "price_fmt": _fmt_price(q.price), "change_1d_pct": (q.changes or {}).get("1d"), } markets.append({ "code": st["code"], "label": st["label"], "open": st["open"], "until_iso": st["until"].isoformat(), "until_hhmm": st["until"].strftime("%H:%M"), "index": idx, }) return templates.TemplateResponse( request, "partials/markets_bar.html", {"markets": markets}, ) @router.get("/health", response_class=HTMLResponse, include_in_schema=False) async def health_html( request: Request, session: AsyncSession = Depends(get_session), as_: str | None = Query(default=None, alias="as"), ): """Returns an HTML fragment by default (the ops footer); ?as=json returns the structured object. The default is HTML because that's how the dashboard consumes it; CLI/curl users will pass ?as=json.""" try: await session.execute(select(func.now())) db_ok = True except Exception: db_ok = False now = utcnow() jobs: list[dict] = [] structured: list[JobStatus] = [] for name in JOB_NAMES: row = (await session.execute( select(JobRun).where(JobRun.name == name) .order_by(desc(JobRun.started_at)).limit(1) )).scalar_one_or_none() if row is None: jobs.append({"name": name, "led": "idle", "age": "—", "last_finished": None}) structured.append(JobStatus(name=name)) continue if row.status == "success": secs = _age_seconds(now, row.finished_at or row.started_at) or 0 led = "ok" if secs < JOB_STALE_HOURS * 3600 else "warn" elif row.status == "skipped": led = "warn" elif row.status == "running": led = "warn" else: led = "err" jobs.append({ "name": name, "led": led, "age": _fmt_age(now, row.finished_at or row.started_at), "last_finished": row.finished_at, }) structured.append(JobStatus( name=name, last_started=row.started_at, last_finished=row.finished_at, status=row.status, error=row.error, items_written=row.items_written, )) if as_ == "json": return JSONResponse( HealthOut(db="ok" if db_ok else "down", jobs=structured).model_dump(mode="json") ) return templates.TemplateResponse( request, "partials/ops_footer.html", {"db_ok": db_ok, "jobs": jobs}, ) # --- Chat ------------------------------------------------------------------- class ChatMessage(BaseModel): role: str = Field(pattern="^(user|assistant)$") content: str class ChatRequest(BaseModel): messages: list[ChatMessage] THESIS_KEYWORDS_FALLBACK = [ "hormuz", "iran", "opec", "brent", "wti", "crude", "oil", "china", "taiwan", "yuan", "fed", "inflation", "cpi", "yield", "gold", "dollar", "yen", "saudi", "russia", "ukraine", "israel", "nato", "defence", "defense", ] async def _latest_quotes_by_group_chat(session: AsyncSession) -> dict[str, list[dict]]: sub = ( select(Quote.group_name, Quote.symbol, func.max(Quote.fetched_at).label("mx")) .group_by(Quote.group_name, Quote.symbol) .subquery() ) rows = (await session.execute( select(Quote).join( sub, (Quote.group_name == sub.c.group_name) & (Quote.symbol == sub.c.symbol) & (Quote.fetched_at == sub.c.mx), ).order_by(Quote.group_name, Quote.symbol) )).scalars().all() by_group: dict[str, list[dict]] = defaultdict(list) for q in rows: by_group[q.group_name].append({ "symbol": q.symbol, "label": q.label, "price": q.price, "currency": q.currency, "as_of": q.as_of, "changes": q.changes, }) return by_group async def _thesis_headlines_for_chat(session: AsyncSession, limit: int = 50) -> list[dict]: cutoff = utcnow() - timedelta(hours=24) rows = (await session.execute( select(Headline) .where(Headline.published_at >= cutoff) .order_by(desc(Headline.published_at)) .limit(300) )).scalars().all() out = [] for h in rows: if any(kw in h.title.lower() for kw in THESIS_KEYWORDS_FALLBACK): out.append({"source": h.source, "title": h.title}) if len(out) >= limit: break return out async def _month_spend(session: AsyncSession) -> float: total = (await session.execute( select(func.coalesce(func.sum(AICall.cost_usd), 0.0)) .where(AICall.called_at >= month_start()) )).scalar() return float(total or 0.0) @router.post("/chat") async def chat( body: ChatRequest, session: AsyncSession = Depends(get_session), principal: CurrentUser | None = Depends(maybe_current_user), ): """Answer one user turn given the conversation so far. Grounded on the latest strategic log + market data + thesis-filtered headlines. Ephemeral — the conversation lives entirely in the client; the endpoint just records each call's cost in `ai_calls`.""" # Paid-only feature. Free users get the static log but not the # interactive chat (see /pricing). from app.services.access import is_paid_active if not is_paid_active(principal): raise HTTPException( status_code=402, detail={"code": "paid_required", "message": "Follow-up chat is a paid-tier feature."}, ) s = get_settings() if not s.OPENROUTER_API_KEY: raise HTTPException(status_code=503, detail="OPENROUTER_API_KEY not set") # Monthly cost cap — same one the log job respects. spent = await _month_spend(session) if spent >= s.OPENROUTER_MONTHLY_CAP_USD: raise HTTPException( status_code=429, detail=f"Monthly OpenRouter cap reached (${spent:.2f})", ) # Trim runaway conversations: keep last 20 turns. history = body.messages[-20:] if not history or history[-1].role != "user": raise HTTPException(status_code=400, detail="Last message must be user") # Gather grounding context. log_row = (await session.execute( select(StrategicLog).order_by(desc(StrategicLog.generated_at)).limit(1) )).scalar_one_or_none() quotes = await _latest_quotes_by_group_chat(session) headlines = await _thesis_headlines_for_chat(session) system_prompt = build_chat_system_prompt( s.CASSANDRA_TONE, s.CASSANDRA_ANALYSIS, log_content=log_row.content if log_row else None, log_generated_at=log_row.generated_at if log_row else None, quotes_by_group=quotes, headlines=headlines, reference_line=REFERENCE_LINE, ) msgs = [{"role": "system", "content": system_prompt}] for m in history: msgs.append({"role": m.role, "content": m.content}) try: async with httpx.AsyncClient(follow_redirects=True) as client: result = await call_llm(client, msgs) except Exception as e: session.add(AICall( model=s.OPENROUTER_MODEL, status="error", error=str(e)[:500], )) await session.commit() raise HTTPException(status_code=502, detail=f"OpenRouter error: {e}") session.add(AICall( model=result.model, prompt_tokens=result.prompt_tokens, completion_tokens=result.completion_tokens, cost_usd=result.cost_usd, status="ok", )) await session.commit() return { "role": "assistant", "content": result.content, "content_html": _md_to_html(result.content), "prompt_tokens": result.prompt_tokens, "completion_tokens": result.completion_tokens, } # --------------------------------------------------------------------------- # Settings — digest preferences # --------------------------------------------------------------------------- class DigestPrefsIn(BaseModel): opt_in: bool tone: Literal["NOVICE", "INTERMEDIATE"] class DigestPrefsOut(BaseModel): opt_in: bool tone: str @router.patch("/settings/digest", response_model=DigestPrefsOut) async def patch_digest_prefs( payload: DigestPrefsIn, principal: CurrentUser = Depends(require_token), session: AsyncSession = Depends(get_session), ) -> DigestPrefsOut: if principal.user is None: # Admin bearer-token path — no per-user row to persist to. raise HTTPException(status_code=400, detail="no_user_context") # require_token loads `principal.user` in its own short-lived session. # By the time this handler runs, that session is closed; mutating the # detached object and committing via `session` would persist nothing. # Re-fetch in the active session before writing. user = await session.get(User, principal.user.id) if user is None: raise HTTPException(status_code=404, detail="user_not_found") user.email_digest_opt_in = payload.opt_in user.digest_tone = payload.tone await session.commit() return DigestPrefsOut(opt_in=payload.opt_in, tone=payload.tone) # --------------------------------------------------------------------------- # Settings — language preference # --------------------------------------------------------------------------- class LanguagePrefsIn(BaseModel): lang: str class LanguagePrefsOut(BaseModel): lang: str @router.patch("/settings/language", response_model=LanguagePrefsOut) async def patch_language_prefs( payload: LanguagePrefsIn, principal: CurrentUser = Depends(require_token), session: AsyncSession = Depends(get_session), ) -> LanguagePrefsOut: if principal.user is None: raise HTTPException(status_code=400, detail="no_user_context") lang = (payload.lang or "").strip().lower() if lang not in ACTIVE_LANGUAGES: raise HTTPException( status_code=400, detail=f"unsupported language: {payload.lang!r}", ) user = await session.get(User, principal.user.id) if user is None: raise HTTPException(status_code=404, detail="user_not_found") user.lang = lang await session.commit() return LanguagePrefsOut(lang=lang)