From 6c13f855e97a44cf103a8ae68b6bf5ded82c038c Mon Sep 17 00:00:00 2001 From: Giorgio Gilestro Date: Tue, 26 May 2026 17:42:41 +0200 Subject: [PATCH] polar: build /api/polar/webhook handler MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Standalone router for inbound Polar (merchant-of-record) deliveries. No bearer-token dep — authenticity comes from the Standard Webhooks HMAC instead. Wired up so it's safe to deploy dark: empty POLAR_WEBHOOK_SECRET makes the endpoint return 503 (loud) rather than accept unsigned events. Behaviour - Standard Webhooks signature verification: HMAC-SHA256 over `{webhook-id}.{webhook-timestamp}.{body}`, base64 secret prefixed whsec_, ±5min replay window, constant-time compare against any of the space-separated v1 tokens. - Idempotency via UNIQUE on polar_events.event_id — a replayed webhook-id short-circuits to 200 "duplicate" without re-running. - Event dispatch table covers the 10 events we subscribed to: subscription.{created,active,updated,uncanceled} -> tier=paid + persist polar_customer_id / polar_subscription_id. subscription.revoked -> tier=free (customer id kept so a resub matches the same User row). canceled / past_due / order.* / refund.created -> audit only. - Unknown event types are acked 200 + recorded; we don't want to 4xx on something Polar adds in the future and trigger their retry loop. Schema (migration 0018) - users.polar_customer_id, users.polar_subscription_id (both nullable String(64)); UNIQUE on polar_customer_id so two users can't claim the same Polar identity. - polar_events table: event_id (unique), event_type, received_at, processed_at, error, raw payload (truncated to 16 KiB). Tests - 7 in tests/test_polar_webhook.py: bad signature -> 401, stale timestamp -> 401, missing headers -> 400, subscription.active flips tier to paid + stores IDs, subscription.revoked drops to free while keeping customer link, replayed webhook-id is no-op, unknown event is acked. - Full suite: 212 passed, 5 skipped. Operator next steps before saving the webhook in Polar 1. Pull this branch to prod and apply migration 0018. 2. Save the webhook in Polar pointing at https://read.markets/api/polar/webhook — Polar will accept the save even though our endpoint still 503s (no secret yet). 3. Copy the secret Polar reveals into the prod .env as POLAR_WEBHOOK_SECRET=whsec_... and restart the app. 4. Trigger a test event from Polar's dashboard to confirm 200 OK. Co-Authored-By: Claude Opus 4.7 --- alembic/versions/0018_polar_webhook.py | 55 +++++ app/config.py | 7 + app/main.py | 5 + app/models.py | 38 ++++ app/routers/polar_webhook.py | 304 +++++++++++++++++++++++++ tests/test_polar_webhook.py | 215 +++++++++++++++++ 6 files changed, 624 insertions(+) create mode 100644 alembic/versions/0018_polar_webhook.py create mode 100644 app/routers/polar_webhook.py create mode 100644 tests/test_polar_webhook.py diff --git a/alembic/versions/0018_polar_webhook.py b/alembic/versions/0018_polar_webhook.py new file mode 100644 index 0000000..bc085a7 --- /dev/null +++ b/alembic/versions/0018_polar_webhook.py @@ -0,0 +1,55 @@ +"""polar webhook: User.polar_customer_id/subscription_id, polar_events table. + +Revision ID: 0018 +Revises: 0017 +Create Date: 2026-05-26 +""" +from typing import Sequence, Union + +import sqlalchemy as sa +from alembic import op + + +revision: str = "0018" +down_revision: Union[str, None] = "0017" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + op.add_column( + "users", + sa.Column("polar_customer_id", sa.String(length=64), nullable=True), + ) + op.add_column( + "users", + sa.Column("polar_subscription_id", sa.String(length=64), nullable=True), + ) + op.create_unique_constraint( + "uq_users_polar_customer", "users", ["polar_customer_id"], + ) + + op.create_table( + "polar_events", + sa.Column("id", sa.BigInteger(), autoincrement=True, primary_key=True), + sa.Column("event_id", sa.String(length=128), nullable=False), + sa.Column("event_type", sa.String(length=64), nullable=False), + sa.Column("received_at", sa.DateTime(timezone=True), nullable=False), + sa.Column("processed_at", sa.DateTime(timezone=True), nullable=True), + sa.Column("error", sa.Text(), nullable=True), + sa.Column("payload", sa.Text(), nullable=False), + sa.UniqueConstraint("event_id", name="uq_polar_events_event_id"), + ) + op.create_index( + "ix_polar_events_type_received", + "polar_events", + ["event_type", "received_at"], + ) + + +def downgrade() -> None: + op.drop_index("ix_polar_events_type_received", table_name="polar_events") + op.drop_table("polar_events") + op.drop_constraint("uq_users_polar_customer", "users", type_="unique") + op.drop_column("users", "polar_subscription_id") + op.drop_column("users", "polar_customer_id") diff --git a/app/config.py b/app/config.py index d70e8a2..4ff67d6 100644 --- a/app/config.py +++ b/app/config.py @@ -92,6 +92,13 @@ class Settings(BaseSettings): CASSANDRA_ANALYSIS: str = "SPECULATIVE" # DRY | SPECULATIVE BETA_MODE: bool = True # Shows a "BETA" pill in the app header. Flip to False at GA. + # Polar (merchant-of-record). Webhook secret is base64-encoded with a + # `whsec_` prefix in the Polar dashboard; paste it verbatim into the + # env var. Empty = webhook endpoint refuses with 503 (so a misconfig + # is loud rather than silently accepting unsigned events). + POLAR_WEBHOOK_SECRET: str = "" + POLAR_API_KEY: str = "" + # Config file locations (overridable for tests) BASELINE_TOML: Path = Field(default_factory=lambda: CONFIG_DIR / "default.toml") PORTFOLIO_TOML: Path = Field(default_factory=lambda: CONFIG_DIR / "portfolio.toml") diff --git a/app/main.py b/app/main.py index efca3ae..d9c491e 100644 --- a/app/main.py +++ b/app/main.py @@ -21,6 +21,7 @@ from app.routers import api as api_router from app.routers import auth as auth_router from app.routers import email as email_router from app.routers import pages as pages_router +from app.routers import polar_webhook as polar_webhook_router from app.routers import public as public_router from app.routers import sync as sync_router from app.routers import universe as universe_router @@ -88,6 +89,10 @@ app.include_router(email_router.router, tags=["email"]) app.include_router(api_router.router, prefix="/api", tags=["api"]) app.include_router(universe_router.router, prefix="/api", tags=["universe"]) app.include_router(sync_router.router, tags=["portfolio-sync"]) +# Polar webhook (no bearer-token auth — authenticity via HMAC). Path +# `/api/polar/webhook` is set on the route itself so the URL Polar +# stores remains stable even if api_router's prefix ever moves. +app.include_router(polar_webhook_router.router, tags=["polar-webhook"]) # Public router (no auth dep) before pages_router so the marketing/legal # paths can never collide with future authenticated routes. app.include_router(public_router.router) diff --git a/app/models.py b/app/models.py index d655bdf..643b8d8 100644 --- a/app/models.py +++ b/app/models.py @@ -188,10 +188,17 @@ class User(Base): # NULL = use INTERMEDIATE at render time. Server-side mirror of the # dashboard tone, decoupled because the dashboard pref is localStorage. digest_tone: Mapped[str | None] = mapped_column(String(16)) + # Polar (MoR) linkage — populated by the polar_webhook handler the + # first time we see a subscription/order event for the user. The + # customer id is the stable join key; the subscription id is what + # we cancel against from /settings. + polar_customer_id: Mapped[str | None] = mapped_column(String(64), nullable=True) + polar_subscription_id: Mapped[str | None] = mapped_column(String(64), nullable=True) __table_args__ = ( UniqueConstraint("email", name="uq_users_email"), UniqueConstraint("referral_code", name="uq_users_referral_code"), + UniqueConstraint("polar_customer_id", name="uq_users_polar_customer"), ) @@ -347,3 +354,34 @@ class EmailSend(Base): __table_args__ = ( Index("ix_email_sends_user_kind_sent", "user_id", "kind", "sent_at"), ) + + +class PolarEvent(Base): + """Audit + idempotency table for inbound Polar (MoR) webhook deliveries. + + Polar uses the Standard Webhooks spec, which guarantees each delivery + carries a unique `webhook-id` header. We store that ID under a UNIQUE + constraint so a replay of the same event is a no-op (the INSERT fails + and the handler returns the prior result). + + `processed_at` distinguishes "delivered and handled" from "delivered + but the handler crashed mid-flight" — the latter rows are what an + operator looks at when investigating a stuck subscription.""" + __tablename__ = "polar_events" + + id: Mapped[int] = mapped_column(_PK, primary_key=True, autoincrement=True) + event_id: Mapped[str] = mapped_column(String(128), nullable=False) + event_type: Mapped[str] = mapped_column(String(64), nullable=False) + received_at: Mapped[datetime] = mapped_column( + DateTime(timezone=True), default=utcnow, nullable=False, + ) + processed_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True)) + error: Mapped[str | None] = mapped_column(Text) + # Raw JSON body, kept for forensics. Truncated to 16 KiB to keep + # one bad request from blowing up the row. + payload: Mapped[str] = mapped_column(Text, nullable=False) + + __table_args__ = ( + UniqueConstraint("event_id", name="uq_polar_events_event_id"), + Index("ix_polar_events_type_received", "event_type", "received_at"), + ) diff --git a/app/routers/polar_webhook.py b/app/routers/polar_webhook.py new file mode 100644 index 0000000..220ce48 --- /dev/null +++ b/app/routers/polar_webhook.py @@ -0,0 +1,304 @@ +"""Polar (merchant-of-record) webhook endpoint. + +Polar uses the Standard Webhooks spec (https://www.standardwebhooks.com). +Every delivery carries three headers: + + webhook-id — unique ID for THIS delivery (use for idempotency). + webhook-timestamp — Unix seconds at send time (use for replay defence). + webhook-signature — space-separated list of `v1,` + tokens. Verifying any one of them means the + payload is authentic. + +The signed content is the literal string `{id}.{timestamp}.{body}`, signed +with the raw secret bytes (the secret is base64-encoded after the +`whsec_` prefix). We verify in constant time and reject anything that +doesn't match — including stale deliveries older than ±5 minutes — before +parsing JSON or touching the database. + +Idempotency is keyed on `webhook-id` via a unique constraint on +`polar_events.event_id`. A second delivery of the same id finds the row +already there and returns 200 without re-running the handler — Polar +will retry on non-2xx, so we must always 2xx after a successful first +processing. + +The router is mounted without the app's bearer-token dependency: webhook +authenticity is established via the HMAC, not the token.""" +from __future__ import annotations + +import base64 +import hashlib +import hmac +import json +import time +from datetime import datetime, timezone +from typing import Any + +from fastapi import APIRouter, Depends, HTTPException, Request +from sqlalchemy import select +from sqlalchemy.exc import IntegrityError +from sqlalchemy.ext.asyncio import AsyncSession + +from app.config import get_settings +from app.db import get_session, utcnow +from app.logging import get_logger +from app.models import PolarEvent, User + + +log = get_logger("polar_webhook") +router = APIRouter() + + +# Max clock skew we'll tolerate on the `webhook-timestamp` header. Standard +# Webhooks recommends ±5 min; anything older is almost certainly replay. +_TIMESTAMP_TOLERANCE_S = 300 +# Cap stored payload at 16 KiB so a hostile (or buggy) sender can't blow +# up a single row. +_PAYLOAD_STORE_MAX = 16 * 1024 + + +def _decode_secret(secret: str) -> bytes: + """Polar/Standard-Webhooks secrets are base64 with a `whsec_` prefix. + Returns the raw HMAC key. Raises ValueError on malformed input.""" + if not secret: + raise ValueError("empty webhook secret") + s = secret + if s.startswith("whsec_"): + s = s[len("whsec_"):] + return base64.b64decode(s) + + +def _compute_signature(key: bytes, signed_payload: str) -> str: + """Return `v1,` — the format a single signature token uses.""" + mac = hmac.new(key, signed_payload.encode("utf-8"), hashlib.sha256).digest() + return "v1," + base64.b64encode(mac).decode("ascii") + + +def verify_standard_webhook( + *, + secret: str, + msg_id: str, + msg_timestamp: str, + msg_signature: str, + body: bytes, + now: float | None = None, +) -> None: + """Verify a Standard Webhooks delivery. Raises HTTPException(401) on + any failure. No return value — success is "did not raise".""" + try: + key = _decode_secret(secret) + except (ValueError, base64.binascii.Error) as e: + raise HTTPException(status_code=500, detail=f"bad webhook secret: {e}") + + # Timestamp / replay window. + try: + ts = int(msg_timestamp) + except ValueError: + raise HTTPException(status_code=401, detail="invalid timestamp") + drift = abs((now if now is not None else time.time()) - ts) + if drift > _TIMESTAMP_TOLERANCE_S: + raise HTTPException(status_code=401, detail="stale timestamp") + + signed_payload = f"{msg_id}.{msg_timestamp}.{body.decode('utf-8')}" + expected = _compute_signature(key, signed_payload) + + # The header can carry several space-separated tokens (key rotation). + # Any match — in constant time — is success. + candidates = msg_signature.split() + if not any(hmac.compare_digest(expected, c) for c in candidates): + raise HTTPException(status_code=401, detail="bad signature") + + +# --------------------------------------------------------------------------- +# Event handlers +# --------------------------------------------------------------------------- + + +def _customer_id_from_payload(payload_data: dict[str, Any]) -> str | None: + """Polar nests the customer object under `customer`. Some events also + surface `customer_id` at the top of `data` — accept either.""" + cust = payload_data.get("customer") or {} + return cust.get("id") or payload_data.get("customer_id") + + +def _customer_email_from_payload(payload_data: dict[str, Any]) -> str | None: + cust = payload_data.get("customer") or {} + return cust.get("email") + + +async def _find_user(session: AsyncSession, data: dict[str, Any]) -> User | None: + """Locate the User row that owns this event. + + Strategy: join by stored Polar customer id first (the only stable + link once we've seen a user). Fall back to email — the first time + Polar fires an event for a brand-new customer, we won't have the id + yet, but the customer record on Polar's side was created with the + user's email by our checkout call.""" + cid = _customer_id_from_payload(data) + if cid: + row = (await session.execute( + select(User).where(User.polar_customer_id == cid) + )).scalar_one_or_none() + if row is not None: + return row + email = _customer_email_from_payload(data) + if email: + row = (await session.execute( + select(User).where(User.email == email) + )).scalar_one_or_none() + return row + return None + + +async def _grant_paid( + session: AsyncSession, user: User, data: dict[str, Any], +) -> None: + """Flip the user to the paid tier and persist the Polar IDs we now + know. Safe to call repeatedly: tier is idempotent and the IDs only + change if Polar issued new ones.""" + user.tier = "paid" + cid = _customer_id_from_payload(data) + if cid and user.polar_customer_id != cid: + user.polar_customer_id = cid + sub_id = data.get("id") # subscription event payloads put sub id at top + if sub_id and user.polar_subscription_id != sub_id: + user.polar_subscription_id = sub_id + + +async def _revoke_paid(session: AsyncSession, user: User) -> None: + """Drop the user back to the free tier. We deliberately leave the + polar_customer_id in place so a re-subscription matches them back to + the same row.""" + user.tier = "free" + user.polar_subscription_id = None + + +async def _handle_subscription_active( + session: AsyncSession, data: dict[str, Any], event_type: str, +) -> None: + user = await _find_user(session, data) + if user is None: + log.warning("polar.user_not_found", event=event_type, + customer_id=_customer_id_from_payload(data)) + return + await _grant_paid(session, user, data) + + +async def _handle_subscription_revoked( + session: AsyncSession, data: dict[str, Any], event_type: str, +) -> None: + user = await _find_user(session, data) + if user is None: + log.warning("polar.user_not_found", event=event_type, + customer_id=_customer_id_from_payload(data)) + return + await _revoke_paid(session, user) + + +async def _handle_no_state_change( + session: AsyncSession, data: dict[str, Any], event_type: str, +) -> None: + """For events we want to record in the audit table but where the + tier doesn't move — canceled (still active until period end), + uncanceled, past_due, order events, refund created. The PolarEvent + row is the record.""" + return None + + +# Map event type → handler. Anything not in this map is acknowledged +# (200) but ignored, on the principle that Polar may add new event types +# over time and we don't want to start 4xx-ing on unknown ones. +_HANDLERS = { + "subscription.created": _handle_subscription_active, + "subscription.active": _handle_subscription_active, + "subscription.updated": _handle_subscription_active, + "subscription.uncanceled": _handle_subscription_active, + "subscription.canceled": _handle_no_state_change, + "subscription.revoked": _handle_subscription_revoked, + "subscription.past_due": _handle_no_state_change, + "order.paid": _handle_no_state_change, + "order.refunded": _handle_no_state_change, + "refund.created": _handle_no_state_change, +} + + +# --------------------------------------------------------------------------- +# Endpoint +# --------------------------------------------------------------------------- + + +@router.post("/api/polar/webhook") +async def polar_webhook( + request: Request, + session: AsyncSession = Depends(get_session), +) -> dict[str, str]: + s = get_settings() + if not s.POLAR_WEBHOOK_SECRET: + # Loud failure rather than accepting an unsigned event. + raise HTTPException(status_code=503, detail="webhook not configured") + + msg_id = request.headers.get("webhook-id", "") + msg_ts = request.headers.get("webhook-timestamp", "") + msg_sig = request.headers.get("webhook-signature", "") + if not (msg_id and msg_ts and msg_sig): + raise HTTPException(status_code=400, detail="missing standard-webhooks headers") + + body = await request.body() + verify_standard_webhook( + secret=s.POLAR_WEBHOOK_SECRET, + msg_id=msg_id, + msg_timestamp=msg_ts, + msg_signature=msg_sig, + body=body, + ) + + try: + envelope = json.loads(body) + except json.JSONDecodeError: + raise HTTPException(status_code=400, detail="invalid JSON") + + event_type = envelope.get("type") or "unknown" + data = envelope.get("data") or {} + + # Idempotency: insert the audit row first. If the webhook-id was + # already delivered, the UNIQUE constraint short-circuits with a + # 200 (Polar will stop retrying). + body_text = body.decode("utf-8", errors="replace")[:_PAYLOAD_STORE_MAX] + audit = PolarEvent( + event_id=msg_id, + event_type=event_type, + received_at=utcnow(), + payload=body_text, + ) + session.add(audit) + try: + await session.flush() + except IntegrityError: + # Already processed — return 200 so Polar doesn't keep retrying. + await session.rollback() + log.info("polar.duplicate_delivery", event_id=msg_id, type=event_type) + return {"status": "duplicate"} + + handler = _HANDLERS.get(event_type) + if handler is None: + # Unknown but well-signed event — record it, ack 200. + audit.processed_at = utcnow() + await session.commit() + log.info("polar.event_unhandled", type=event_type, id=msg_id) + return {"status": "ignored"} + + try: + await handler(session, data, event_type) + except Exception as e: + # Mark as errored so an operator can see what's stuck, then + # commit + ack 200. We do NOT want Polar to retry an event that + # broke handler logic — the same code will break the same way. + # Operator gets paged from the error column instead. + audit.error = str(e)[:1024] + await session.commit() + log.exception("polar.handler_error", type=event_type, id=msg_id) + return {"status": "handler_error"} + + audit.processed_at = utcnow() + await session.commit() + log.info("polar.processed", type=event_type, id=msg_id) + return {"status": "ok"} diff --git a/tests/test_polar_webhook.py b/tests/test_polar_webhook.py new file mode 100644 index 0000000..7cf4f81 --- /dev/null +++ b/tests/test_polar_webhook.py @@ -0,0 +1,215 @@ +"""Polar (Standard Webhooks) endpoint: signature verification, idempotency, +and the subscription.active -> tier=paid handler. + +Integration-style: real router + in-memory aiosqlite. Same scaffold as +test_news_window.py / test_chat_and_log_gates.py.""" +from __future__ import annotations + +import asyncio +import base64 +import hashlib +import hmac +import json +import time + +import pytest + + +_SECRET_RAW = b"this-is-a-deterministic-test-secret-32b!" +_SECRET = "whsec_" + base64.b64encode(_SECRET_RAW).decode("ascii") + + +def _sign(msg_id: str, ts: str, body: bytes) -> str: + """Produce the `v1,` token Polar would send.""" + signed = f"{msg_id}.{ts}.{body.decode('utf-8')}" + mac = hmac.new(_SECRET_RAW, signed.encode("utf-8"), hashlib.sha256).digest() + return "v1," + base64.b64encode(mac).decode("ascii") + + +def _build_app(tmp_path): + from fastapi import FastAPI + from fastapi.testclient import TestClient + from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine + + from app import db as db_mod + from app.config import get_settings + from app.db import Base + from app.models import User + from app.routers import polar_webhook as polar_router + + # Inject the secret into the cached Settings. We override the + # field rather than monkeypatching env because the secret is read + # via get_settings() at request time. + s = get_settings() + s.POLAR_WEBHOOK_SECRET = _SECRET # type: ignore[misc] + + engine = create_async_engine(f"sqlite+aiosqlite:///{tmp_path}/polar.db") + factory = async_sessionmaker(engine, expire_on_commit=False) + db_mod._engine = engine + db_mod._session_factory = factory + + async def _seed(): + async with engine.begin() as conn: + await conn.run_sync(Base.metadata.create_all) + async with factory() as session: + session.add(User(id=1, email="paying@x", tier="free")) + await session.commit() + + asyncio.run(_seed()) + + app = FastAPI() + app.include_router(polar_router.router) + return TestClient(app), factory + + +def _post(client, *, body: dict, msg_id="msg_001", ts: str | None = None, + sig: str | None = None): + raw = json.dumps(body).encode("utf-8") + ts = ts or str(int(time.time())) + sig = sig or _sign(msg_id, ts, raw) + return client.post( + "/api/polar/webhook", + content=raw, + headers={ + "webhook-id": msg_id, + "webhook-timestamp": ts, + "webhook-signature": sig, + "content-type": "application/json", + }, + ) + + +# --- signature gate -------------------------------------------------------- + + +def test_rejects_bad_signature(tmp_path): + client, _ = _build_app(tmp_path) + raw = json.dumps({"type": "subscription.active", "data": {}}).encode("utf-8") + ts = str(int(time.time())) + r = client.post( + "/api/polar/webhook", + content=raw, + headers={ + "webhook-id": "msg_bad", + "webhook-timestamp": ts, + "webhook-signature": "v1,AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA=", + "content-type": "application/json", + }, + ) + assert r.status_code == 401, r.text + + +def test_rejects_stale_timestamp(tmp_path): + client, _ = _build_app(tmp_path) + body = {"type": "subscription.active", "data": {}} + # 10 minutes in the past — beyond the 5-minute tolerance window. + stale = str(int(time.time()) - 600) + r = _post(client, body=body, ts=stale, msg_id="msg_stale") + assert r.status_code == 401, r.text + + +def test_rejects_missing_headers(tmp_path): + client, _ = _build_app(tmp_path) + r = client.post("/api/polar/webhook", content=b"{}", + headers={"content-type": "application/json"}) + assert r.status_code == 400, r.text + + +# --- happy paths ----------------------------------------------------------- + + +def test_subscription_active_flips_tier_to_paid(tmp_path): + client, factory = _build_app(tmp_path) + body = { + "type": "subscription.active", + "data": { + "id": "sub_abc", + "customer": {"id": "cust_xyz", "email": "paying@x"}, + }, + } + r = _post(client, body=body, msg_id="msg_active") + assert r.status_code == 200, r.text + assert r.json()["status"] == "ok" + + async def _check(): + from sqlalchemy import select + from app.models import User + async with factory() as session: + u = (await session.execute( + select(User).where(User.id == 1) + )).scalar_one() + return u.tier, u.polar_customer_id, u.polar_subscription_id + + tier, cid, sid = asyncio.run(_check()) + assert tier == "paid" + assert cid == "cust_xyz" + assert sid == "sub_abc" + + +def test_subscription_revoked_drops_to_free(tmp_path): + client, factory = _build_app(tmp_path) + # First, activate. + _post(client, body={ + "type": "subscription.active", + "data": {"id": "sub_abc", + "customer": {"id": "cust_xyz", "email": "paying@x"}}, + }, msg_id="msg_act") + # Then, revoke. + r = _post(client, body={ + "type": "subscription.revoked", + "data": {"id": "sub_abc", + "customer": {"id": "cust_xyz", "email": "paying@x"}}, + }, msg_id="msg_rev") + assert r.status_code == 200, r.text + + async def _check(): + from sqlalchemy import select + from app.models import User + async with factory() as session: + u = (await session.execute( + select(User).where(User.id == 1) + )).scalar_one() + return u.tier, u.polar_customer_id, u.polar_subscription_id + + tier, cid, sid = asyncio.run(_check()) + assert tier == "free" + # Customer linkage preserved so a future resub matches the same row. + assert cid == "cust_xyz" + assert sid is None + + +# --- idempotency ----------------------------------------------------------- + + +def test_replayed_event_id_is_a_noop(tmp_path): + client, factory = _build_app(tmp_path) + body = { + "type": "subscription.active", + "data": {"id": "sub_abc", + "customer": {"id": "cust_xyz", "email": "paying@x"}}, + } + # Two POSTs with the same msg_id and body — second should be deduped. + r1 = _post(client, body=body, msg_id="msg_dup") + r2 = _post(client, body=body, msg_id="msg_dup") + assert r1.status_code == 200 and r1.json()["status"] == "ok" + assert r2.status_code == 200 and r2.json()["status"] == "duplicate" + + async def _count(): + from sqlalchemy import select, func + from app.models import PolarEvent + async with factory() as session: + n = (await session.execute( + select(func.count(PolarEvent.id)) + .where(PolarEvent.event_id == "msg_dup") + )).scalar_one() + return n + + assert asyncio.run(_count()) == 1 + + +def test_unknown_event_type_is_acked(tmp_path): + client, _ = _build_app(tmp_path) + body = {"type": "benefit_grant.cycled", "data": {}} + r = _post(client, body=body, msg_id="msg_unknown") + assert r.status_code == 200 + assert r.json()["status"] == "ignored"