email: one-click unsubscribe endpoint w/ signed token
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
a4e585fbfb
commit
a292289dc6
3 changed files with 186 additions and 0 deletions
83
tests/test_email_unsubscribe.py
Normal file
83
tests/test_email_unsubscribe.py
Normal file
|
|
@ -0,0 +1,83 @@
|
|||
"""Unsubscribe token roundtrip + endpoint."""
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
|
||||
|
||||
def _build_app(tmp_path, secret="rt-secret-32-bytes-or-so-padding-here"):
|
||||
import os
|
||||
os.environ["CASSANDRA_SESSION_SECRET"] = secret
|
||||
|
||||
from fastapi import FastAPI
|
||||
from fastapi.testclient import TestClient
|
||||
from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine
|
||||
|
||||
from app import db as db_mod
|
||||
from app.db import Base
|
||||
from app.config import get_settings
|
||||
from app.models import User
|
||||
from app.routers import email as email_router
|
||||
get_settings.cache_clear() # pick up the new env var
|
||||
|
||||
engine = create_async_engine(f"sqlite+aiosqlite:///{tmp_path}/u.db")
|
||||
factory = async_sessionmaker(engine, expire_on_commit=False)
|
||||
db_mod._engine = engine
|
||||
db_mod._session_factory = factory
|
||||
|
||||
async def _seed():
|
||||
async with engine.begin() as conn:
|
||||
await conn.run_sync(Base.metadata.create_all)
|
||||
async with factory() as s:
|
||||
s.add(User(id=42, email="u@x", tier="paid", email_digest_opt_in=True))
|
||||
await s.commit()
|
||||
|
||||
asyncio.run(_seed())
|
||||
|
||||
app = FastAPI()
|
||||
app.include_router(email_router.router)
|
||||
return TestClient(app)
|
||||
|
||||
|
||||
def test_sign_and_verify_token_roundtrip(monkeypatch):
|
||||
monkeypatch.setenv("CASSANDRA_SESSION_SECRET", "rt-secret-32-bytes-or-so-padding-here")
|
||||
from app.config import get_settings
|
||||
get_settings.cache_clear()
|
||||
from app.routers.email import sign_unsubscribe_token, verify_unsubscribe_token
|
||||
tok = sign_unsubscribe_token(42)
|
||||
assert verify_unsubscribe_token(tok) == 42
|
||||
assert verify_unsubscribe_token("garbage") is None
|
||||
|
||||
|
||||
def test_get_unsubscribe_flips_flag(tmp_path):
|
||||
client = _build_app(tmp_path)
|
||||
from app.routers.email import sign_unsubscribe_token
|
||||
tok = sign_unsubscribe_token(42)
|
||||
r = client.get(f"/email/unsubscribe?token={tok}")
|
||||
assert r.status_code == 200
|
||||
assert "unsubscribed" in r.text.lower()
|
||||
|
||||
async def _check():
|
||||
from app import db as db_mod
|
||||
from app.models import User
|
||||
async with db_mod._session_factory() as s:
|
||||
u = await s.get(User, 42)
|
||||
assert u.email_digest_opt_in is False
|
||||
asyncio.run(_check())
|
||||
|
||||
|
||||
def test_get_unsubscribe_invalid_token_returns_generic_page(tmp_path):
|
||||
client = _build_app(tmp_path)
|
||||
r = client.get("/email/unsubscribe?token=garbage")
|
||||
# We don't 4xx — that would leak token validity. Show the generic page.
|
||||
assert r.status_code == 200
|
||||
assert "unsubscribed" in r.text.lower() or "preferences" in r.text.lower()
|
||||
|
||||
|
||||
def test_replay_is_idempotent(tmp_path):
|
||||
client = _build_app(tmp_path)
|
||||
from app.routers.email import sign_unsubscribe_token
|
||||
tok = sign_unsubscribe_token(42)
|
||||
r1 = client.get(f"/email/unsubscribe?token={tok}")
|
||||
r2 = client.get(f"/email/unsubscribe?token={tok}")
|
||||
assert r1.status_code == 200
|
||||
assert r2.status_code == 200
|
||||
Loading…
Add table
Add a link
Reference in a new issue