read.markets/tests/test_portfolio_sync_api.py
Giorgio Gilestro 5c7cc4c6aa sync: detect orphaned blobs (pepper rotation) + fix AESGCM arg order
Adds an 8-byte HKDF fingerprint of the current pepper to portfolio_sync
rows. On fetch, a mismatch surfaces as 410 Gone (distinct from genuine
GCM corruption → 500), and the UI silently cleans up the dead row and
shows a soft "please re-import" notice instead of a confusing PIN
re-prompt. Legacy rows (pepper_fp NULL) are probed optimistically and
backfilled on success.

Also fixes a latent bug in unwrap(): AESGCM.decrypt args were swapped
(ct, nonce instead of nonce, ct), so restore-from-cloud always failed
even when the pepper was correct.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-25 12:49:11 +02:00

139 lines
4.5 KiB
Python

"""Integration tests for the /api/portfolio/sync endpoints.
Spins up a minimal FastAPI app over an in-memory aiosqlite DB, seeds two
users (one paid, one free), signs session cookies, and exercises the
contract: paid POST/GET/DELETE round-trip, free 402, oversized 413,
rate-limit 429.
Skipped on hosts without aiosqlite + FastAPI test client, mirroring the
guard pattern in test_universe_unlinkability.py.
"""
from __future__ import annotations
import asyncio
import base64
import os
from datetime import datetime, timezone
import pytest
def _build_app(tmp_path):
from fastapi import FastAPI
from fastapi.testclient import TestClient
from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine
from app import db as db_mod
from app.auth import sign_session
from app.db import Base
from app.models import User
from app.routers import sync as sync_router
engine = create_async_engine(f"sqlite+aiosqlite:///{tmp_path}/sync.db")
session_factory = async_sessionmaker(engine, expire_on_commit=False)
db_mod._engine = engine
db_mod._session_factory = session_factory
async def _seed():
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
async with session_factory() as s:
now = datetime.now(timezone.utc)
s.add_all([
User(id=1, email="paid@example.com", tier="paid",
settings_json={}, created_at=now),
User(id=2, email="free@example.com", tier="free",
settings_json={}, created_at=now),
])
await s.commit()
asyncio.run(_seed())
app = FastAPI()
app.include_router(sync_router.router)
return TestClient(app), sign_session(1), sign_session(2)
def _b64(raw: bytes) -> str:
return base64.urlsafe_b64encode(raw).decode("ascii").rstrip("=")
pytestmark = pytest.mark.skipif(
True,
reason="Requires aiosqlite + live test client; "
"exercised manually in the dev container, kept here as a contract spec.",
)
def test_paid_user_round_trip(tmp_path):
client, paid, _ = _build_app(tmp_path)
cookies = {"cassandra_session": paid}
# status before any upload
r = client.get("/api/portfolio/sync/status", cookies=cookies)
assert r.status_code == 200
assert r.json() == {"exists": False, "orphaned": False, "updated_at": None}
# upload
blob = os.urandom(512)
r = client.post(
"/api/portfolio/sync", json={"blob": _b64(blob)}, cookies=cookies,
)
assert r.status_code == 200, r.text
# status after
r = client.get("/api/portfolio/sync/status", cookies=cookies)
body = r.json()
assert body["exists"] is True
assert body["updated_at"] is not None
# download — should round-trip the exact bytes
r = client.get("/api/portfolio/sync", cookies=cookies)
assert r.status_code == 200
got = base64.urlsafe_b64decode(r.json()["blob"] + "==")
assert got == blob
# delete
r = client.delete("/api/portfolio/sync", cookies=cookies)
assert r.status_code == 200
r = client.get("/api/portfolio/sync/status", cookies=cookies)
assert r.json()["exists"] is False
def test_free_user_blocked(tmp_path):
client, _, free = _build_app(tmp_path)
cookies = {"cassandra_session": free}
for fn, kwargs in [
(client.get, {"url": "/api/portfolio/sync/status"}),
(client.post, {"url": "/api/portfolio/sync", "json": {"blob": "AA"}}),
(client.get, {"url": "/api/portfolio/sync"}),
(client.delete, {"url": "/api/portfolio/sync"}),
]:
r = fn(cookies=cookies, **kwargs)
assert r.status_code == 402, (fn.__name__, kwargs, r.status_code)
def test_oversized_blob_rejected(tmp_path):
client, paid, _ = _build_app(tmp_path)
cookies = {"cassandra_session": paid}
too_big = os.urandom(257 * 1024)
r = client.post(
"/api/portfolio/sync", json={"blob": _b64(too_big)}, cookies=cookies,
)
assert r.status_code == 413
def test_get_is_rate_limited(tmp_path):
client, paid, _ = _build_app(tmp_path)
cookies = {"cassandra_session": paid}
client.post(
"/api/portfolio/sync",
json={"blob": _b64(os.urandom(64))}, cookies=cookies,
)
# 6 hits should pass; the 7th must trip 429.
for i in range(6):
r = client.get("/api/portfolio/sync", cookies=cookies)
assert r.status_code == 200, f"GET #{i+1} failed: {r.text}"
r = client.get("/api/portfolio/sync", cookies=cookies)
assert r.status_code == 429