read.markets/app/services/portfolio_sync.py
Giorgio Gilestro f326b41a08 sync: encrypted cloud backup for portfolios + settings UX rework
Adds opt-in client-side-encrypted portfolio sync (paid). Browser
PBKDF2(PIN) → AES-GCM, server HKDF(pepper, user_id) outer wrap;
server stores opaque bytes only. Sliding-window rate limit on GET.

  - new portfolio_sync table (migration 0015)
  - POST/GET/DELETE /api/portfolio/sync + /status
  - app/services/portfolio_sync.py crypto + rate limit
  - app/routers/sync.py paid-gated
  - app/static/js/portfolio-sync.js WebCrypto wrapper
  - settings page: enable/disable + PIN modal
  - PORTFOLIO_SYNC_PEPPER setting (warn on startup if missing)

Settings + import rework:

  - /upload merged into /settings#import (legacy route 302s)
  - drop CSV → auto-parse → preview → Import only / Import & sync
  - nav slimmed to Dashboard / News / Log
  - Settings + Logout moved to a user dropdown
  - brand logo links to /

Collateral fixes:

  - settings 500: re-fetch User in current session before mutating
    referral_code (assign_code_if_missing was refreshing a User
    loaded in the auth dep's now-closed session)
  - csv_import: distinct error for unfunded T212 pies (all qty=0)
  - db.py: drop pool_pre_ping (aiomysql 0.3.2 incompat); pin
    isolation_level=READ COMMITTED to avoid gap-lock deadlocks
  - alembic env: disable_existing_loggers=False so in-process
    migrations don't silence uvicorn's loggers
  - docker-compose.override.yml: dev-only volume mount + --reload

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-23 16:15:54 +02:00

178 lines
6.3 KiB
Python

"""Encrypted-pie cloud-sync service.
The client encrypts the pie locally with a PIN-derived AES-GCM key — we
never see that key. We add a second AES-GCM layer with a per-user key
derived from the env-only PORTFOLIO_SYNC_PEPPER, so a DB-only leak yields
nothing usable. Stored bytes are opaque from the inside (the inner
ciphertext) and from the outside (the outer wrap).
Threat-model summary:
- DB leaks, env intact: safe (outer key still secret).
- env leaks, DB intact: safe (no rows to decrypt).
- Both leak: attacker still brute-forces PBKDF2(600k)
over the PIN; the fetch endpoint is
rate-limited to bound online attempts.
- PIN forgotten: unrecoverable. Re-upload the CSV.
The service is pure: no FastAPI deps, no logging. The router wires it up.
"""
from __future__ import annotations
import os
from datetime import datetime, timedelta, timezone
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
from cryptography.hazmat.primitives.kdf.hkdf import HKDF
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.config import get_settings
from app.models import PortfolioSync
# AES-GCM standard nonce is 12 bytes; AES-256 needs a 32-byte key.
_NONCE_LEN = 12
_KEY_LEN = 32
# Sliding-window rate limit on GET (the brute-force vector). 6 / 60s ≈
# 100k attempts/year — slow enough that a 6-digit PIN would take a decade
# even if the pepper leaked.
RATE_LIMIT_WINDOW = timedelta(seconds=60)
RATE_LIMIT_MAX = 6
class SyncCryptoError(Exception):
"""Outer-wrap decryption failed — usually a pepper change or
bit-rotted row. The router maps this to a 500."""
def _utcnow() -> datetime:
return datetime.now(timezone.utc)
def _pepper_bytes() -> bytes:
"""Returns the configured pepper as bytes. Empty in dev / tests; that
weakens the outer wrap (key is now derived from user_id alone) but
keeps everything functional."""
return get_settings().PORTFOLIO_SYNC_PEPPER.encode("utf-8")
def _server_key(user_id: int) -> bytes:
"""Per-user 32-byte AES key derived from PORTFOLIO_SYNC_PEPPER. HKDF
binds the user_id into the `salt` so two users with the same pepper
get independent keys, and includes a versioned info string so we can
rotate the derivation later without breaking old rows."""
return HKDF(
algorithm=hashes.SHA256(),
length=_KEY_LEN,
salt=str(user_id).encode("utf-8"),
info=b"portfolio-sync-v1",
).derive(_pepper_bytes())
def wrap(user_id: int, inner_blob: bytes) -> tuple[bytes, bytes]:
"""Encrypt the client-side ciphertext (`inner_blob`) for storage.
Returns (outer_ct, outer_nonce). The nonce is random per write."""
nonce = os.urandom(_NONCE_LEN)
ct = AESGCM(_server_key(user_id)).encrypt(nonce, inner_blob, None)
return ct, nonce
def unwrap(user_id: int, outer_ct: bytes, outer_nonce: bytes) -> bytes:
"""Inverse of wrap(). Raises SyncCryptoError if the GCM tag fails."""
try:
return AESGCM(_server_key(user_id)).decrypt(outer_ct, outer_nonce, None)
except Exception as exc: # InvalidTag, malformed ciphertext, etc.
raise SyncCryptoError("outer wrap unwrap failed") from exc
async def upsert(session: AsyncSession, user_id: int, inner_blob: bytes) -> datetime:
"""Insert or replace this user's sync row. Returns the new updated_at."""
outer_ct, outer_nonce = wrap(user_id, inner_blob)
now = _utcnow()
row = await session.get(PortfolioSync, user_id)
if row is None:
row = PortfolioSync(
user_id=user_id,
outer_ciphertext=outer_ct,
outer_nonce=outer_nonce,
version=1,
created_at=now,
updated_at=now,
)
session.add(row)
else:
row.outer_ciphertext = outer_ct
row.outer_nonce = outer_nonce
row.updated_at = now
# Bump version field forward if we ever change the wrap scheme.
row.version = 1
await session.commit()
return now
async def fetch_status(
session: AsyncSession, user_id: int,
) -> tuple[bool, datetime | None]:
"""Cheap existence check — does NOT decrypt. Used by the dashboard to
decide whether to show the restore prompt."""
row = await session.get(PortfolioSync, user_id)
if row is None:
return False, None
return True, row.updated_at
async def fetch(
session: AsyncSession, user_id: int,
) -> tuple[bytes, datetime] | None:
"""Returns (inner_blob, updated_at) or None if sync disabled.
Raises SyncCryptoError if the row exists but the outer wrap is
unreadable (typically: pepper was rotated without re-encrypting).
"""
row = await session.get(PortfolioSync, user_id)
if row is None:
return None
inner = unwrap(user_id, row.outer_ciphertext, row.outer_nonce)
return inner, row.updated_at
async def delete(session: AsyncSession, user_id: int) -> bool:
"""Returns True if a row was deleted, False if none existed."""
row = await session.get(PortfolioSync, user_id)
if row is None:
return False
await session.delete(row)
await session.commit()
return True
async def consume_fetch_budget(session: AsyncSession, user_id: int) -> bool:
"""Sliding-window rate-limiter for GET. Returns True if the call is
within budget (caller proceeds), False if over (caller returns 429).
Window state lives on the row itself — no need for Redis. On no-row,
the GET handler will 404 anyway; we return True so it can fall
through to that handler.
"""
row = await session.get(PortfolioSync, user_id)
if row is None:
return True
now = _utcnow()
start = row.fetch_window_start
# Normalise: aiomysql sometimes returns naive datetimes.
if start is not None and start.tzinfo is None:
start = start.replace(tzinfo=timezone.utc)
if start is None or now - start >= RATE_LIMIT_WINDOW:
row.fetch_window_start = now
row.fetch_count = 1
await session.commit()
return True
if row.fetch_count >= RATE_LIMIT_MAX:
# Don't bump — over budget. Window expires on its own; no commit.
return False
row.fetch_count += 1
await session.commit()
return True