"""User authentication primitives: password hashing, signup, login. Argon2id for password hashing (argon2-cffi). itsdangerous for signed session cookies. Tier-aware user creation; phase D adds the actual tier-based feature gating. """ from __future__ import annotations import re from dataclasses import dataclass from argon2 import PasswordHasher from argon2.exceptions import VerifyMismatchError, InvalidHashError from email_validator import EmailNotValidError, validate_email from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from app.db import utcnow from app.models import User # Argon2 default parameters are sensible; we let it pick. _HASHER = PasswordHasher() # Reasonable floor. Real password policy lives in Phase E. MIN_PASSWORD_LENGTH = 8 MAX_PASSWORD_LENGTH = 256 class AuthError(Exception): """Raised when signup/login validation fails. The message is safe to surface to the user as-is.""" def hash_password(plain: str) -> str: return _HASHER.hash(plain) def verify_password(plain: str, hashed: str) -> bool: try: _HASHER.verify(hashed, plain) return True except (VerifyMismatchError, InvalidHashError): return False except Exception: return False def _validate_email_or_raise(email: str) -> str: try: info = validate_email(email, check_deliverability=False) return info.normalized except EmailNotValidError as e: raise AuthError(f"Invalid email: {e}") def _validate_password_or_raise(password: str) -> None: if not isinstance(password, str): raise AuthError("Password must be a string") if len(password) < MIN_PASSWORD_LENGTH: raise AuthError( f"Password must be at least {MIN_PASSWORD_LENGTH} characters" ) if len(password) > MAX_PASSWORD_LENGTH: raise AuthError("Password too long") async def create_user( session: AsyncSession, email: str, password: str, *, tier: str = "free", ) -> User: """Create a new user. Raises AuthError on bad input or duplicate email.""" email = _validate_email_or_raise(email).lower() _validate_password_or_raise(password) existing = (await session.execute( select(User).where(User.email == email) )).scalar_one_or_none() if existing: raise AuthError("An account with this email already exists") user = User( email=email, password_hash=hash_password(password), tier=tier, email_verified=False, # phase E enforces verification settings_json={}, created_at=utcnow(), ) session.add(user) await session.commit() await session.refresh(user) return user async def authenticate( session: AsyncSession, email: str, password: str, ) -> User: """Return the User if credentials match. Raises AuthError on miss. Uses the same generic message for unknown-email and wrong-password to avoid a username-enumeration oracle.""" email = email.strip().lower() user = (await session.execute( select(User).where(User.email == email) )).scalar_one_or_none() # Always run a hash verification even on unknown-email to keep timing # similar (mitigates timing-based user enumeration). if user is None: verify_password(password, "$argon2id$v=19$m=65536,t=3,p=4$" + "a" * 22 + "$" + "b" * 43) raise AuthError("Invalid email or password") if not verify_password(password, user.password_hash): raise AuthError("Invalid email or password") user.last_login_at = utcnow() await session.commit() return user async def get_user(session: AsyncSession, user_id: int) -> User | None: return (await session.execute( select(User).where(User.id == user_id) )).scalar_one_or_none()