"""Shared job machinery: job_runs lifecycle, MariaDB advisory lock, mock-mode short-circuits.""" from __future__ import annotations from contextlib import asynccontextmanager from typing import AsyncIterator from sqlalchemy import text from sqlalchemy.ext.asyncio import AsyncSession from app.db import get_session_factory, utcnow from app.logging import get_logger from app.models import JobRun log = get_logger("jobs") @asynccontextmanager async def job_lifecycle(name: str) -> AsyncIterator[tuple[AsyncSession, JobRun]]: """Wraps a job invocation. Creates a JobRun row on entry; updates it on exit. Failures are caught by the caller's try/except; this context only handles the bookkeeping. A MariaDB GET_LOCK(name, 0) is acquired to prevent concurrent runs of the same job across processes. If the lock is busy, we skip the run.""" factory = get_session_factory() async with factory() as session: # Try lock; skip if held. got = (await session.execute( text("SELECT GET_LOCK(:n, 0)"), {"n": f"cassandra_{name}"} )).scalar() if not got: log.warning("job.skipped_locked", name=name) yield session, JobRun(name=name, started_at=utcnow(), status="skipped") return run = JobRun(name=name, started_at=utcnow(), status="running") session.add(run) await session.commit() await session.refresh(run) try: yield session, run run.status = run.status if run.status not in ("running",) else "success" run.finished_at = utcnow() await session.commit() log.info("job.finished", name=name, status=run.status, items=run.items_written) except Exception as e: run.status = "failed" run.error = str(e)[:1000] run.finished_at = utcnow() await session.commit() log.error("job.failed", name=name, error=str(e)) raise finally: await session.execute(text("SELECT RELEASE_LOCK(:n)"), {"n": f"cassandra_{name}"}) await session.commit()