"""Encrypted-pie cloud sync — endpoints behind the paid-tier gate. The blob field is base64 because JSON can't carry raw bytes. The server treats it as opaque: we only need to know its length (to reject obviously oversized payloads) and to hand it back as-is on GET. All crypto for the inner layer happens in the browser; we just add the outer wrap in app.services.portfolio_sync. """ from __future__ import annotations import base64 from datetime import datetime from fastapi import APIRouter, Depends, HTTPException, status from pydantic import BaseModel, Field from sqlalchemy.ext.asyncio import AsyncSession from app.auth import CurrentUser from app.db import get_session from app.logging import get_logger from app.services import portfolio_sync as svc from app.services.access import require_paid log = get_logger("portfolio_sync_router") router = APIRouter(prefix="/api/portfolio/sync") # A 256 KB cap is ~200× a typical pie's serialized size — generous # headroom for AI analysis blobs the client may bundle later. MAX_BLOB_BYTES = 256 * 1024 class SyncBlobIn(BaseModel): blob: str = Field(..., description="base64url of the client-side ciphertext") class SyncBlobOut(BaseModel): blob: str updated_at: datetime class SyncStatusOut(BaseModel): exists: bool orphaned: bool = False updated_at: datetime | None = None class SyncWriteOut(BaseModel): updated_at: datetime def _decode_blob(b64: str) -> bytes: """Tolerates url-safe and standard alphabets, with or without padding.""" try: s = b64.strip() # Pad to multiple of 4 — base64 in browsers commonly omits it. s += "=" * (-len(s) % 4) return base64.urlsafe_b64decode(s) except Exception: # Last-ditch: try standard alphabet too. try: return base64.b64decode(b64, validate=False) except Exception: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="blob must be base64", ) @router.get("/status", response_model=SyncStatusOut) async def get_status( principal: CurrentUser = Depends(require_paid), session: AsyncSession = Depends(get_session), ) -> SyncStatusOut: exists, orphaned, updated_at = await svc.fetch_status(session, principal.id) return SyncStatusOut(exists=exists, orphaned=orphaned, updated_at=updated_at) @router.post("", response_model=SyncWriteOut) async def upload_blob( body: SyncBlobIn, principal: CurrentUser = Depends(require_paid), session: AsyncSession = Depends(get_session), ) -> SyncWriteOut: raw = _decode_blob(body.blob) if len(raw) > MAX_BLOB_BYTES: raise HTTPException( status_code=status.HTTP_413_REQUEST_ENTITY_TOO_LARGE, detail=f"blob exceeds {MAX_BLOB_BYTES} bytes", ) if not raw: raise HTTPException(status_code=400, detail="blob is empty") updated_at = await svc.upsert(session, principal.id, raw) log.info("portfolio_sync.upserted", user_id=principal.id, bytes=len(raw)) return SyncWriteOut(updated_at=updated_at) @router.get("", response_model=SyncBlobOut) async def download_blob( principal: CurrentUser = Depends(require_paid), session: AsyncSession = Depends(get_session), ) -> SyncBlobOut: if not await svc.consume_fetch_budget(session, principal.id): raise HTTPException( status_code=status.HTTP_429_TOO_MANY_REQUESTS, detail="too many fetches; try again in a minute", ) try: result = await svc.fetch(session, principal.id) except svc.SyncOrphanedError: # Known state: pepper rotated. The frontend uses 410 to swap the # restore form for a "stale — re-upload" CTA. Logged at INFO, # not ERROR, because this isn't a server fault. log.info("portfolio_sync.orphaned", user_id=principal.id) raise HTTPException( status_code=status.HTTP_410_GONE, detail="stale_blob", ) except svc.SyncCryptoError: log.error("portfolio_sync.unwrap_failed", user_id=principal.id) raise HTTPException( status_code=500, detail="server failed to read the encrypted blob", ) if result is None: raise HTTPException(status_code=404, detail="no synced portfolio") inner, updated_at = result return SyncBlobOut( blob=base64.urlsafe_b64encode(inner).decode("ascii").rstrip("="), updated_at=updated_at, ) @router.delete("") async def delete_blob( principal: CurrentUser = Depends(require_paid), session: AsyncSession = Depends(get_session), ) -> dict: removed = await svc.delete(session, principal.id) log.info("portfolio_sync.deleted", user_id=principal.id, removed=removed) return {"ok": True, "removed": removed}