ai-log-job: translate strategic log for active non-en languages

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
Giorgio Gilestro 2026-05-27 16:57:06 +02:00
parent e190d0e35b
commit e4982cdc04
2 changed files with 181 additions and 3 deletions

View file

@ -17,8 +17,9 @@ from app.jobs._market_context import (
month_spend,
recent_headlines_by_bucket,
)
from app.models import AICall, JobRun, StrategicLog
from app.models import AICall, JobRun, StrategicLog, StrategicLogTranslation, User
from app.services.cadence import DEFAULT_POLICY
from app.services.i18n import ACTIVE_LANGUAGES
from app.services.openrouter import (
PROMPT_VERSION,
active_model,
@ -27,6 +28,58 @@ from app.services.openrouter import (
call_llm,
llm_configured,
)
from app.services.translation import translate
async def translate_log_for_active_languages(session, log_id: int) -> None:
"""Fan out per-language translations for the strategic log identified
by ``log_id``.
Reads ``users.lang`` (deduplicated, restricted to ACTIVE_LANGUAGES
minus English), one translation call per language in parallel via
``asyncio.gather``, persists each successful result as a
``StrategicLogTranslation`` row. Per-language failures are logged
but never raise the strategic log itself is already committed at
this point and translation is a best-effort enhancement.
The job orchestrator calls this AFTER the English ``StrategicLog``
row is committed; pass the row's ``id`` in.
"""
target_langs = sorted({l for l in ACTIVE_LANGUAGES if l != "en"})
if not target_langs:
return
active_langs = (await session.execute(
select(User.lang).distinct().where(User.lang.in_(target_langs))
)).scalars().all()
if not active_langs:
return
log_row = await session.get(StrategicLog, log_id)
if log_row is None:
log.warning("log.translate.missing_log", log_id=log_id)
return
async with httpx.AsyncClient(follow_redirects=True, timeout=60) as client:
results = await asyncio.gather(*[
translate(client, log_row.content, lang)
for lang in active_langs
], return_exceptions=True)
for lang, result in zip(active_langs, results):
if isinstance(result, Exception):
log.warning("log.translate.failed", lang=lang, log_id=log_id,
error=str(result)[:200])
continue
translated_md, llm_result = result
session.add(StrategicLogTranslation(
log_id=log_id, lang=lang,
content_md=translated_md,
generated_at=utcnow(),
llm_model=llm_result.model,
llm_cost_usd=llm_result.cost_usd,
))
await session.commit()
async def run() -> None:
@ -126,7 +179,7 @@ async def run() -> None:
tone=tone, analysis=analysis, error=str(e)[:200])
continue
session.add(StrategicLog(
slog = StrategicLog(
generated_at=utcnow(),
model=result.model,
anchor_date=anchor,
@ -137,7 +190,8 @@ async def run() -> None:
prompt_tokens=result.prompt_tokens,
completion_tokens=result.completion_tokens,
cost_usd=result.cost_usd,
))
)
session.add(slog)
session.add(AICall(
model=result.model,
prompt_tokens=result.prompt_tokens,
@ -146,6 +200,7 @@ async def run() -> None:
status="ok",
))
await session.commit()
await translate_log_for_active_languages(session, slog.id)
written += 1
log.info("ai_log.variant_done",
tone=tone, analysis=analysis,

View file

@ -53,3 +53,126 @@ def test_strategic_log_translation_model_columns():
assert cols["log_id"].nullable is False
assert cols["lang"].nullable is False
assert cols["content_md"].nullable is False
@pytest.mark.asyncio
async def test_log_translation_fanout_no_active_non_en_users(tmp_path, monkeypatch):
"""When no users have an active non-en lang, the fan-out makes no
translation calls and no rows are inserted."""
from unittest.mock import AsyncMock
from sqlalchemy import select
from app.db import utcnow
from app.models import StrategicLog, StrategicLogTranslation, User
from app.jobs import ai_log_job
_, factory, setup = _build_session_factory(tmp_path)
await setup()
fake_translate = AsyncMock()
monkeypatch.setattr(ai_log_job, "translate", fake_translate)
# Seed an English user (no non-en users).
async with factory() as session:
session.add(User(id=1, email="en@x", tier="paid", lang="en"))
slog = StrategicLog(
generated_at=utcnow(), content="# Open\n\nDown 0.4%.",
model="test-model",
tone="INTERMEDIATE", analysis="NORMAL",
)
session.add(slog)
await session.commit()
log_id = slog.id
async with factory() as session:
await ai_log_job.translate_log_for_active_languages(session, log_id)
fake_translate.assert_not_awaited()
async with factory() as session:
rows = (await session.execute(select(StrategicLogTranslation))).scalars().all()
assert rows == []
@pytest.mark.asyncio
async def test_log_translation_fanout_italian_user(tmp_path, monkeypatch):
"""One user at lang=it triggers one translation; the row lands with
the right lang and log_id."""
from sqlalchemy import select
from app.db import utcnow
from app.models import StrategicLog, StrategicLogTranslation, User
from app.services.openrouter import LogResult
from app.jobs import ai_log_job
_, factory, setup = _build_session_factory(tmp_path)
await setup()
async def _fake_translate(client, text, target_lang):
assert target_lang == "it"
return "# Apertura\n\nIn calo 0,4%.", LogResult(
content="# Apertura\n\nIn calo 0,4%.",
model="deepseek/deepseek-v4-flash",
prompt_tokens=300, completion_tokens=80, cost_usd=0.00002,
)
monkeypatch.setattr(ai_log_job, "translate", _fake_translate)
async with factory() as session:
session.add(User(id=2, email="it@x", tier="paid", lang="it"))
slog = StrategicLog(
generated_at=utcnow(), content="# Open\n\nDown 0.4%.",
model="test-model",
tone="INTERMEDIATE", analysis="NORMAL",
)
session.add(slog)
await session.commit()
log_id = slog.id
async with factory() as session:
await ai_log_job.translate_log_for_active_languages(session, log_id)
async with factory() as session:
rows = (await session.execute(select(StrategicLogTranslation))).scalars().all()
assert len(rows) == 1
row = rows[0]
assert row.log_id == log_id
assert row.lang == "it"
assert row.content_md.startswith("# Apertura")
assert row.llm_model == "deepseek/deepseek-v4-flash"
assert row.llm_cost_usd == pytest.approx(0.00002)
@pytest.mark.asyncio
async def test_log_translation_fanout_per_language_failure_isolated(tmp_path, monkeypatch):
"""If one language's translation fails, the others (if any) still land
and the job does not raise."""
from sqlalchemy import select
from app.db import utcnow
from app.models import StrategicLog, StrategicLogTranslation, User
from app.jobs import ai_log_job
_, factory, setup = _build_session_factory(tmp_path)
await setup()
async def _fake_translate(client, text, target_lang):
raise RuntimeError("upstream down")
monkeypatch.setattr(ai_log_job, "translate", _fake_translate)
async with factory() as session:
session.add(User(id=3, email="it@x", tier="paid", lang="it"))
slog = StrategicLog(
generated_at=utcnow(), content="# Open",
model="test-model",
tone="INTERMEDIATE", analysis="NORMAL",
)
session.add(slog)
await session.commit()
log_id = slog.id
# Must NOT raise.
async with factory() as session:
await ai_log_job.translate_log_for_active_languages(session, log_id)
async with factory() as session:
rows = (await session.execute(select(StrategicLogTranslation))).scalars().all()
assert rows == []