"""Request-level authentication. Two paths accepted: 1. **Session cookie** (`cassandra_session`) — set by /login. Signed with `CASSANDRA_SESSION_SECRET` via itsdangerous; carries just the user id. On each request we deserialise, then load the User from the DB so the tier value is always fresh. 2. **Bearer token** (`Authorization: Bearer …`) — the legacy single-user path kept as an admin/dev escape hatch and for programmatic API access (CLI, curl, scripts). Matches `CASSANDRA_TOKEN` if set. If neither matches: - HTML requests get 303 → /login - API / curl-style requests get 401 For backwards-compat, `require_token` is an alias for `require_auth` so existing routers that do `dependencies=[Depends(require_token)]` keep working without edit. """ from __future__ import annotations import secrets from dataclasses import dataclass from fastapi import Header, HTTPException, Request, status from itsdangerous import BadSignature, SignatureExpired, URLSafeTimedSerializer from app.config import get_settings from app.db import get_session_factory from app.models import User from app.services.auth_service import get_user SESSION_COOKIE_NAME = "cassandra_session" SESSION_TTL_SECONDS = 14 * 24 * 60 * 60 # 14 days # Short-lived cookie set during signup / unverified-login. Carries the email # under verification so the /verify page knows who's verifying without making # the user retype the address. NOT an auth cookie — never grants access to # anything beyond /verify and /verify/resend. PENDING_COOKIE_NAME = "cassandra_pending" PENDING_TTL_SECONDS = 60 * 60 # 1 hour @dataclass class CurrentUser: """The authenticated principal for the current request. `user` is None when the bearer token was used (admin/dev path with no matching DB row). Routes that need per-user scoping should check `is_admin` and either use a sentinel admin scope or 403.""" is_admin: bool user: User | None @property def id(self) -> int | None: return self.user.id if self.user else None @property def email(self) -> str | None: return self.user.email if self.user else None def _serializer() -> URLSafeTimedSerializer: s = get_settings() secret = s.CASSANDRA_SESSION_SECRET or s.CASSANDRA_TOKEN or "dev-insecure-secret" return URLSafeTimedSerializer(secret, salt="cassandra-session-v1") def sign_session(user_id: int) -> str: return _serializer().dumps({"uid": int(user_id)}) def verify_session(cookie: str) -> int | None: try: data = _serializer().loads(cookie, max_age=SESSION_TTL_SECONDS) return int(data["uid"]) except (BadSignature, SignatureExpired, KeyError, TypeError, ValueError): return None def _pending_serializer() -> URLSafeTimedSerializer: s = get_settings() secret = s.CASSANDRA_SESSION_SECRET or s.CASSANDRA_TOKEN or "dev-insecure-secret" return URLSafeTimedSerializer(secret, salt="cassandra-pending-v1") def sign_pending(email: str, user_id: int) -> str: return _pending_serializer().dumps({"email": email, "uid": int(user_id)}) def verify_pending(cookie: str) -> dict | None: """Returns {"email": str, "uid": int} or None if signature/expiry bad.""" try: data = _pending_serializer().loads(cookie, max_age=PENDING_TTL_SECONDS) return {"email": str(data["email"]), "uid": int(data["uid"])} except (BadSignature, SignatureExpired, KeyError, TypeError, ValueError): return None def _wants_html(request: Request) -> bool: accept = request.headers.get("accept", "").lower() # Treat a missing Accept header as HTML for browser navigations. if not accept: return True return "text/html" in accept and "application/json" not in accept async def require_auth( request: Request, authorization: str | None = Header(default=None), ) -> CurrentUser: """Resolve the current authenticated principal. Raises HTTPException on failure (303 redirect to /login for HTML, 401 for API).""" s = get_settings() # --- 1) Bearer token (admin / dev / scripts) --- if s.CASSANDRA_TOKEN and authorization and authorization.lower().startswith("bearer "): provided = authorization.split(" ", 1)[1].strip() if secrets.compare_digest(provided.encode(), s.CASSANDRA_TOKEN.encode()): principal = CurrentUser(is_admin=True, user=None) request.state.current_user = principal return principal # --- 2) Session cookie (browser) --- cookie = request.cookies.get(SESSION_COOKIE_NAME) if cookie: uid = verify_session(cookie) if uid is not None: async with get_session_factory()() as db_session: user = await get_user(db_session, uid) if user is not None: principal = CurrentUser(is_admin=False, user=user) request.state.current_user = principal return principal # --- 3) Unauthenticated --- if _wants_html(request): # Preserve the originally-requested path so /login can redirect back. path = request.url.path if request.url.query: path += "?" + request.url.query return _raise_redirect_to_login(next_path=path) raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Authentication required", headers={"WWW-Authenticate": "Bearer"}, ) def _raise_redirect_to_login(next_path: str = "/") -> None: # Some pages (login itself) are paths a redirect loop would be silly # to send back to. The auth router opts out of this dependency # entirely, so we don't need to filter here. raise HTTPException( status_code=status.HTTP_303_SEE_OTHER, detail="Login required", headers={"Location": f"/login?next={next_path}"}, ) # Backwards compatibility: every existing router uses Depends(require_token). require_token = require_auth