"""Encrypted-pie cloud-sync service. The client encrypts the pie locally with a PIN-derived AES-GCM key — we never see that key. We add a second AES-GCM layer with a per-user key derived from the env-only PORTFOLIO_SYNC_PEPPER, so a DB-only leak yields nothing usable. Stored bytes are opaque from the inside (the inner ciphertext) and from the outside (the outer wrap). Threat-model summary: - DB leaks, env intact: safe (outer key still secret). - env leaks, DB intact: safe (no rows to decrypt). - Both leak: attacker still brute-forces PBKDF2(600k) over the PIN; the fetch endpoint is rate-limited to bound online attempts. - PIN forgotten: unrecoverable. Re-upload the CSV. The service is pure: no FastAPI deps, no logging. The router wires it up. """ from __future__ import annotations import os from datetime import datetime, timedelta, timezone from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives.ciphers.aead import AESGCM from cryptography.hazmat.primitives.kdf.hkdf import HKDF from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from app.config import get_settings from app.models import PortfolioSync # AES-GCM standard nonce is 12 bytes; AES-256 needs a 32-byte key. _NONCE_LEN = 12 _KEY_LEN = 32 # Sliding-window rate limit on GET (the brute-force vector). 6 / 60s ≈ # 100k attempts/year — slow enough that a 6-digit PIN would take a decade # even if the pepper leaked. RATE_LIMIT_WINDOW = timedelta(seconds=60) RATE_LIMIT_MAX = 6 class SyncCryptoError(Exception): """Outer-wrap decryption failed even though the pepper fingerprint matched — i.e. genuine corruption or tampering. The router maps this to a 500.""" class SyncOrphanedError(Exception): """The row was wrapped with a different pepper than the one currently configured (typically: dev-time pepper rotation). The data is permanently unrecoverable, but this is a *known* state, not a server fault — the router maps this to a 410 Gone.""" def _utcnow() -> datetime: return datetime.now(timezone.utc) def _pepper_bytes() -> bytes: """Returns the configured pepper as bytes. Empty in dev / tests; that weakens the outer wrap (key is now derived from user_id alone) but keeps everything functional.""" return get_settings().PORTFOLIO_SYNC_PEPPER.encode("utf-8") def _server_key(user_id: int) -> bytes: """Per-user 32-byte AES key derived from PORTFOLIO_SYNC_PEPPER. HKDF binds the user_id into the `salt` so two users with the same pepper get independent keys, and includes a versioned info string so we can rotate the derivation later without breaking old rows.""" return HKDF( algorithm=hashes.SHA256(), length=_KEY_LEN, salt=str(user_id).encode("utf-8"), info=b"portfolio-sync-v1", ).derive(_pepper_bytes()) _FP_LEN = 8 def current_pepper_fp() -> bytes: """8-byte HKDF-derived fingerprint of the current pepper. Doesn't leak the pepper itself (HKDF is one-way) and is short enough to make accidental collisions across rotations effectively zero (2^-32 birthday floor — fine for a few-row dev install).""" return HKDF( algorithm=hashes.SHA256(), length=_FP_LEN, salt=b"portfolio-sync-pepper-fp", info=b"v1", ).derive(_pepper_bytes()) def wrap(user_id: int, inner_blob: bytes) -> tuple[bytes, bytes]: """Encrypt the client-side ciphertext (`inner_blob`) for storage. Returns (outer_ct, outer_nonce). The nonce is random per write.""" nonce = os.urandom(_NONCE_LEN) ct = AESGCM(_server_key(user_id)).encrypt(nonce, inner_blob, None) return ct, nonce def unwrap(user_id: int, outer_ct: bytes, outer_nonce: bytes) -> bytes: """Inverse of wrap(). Raises SyncCryptoError if the GCM tag fails. AESGCM.decrypt takes (nonce, data, associated_data) — not (data, nonce). The original implementation had the arguments swapped, which meant restore-from-cloud always failed even when the pepper was correct. """ try: return AESGCM(_server_key(user_id)).decrypt(outer_nonce, outer_ct, None) except Exception as exc: # InvalidTag, malformed ciphertext, etc. raise SyncCryptoError("outer wrap unwrap failed") from exc async def upsert(session: AsyncSession, user_id: int, inner_blob: bytes) -> datetime: """Insert or replace this user's sync row. Returns the new updated_at.""" outer_ct, outer_nonce = wrap(user_id, inner_blob) fp = current_pepper_fp() now = _utcnow() row = await session.get(PortfolioSync, user_id) if row is None: row = PortfolioSync( user_id=user_id, outer_ciphertext=outer_ct, outer_nonce=outer_nonce, version=1, created_at=now, updated_at=now, pepper_fp=fp, ) session.add(row) else: row.outer_ciphertext = outer_ct row.outer_nonce = outer_nonce row.updated_at = now # Bump version field forward if we ever change the wrap scheme. row.version = 1 row.pepper_fp = fp await session.commit() return now def _is_orphaned(row: PortfolioSync) -> bool: """A row is orphaned when its stored pepper fingerprint is present and differs from the current pepper's fingerprint. NULL fingerprint (rows from before the pepper_fp column existed) is treated optimistically: we don't know whether the pepper rotated, so we let the fetch path probe with a real unwrap and self-heal on success. Status returns orphaned=False for NULL so the user is offered the Restore form; if unwrap then fails, the GET path returns 410 and the UI flips to the stale state.""" return row.pepper_fp is not None and row.pepper_fp != current_pepper_fp() async def fetch_status( session: AsyncSession, user_id: int, ) -> tuple[bool, bool, datetime | None]: """Cheap existence check — does NOT decrypt. Returns (exists, orphaned, updated_at). Used by the dashboard to decide whether to show the restore prompt vs the "stale, re-upload" prompt. """ row = await session.get(PortfolioSync, user_id) if row is None: return False, False, None return True, _is_orphaned(row), row.updated_at async def fetch( session: AsyncSession, user_id: int, ) -> tuple[bytes, datetime] | None: """Returns (inner_blob, updated_at) or None if sync disabled. Raises SyncOrphanedError if the row's pepper fingerprint mismatches the current pepper, OR if a fingerprint-less legacy row fails to unwrap (which can only mean a pepper rotation, since the arg-order bug fix landed alongside the fingerprint column). Raises SyncCryptoError if the fingerprint matched but the outer wrap still failed (genuine corruption or tampering). On a successful unwrap of a fingerprint-less legacy row, the current pepper's fingerprint is backfilled so subsequent status checks correctly report healthy (and future rotations are detectable). """ row = await session.get(PortfolioSync, user_id) if row is None: return None if _is_orphaned(row): raise SyncOrphanedError("pepper fingerprint mismatch") legacy = row.pepper_fp is None try: inner = unwrap(user_id, row.outer_ciphertext, row.outer_nonce) except SyncCryptoError: if legacy: # Legacy row + decrypt fails = pepper rotated before the # fingerprint column existed. Same observable state as a # post-fingerprint orphan; report it that way. raise SyncOrphanedError("legacy row, decrypt failed") raise if legacy: row.pepper_fp = current_pepper_fp() await session.commit() return inner, row.updated_at async def delete(session: AsyncSession, user_id: int) -> bool: """Returns True if a row was deleted, False if none existed.""" row = await session.get(PortfolioSync, user_id) if row is None: return False await session.delete(row) await session.commit() return True async def consume_fetch_budget(session: AsyncSession, user_id: int) -> bool: """Sliding-window rate-limiter for GET. Returns True if the call is within budget (caller proceeds), False if over (caller returns 429). Window state lives on the row itself — no need for Redis. On no-row, the GET handler will 404 anyway; we return True so it can fall through to that handler. """ row = await session.get(PortfolioSync, user_id) if row is None: return True now = _utcnow() start = row.fetch_window_start # Normalise: aiomysql sometimes returns naive datetimes. if start is not None and start.tzinfo is None: start = start.replace(tzinfo=timezone.utc) if start is None or now - start >= RATE_LIMIT_WINDOW: row.fetch_window_start = now row.fetch_count = 1 await session.commit() return True if row.fetch_count >= RATE_LIMIT_MAX: # Don't bump — over budget. Window expires on its own; no commit. return False row.fetch_count += 1 await session.commit() return True