Adds an 8-byte HKDF fingerprint of the current pepper to portfolio_sync rows. On fetch, a mismatch surfaces as 410 Gone (distinct from genuine GCM corruption → 500), and the UI silently cleans up the dead row and shows a soft "please re-import" notice instead of a confusing PIN re-prompt. Legacy rows (pepper_fp NULL) are probed optimistically and backfilled on success. Also fixes a latent bug in unwrap(): AESGCM.decrypt args were swapped (ct, nonce instead of nonce, ct), so restore-from-cloud always failed even when the pepper was correct. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
143 lines
4.7 KiB
Python
143 lines
4.7 KiB
Python
"""Encrypted-pie cloud sync — endpoints behind the paid-tier gate.
|
||
|
||
The blob field is base64 because JSON can't carry raw bytes. The server
|
||
treats it as opaque: we only need to know its length (to reject obviously
|
||
oversized payloads) and to hand it back as-is on GET. All crypto for the
|
||
inner layer happens in the browser; we just add the outer wrap in
|
||
app.services.portfolio_sync.
|
||
"""
|
||
from __future__ import annotations
|
||
|
||
import base64
|
||
from datetime import datetime
|
||
|
||
from fastapi import APIRouter, Depends, HTTPException, status
|
||
from pydantic import BaseModel, Field
|
||
from sqlalchemy.ext.asyncio import AsyncSession
|
||
|
||
from app.auth import CurrentUser
|
||
from app.db import get_session
|
||
from app.logging import get_logger
|
||
from app.services import portfolio_sync as svc
|
||
from app.services.access import require_paid
|
||
|
||
|
||
log = get_logger("portfolio_sync_router")
|
||
|
||
router = APIRouter(prefix="/api/portfolio/sync")
|
||
|
||
|
||
# A 256 KB cap is ~200× a typical pie's serialized size — generous
|
||
# headroom for AI analysis blobs the client may bundle later.
|
||
MAX_BLOB_BYTES = 256 * 1024
|
||
|
||
|
||
class SyncBlobIn(BaseModel):
|
||
blob: str = Field(..., description="base64url of the client-side ciphertext")
|
||
|
||
|
||
class SyncBlobOut(BaseModel):
|
||
blob: str
|
||
updated_at: datetime
|
||
|
||
|
||
class SyncStatusOut(BaseModel):
|
||
exists: bool
|
||
orphaned: bool = False
|
||
updated_at: datetime | None = None
|
||
|
||
|
||
class SyncWriteOut(BaseModel):
|
||
updated_at: datetime
|
||
|
||
|
||
def _decode_blob(b64: str) -> bytes:
|
||
"""Tolerates url-safe and standard alphabets, with or without padding."""
|
||
try:
|
||
s = b64.strip()
|
||
# Pad to multiple of 4 — base64 in browsers commonly omits it.
|
||
s += "=" * (-len(s) % 4)
|
||
return base64.urlsafe_b64decode(s)
|
||
except Exception:
|
||
# Last-ditch: try standard alphabet too.
|
||
try:
|
||
return base64.b64decode(b64, validate=False)
|
||
except Exception:
|
||
raise HTTPException(
|
||
status_code=status.HTTP_400_BAD_REQUEST,
|
||
detail="blob must be base64",
|
||
)
|
||
|
||
|
||
@router.get("/status", response_model=SyncStatusOut)
|
||
async def get_status(
|
||
principal: CurrentUser = Depends(require_paid),
|
||
session: AsyncSession = Depends(get_session),
|
||
) -> SyncStatusOut:
|
||
exists, orphaned, updated_at = await svc.fetch_status(session, principal.id)
|
||
return SyncStatusOut(exists=exists, orphaned=orphaned, updated_at=updated_at)
|
||
|
||
|
||
@router.post("", response_model=SyncWriteOut)
|
||
async def upload_blob(
|
||
body: SyncBlobIn,
|
||
principal: CurrentUser = Depends(require_paid),
|
||
session: AsyncSession = Depends(get_session),
|
||
) -> SyncWriteOut:
|
||
raw = _decode_blob(body.blob)
|
||
if len(raw) > MAX_BLOB_BYTES:
|
||
raise HTTPException(
|
||
status_code=status.HTTP_413_REQUEST_ENTITY_TOO_LARGE,
|
||
detail=f"blob exceeds {MAX_BLOB_BYTES} bytes",
|
||
)
|
||
if not raw:
|
||
raise HTTPException(status_code=400, detail="blob is empty")
|
||
updated_at = await svc.upsert(session, principal.id, raw)
|
||
log.info("portfolio_sync.upserted", user_id=principal.id, bytes=len(raw))
|
||
return SyncWriteOut(updated_at=updated_at)
|
||
|
||
|
||
@router.get("", response_model=SyncBlobOut)
|
||
async def download_blob(
|
||
principal: CurrentUser = Depends(require_paid),
|
||
session: AsyncSession = Depends(get_session),
|
||
) -> SyncBlobOut:
|
||
if not await svc.consume_fetch_budget(session, principal.id):
|
||
raise HTTPException(
|
||
status_code=status.HTTP_429_TOO_MANY_REQUESTS,
|
||
detail="too many fetches; try again in a minute",
|
||
)
|
||
try:
|
||
result = await svc.fetch(session, principal.id)
|
||
except svc.SyncOrphanedError:
|
||
# Known state: pepper rotated. The frontend uses 410 to swap the
|
||
# restore form for a "stale — re-upload" CTA. Logged at INFO,
|
||
# not ERROR, because this isn't a server fault.
|
||
log.info("portfolio_sync.orphaned", user_id=principal.id)
|
||
raise HTTPException(
|
||
status_code=status.HTTP_410_GONE,
|
||
detail="stale_blob",
|
||
)
|
||
except svc.SyncCryptoError:
|
||
log.error("portfolio_sync.unwrap_failed", user_id=principal.id)
|
||
raise HTTPException(
|
||
status_code=500,
|
||
detail="server failed to read the encrypted blob",
|
||
)
|
||
if result is None:
|
||
raise HTTPException(status_code=404, detail="no synced portfolio")
|
||
inner, updated_at = result
|
||
return SyncBlobOut(
|
||
blob=base64.urlsafe_b64encode(inner).decode("ascii").rstrip("="),
|
||
updated_at=updated_at,
|
||
)
|
||
|
||
|
||
@router.delete("")
|
||
async def delete_blob(
|
||
principal: CurrentUser = Depends(require_paid),
|
||
session: AsyncSession = Depends(get_session),
|
||
) -> dict:
|
||
removed = await svc.delete(session, principal.id)
|
||
log.info("portfolio_sync.deleted", user_id=principal.id, removed=removed)
|
||
return {"ok": True, "removed": removed}
|