diff --git a/alembic/versions/0024_ind_summary_translations.py b/alembic/versions/0024_ind_summary_translations.py new file mode 100644 index 0000000..cde61bc --- /dev/null +++ b/alembic/versions/0024_ind_summary_translations.py @@ -0,0 +1,38 @@ +"""indicator_summary_translations table. + +Revision ID: 0024 +Revises: 0023 +Create Date: 2026-05-27 +""" +from typing import Sequence, Union + +import sqlalchemy as sa +from alembic import op + + +revision: str = "0024" +down_revision: Union[str, None] = "0023" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + op.create_table( + "indicator_summary_translations", + sa.Column("id", sa.BigInteger(), primary_key=True, autoincrement=True), + sa.Column("summary_id", sa.BigInteger(), nullable=False), + sa.Column("lang", sa.String(length=8), nullable=False), + sa.Column("content_md", sa.Text(), nullable=False), + sa.Column("generated_at", sa.DateTime(timezone=True), nullable=False), + sa.Column("llm_model", sa.String(length=64), nullable=True), + sa.Column("llm_cost_usd", sa.Float(), nullable=True), + sa.ForeignKeyConstraint( + ["summary_id"], ["indicator_summaries.id"], + ondelete="CASCADE", name="fk_ist_summary", + ), + sa.UniqueConstraint("summary_id", "lang", name="uq_ist_summary_lang"), + ) + + +def downgrade() -> None: + op.drop_table("indicator_summary_translations") diff --git a/app/jobs/indicator_summary_job.py b/app/jobs/indicator_summary_job.py index 5513a5c..829077b 100644 --- a/app/jobs/indicator_summary_job.py +++ b/app/jobs/indicator_summary_job.py @@ -13,8 +13,15 @@ from app.config import get_settings, load_groups from app.db import utcnow from app.jobs._helpers import job_lifecycle, log from app.jobs._market_context import latest_quotes_by_group, month_spend -from app.models import AICall, IndicatorSummary, JobRun +from app.models import ( + AICall, + IndicatorSummary, + IndicatorSummaryTranslation, + JobRun, + User, +) from app.services.cadence import DEFAULT_POLICY +from app.services.i18n import ACTIVE_LANGUAGES from app.services.openrouter import ( PROMPT_VERSION, active_model, @@ -26,11 +33,58 @@ from app.services.openrouter import ( llm_configured, month_start, ) +from app.services.translation import translate AGGREGATE_GROUP_NAME = "__all__" +async def translate_summary_for_active_languages(session, summary_id: int) -> None: + """Fan out per-language translations for one IndicatorSummary row. + + Mirrors ``ai_log_job.translate_log_for_active_languages``: reads the + distinct non-en ``users.lang`` set, translates the English content + once per active language in parallel via ``asyncio.gather``, and + persists each result as an ``IndicatorSummaryTranslation`` row. + Per-language failures are logged but never raise. + """ + target_langs = sorted({l for l in ACTIVE_LANGUAGES if l != "en"}) + if not target_langs: + return + active_langs = (await session.execute( + select(User.lang).distinct().where(User.lang.in_(target_langs)) + )).scalars().all() + if not active_langs: + return + + summary_row = await session.get(IndicatorSummary, summary_id) + if summary_row is None: + log.warning("ind_summary.translate.missing_summary", summary_id=summary_id) + return + + async with httpx.AsyncClient(follow_redirects=True, timeout=60) as client: + results = await asyncio.gather(*[ + translate(client, summary_row.content, lang) + for lang in active_langs + ], return_exceptions=True) + + for lang, result in zip(active_langs, results): + if isinstance(result, Exception): + log.warning("ind_summary.translate.failed", + lang=lang, summary_id=summary_id, + error=str(result)[:200]) + continue + translated_md, llm_result = result + session.add(IndicatorSummaryTranslation( + summary_id=summary_id, lang=lang, + content_md=translated_md, + generated_at=utcnow(), + llm_model=llm_result.model, + llm_cost_usd=llm_result.cost_usd, + )) + await session.commit() + + # Strip known meta-commentary openers the model sometimes leaks despite the # prompt's hard constraints. Each pattern matches one leading sentence. _LEAK_PATTERNS = [ @@ -140,8 +194,10 @@ def clean_summary(text: str) -> str: async def _generate_one( session, client: httpx.AsyncClient, group: str, quotes: list[dict], system_prompt: str, model: str, tone: str, analysis: str, -) -> bool: - """Generate + persist one group's summary. Returns True on success. +) -> IndicatorSummary | None: + """Generate + persist one group's summary. Returns the new row on + success (so the caller can fan out localized translations after + the commit picks up its id) or None on failure. `model` is retained for ledger labelling but call_llm now picks the active-provider model itself.""" user_prompt = build_summary_user_prompt(group, quotes) @@ -155,7 +211,7 @@ async def _generate_one( except Exception as e: session.add(AICall(model=active_model(), status="error", error=str(e)[:500])) log.warning("ind_summary.failed", group=group, error=str(e)[:120]) - return False + return None cleaned = clean_summary(result.content) if looks_like_leakage(cleaned) or len(cleaned) < 40: @@ -171,9 +227,9 @@ async def _generate_one( cost_usd=result.cost_usd, status="leaked", )) - return False + return None - session.add(IndicatorSummary( + summary = IndicatorSummary( group_name=group, generated_at=utcnow(), model=result.model, @@ -184,7 +240,8 @@ async def _generate_one( prompt_tokens=result.prompt_tokens, completion_tokens=result.completion_tokens, cost_usd=result.cost_usd, - )) + ) + session.add(summary) session.add(AICall( model=result.model, prompt_tokens=result.prompt_tokens, @@ -192,7 +249,7 @@ async def _generate_one( cost_usd=result.cost_usd, status="ok", )) - return True + return summary async def run() -> None: @@ -249,17 +306,20 @@ async def run() -> None: for tone in tones: system_prompt = build_summary_system_prompt(tone, analysis) for group, quotes in groups.items(): - ok = await _generate_one( + summary = await _generate_one( session, client, group, quotes, system_prompt, active_model(), tone, analysis, ) - if ok: + if summary is not None: written += 1 await session.commit() # partial progress survives mid-job error + if summary is not None: + await translate_summary_for_active_languages(session, summary.id) # One aggregate read across all groups, stored under __all__. agg_system = build_aggregate_summary_system_prompt(tone, analysis) agg_user = build_aggregate_summary_user_prompt(groups) + agg_summary: IndicatorSummary | None = None try: result = await call_llm( client, @@ -267,7 +327,7 @@ async def run() -> None: {"role": "user", "content": agg_user}], max_tokens=1500, # room for reasoning + 80-word output ) - session.add(IndicatorSummary( + agg_summary = IndicatorSummary( group_name=AGGREGATE_GROUP_NAME, generated_at=utcnow(), model=result.model, @@ -278,7 +338,8 @@ async def run() -> None: prompt_tokens=result.prompt_tokens, completion_tokens=result.completion_tokens, cost_usd=result.cost_usd, - )) + ) + session.add(agg_summary) session.add(AICall( model=result.model, prompt_tokens=result.prompt_tokens, @@ -294,6 +355,8 @@ async def run() -> None: log.warning("ind_summary.agg_failed", tone=tone, error=str(e)[:120]) await session.commit() + if agg_summary is not None: + await translate_summary_for_active_languages(session, agg_summary.id) jr.items_written = written log.info("ind_summary.done", diff --git a/app/models.py b/app/models.py index 7b0a2ef..4416501 100644 --- a/app/models.py +++ b/app/models.py @@ -172,6 +172,37 @@ class IndicatorSummary(Base): __table_args__ = (Index("ix_indsumm_group_generated", "group_name", "generated_at"),) +class IndicatorSummaryTranslation(Base): + """Cached translation of a single IndicatorSummary row. + + Same pattern as StrategicLogTranslation: one row per + (summary_id, lang). Populated by indicator_summary_job after the + English row is committed. The dashboard / indicators endpoints + swap in the matching translation when a user with a non-en + lang preference loads them, falling back silently to the English + source when no row exists yet. + """ + __tablename__ = "indicator_summary_translations" + + id: Mapped[int] = mapped_column(_PK, primary_key=True, autoincrement=True) + summary_id: Mapped[int] = mapped_column( + BigInteger().with_variant(Integer(), "sqlite"), + ForeignKey("indicator_summaries.id", ondelete="CASCADE"), + nullable=False, + ) + lang: Mapped[str] = mapped_column(String(8), nullable=False) + content_md: Mapped[str] = mapped_column(Text, nullable=False) + generated_at: Mapped[datetime] = mapped_column( + DateTime(timezone=True), nullable=False, default=utcnow, + ) + llm_model: Mapped[str | None] = mapped_column(String(64)) + llm_cost_usd: Mapped[float | None] = mapped_column(Float) + + __table_args__ = ( + UniqueConstraint("summary_id", "lang", name="uq_ist_summary_lang"), + ) + + class AICall(Base): """Cost ledger for OpenRouter calls. Feeds the monthly cap check.""" __tablename__ = "ai_calls" diff --git a/app/routers/api.py b/app/routers/api.py index 7751ed9..30c1c62 100644 --- a/app/routers/api.py +++ b/app/routers/api.py @@ -36,6 +36,7 @@ from app.models import ( AICall, Headline, IndicatorSummary, + IndicatorSummaryTranslation, JobRun, Quote, StrategicLog, @@ -138,6 +139,7 @@ async def indicators( as_: str | None = Query(default=None, alias="as"), tone: str | None = Query(default=None), session: AsyncSession = Depends(get_session), + principal: CurrentUser | None = Depends(maybe_current_user), ): sub = ( select(Quote.symbol, func.max(Quote.fetched_at).label("mx")) @@ -205,6 +207,7 @@ async def indicators( if as_of_d and (today - as_of_d).days > threshold: stale_symbols.add(r.symbol) + await _apply_localized_summary(session, summary, principal) return templates.TemplateResponse( request, "partials/indicators.html", {"quotes": rows, "has_anchor": has_anchor, @@ -340,6 +343,30 @@ async def _localized_content( return t.content_md if t is not None else None +async def _apply_localized_summary( + session: AsyncSession, + row: IndicatorSummary | None, + principal: CurrentUser | None, +) -> None: + """If ``row`` has a matching translation for ``principal.user.lang``, + overwrite the in-memory ``content`` attribute so the template renders + the localized version. No DB write happens — the mutation lives only + for the lifetime of this GET request. + """ + if row is None or principal is None or principal.user is None: + return + lang = (principal.user.lang or "en") + if lang == "en": + return + t = (await session.execute( + select(IndicatorSummaryTranslation) + .where(IndicatorSummaryTranslation.summary_id == row.id) + .where(IndicatorSummaryTranslation.lang == lang) + )).scalar_one_or_none() + if t is not None: + row.content = t.content_md + + def _resolve_tone_param(tone: str | None) -> str: """Normalise a query-param tone to one of the two valid values. PRO is silently mapped to INTERMEDIATE (see openrouter.PROMPT_VERSION 6).""" @@ -552,6 +579,7 @@ async def aggregate_summary( session: AsyncSession = Depends(get_session), as_: str | None = Query(default=None, alias="as"), tone: str | None = Query(default=None), + principal: CurrentUser | None = Depends(maybe_current_user), ): wanted_tone = _resolve_tone_param(tone) row = (await session.execute( @@ -573,6 +601,7 @@ async def aggregate_summary( statuses = all_statuses() if as_ == "html": + await _apply_localized_summary(session, row, principal) return templates.TemplateResponse( request, "partials/dashboard_header.html", {"summary": row, "markets": statuses, "tone": wanted_tone},