jobs: per-row savepoint + aggregate logging in translation fan-out

Previously translate_log_for_active_languages and
translate_summary_for_active_languages added every successful
translation to the session and called session.commit() once at the
end. A single bad row (DB error, constraint violation, encoding
mismatch) rolled back the whole batch — losing all the languages that
had succeeded.

Wrap each row in session.begin_nested() so a per-row failure only
loses that one row. Track succeeded/failed counts and log them at the
end — escalating to error if zero succeeded out of N attempted, so
total failure surfaces in monitoring instead of just N warning lines.
This commit is contained in:
Giorgio Gilestro 2026-05-28 12:37:06 +02:00
parent 7348055d72
commit c5fb4525f3
2 changed files with 59 additions and 25 deletions

View file

@ -40,9 +40,9 @@ async def translate_log_for_active_languages(session, log_id: int) -> None:
Reads ``users.lang`` (deduplicated, restricted to ACTIVE_LANGUAGES Reads ``users.lang`` (deduplicated, restricted to ACTIVE_LANGUAGES
minus English), one translation call per language in parallel via minus English), one translation call per language in parallel via
``asyncio.gather``, persists each successful result as a ``asyncio.gather``, persists each successful result as a
``StrategicLogTranslation`` row. Per-language failures are logged ``StrategicLogTranslation`` row. Each row is committed in its own
but never raise the strategic log itself is already committed at savepoint so a per-language LLM error or DB error doesn't roll back
this point and translation is a best-effort enhancement. the languages that already succeeded.
The job orchestrator calls this AFTER the English ``StrategicLog`` The job orchestrator calls this AFTER the English ``StrategicLog``
row is committed; pass the row's ``id`` in. row is committed; pass the row's ``id`` in.
@ -68,12 +68,17 @@ async def translate_log_for_active_languages(session, log_id: int) -> None:
for lang in active_langs for lang in active_langs
], return_exceptions=True) ], return_exceptions=True)
succeeded = 0
failed = 0
for lang, result in zip(active_langs, results): for lang, result in zip(active_langs, results):
if isinstance(result, Exception): if isinstance(result, Exception):
log.warning("log.translate.failed", lang=lang, log_id=log_id, log.warning("log.translate.failed", lang=lang, log_id=log_id,
error=str(result)[:200]) error=str(result)[:200])
failed += 1
continue continue
translated_md, llm_result = result translated_md, llm_result = result
try:
async with session.begin_nested():
session.add(StrategicLogTranslation( session.add(StrategicLogTranslation(
log_id=log_id, lang=lang, log_id=log_id, lang=lang,
content=translated_md, content=translated_md,
@ -84,6 +89,18 @@ async def translate_log_for_active_languages(session, log_id: int) -> None:
cost_usd=llm_result.cost_usd, cost_usd=llm_result.cost_usd,
)) ))
await session.commit() await session.commit()
succeeded += 1
except Exception as exc:
log.warning("log.translate.persist_failed",
lang=lang, log_id=log_id, error=str(exc)[:200])
failed += 1
if failed and succeeded == 0:
log.error("log.translate.all_failed",
log_id=log_id, attempted=len(active_langs))
else:
log.info("log.translate.done",
log_id=log_id, succeeded=succeeded, failed=failed)
async def run() -> None: async def run() -> None:

View file

@ -47,8 +47,8 @@ async def translate_summary_for_active_languages(session, summary_id: int) -> No
Mirrors ``ai_log_job.translate_log_for_active_languages``: reads the Mirrors ``ai_log_job.translate_log_for_active_languages``: reads the
distinct non-en ``users.lang`` set, translates the English content distinct non-en ``users.lang`` set, translates the English content
once per active language in parallel via ``asyncio.gather``, and once per active language in parallel via ``asyncio.gather``, and
persists each result as an ``IndicatorSummaryTranslation`` row. persists each result as an ``IndicatorSummaryTranslation`` row in
Per-language failures are logged but never raise. its own savepoint so one bad row doesn't lose the rest.
""" """
target_langs = sorted({l for l in ACTIVE_LANGUAGES if l != "en"}) target_langs = sorted({l for l in ACTIVE_LANGUAGES if l != "en"})
if not target_langs: if not target_langs:
@ -70,13 +70,18 @@ async def translate_summary_for_active_languages(session, summary_id: int) -> No
for lang in active_langs for lang in active_langs
], return_exceptions=True) ], return_exceptions=True)
succeeded = 0
failed = 0
for lang, result in zip(active_langs, results): for lang, result in zip(active_langs, results):
if isinstance(result, Exception): if isinstance(result, Exception):
log.warning("ind_summary.translate.failed", log.warning("ind_summary.translate.failed",
lang=lang, summary_id=summary_id, lang=lang, summary_id=summary_id,
error=str(result)[:200]) error=str(result)[:200])
failed += 1
continue continue
translated_md, llm_result = result translated_md, llm_result = result
try:
async with session.begin_nested():
session.add(IndicatorSummaryTranslation( session.add(IndicatorSummaryTranslation(
summary_id=summary_id, lang=lang, summary_id=summary_id, lang=lang,
content=translated_md, content=translated_md,
@ -87,6 +92,18 @@ async def translate_summary_for_active_languages(session, summary_id: int) -> No
cost_usd=llm_result.cost_usd, cost_usd=llm_result.cost_usd,
)) ))
await session.commit() await session.commit()
succeeded += 1
except Exception as exc:
log.warning("ind_summary.translate.persist_failed",
lang=lang, summary_id=summary_id, error=str(exc)[:200])
failed += 1
if failed and succeeded == 0:
log.error("ind_summary.translate.all_failed",
summary_id=summary_id, attempted=len(active_langs))
else:
log.info("ind_summary.translate.done",
summary_id=summary_id, succeeded=succeeded, failed=failed)
# Strip known meta-commentary openers the model sometimes leaks despite the # Strip known meta-commentary openers the model sometimes leaks despite the