From e4982cdc047db9ac78d1c23a5d9c2844509fcfe8 Mon Sep 17 00:00:00 2001 From: Giorgio Gilestro Date: Wed, 27 May 2026 16:57:06 +0200 Subject: [PATCH] ai-log-job: translate strategic log for active non-en languages Co-Authored-By: Claude Opus 4.7 --- app/jobs/ai_log_job.py | 61 +++++++++++- tests/test_localization_integration.py | 123 +++++++++++++++++++++++++ 2 files changed, 181 insertions(+), 3 deletions(-) diff --git a/app/jobs/ai_log_job.py b/app/jobs/ai_log_job.py index bc8b488..c0635a7 100644 --- a/app/jobs/ai_log_job.py +++ b/app/jobs/ai_log_job.py @@ -17,8 +17,9 @@ from app.jobs._market_context import ( month_spend, recent_headlines_by_bucket, ) -from app.models import AICall, JobRun, StrategicLog +from app.models import AICall, JobRun, StrategicLog, StrategicLogTranslation, User from app.services.cadence import DEFAULT_POLICY +from app.services.i18n import ACTIVE_LANGUAGES from app.services.openrouter import ( PROMPT_VERSION, active_model, @@ -27,6 +28,58 @@ from app.services.openrouter import ( call_llm, llm_configured, ) +from app.services.translation import translate + + +async def translate_log_for_active_languages(session, log_id: int) -> None: + """Fan out per-language translations for the strategic log identified + by ``log_id``. + + Reads ``users.lang`` (deduplicated, restricted to ACTIVE_LANGUAGES + minus English), one translation call per language in parallel via + ``asyncio.gather``, persists each successful result as a + ``StrategicLogTranslation`` row. Per-language failures are logged + but never raise — the strategic log itself is already committed at + this point and translation is a best-effort enhancement. + + The job orchestrator calls this AFTER the English ``StrategicLog`` + row is committed; pass the row's ``id`` in. + """ + target_langs = sorted({l for l in ACTIVE_LANGUAGES if l != "en"}) + if not target_langs: + return + + active_langs = (await session.execute( + select(User.lang).distinct().where(User.lang.in_(target_langs)) + )).scalars().all() + if not active_langs: + return + + log_row = await session.get(StrategicLog, log_id) + if log_row is None: + log.warning("log.translate.missing_log", log_id=log_id) + return + + async with httpx.AsyncClient(follow_redirects=True, timeout=60) as client: + results = await asyncio.gather(*[ + translate(client, log_row.content, lang) + for lang in active_langs + ], return_exceptions=True) + + for lang, result in zip(active_langs, results): + if isinstance(result, Exception): + log.warning("log.translate.failed", lang=lang, log_id=log_id, + error=str(result)[:200]) + continue + translated_md, llm_result = result + session.add(StrategicLogTranslation( + log_id=log_id, lang=lang, + content_md=translated_md, + generated_at=utcnow(), + llm_model=llm_result.model, + llm_cost_usd=llm_result.cost_usd, + )) + await session.commit() async def run() -> None: @@ -126,7 +179,7 @@ async def run() -> None: tone=tone, analysis=analysis, error=str(e)[:200]) continue - session.add(StrategicLog( + slog = StrategicLog( generated_at=utcnow(), model=result.model, anchor_date=anchor, @@ -137,7 +190,8 @@ async def run() -> None: prompt_tokens=result.prompt_tokens, completion_tokens=result.completion_tokens, cost_usd=result.cost_usd, - )) + ) + session.add(slog) session.add(AICall( model=result.model, prompt_tokens=result.prompt_tokens, @@ -146,6 +200,7 @@ async def run() -> None: status="ok", )) await session.commit() + await translate_log_for_active_languages(session, slog.id) written += 1 log.info("ai_log.variant_done", tone=tone, analysis=analysis, diff --git a/tests/test_localization_integration.py b/tests/test_localization_integration.py index 1079d66..6aac9f9 100644 --- a/tests/test_localization_integration.py +++ b/tests/test_localization_integration.py @@ -53,3 +53,126 @@ def test_strategic_log_translation_model_columns(): assert cols["log_id"].nullable is False assert cols["lang"].nullable is False assert cols["content_md"].nullable is False + + +@pytest.mark.asyncio +async def test_log_translation_fanout_no_active_non_en_users(tmp_path, monkeypatch): + """When no users have an active non-en lang, the fan-out makes no + translation calls and no rows are inserted.""" + from unittest.mock import AsyncMock + from sqlalchemy import select + + from app.db import utcnow + from app.models import StrategicLog, StrategicLogTranslation, User + from app.jobs import ai_log_job + + _, factory, setup = _build_session_factory(tmp_path) + await setup() + + fake_translate = AsyncMock() + monkeypatch.setattr(ai_log_job, "translate", fake_translate) + + # Seed an English user (no non-en users). + async with factory() as session: + session.add(User(id=1, email="en@x", tier="paid", lang="en")) + slog = StrategicLog( + generated_at=utcnow(), content="# Open\n\nDown 0.4%.", + model="test-model", + tone="INTERMEDIATE", analysis="NORMAL", + ) + session.add(slog) + await session.commit() + log_id = slog.id + + async with factory() as session: + await ai_log_job.translate_log_for_active_languages(session, log_id) + + fake_translate.assert_not_awaited() + async with factory() as session: + rows = (await session.execute(select(StrategicLogTranslation))).scalars().all() + assert rows == [] + + +@pytest.mark.asyncio +async def test_log_translation_fanout_italian_user(tmp_path, monkeypatch): + """One user at lang=it triggers one translation; the row lands with + the right lang and log_id.""" + from sqlalchemy import select + + from app.db import utcnow + from app.models import StrategicLog, StrategicLogTranslation, User + from app.services.openrouter import LogResult + from app.jobs import ai_log_job + + _, factory, setup = _build_session_factory(tmp_path) + await setup() + + async def _fake_translate(client, text, target_lang): + assert target_lang == "it" + return "# Apertura\n\nIn calo 0,4%.", LogResult( + content="# Apertura\n\nIn calo 0,4%.", + model="deepseek/deepseek-v4-flash", + prompt_tokens=300, completion_tokens=80, cost_usd=0.00002, + ) + monkeypatch.setattr(ai_log_job, "translate", _fake_translate) + + async with factory() as session: + session.add(User(id=2, email="it@x", tier="paid", lang="it")) + slog = StrategicLog( + generated_at=utcnow(), content="# Open\n\nDown 0.4%.", + model="test-model", + tone="INTERMEDIATE", analysis="NORMAL", + ) + session.add(slog) + await session.commit() + log_id = slog.id + + async with factory() as session: + await ai_log_job.translate_log_for_active_languages(session, log_id) + + async with factory() as session: + rows = (await session.execute(select(StrategicLogTranslation))).scalars().all() + assert len(rows) == 1 + row = rows[0] + assert row.log_id == log_id + assert row.lang == "it" + assert row.content_md.startswith("# Apertura") + assert row.llm_model == "deepseek/deepseek-v4-flash" + assert row.llm_cost_usd == pytest.approx(0.00002) + + +@pytest.mark.asyncio +async def test_log_translation_fanout_per_language_failure_isolated(tmp_path, monkeypatch): + """If one language's translation fails, the others (if any) still land + and the job does not raise.""" + from sqlalchemy import select + + from app.db import utcnow + from app.models import StrategicLog, StrategicLogTranslation, User + from app.jobs import ai_log_job + + _, factory, setup = _build_session_factory(tmp_path) + await setup() + + async def _fake_translate(client, text, target_lang): + raise RuntimeError("upstream down") + monkeypatch.setattr(ai_log_job, "translate", _fake_translate) + + async with factory() as session: + session.add(User(id=3, email="it@x", tier="paid", lang="it")) + slog = StrategicLog( + generated_at=utcnow(), content="# Open", + model="test-model", + tone="INTERMEDIATE", analysis="NORMAL", + ) + session.add(slog) + await session.commit() + log_id = slog.id + + # Must NOT raise. + async with factory() as session: + await ai_log_job.translate_log_for_active_languages(session, log_id) + + async with factory() as session: + rows = (await session.execute(select(StrategicLogTranslation))).scalars().all() + assert rows == []