read.markets/app/jobs/_helpers.py
Giorgio Gilestro a113a7f3ce test+fix: make the suite run cleanly in the test container
Five fixes uncovered by actually running the suite in docker-compose.test.yml:

1. (real prod bug) PATCH /api/settings/digest mutated principal.user which
   require_token had loaded in a now-closed session — the commit on the
   handler's session persisted nothing. Re-fetch the user via the active
   session before writing.

2. Portable PK type. SQLite only auto-fills `INTEGER PRIMARY KEY`; plain
   BIGINT requires explicit values. Define a `_PK` alias of
   `BigInteger().with_variant(Integer(), "sqlite")` and use it for all 10
   autoincrement primary keys in app/models.py. No prod-schema change
   (MariaDB still gets BIGINT).

3. job_lifecycle's MariaDB GET_LOCK / RELEASE_LOCK is now gated behind
   `dialect.name == "mysql"`, so the test SQLite engine doesn't trip on
   the missing function. Single-process test runs can't race themselves.

4. tests/test_news_window.py seeded Headline rows without `fingerprint`,
   which is NOT NULL — added an `fp-{title}` value per row.

5. tests/test_email_digest_job.py now also patches `llm_configured` to
   True so the job doesn't short-circuit on the missing API key.

6. (test container hygiene) Drop `COPY tests ./tests` from the test stage
   in the Dockerfile — .dockerignore excludes `tests/` (correct: prod
   image must not bake tests), and docker-compose.test.yml bind-mounts
   ./tests at run time anyway.

Suite now: 198 passed, 5 skipped, 1 pre-existing failure
(test_default_groups_present — Phase G dropped the "pie" group from
config/default.toml but the assertion wasn't updated; unrelated to this
branch).
2026-05-26 00:11:18 +02:00

63 lines
2.4 KiB
Python

"""Shared job machinery: job_runs lifecycle, MariaDB advisory lock,
mock-mode short-circuits."""
from __future__ import annotations
from contextlib import asynccontextmanager
from typing import AsyncIterator
from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncSession
from app.db import get_session_factory, utcnow
from app.logging import get_logger
from app.models import JobRun
log = get_logger("jobs")
@asynccontextmanager
async def job_lifecycle(name: str) -> AsyncIterator[tuple[AsyncSession, JobRun]]:
"""Wraps a job invocation. Creates a JobRun row on entry; updates it on
exit. Failures are caught by the caller's try/except; this context only
handles the bookkeeping.
A MariaDB GET_LOCK(name, 0) is acquired to prevent concurrent runs of the
same job across processes. If the lock is busy, we skip the run.
The lock dance is MariaDB-specific; on SQLite (used in tests) it's a
no-op, since the single-process test runner can't race itself."""
factory = get_session_factory()
async with factory() as session:
bind = session.get_bind()
use_lock = bind is not None and bind.dialect.name == "mysql"
if use_lock:
got = (await session.execute(
text("SELECT GET_LOCK(:n, 0)"), {"n": f"cassandra_{name}"}
)).scalar()
if not got:
log.warning("job.skipped_locked", name=name)
yield session, JobRun(name=name, started_at=utcnow(), status="skipped")
return
run = JobRun(name=name, started_at=utcnow(), status="running")
session.add(run)
await session.commit()
await session.refresh(run)
try:
yield session, run
run.status = run.status if run.status not in ("running",) else "success"
run.finished_at = utcnow()
await session.commit()
log.info("job.finished",
name=name, status=run.status, items=run.items_written)
except Exception as e:
run.status = "failed"
run.error = str(e)[:1000]
run.finished_at = utcnow()
await session.commit()
log.error("job.failed", name=name, error=str(e))
raise
finally:
if use_lock:
await session.execute(text("SELECT RELEASE_LOCK(:n)"),
{"n": f"cassandra_{name}"})
await session.commit()