phase D milestones 1+2: referral system + paid-access gate
Lays the billing-prep spine before Paddle lands in D.3.
D.1 — referrals
- users.referral_code: unique 8-char URL-safe code (alphabet excludes the
ambiguous 0/O/1/I/L). Generated lazily on first /settings hit so existing
accounts pick one up without a backfill migration.
- users.referred_by_user_id + new referrals audit table (referrer,
referred, created_at, converted_at, credited_at). converted_at /
credited_at stay null until D.3 fills them via the Paddle webhook.
- POST /login accepts ?ref=<code>; the code rides on the signed
pending-verify cookie so it survives the GET → POST → /verify hop.
- /settings page: email, tier badge, referral code chip + invite link
with one-click copy, pending/converted/active-credits stats grid.
Settings nav link added to the top bar.
Reward shape: when the referred user makes their first paid Paddle
subscription, both they and the referrer get 50% off for 3 months.
(D.3 wires the actual credit application via the Paddle webhook.)
D.2 — paid-access gate
- users.credit_until: timestamp until which a free-tier account has
paid-tier access. Null = no credit. Populated by admin CLI now and the
D.3 webhook later.
- app.services.access exposes paid_status(user) → PaidStatus dataclass
(active / source / expires_at / days_remaining), is_paid_active() with
admin-bearer-token bypass, and a require_paid FastAPI dependency that
raises 402 Payment Required for free-tier callers.
- POST /api/analyze (portfolio AI commentary) gated behind require_paid.
- Settings page surfaces credit window when active ("free · credit · N
day(s) remaining (expires YYYY-MM-DD)") and the upgrade hint when not.
- Admin CLI: python -m app.cli {grant-credit,revoke-credit,show-status}.
grant-credit is idempotent — extends from max(now, current expiry) so
re-running the command never erodes an existing grant.
Migrations 0013 (referrals) and 0014 (credit_until). Tests cover the
paid-status truth table, code generation + normalisation, CLI argument
parsing, and the pending-cookie ref roundtrip (29 new tests).
This commit is contained in:
parent
2013bfa8cc
commit
9759080134
18 changed files with 1159 additions and 21 deletions
133
tests/test_access.py
Normal file
133
tests/test_access.py
Normal file
|
|
@ -0,0 +1,133 @@
|
|||
"""Unit tests for app.services.access — the paid-tier gate.
|
||||
|
||||
No DB; we hand-construct ``User`` rows and ``CurrentUser`` principals
|
||||
directly. The point is to nail down the truth table:
|
||||
|
||||
tier | credit_until | active | source
|
||||
-------------|-------------------|--------|--------
|
||||
free | None | False | None
|
||||
free | past | False | None
|
||||
free | future | True | credit
|
||||
paid | None | True | tier
|
||||
paid | future | True | tier (tier wins)
|
||||
enterprise | None | True | tier
|
||||
admin bearer | n/a | True | (bypass)
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from types import SimpleNamespace
|
||||
|
||||
import pytest
|
||||
|
||||
from app.auth import CurrentUser
|
||||
from app.services.access import is_paid_active, paid_status
|
||||
|
||||
|
||||
def _utcnow() -> datetime:
|
||||
return datetime.now(timezone.utc)
|
||||
|
||||
|
||||
def _make_user(*, tier: str = "free", credit_until: datetime | None = None):
|
||||
"""Build something User-shaped without touching SQLAlchemy."""
|
||||
return SimpleNamespace(tier=tier, credit_until=credit_until)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# paid_status — the truth table
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_paid_status_free_no_credit():
|
||||
st = paid_status(_make_user(tier="free"))
|
||||
assert st.active is False
|
||||
assert st.source is None
|
||||
assert st.expires_at is None
|
||||
assert st.days_remaining is None
|
||||
|
||||
|
||||
def test_paid_status_free_expired_credit():
|
||||
st = paid_status(_make_user(tier="free", credit_until=_utcnow() - timedelta(days=1)))
|
||||
assert st.active is False
|
||||
assert st.source is None
|
||||
|
||||
|
||||
def test_paid_status_free_future_credit():
|
||||
expiry = _utcnow() + timedelta(days=45)
|
||||
st = paid_status(_make_user(tier="free", credit_until=expiry))
|
||||
assert st.active is True
|
||||
assert st.source == "credit"
|
||||
assert st.expires_at == expiry
|
||||
# Allow ±1 day slack for clock drift; integer-days floors.
|
||||
assert 44 <= st.days_remaining <= 45
|
||||
|
||||
|
||||
def test_paid_status_paid_tier_no_credit():
|
||||
st = paid_status(_make_user(tier="paid"))
|
||||
assert st.active is True
|
||||
assert st.source == "tier"
|
||||
assert st.expires_at is None
|
||||
|
||||
|
||||
def test_paid_status_paid_tier_wins_over_credit():
|
||||
"""A paid subscription dominates — we surface 'tier' even if a
|
||||
credit row also exists. Avoids confusing the user with 'X days
|
||||
remaining' when they're actually on a rolling subscription."""
|
||||
st = paid_status(_make_user(tier="paid", credit_until=_utcnow() + timedelta(days=10)))
|
||||
assert st.source == "tier"
|
||||
assert st.days_remaining is None
|
||||
|
||||
|
||||
def test_paid_status_enterprise_tier():
|
||||
st = paid_status(_make_user(tier="enterprise"))
|
||||
assert st.active is True
|
||||
assert st.source == "tier"
|
||||
|
||||
|
||||
def test_paid_status_none_user():
|
||||
"""No DB row → no paid status. Admin bearer-token hits this path."""
|
||||
st = paid_status(None)
|
||||
assert st.active is False
|
||||
assert st.source is None
|
||||
|
||||
|
||||
def test_paid_status_handles_naive_datetime():
|
||||
"""MariaDB+aiomysql sometimes returns DateTime(timezone=True) as a
|
||||
naive datetime. The helper must normalise rather than raising
|
||||
'can't compare offset-naive and offset-aware'."""
|
||||
naive_future = (_utcnow() + timedelta(days=5)).replace(tzinfo=None)
|
||||
st = paid_status(_make_user(credit_until=naive_future))
|
||||
assert st.active is True
|
||||
assert st.source == "credit"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# is_paid_active — sugar + admin bypass
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_is_paid_active_admin_bearer_bypass():
|
||||
"""Admin bearer-token (is_admin=True, user=None) always passes — the
|
||||
dev/CLI path must not be artificially gated."""
|
||||
principal = CurrentUser(is_admin=True, user=None)
|
||||
assert is_paid_active(principal) is True
|
||||
|
||||
|
||||
def test_is_paid_active_free_user_principal():
|
||||
principal = CurrentUser(is_admin=False, user=_make_user(tier="free"))
|
||||
assert is_paid_active(principal) is False
|
||||
|
||||
|
||||
def test_is_paid_active_paid_user_principal():
|
||||
principal = CurrentUser(is_admin=False, user=_make_user(tier="paid"))
|
||||
assert is_paid_active(principal) is True
|
||||
|
||||
|
||||
def test_is_paid_active_accepts_bare_user():
|
||||
"""Sugar: accepts a User row directly, not just a CurrentUser."""
|
||||
assert is_paid_active(_make_user(tier="paid")) is True
|
||||
assert is_paid_active(_make_user(tier="free")) is False
|
||||
|
||||
|
||||
def test_is_paid_active_none():
|
||||
assert is_paid_active(None) is False
|
||||
49
tests/test_cli.py
Normal file
49
tests/test_cli.py
Normal file
|
|
@ -0,0 +1,49 @@
|
|||
"""Unit tests for app.cli.
|
||||
|
||||
Sub-command parsing only — the DB-touching paths (`grant_credit`,
|
||||
`revoke_credit`, `show_status`) are exercised manually inside the dev
|
||||
container. The parser-level tests are enough to catch the common
|
||||
shapes: bad args, missing args, unknown sub-command."""
|
||||
from __future__ import annotations
|
||||
|
||||
import pytest
|
||||
|
||||
from app.cli import build_parser
|
||||
|
||||
|
||||
def test_grant_credit_parses():
|
||||
args = build_parser().parse_args(["grant-credit", "user@example.com", "3"])
|
||||
assert args.cmd == "grant-credit"
|
||||
assert args.email == "user@example.com"
|
||||
assert args.months == 3.0
|
||||
|
||||
|
||||
def test_grant_credit_accepts_fractional_months():
|
||||
args = build_parser().parse_args(["grant-credit", "user@x.com", "0.5"])
|
||||
assert args.months == 0.5
|
||||
|
||||
|
||||
def test_revoke_credit_parses():
|
||||
args = build_parser().parse_args(["revoke-credit", "user@example.com"])
|
||||
assert args.cmd == "revoke-credit"
|
||||
assert args.email == "user@example.com"
|
||||
|
||||
|
||||
def test_show_status_parses():
|
||||
args = build_parser().parse_args(["show-status", "user@example.com"])
|
||||
assert args.cmd == "show-status"
|
||||
|
||||
|
||||
def test_grant_credit_requires_months():
|
||||
with pytest.raises(SystemExit):
|
||||
build_parser().parse_args(["grant-credit", "user@example.com"])
|
||||
|
||||
|
||||
def test_unknown_command_rejected():
|
||||
with pytest.raises(SystemExit):
|
||||
build_parser().parse_args(["bogus-cmd"])
|
||||
|
||||
|
||||
def test_no_command_rejected():
|
||||
with pytest.raises(SystemExit):
|
||||
build_parser().parse_args([])
|
||||
|
|
@ -13,7 +13,15 @@ from app import auth
|
|||
def test_pending_cookie_roundtrip():
|
||||
cookie = auth.sign_pending("user@example.com", 42)
|
||||
out = auth.verify_pending(cookie)
|
||||
assert out == {"email": "user@example.com", "uid": 42}
|
||||
assert out == {"email": "user@example.com", "uid": 42, "ref": None}
|
||||
|
||||
|
||||
def test_pending_cookie_roundtrip_with_ref():
|
||||
"""Referral code captured at signup (Phase D.1) rides on the
|
||||
pending cookie so it survives the POST /login → /verify hop."""
|
||||
cookie = auth.sign_pending("user@example.com", 42, ref="ABCD1234")
|
||||
out = auth.verify_pending(cookie)
|
||||
assert out == {"email": "user@example.com", "uid": 42, "ref": "ABCD1234"}
|
||||
|
||||
|
||||
def test_pending_cookie_rejects_garbage():
|
||||
|
|
|
|||
80
tests/test_referral.py
Normal file
80
tests/test_referral.py
Normal file
|
|
@ -0,0 +1,80 @@
|
|||
"""Unit tests for the deterministic half of referral_service: code
|
||||
generation, normalisation, and lookup helpers. DB-backed linkage logic
|
||||
is exercised manually via the dev container."""
|
||||
from __future__ import annotations
|
||||
|
||||
import pytest
|
||||
|
||||
from app.services.referral_service import (
|
||||
_ALPHABET,
|
||||
_CODE_LEN,
|
||||
generate_code,
|
||||
normalise_code,
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Code generation
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_generate_code_length():
|
||||
code = generate_code()
|
||||
assert len(code) == _CODE_LEN
|
||||
|
||||
|
||||
def test_generate_code_alphabet():
|
||||
"""Every character must come from the unambiguous alphabet."""
|
||||
for _ in range(50):
|
||||
code = generate_code()
|
||||
for ch in code:
|
||||
assert ch in _ALPHABET, f"unexpected char {ch!r} in {code!r}"
|
||||
|
||||
|
||||
def test_generate_code_no_ambiguous_chars():
|
||||
"""0, O, 1, I, L are excluded to avoid dictation errors."""
|
||||
for _ in range(200):
|
||||
code = generate_code()
|
||||
assert not (set(code) & set("01IOL"))
|
||||
|
||||
|
||||
def test_generate_code_diversity():
|
||||
"""Two consecutive generations should almost never collide
|
||||
(sanity check on the RNG)."""
|
||||
a, b = generate_code(), generate_code()
|
||||
assert a != b
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# normalise_code
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_normalise_uppercases():
|
||||
assert normalise_code("abcdefgh") == "ABCDEFGH"
|
||||
|
||||
|
||||
def test_normalise_strips_disallowed_chars():
|
||||
"""Users may paste with spaces / dashes / quotes — strip those."""
|
||||
assert normalise_code(" ABCD-EFGH ") == "ABCDEFGH"
|
||||
assert normalise_code('"ABCDEFGH"') == "ABCDEFGH"
|
||||
|
||||
|
||||
def test_normalise_rejects_wrong_length():
|
||||
"""If too short / too long after cleaning, return None — bogus."""
|
||||
assert normalise_code("ABC") is None
|
||||
assert normalise_code("ABCDEFGHX") is None
|
||||
# Long enough but ambiguous chars stripped → still wrong length:
|
||||
assert normalise_code("ABCDEFG0") is None # 0 stripped → 7 chars
|
||||
|
||||
|
||||
def test_normalise_rejects_none_and_empty():
|
||||
assert normalise_code(None) is None
|
||||
assert normalise_code("") is None
|
||||
assert normalise_code(" ") is None
|
||||
|
||||
|
||||
def test_normalise_preserves_valid_code():
|
||||
"""A code that's already canonical should pass through unchanged."""
|
||||
code = generate_code()
|
||||
assert normalise_code(code) == code
|
||||
Loading…
Add table
Add a link
Reference in a new issue