"""Async DB engine, session factory, declarative base, UTC helper.""" from __future__ import annotations from datetime import datetime, timezone from typing import AsyncIterator from sqlalchemy.ext.asyncio import ( AsyncSession, async_sessionmaker, create_async_engine, ) from sqlalchemy.orm import DeclarativeBase from app.config import get_settings class Base(DeclarativeBase): pass _engine = None _session_factory: async_sessionmaker[AsyncSession] | None = None def utcnow() -> datetime: """All timestamps in Cassandra are tz-aware UTC by convention.""" return datetime.now(timezone.utc) def get_engine(): global _engine if _engine is None: s = get_settings() # NB: pool_pre_ping is intentionally OFF. aiomysql 0.3.x made # Connection.ping()'s `reconnect` arg mandatory, but SQLAlchemy's # MySQL pre-ping (2.0.49) calls it without that arg — so every # reused pooled connection raises TypeError, surfacing as an # intermittent 500 (502 behind the proxy). pool_recycle below # (1h, well under MariaDB's 8h wait_timeout) keeps connections # fresh without needing a ping. # # isolation_level READ COMMITTED: under MariaDB's default # REPEATABLE READ, the "invalidate prior unused codes" UPDATE in # otp_service.issue() takes next-key/gap locks on the # (email, created_at) index even when it matches no rows; # concurrent OTP INSERTs then deadlock (errno 1213). READ # COMMITTED drops those gap locks — appropriate here since every # request is a short, self-contained transaction. SQLite (the # test sentinel backend) rejects this level, so set it only for # the real server backends. kwargs: dict = {"pool_recycle": 3600, "future": True} if not s.DATABASE_URL.startswith("sqlite"): kwargs["isolation_level"] = "READ COMMITTED" _engine = create_async_engine(s.DATABASE_URL, **kwargs) return _engine def get_session_factory() -> async_sessionmaker[AsyncSession]: global _session_factory if _session_factory is None: _session_factory = async_sessionmaker( get_engine(), expire_on_commit=False, class_=AsyncSession ) return _session_factory async def get_session() -> AsyncIterator[AsyncSession]: """FastAPI dependency yielding an async session.""" async with get_session_factory()() as session: yield session