read.markets/docs/superpowers/plans/2026-05-25-beta-mode-and-paid-gap.md
Giorgio Gilestro 8bc546220d docs: implementation plan for beta + paid-gap rollout
Twelve-task plan covering the BETA chip, free-tier 6h news cap, daily
+ Sunday digest job, one-click unsubscribe, settings UI, sign-up
checkbox, pricing copy, and an admin send-test-digest CLI. Each task
is TDD where feasible.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-25 18:28:39 +02:00

2096 lines
68 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# Beta Mode + Paid/Free Gap Implementation Plan
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
**Goal:** Ship the closed-beta launch package: a visible BETA chip in the app chrome, a 6-hour free-tier news cap, and email digests (daily for paid MonSat, Sunday weekly for everyone), all gated by an opt-in flag.
**Architecture:** Three coordinated changes layered onto existing FastAPI + APScheduler + MariaDB + OpenRouter + SMTP infrastructure. No new external services. New persisted state: two columns on `users` (`email_digest_opt_in`, `digest_tone`) and one new table (`email_sends`). One new job (`email_digest_job`), one new router (`email`), two new prompt builders.
**Tech Stack:** FastAPI · SQLAlchemy 2.0 (async) · Alembic · APScheduler · aiosmtplib · itsdangerous · pytest + aiosqlite (tests) · Jinja2 templates · vanilla JS + HTMX.
**Spec:** `docs/superpowers/specs/2026-05-25-beta-mode-and-paid-gap-design.md`
---
## File Map
**New files**
- `app/jobs/email_digest_job.py` — orchestrator (daily MonSat for paid, weekly Sunday for all opt-in)
- `app/routers/email.py``GET /email/unsubscribe?token=…`
- `alembic/versions/0017_email_digest.py` — migration
- `tests/test_news_window.py` — news-cap regression
- `tests/test_email_digest_job.py` — job recipient selection, idempotency
- `tests/test_email_unsubscribe.py` — token roundtrip + endpoint
- `tests/test_email_render.py` — digest email renderer
- `tests/test_digest_prompts.py` — prompt builder unit tests
- `tests/test_settings_digest_api.py` — PATCH endpoint
- `tests/test_verify_subscribe.py` — sign-up checkbox
**Modified files**
- `app/config.py` — add `BETA_MODE` flag
- `app/templates_env.py` — expose `BETA_MODE` to templates
- `app/templates/base.html` — render the chip
- `app/static/css/cassandra.css``.beta-chip` styling
- `app/services/access.py``FREE_NEWS_WINDOW_HOURS` constant
- `app/routers/api.py``news_list` clamps `since_hours` for non-paid; PATCH endpoint for digest prefs
- `app/templates/partials/news.html` — capped-footer marker
- `app/models.py``User.email_digest_opt_in`, `User.digest_tone`, `EmailSend` model
- `app/services/openrouter.py``build_daily_digest_prompt`, `build_weekly_digest_prompt`, bump `PROMPT_VERSION`
- `app/services/email_service.py``render_digest_email`, `send_digest`
- `app/scheduler_main.py` — register the digest job at 06:30 UTC
- `app/templates/settings.html` — Email digests section
- `app/templates/verify.html` — subscribe checkbox
- `app/routers/auth.py` — read `subscribe_to_digests` on verify POST
- `app/templates/pricing.html` — updated tier copy
- `app/cli.py``send-test-digest` command
- `app/main.py` — include new email router
---
## Task 1: BETA chip
**Files:**
- Modify: `app/config.py`
- Modify: `app/templates_env.py`
- Modify: `app/templates/base.html` (line ~140)
- Modify: `app/static/css/cassandra.css`
- [ ] **Step 1: Add `BETA_MODE` to config**
Open `app/config.py`. Find the `Settings` class. Add:
```python
BETA_MODE: bool = True # Shows a "BETA" pill in the app header. Flip to False at GA.
```
Place it adjacent to other display/flag-style settings (alongside `CASSANDRA_TONE` or the LLM caps — whichever cluster fits).
- [ ] **Step 2: Expose the flag to templates**
Open `app/templates_env.py`. After the existing `templates.env.globals[...]` block (around line 75), add:
```python
from app.config import get_settings as _get_settings # if not already imported
templates.env.globals["BETA_MODE"] = _get_settings().BETA_MODE
```
If `get_settings` is already imported in the file under another alias, reuse it.
- [ ] **Step 3: Render the chip in the app header**
Open `app/templates/base.html`. Find the brand link (line ~140):
```html
<a href="/" class="brand" aria-label="Dashboard">{{ BRAND_NAME }}</a>
```
Insert immediately after it:
```html
{% if BETA_MODE %}<span class="beta-chip" title="Beta — feedback welcome at hello@read.markets">BETA</span>{% endif %}
```
- [ ] **Step 4: Style the chip**
Open `app/static/css/cassandra.css`. Append at the bottom of the file:
```css
/* BETA indicator pill in the app header — see app/templates/base.html. */
.beta-chip {
display: inline-block;
margin-left: 8px;
padding: 2px 7px;
font-size: 10px;
font-weight: 700;
letter-spacing: 0.14em;
font-family: var(--font-mono);
color: var(--bg);
background: var(--accent);
border-radius: 2px;
vertical-align: middle;
user-select: none;
}
```
- [ ] **Step 5: Manual visual check**
Run the app locally (`docker compose restart app` — the new volume mount picks up changes). Load any logged-in page and confirm the `BETA` pill renders next to the brand. Confirm the chip does NOT appear on `/pricing` (which uses `public_base.html`, not `base.html`).
- [ ] **Step 6: Commit**
```bash
git add app/config.py app/templates_env.py app/templates/base.html app/static/css/cassandra.css
git commit -m "beta: header chip flagged by BETA_MODE config (default on)"
```
---
## Task 2: Free-tier news window cap
**Files:**
- Modify: `app/services/access.py`
- Modify: `app/routers/api.py` (lines 222-280, `news_list`)
- Modify: `app/templates/partials/news.html`
- Create: `tests/test_news_window.py`
- [ ] **Step 1: Write the failing test**
Create `tests/test_news_window.py`:
```python
"""Free vs paid window clamp on /api/news."""
from __future__ import annotations
import asyncio
from datetime import datetime, timedelta, timezone
import pytest
def _build_app(tmp_path):
from fastapi import FastAPI
from fastapi.testclient import TestClient
from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine
from app import db as db_mod
from app.auth import sign_session
from app.db import Base
from app.models import Headline, User
from app.routers import api as api_router
engine = create_async_engine(f"sqlite+aiosqlite:///{tmp_path}/news.db")
factory = async_sessionmaker(engine, expire_on_commit=False)
db_mod._engine = engine
db_mod._session_factory = factory
now = datetime.now(timezone.utc)
async def _seed():
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
async with factory() as s:
s.add(User(id=1, email="free@x", tier="free"))
s.add(User(id=2, email="paid@x", tier="paid"))
# Headlines: one 1h old, one 12h old, one 20h old.
for hours_old, title in ((1, "fresh"), (12, "mid"), (20, "old")):
s.add(Headline(
source="test", title=title, url=f"https://e/{title}",
category="general",
published_at=now - timedelta(hours=hours_old),
fetched_at=now,
tags=[],
))
await s.commit()
asyncio.run(_seed())
app = FastAPI()
app.include_router(api_router.router, prefix="/api")
client = TestClient(app)
return client, sign_session(1), sign_session(2)
@pytest.mark.skipif(False, reason="requires aiosqlite + httpx")
def test_free_user_clamped_to_6h(tmp_path):
client, free_sess, _ = _build_app(tmp_path)
r = client.get("/api/news?since_hours=24",
cookies={"cassandra_session": free_sess})
assert r.status_code == 200
titles = [h["title"] for h in r.json()]
assert "fresh" in titles
assert "mid" not in titles # 12h ago, beyond 6h
assert "old" not in titles
def test_paid_user_full_24h(tmp_path):
client, _, paid_sess = _build_app(tmp_path)
r = client.get("/api/news?since_hours=24",
cookies={"cassandra_session": paid_sess})
assert r.status_code == 200
titles = [h["title"] for h in r.json()]
assert {"fresh", "mid", "old"} <= set(titles)
def test_anonymous_clamped_to_6h(tmp_path):
client, _, _ = _build_app(tmp_path)
r = client.get("/api/news?since_hours=24")
assert r.status_code == 200
titles = [h["title"] for h in r.json()]
assert "fresh" in titles
assert "mid" not in titles
```
- [ ] **Step 2: Run the failing test**
```bash
pytest tests/test_news_window.py -v
```
Expected: FAIL — the clamp doesn't exist yet, so the free/anonymous calls return 12h and 20h headlines too.
- [ ] **Step 3: Add the constant**
Open `app/services/access.py`. Below the imports / above `_utcnow`, add:
```python
# How many hours of news the free tier sees. Paid sees whatever the
# endpoint's `since_hours` param requests (up to its own max).
FREE_NEWS_WINDOW_HOURS = 6.0
```
- [ ] **Step 4: Clamp the endpoint**
Open `app/routers/api.py`. Find `news_list` (line 222). Update:
```python
@router.get("/news")
async def news_list(
request: Request,
session: AsyncSession = Depends(get_session),
principal: CurrentUser | None = Depends(maybe_current_user),
category: str | None = Query(None),
since_hours: float = Query(24.0, ge=0.1, le=720.0),
limit: int = Query(50, ge=1, le=500),
tags: str | None = Query(None, description="comma-separated include list"),
exclude_tags: str | None = Query(None, description="comma-separated exclude list"),
as_: str | None = Query(default=None, alias="as"),
):
from app.services.news_tagging import TAG_LABELS, TAG_VOCABULARY
from app.services.access import FREE_NEWS_WINDOW_HOURS, is_paid_active
effective_hours = since_hours
capped = not is_paid_active(principal)
if capped:
effective_hours = min(since_hours, FREE_NEWS_WINDOW_HOURS)
cutoff = utcnow() - timedelta(hours=effective_hours)
# ... rest of the function unchanged through `filtered = ...` ...
```
Also add `maybe_current_user` to the imports at the top of `app/routers/api.py`:
```python
from app.auth import maybe_current_user
```
(The existing `from app.auth import …` line may already import other names — extend it.)
In the `as_ == "html"` branch, extend the template context with `capped` and `window_hours`:
```python
return templates.TemplateResponse(
request, "partials/news.html",
{"headlines": items,
"tag_vocabulary": TAG_VOCABULARY,
"tag_labels": TAG_LABELS,
"active_include": sorted(include),
"active_exclude": sorted(exclude),
"capped": capped,
"window_hours": effective_hours},
)
```
The JSON branch is unchanged — the test asserts via the JSON shape.
- [ ] **Step 5: Run the test, expect PASS**
```bash
pytest tests/test_news_window.py -v
```
Expected: all three tests pass.
- [ ] **Step 6: Add the partial-template footer**
Open `app/templates/partials/news.html`. At the bottom of the file (after the last existing item rendering and any close tags), add:
```html
{% if capped %}
<div class="news-capped-note" style="margin-top:14px; padding:10px 12px; border:1px dashed var(--border); color:var(--muted); font-size:12px; line-height:1.55;">
Free tier — showing the last {{ window_hours|int }} hours of news.
<a href="/pricing" style="color:var(--accent);">Upgrade</a>
for the full 24-hour feed plus daily and weekly email digests.
</div>
{% endif %}
```
- [ ] **Step 7: Run the full suite to catch regressions**
```bash
pytest tests/ -x -q
```
Expected: no regressions (the existing `test_news_*` tests don't depend on the clamp).
- [ ] **Step 8: Commit**
```bash
git add app/services/access.py app/routers/api.py app/templates/partials/news.html tests/test_news_window.py
git commit -m "news: clamp free + anonymous to last 6h; paid keeps 24h"
```
---
## Task 3: DB model + Alembic migration
**Files:**
- Modify: `app/models.py`
- Create: `alembic/versions/0017_email_digest.py`
- [ ] **Step 1: Add columns to `User`**
Open `app/models.py`. Find the `User` class (search for `class User(Base)`). Add two columns alongside the existing tier/credit fields:
```python
email_digest_opt_in: Mapped[bool] = mapped_column(
Boolean, nullable=False, default=True, server_default=text("1"),
)
# NULL = use INTERMEDIATE at render time. Server-side mirror of the
# dashboard tone, decoupled because the dashboard pref is localStorage.
digest_tone: Mapped[str | None] = mapped_column(String(16))
```
If `Boolean` or `text` aren't already imported, extend the SQLAlchemy import line at the top of the file:
```python
from sqlalchemy import (..., Boolean, text, ...)
```
- [ ] **Step 2: Add the `EmailSend` model**
In the same file, after the existing model definitions, add:
```python
class EmailSend(Base):
"""Audit row per digest email send. Used for idempotency (don't send
twice on the same UTC day) and for surfacing 'last delivery' on the
Settings page."""
__tablename__ = "email_sends"
id: Mapped[int] = mapped_column(BigInteger, primary_key=True, autoincrement=True)
user_id: Mapped[int] = mapped_column(
ForeignKey("users.id", ondelete="CASCADE"), nullable=False, index=True,
)
kind: Mapped[str] = mapped_column(String(16), nullable=False) # "daily" | "weekly"
sent_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), default=utcnow, nullable=False,
)
status: Mapped[str] = mapped_column(String(16), nullable=False) # "sent" | "skipped" | "error"
error: Mapped[str | None] = mapped_column(String(255))
__table_args__ = (
Index("ix_email_sends_user_kind_sent", "user_id", "kind", "sent_at"),
)
```
Add any missing imports (`BigInteger`, `ForeignKey`, `Index`) at the top.
- [ ] **Step 3: Create the Alembic migration**
Create `alembic/versions/0017_email_digest.py`:
```python
"""email digests: User.email_digest_opt_in, User.digest_tone, email_sends table.
Revision ID: 0017
Revises: 0016
Create Date: 2026-05-25
"""
from typing import Sequence, Union
import sqlalchemy as sa
from alembic import op
revision: str = "0017"
down_revision: Union[str, None] = "0016"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
op.add_column(
"users",
sa.Column(
"email_digest_opt_in", sa.Boolean(), nullable=False,
server_default=sa.text("1"),
),
)
op.add_column(
"users",
sa.Column("digest_tone", sa.String(length=16), nullable=True),
)
op.create_table(
"email_sends",
sa.Column("id", sa.BigInteger(), autoincrement=True, primary_key=True),
sa.Column(
"user_id", sa.Integer(), nullable=False,
),
sa.Column("kind", sa.String(length=16), nullable=False),
sa.Column("sent_at", sa.DateTime(timezone=True), nullable=False),
sa.Column("status", sa.String(length=16), nullable=False),
sa.Column("error", sa.String(length=255), nullable=True),
sa.ForeignKeyConstraint(
["user_id"], ["users.id"], ondelete="CASCADE",
),
)
op.create_index(
"ix_email_sends_user_kind_sent",
"email_sends",
["user_id", "kind", "sent_at"],
)
op.create_index(
op.f("ix_email_sends_user_id"), "email_sends", ["user_id"],
)
def downgrade() -> None:
op.drop_index(op.f("ix_email_sends_user_id"), table_name="email_sends")
op.drop_index("ix_email_sends_user_kind_sent", table_name="email_sends")
op.drop_table("email_sends")
op.drop_column("users", "digest_tone")
op.drop_column("users", "email_digest_opt_in")
```
- [ ] **Step 4: Apply the migration to the dev DB**
```bash
docker compose exec app alembic upgrade head
```
Expected output ends with: `Running upgrade 0016 -> 0017, email digests...`
- [ ] **Step 5: Verify the columns exist**
```bash
docker compose exec db mysql -ucassandra -p${MARIADB_PASSWORD} -e "DESCRIBE cassandra.users;" | grep -E "email_digest_opt_in|digest_tone"
docker compose exec db mysql -ucassandra -p${MARIADB_PASSWORD} -e "DESCRIBE cassandra.email_sends;"
```
Expected: both new columns on `users` and the full `email_sends` table appear.
- [ ] **Step 6: Run the test suite — confirm no regressions from the model change**
```bash
pytest tests/ -x -q
```
Expected: all existing tests still pass (no test depends on the old User shape).
- [ ] **Step 7: Commit**
```bash
git add app/models.py alembic/versions/0017_email_digest.py
git commit -m "db: add digest opt-in/tone on users, email_sends audit table"
```
---
## Task 4: Daily / Weekly digest prompts
**Files:**
- Modify: `app/services/openrouter.py`
- Create: `tests/test_digest_prompts.py`
- [ ] **Step 1: Inspect the existing prompt scaffolding**
Read `app/services/openrouter.py` to confirm the shape of `build_system_prompt(tone, analysis)` and `build_user_prompt(...)`. The new functions will mirror that contract: return `(system_prompt, user_prompt)` strings ready for `call_llm`.
- [ ] **Step 2: Write the failing test**
Create `tests/test_digest_prompts.py`:
```python
"""Unit tests for the daily / weekly digest prompt builders."""
from __future__ import annotations
from datetime import datetime, timezone
from app.services.openrouter import (
build_daily_digest_prompt,
build_weekly_digest_prompt,
)
def _ctx():
return dict(
today=datetime(2026, 5, 25, 6, 30, tzinfo=timezone.utc),
quotes_by_group={"equities": [{"symbol": "SPX", "price": 7500.0,
"label": "S&P 500", "currency": "USD",
"source": "test", "note": "",
"as_of": None, "changes": {}}]},
headlines_by_bucket={"general": [{"when": "2026-05-25T05:00:00+00:00",
"source": "FT", "title": "Brent slides"}]},
reference_line="S&P 7,501 (ATH) · VIX 18.0 · US 10y 4.45%",
)
def test_daily_prompt_tone_intermediate():
sys_, usr = build_daily_digest_prompt(tone="INTERMEDIATE", **_ctx())
assert "INTERMEDIATE" in sys_.upper() or "intermediate" in sys_.lower()
assert "Brent slides" in usr
assert "daily" in sys_.lower()
def test_daily_prompt_tone_novice_differs():
sys_int, _ = build_daily_digest_prompt(tone="INTERMEDIATE", **_ctx())
sys_nov, _ = build_daily_digest_prompt(tone="NOVICE", **_ctx())
assert sys_int != sys_nov
def test_weekly_prompt_mentions_week():
sys_, usr = build_weekly_digest_prompt(tone="INTERMEDIATE", **_ctx())
assert "week" in sys_.lower() or "weekly" in sys_.lower()
assert "Brent slides" in usr
def test_prompts_return_strings():
for fn in (build_daily_digest_prompt, build_weekly_digest_prompt):
sys_, usr = fn(tone="INTERMEDIATE", **_ctx())
assert isinstance(sys_, str) and isinstance(usr, str)
assert len(sys_) > 50 and len(usr) > 50
```
- [ ] **Step 3: Run the failing test**
```bash
pytest tests/test_digest_prompts.py -v
```
Expected: FAIL — `ImportError: cannot import name 'build_daily_digest_prompt'`.
- [ ] **Step 4: Implement the prompt builders**
Open `app/services/openrouter.py`. After `build_user_prompt`, add:
```python
def build_daily_digest_prompt(
*,
tone: str,
today,
quotes_by_group: dict,
headlines_by_bucket: dict,
reference_line: str,
) -> tuple[str, str]:
"""System + user prompt for the once-a-day editorial digest.
Different from the hourly log: the daily digest reflects on the past
24h and looks forward to the upcoming session. Longer, less
'live-blogging,' more contextual. Target ~600 words."""
tone_clause = (
"Use plain English. Define any jargon on first use."
if tone.upper() == "NOVICE"
else "Write for a reader who already speaks markets fluently."
)
system = (
"You write the daily editorial digest for Read the Markets. "
f"Audience tone: {tone.upper()}. {tone_clause} "
"Cover: (1) what mattered yesterday, (2) what to watch in today's "
"EU and US sessions, (3) one cross-asset thread connecting them. "
"No predictions of price level, no buy/sell language. Target ~600 "
"words. Output HTML using only <p>, <h3>, <ul>, <li>, <strong>, "
"<em> — no <html>, <head>, or <body> wrapper, no inline styles."
)
user = _digest_user_prompt(today, quotes_by_group, headlines_by_bucket, reference_line)
return system, user
def build_weekly_digest_prompt(
*,
tone: str,
today,
quotes_by_group: dict,
headlines_by_bucket: dict,
reference_line: str,
) -> tuple[str, str]:
"""System + user prompt for the Sunday weekly recap + look-ahead.
Sent to ALL opt-in users (free and paid). Target ~900 words."""
tone_clause = (
"Use plain English. Define any jargon on first use."
if tone.upper() == "NOVICE"
else "Write for a reader who already speaks markets fluently."
)
system = (
"You write the Sunday weekly digest for Read the Markets. "
f"Audience tone: {tone.upper()}. {tone_clause} "
"Cover: (1) the week behind — what moved and why, "
"(2) the week ahead — releases, earnings, central-bank meetings, "
"(3) the cross-asset story to keep in mind. "
"No predictions of price level, no buy/sell language. Target ~900 "
"words. Output HTML using only <p>, <h3>, <ul>, <li>, <strong>, "
"<em> — no <html>, <head>, or <body> wrapper, no inline styles."
)
user = _digest_user_prompt(today, quotes_by_group, headlines_by_bucket, reference_line)
return system, user
def _digest_user_prompt(today, quotes_by_group, headlines_by_bucket, reference_line):
"""Shared user-message body used by both digest prompts. Same data
shape as the hourly user prompt; reformatted for the digest context."""
today_str = today.strftime("%A %d %B %Y") if hasattr(today, "strftime") else str(today)
lines = [f"TODAY (UTC): {today_str}", "", f"REFERENCE: {reference_line}", ""]
if headlines_by_bucket:
lines.append("HEADLINES BY CATEGORY")
for cat, items in headlines_by_bucket.items():
lines.append(f" [{cat}]")
for h in items[:30]:
when = h.get("when", "")
src = h.get("source", "")
title = h.get("title", "")
lines.append(f" {when} · {src} · {title}")
lines.append("")
if quotes_by_group:
lines.append("LATEST QUOTES BY GROUP")
for grp, items in quotes_by_group.items():
lines.append(f" [{grp}]")
for q in items[:30]:
sym = q.get("symbol", "")
price = q.get("price", "")
lbl = q.get("label", "")
ccy = q.get("currency", "")
lines.append(f" {sym} ({lbl}) — {price} {ccy}")
lines.append("")
return "\n".join(lines)
```
Bump the PROMPT_VERSION constant at the top of the file. Locate it (search for `PROMPT_VERSION =`) and increment by one — e.g., `8``9`. Add a short comment line above noting the bump is for digest prompts.
- [ ] **Step 5: Run the tests, expect PASS**
```bash
pytest tests/test_digest_prompts.py -v
```
Expected: 4 passed.
- [ ] **Step 6: Commit**
```bash
git add app/services/openrouter.py tests/test_digest_prompts.py
git commit -m "digest: daily + weekly prompt builders (NOVICE/INTERMEDIATE)"
```
---
## Task 5: Digest email rendering
**Files:**
- Modify: `app/services/email_service.py`
- Create: `tests/test_email_render.py`
- [ ] **Step 1: Write the failing test**
Create `tests/test_email_render.py`:
```python
"""Unit tests for render_digest_email."""
from __future__ import annotations
from app.services.email_service import render_digest_email
def test_daily_subject_and_bodies():
subj, text, html = render_digest_email(
kind="daily",
date_str="2026-05-25",
content_html="<p>Markets did stuff today.</p>",
unsubscribe_url="https://read.markets/email/unsubscribe?token=abc",
settings_url="https://read.markets/settings",
)
assert "Daily" in subj
assert "2026-05-25" in subj
assert "Markets did stuff today" in html
assert "abc" in html # unsubscribe link landed
assert "/settings" in html
# Plain-text fallback strips HTML.
assert "<p>" not in text
assert "Markets did stuff today" in text
def test_weekly_subject_says_recap():
subj, _, _ = render_digest_email(
kind="weekly",
date_str="2026-05-25",
content_html="<p>x</p>",
unsubscribe_url="https://x/u",
settings_url="https://x/s",
)
assert "Weekly" in subj
assert "recap" in subj.lower()
def test_invalid_kind_raises():
import pytest
with pytest.raises(ValueError):
render_digest_email(
kind="bogus", date_str="2026-05-25",
content_html="<p>x</p>",
unsubscribe_url="u", settings_url="s",
)
```
- [ ] **Step 2: Run the failing test**
```bash
pytest tests/test_email_render.py -v
```
Expected: FAIL — `render_digest_email` doesn't exist.
- [ ] **Step 3: Implement `render_digest_email`**
Open `app/services/email_service.py`. Add at the bottom of the file:
```python
# ---------------------------------------------------------------------------
# Digest email rendering
# ---------------------------------------------------------------------------
import html as _html_lib
import re as _re
_DIGEST_HTML_TEMPLATE = """\
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1">
<meta name="color-scheme" content="light dark">
<title>{brand}{label}</title>
<style>
@media (prefers-color-scheme: dark) {{
body {{ background:{D_bg} !important; }}
.card {{ background:{D_surface} !important; border-color:{D_border} !important; }}
.h1, p, li {{ color:{D_text} !important; }}
.muted {{ color:{D_muted} !important; }}
a {{ color:{D_accent} !important; }}
}}
</style>
</head>
<body style="margin:0; padding:24px 12px; background:{L_bg}; font-family:{FONT_MONO}; color:{L_text};">
<table role="presentation" cellpadding="0" cellspacing="0" border="0" align="center" width="100%" style="max-width:520px; margin:0 auto; border-collapse:separate;">
<tr><td class="card" style="background:{L_surface}; border:1px solid {L_border}; padding:32px 28px;">
<div class="muted" style="font-size:11px; letter-spacing:0.32em; color:{L_muted}; text-transform:uppercase;">
&#9648;&nbsp;{brand_upper} &middot; {label_upper}
</div>
<div style="height:20px; line-height:20px; font-size:0;">&nbsp;</div>
<div class="content" style="font-size:14px; line-height:1.65; color:{L_text};">
{content_html}
</div>
<div style="height:24px; line-height:24px; font-size:0;">&nbsp;</div>
<div style="border-top:1px solid {L_border};"></div>
<div style="height:14px; line-height:14px; font-size:0;">&nbsp;</div>
<div class="muted" style="font-size:11px; color:{L_muted};">
<a href="{unsubscribe_url}" style="color:{L_accent};">Unsubscribe in one click</a>
&middot; <a href="{settings_url}" style="color:{L_accent};">Manage preferences</a>
</div>
</td></tr>
</table>
</body>
</html>
"""
def _strip_html_to_text(html_body: str) -> str:
"""Best-effort HTML → plain text for the multipart fallback. We don't
need perfection — just readable prose for clients that won't render
HTML."""
# Block-level tags become double newlines, <br> becomes single newline.
text = _re.sub(r"(?i)<(/(p|h[1-6]|li|ul|ol)|br\s*/?)>", "\n", html_body)
text = _re.sub(r"<[^>]+>", "", text)
text = _html_lib.unescape(text)
text = _re.sub(r"\n{3,}", "\n\n", text)
return text.strip()
def render_digest_email(
*,
kind: str,
date_str: str,
content_html: str,
unsubscribe_url: str,
settings_url: str,
) -> tuple[str, str, str]:
"""Returns (subject, text_body, html_body) for a digest email.
`kind` is "daily" or "weekly". Anything else raises ValueError."""
if kind == "daily":
label = "Daily"
subject = f"{branding.BRAND_NAME} · Daily — {date_str}"
elif kind == "weekly":
label = "Weekly recap"
subject = f"{branding.BRAND_NAME} · Weekly recap — {date_str}"
else:
raise ValueError(f"unknown digest kind: {kind!r}")
html_body = _DIGEST_HTML_TEMPLATE.format(
brand=branding.BRAND_NAME,
brand_upper=branding.BRAND_NAME.upper(),
label=label,
label_upper=label.upper(),
FONT_MONO=branding.FONT_MONO,
content_html=content_html,
unsubscribe_url=unsubscribe_url,
settings_url=settings_url,
**{f"L_{k.replace('-', '_')}": v for k, v in branding.LIGHT.items()},
**{f"D_{k.replace('-', '_')}": v for k, v in branding.DARK.items()},
)
text_lines = [
f"{branding.BRAND_NAME}{label}",
date_str,
"",
_strip_html_to_text(content_html),
"",
f"Unsubscribe: {unsubscribe_url}",
f"Manage preferences: {settings_url}",
]
text_body = "\n".join(text_lines)
return subject, text_body, html_body
```
- [ ] **Step 4: Run the tests, expect PASS**
```bash
pytest tests/test_email_render.py -v
```
Expected: 3 passed.
- [ ] **Step 5: Commit**
```bash
git add app/services/email_service.py tests/test_email_render.py
git commit -m "email: render_digest_email — multipart digest template"
```
---
## Task 6: Unsubscribe token + route
**Files:**
- Create: `app/routers/email.py`
- Modify: `app/main.py` (router include)
- Create: `tests/test_email_unsubscribe.py`
- [ ] **Step 1: Write the failing test**
Create `tests/test_email_unsubscribe.py`:
```python
"""Unsubscribe token roundtrip + endpoint."""
from __future__ import annotations
import asyncio
import pytest
def _build_app(tmp_path, secret="testsecret"):
import os
os.environ["CASSANDRA_SESSION_SECRET"] = secret
from fastapi import FastAPI
from fastapi.testclient import TestClient
from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine
from app import db as db_mod
from app.db import Base
from app.models import User
from app.routers import email as email_router
from app.config import get_settings
get_settings.cache_clear() # pick up the new env var
engine = create_async_engine(f"sqlite+aiosqlite:///{tmp_path}/u.db")
factory = async_sessionmaker(engine, expire_on_commit=False)
db_mod._engine = engine
db_mod._session_factory = factory
async def _seed():
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
async with factory() as s:
s.add(User(id=42, email="u@x", tier="paid", email_digest_opt_in=True))
await s.commit()
asyncio.run(_seed())
app = FastAPI()
app.include_router(email_router.router)
return TestClient(app)
def test_sign_and_verify_token_roundtrip(tmp_path, monkeypatch):
monkeypatch.setenv("CASSANDRA_SESSION_SECRET", "rt-secret-32-bytes-or-so-padding-here")
from app.config import get_settings
get_settings.cache_clear()
from app.routers.email import sign_unsubscribe_token, verify_unsubscribe_token
tok = sign_unsubscribe_token(42)
assert verify_unsubscribe_token(tok) == 42
assert verify_unsubscribe_token("garbage") is None
def test_get_unsubscribe_flips_flag(tmp_path):
client = _build_app(tmp_path)
from app.routers.email import sign_unsubscribe_token
tok = sign_unsubscribe_token(42)
r = client.get(f"/email/unsubscribe?token={tok}")
assert r.status_code == 200
assert "unsubscribed" in r.text.lower()
# DB state changed.
async def _check():
from app import db as db_mod
async with db_mod._session_factory() as s:
from app.models import User
u = await s.get(User, 42)
assert u.email_digest_opt_in is False
asyncio.run(_check())
def test_get_unsubscribe_invalid_token_returns_generic_page(tmp_path):
client = _build_app(tmp_path)
r = client.get("/email/unsubscribe?token=garbage")
# We don't 4xx — that would leak token validity. Show the generic page.
assert r.status_code == 200
assert "unsubscribed" in r.text.lower() or "preferences" in r.text.lower()
def test_replay_is_idempotent(tmp_path):
client = _build_app(tmp_path)
from app.routers.email import sign_unsubscribe_token
tok = sign_unsubscribe_token(42)
r1 = client.get(f"/email/unsubscribe?token={tok}")
r2 = client.get(f"/email/unsubscribe?token={tok}")
assert r1.status_code == 200
assert r2.status_code == 200
```
- [ ] **Step 2: Run the failing test**
```bash
pytest tests/test_email_unsubscribe.py -v
```
Expected: FAIL — `app.routers.email` doesn't exist.
- [ ] **Step 3: Create the router**
Create `app/routers/email.py`:
```python
"""Email-related public routes.
Currently:
- GET /email/unsubscribe?token=...
The token is `itsdangerous.URLSafeSerializer` over a small payload,
signed with CASSANDRA_SESSION_SECRET. No auth dependency: the whole
point of one-click unsubscribe is that the user does not have to
sign in.
"""
from __future__ import annotations
from fastapi import APIRouter, Depends, Request, Query
from fastapi.responses import HTMLResponse
from itsdangerous import BadSignature, URLSafeSerializer
from sqlalchemy.ext.asyncio import AsyncSession
from app.config import get_settings
from app.db import get_session
from app.logging import get_logger
from app.models import User
from app.templates_env import templates
router = APIRouter()
log = get_logger("email_router")
_SALT = "digest-unsubscribe-v1"
def _serializer() -> URLSafeSerializer:
s = get_settings()
if not s.CASSANDRA_SESSION_SECRET:
# In tests with no secret configured, fall back to a constant —
# NEVER reach production; settings validation should catch this.
return URLSafeSerializer("dev-only-empty-secret", salt=_SALT)
return URLSafeSerializer(s.CASSANDRA_SESSION_SECRET, salt=_SALT)
def sign_unsubscribe_token(user_id: int) -> str:
return _serializer().dumps({"uid": int(user_id), "purpose": "digest_optout"})
def verify_unsubscribe_token(token: str) -> int | None:
try:
data = _serializer().loads(token)
except BadSignature:
return None
if not isinstance(data, dict):
return None
if data.get("purpose") != "digest_optout":
return None
try:
return int(data["uid"])
except (KeyError, TypeError, ValueError):
return None
_CONFIRM_PAGE = """\
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8">
<title>Unsubscribed — {brand}</title>
<link rel="stylesheet" href="/static/css/cassandra.css">
</head>
<body class="auth-shell">
<div class="auth-card" style="max-width:480px;">
<div class="auth-card__brand">{brand}</div>
<div class="auth-card__hint">email preferences</div>
<p class="auth-card__lede">You're unsubscribed from email digests.</p>
<p style="font-size:13px; color:var(--muted); line-height:1.6;">
You can re-enable digests any time from
<a href="/settings" style="color:var(--accent);">Settings</a>.
</p>
</div>
</body>
</html>
"""
@router.get("/email/unsubscribe", response_class=HTMLResponse)
async def unsubscribe(
request: Request,
token: str = Query(...),
session: AsyncSession = Depends(get_session),
):
from app import branding
uid = verify_unsubscribe_token(token)
if uid is not None:
user = await session.get(User, uid)
if user is not None and user.email_digest_opt_in:
user.email_digest_opt_in = False
await session.commit()
log.info("email.unsubscribe.ok", user_id=uid)
else:
log.info("email.unsubscribe.noop_or_unknown", user_id=uid)
else:
log.info("email.unsubscribe.bad_token")
# Same confirmation page regardless — don't leak token validity.
return HTMLResponse(_CONFIRM_PAGE.format(brand=branding.BRAND_NAME))
```
- [ ] **Step 4: Wire the router into the FastAPI app**
Open `app/main.py`. Find the existing router includes (line ~85). Add:
```python
from app.routers import email as email_router
# ...
app.include_router(email_router.router, tags=["email"])
```
- [ ] **Step 5: Run the tests, expect PASS**
```bash
pytest tests/test_email_unsubscribe.py -v
```
Expected: 4 passed.
- [ ] **Step 6: Commit**
```bash
git add app/routers/email.py app/main.py tests/test_email_unsubscribe.py
git commit -m "email: one-click unsubscribe endpoint w/ signed token"
```
---
## Task 7: Email digest job + idempotency
**Files:**
- Create: `app/jobs/email_digest_job.py`
- Create: `tests/test_email_digest_job.py`
- [ ] **Step 1: Write the failing test**
Create `tests/test_email_digest_job.py`:
```python
"""Recipient selection + idempotency for the digest job."""
from __future__ import annotations
import asyncio
from datetime import datetime, timezone, timedelta
from unittest.mock import AsyncMock, patch
import pytest
def _bootstrap(tmp_path, today_weekday: int):
"""Spin up an in-memory DB with three users: a paid opt-in, a paid
opt-out, a free opt-in. Returns the session factory + the test users."""
from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine
from app import db as db_mod
from app.db import Base
from app.models import User
engine = create_async_engine(f"sqlite+aiosqlite:///{tmp_path}/dj.db")
factory = async_sessionmaker(engine, expire_on_commit=False)
db_mod._engine = engine
db_mod._session_factory = factory
async def _seed():
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
async with factory() as s:
s.add(User(id=1, email="paid_in@x", tier="paid", email_digest_opt_in=True))
s.add(User(id=2, email="paid_out@x", tier="paid", email_digest_opt_in=False))
s.add(User(id=3, email="free_in@x", tier="free", email_digest_opt_in=True))
await s.commit()
asyncio.run(_seed())
return factory
def _patch_today(weekday: int):
"""Return a datetime whose weekday() == `weekday` (0=Mon, 6=Sun)."""
base = datetime(2026, 5, 25, 6, 30, tzinfo=timezone.utc) # Monday
return base + timedelta(days=(weekday - base.weekday()) % 7)
def _stub_generate(content="<p>x</p>"):
"""Stub out the LLM call so the test never hits the network. Use a
SimpleNamespace so we don't have to know the real result class name."""
from types import SimpleNamespace
async def _fake(_client, messages, **kwargs):
return SimpleNamespace(
content=content, model="stub",
prompt_tokens=10, completion_tokens=10, cost_usd=0.0,
)
return _fake
@pytest.mark.skipif(False, reason="requires aiosqlite")
def test_daily_run_only_paid_opt_in(tmp_path):
_bootstrap(tmp_path, today_weekday=0)
from app.jobs import email_digest_job
with patch("app.jobs.email_digest_job._now",
return_value=_patch_today(0)), \
patch("app.jobs.email_digest_job.send_email",
new=AsyncMock()) as send_mock, \
patch("app.jobs.email_digest_job.call_llm",
new=AsyncMock(side_effect=_stub_generate())):
asyncio.run(email_digest_job.run())
# Only user_id=1 (paid + opt-in) received an email. Two calls
# (one per tone), but only one recipient.
sent_to = {kwargs["to"] for _, _, kwargs in send_mock.mock_calls
if "to" in kwargs}
addresses_sent = {call.kwargs.get("to") for call in send_mock.await_args_list}
assert addresses_sent == {"paid_in@x"}
def test_weekly_run_includes_free_and_paid_opt_in(tmp_path):
_bootstrap(tmp_path, today_weekday=6)
from app.jobs import email_digest_job
with patch("app.jobs.email_digest_job._now",
return_value=_patch_today(6)), \
patch("app.jobs.email_digest_job.send_email",
new=AsyncMock()) as send_mock, \
patch("app.jobs.email_digest_job.call_llm",
new=AsyncMock(side_effect=_stub_generate())):
asyncio.run(email_digest_job.run())
addresses_sent = {call.kwargs.get("to") for call in send_mock.await_args_list}
assert addresses_sent == {"paid_in@x", "free_in@x"}
def test_second_run_same_day_is_idempotent(tmp_path):
_bootstrap(tmp_path, today_weekday=0)
from app.jobs import email_digest_job
with patch("app.jobs.email_digest_job._now",
return_value=_patch_today(0)), \
patch("app.jobs.email_digest_job.send_email",
new=AsyncMock()) as send_mock, \
patch("app.jobs.email_digest_job.call_llm",
new=AsyncMock(side_effect=_stub_generate())):
asyncio.run(email_digest_job.run())
first_count = len(send_mock.await_args_list)
asyncio.run(email_digest_job.run())
second_count = len(send_mock.await_args_list)
assert first_count > 0
assert second_count == first_count, "second run should not re-send"
```
- [ ] **Step 2: Run the failing test**
```bash
pytest tests/test_email_digest_job.py -v
```
Expected: FAIL — `app.jobs.email_digest_job` doesn't exist yet.
- [ ] **Step 3: Implement the job**
Create `app/jobs/email_digest_job.py`:
```python
"""Daily/weekly editorial email digest.
Runs once a day at 06:30 UTC via the scheduler. On Sundays sends the
weekly recap to every opt-in user (free + paid). On other days sends
the daily digest to opt-in paid users only.
Generates LLM content once per tone (NOVICE + INTERMEDIATE), then fans
out by SMTP. EmailSend audit rows guard against double-delivery if the
job is re-run within the same UTC day.
"""
from __future__ import annotations
import asyncio
from collections import defaultdict
from datetime import datetime, timedelta, timezone
import httpx
from sqlalchemy import desc, func, select
from app import branding
from app.config import get_settings
from app.db import utcnow
from app.jobs._helpers import job_lifecycle, log
from app.jobs.ai_log_job import (
REFERENCE_LINE,
_latest_quotes_by_group,
_recent_headlines_by_bucket,
_month_spend,
)
from app.models import EmailSend, User
from app.routers.email import sign_unsubscribe_token
from app.services.email_service import render_digest_email, send_email
from app.services.openrouter import (
PROMPT_VERSION,
active_model,
build_daily_digest_prompt,
build_weekly_digest_prompt,
call_llm,
llm_configured,
)
from app.services.access import paid_status
# Indirection so tests can monkeypatch the "current time" without
# touching the system clock.
def _now() -> datetime:
return utcnow()
async def _opt_in_recipients(session, *, paid_only: bool) -> list[User]:
stmt = select(User).where(User.email_digest_opt_in.is_(True))
rows = (await session.execute(stmt)).scalars().all()
if paid_only:
rows = [u for u in rows if paid_status(u).active]
return rows
async def _already_sent_today(session, user_id: int, kind: str, today: datetime) -> bool:
"""True if an EmailSend row exists for this user+kind on the same UTC
day, with status in ('sent','error'). 'error' counts because we don't
want to keep retrying a bad address inside the same daily slot."""
day_start = today.replace(hour=0, minute=0, second=0, microsecond=0)
day_end = day_start + timedelta(days=1)
stmt = select(EmailSend.id).where(
EmailSend.user_id == user_id,
EmailSend.kind == kind,
EmailSend.sent_at >= day_start,
EmailSend.sent_at < day_end,
EmailSend.status.in_(("sent", "error")),
)
return (await session.execute(stmt)).first() is not None
async def _generate_variants(client, kind: str, ctx: dict) -> dict[str, str]:
"""Returns {tone: html_content}. Missing tone means generation failed
for that variant — skip recipients on that tone."""
builder = build_weekly_digest_prompt if kind == "weekly" else build_daily_digest_prompt
out: dict[str, str] = {}
for tone in ("NOVICE", "INTERMEDIATE"):
sys_, usr = builder(tone=tone, **ctx)
try:
result = await call_llm(
client,
[{"role": "system", "content": sys_},
{"role": "user", "content": usr}],
)
out[tone] = result.content
log.info("digest.variant_ok", kind=kind, tone=tone,
prompt_tokens=result.prompt_tokens,
completion_tokens=result.completion_tokens)
except Exception as e:
log.error("digest.variant_failed", kind=kind, tone=tone,
error=str(e)[:200])
return out
def _kind_for_today(today: datetime) -> str | None:
"""Sunday → weekly. MonSat → daily. None means 'no run today'."""
return "weekly" if today.weekday() == 6 else "daily"
async def _send_one(user: User, kind: str, content_html: str, date_str: str,
session) -> None:
settings_url = f"{branding.SITE_URL}/settings"
unsubscribe_url = (
f"{branding.SITE_URL}/email/unsubscribe"
f"?token={sign_unsubscribe_token(user.id)}"
)
subject, text_body, html_body = render_digest_email(
kind=kind, date_str=date_str,
content_html=content_html,
unsubscribe_url=unsubscribe_url,
settings_url=settings_url,
)
try:
await send_email(to=user.email, subject=subject,
text_body=text_body, html_body=html_body)
status_ = "sent"
err = None
except Exception as e:
status_ = "error"
err = str(e)[:255]
log.error("digest.send_failed", user_id=user.id, error=err)
session.add(EmailSend(
user_id=user.id, kind=kind, sent_at=_now(),
status=status_, error=err,
))
await session.commit()
async def run() -> None:
async with job_lifecycle("email_digest_job") as (session, jr):
if jr.status == "skipped":
return
s = get_settings()
if not llm_configured():
log.warning("digest.skipped_no_key", provider=s.LLM_PROVIDER)
jr.status = "skipped"
return
today = _now()
kind = _kind_for_today(today)
date_str = today.strftime("%Y-%m-%d")
# Build the recipient list before LLM work — if it's empty,
# skip the LLM spend.
recipients = await _opt_in_recipients(
session, paid_only=(kind == "daily"),
)
# Filter out anyone already sent today (idempotency).
fresh: list[User] = []
for u in recipients:
if not await _already_sent_today(session, u.id, kind, today):
fresh.append(u)
if not fresh:
log.info("digest.no_fresh_recipients", kind=kind,
total=len(recipients))
jr.status = "skipped"
return
spent = await _month_spend(session)
if spent >= s.OPENROUTER_MONTHLY_CAP_USD:
log.warning("digest.cap_reached", spent=spent,
cap=s.OPENROUTER_MONTHLY_CAP_USD)
jr.status = "skipped"
jr.error = f"monthly cost cap reached (${spent:.2f})"
return
quotes = await _latest_quotes_by_group(session)
news = await _recent_headlines_by_bucket(
session, hours=(168 if kind == "weekly" else 24),
)
ctx = dict(
today=today,
quotes_by_group=quotes,
headlines_by_bucket=news,
reference_line=REFERENCE_LINE,
)
async with httpx.AsyncClient(follow_redirects=True) as client:
variants = await _generate_variants(client, kind, ctx)
if not variants:
log.warning("digest.all_variants_failed", kind=kind)
jr.status = "failed"
jr.error = "all variants failed"
return
written = 0
for u in fresh:
tone = (u.digest_tone or "INTERMEDIATE").upper()
content = variants.get(tone) or variants.get("INTERMEDIATE")
if content is None:
# Both variants failed for this user's tone — skip.
continue
await _send_one(u, kind, content, date_str, session)
await asyncio.sleep(0.1) # gentle SMTP pacing
written += 1
jr.items_written = written
log.info("digest.done", kind=kind, written=written,
prompt_version=PROMPT_VERSION)
if __name__ == "__main__":
asyncio.run(run())
```
- [ ] **Step 4: Run the tests, expect PASS**
```bash
pytest tests/test_email_digest_job.py -v
```
Expected: 3 passed.
- [ ] **Step 5: Commit**
```bash
git add app/jobs/email_digest_job.py tests/test_email_digest_job.py
git commit -m "digest: daily/weekly job w/ EmailSend idempotency"
```
---
## Task 8: Scheduler registration
**Files:**
- Modify: `app/scheduler_main.py`
- [ ] **Step 1: Add the cron entry**
Open `app/scheduler_main.py`. Locate the block where the other jobs are registered (line ~42 onward — `sched.add_job(ai_log_job.run, ...)` etc.). Add an import at the top and a new `sched.add_job` line:
```python
from app.jobs import email_digest_job
# ...
sched.add_job(
email_digest_job.run,
CronTrigger(hour=6, minute=30),
name="email_digest_job",
id="email_digest_job",
)
```
- [ ] **Step 2: Verify the scheduler boots clean**
```bash
docker compose restart scheduler
docker compose logs --tail=40 scheduler
```
Expected: `scheduler.started jobs=[..., 'email_digest_job']` in the log.
- [ ] **Step 3: Commit**
```bash
git add app/scheduler_main.py
git commit -m "scheduler: register email_digest_job at 06:30 UTC"
```
---
## Task 9: Settings page section + PATCH endpoint
**Files:**
- Modify: `app/routers/api.py` (add PATCH /settings/digest)
- Modify: `app/templates/settings.html`
- Modify: `app/routers/pages.py` (settings_page context: pass last EmailSend)
- Create: `tests/test_settings_digest_api.py`
- [ ] **Step 1: Write the failing test**
Create `tests/test_settings_digest_api.py`:
```python
"""PATCH /api/settings/digest persists opt-in + tone."""
from __future__ import annotations
import asyncio
import pytest
def _build(tmp_path):
from fastapi import FastAPI
from fastapi.testclient import TestClient
from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine
from app import db as db_mod
from app.auth import sign_session
from app.db import Base
from app.models import User
from app.routers import api as api_router
engine = create_async_engine(f"sqlite+aiosqlite:///{tmp_path}/s.db")
factory = async_sessionmaker(engine, expire_on_commit=False)
db_mod._engine = engine
db_mod._session_factory = factory
async def _seed():
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
async with factory() as s:
s.add(User(id=1, email="u@x", tier="paid",
email_digest_opt_in=True))
await s.commit()
asyncio.run(_seed())
app = FastAPI()
app.include_router(api_router.router, prefix="/api")
return TestClient(app), sign_session(1)
def test_patch_round_trip(tmp_path):
client, sess = _build(tmp_path)
r = client.patch(
"/api/settings/digest",
json={"opt_in": False, "tone": "NOVICE"},
cookies={"cassandra_session": sess},
)
assert r.status_code == 200, r.text
assert r.json() == {"opt_in": False, "tone": "NOVICE"}
# Round-trip GET to confirm persistence.
async def _check():
from app import db as db_mod
from app.models import User
async with db_mod._session_factory() as s:
u = await s.get(User, 1)
assert u.email_digest_opt_in is False
assert u.digest_tone == "NOVICE"
asyncio.run(_check())
def test_patch_rejects_invalid_tone(tmp_path):
client, sess = _build(tmp_path)
r = client.patch(
"/api/settings/digest",
json={"opt_in": True, "tone": "PRO"}, # not in NOVICE|INTERMEDIATE
cookies={"cassandra_session": sess},
)
assert r.status_code == 422
def test_patch_requires_auth(tmp_path):
client, _ = _build(tmp_path)
r = client.patch("/api/settings/digest",
json={"opt_in": True, "tone": "NOVICE"})
assert r.status_code in (401, 303)
```
- [ ] **Step 2: Run the failing test**
```bash
pytest tests/test_settings_digest_api.py -v
```
Expected: FAIL — endpoint not defined.
- [ ] **Step 3: Add the endpoint**
Open `app/routers/api.py`. Near the other settings-related code (or at the bottom of the file), add:
```python
from pydantic import BaseModel, Field
from typing import Literal
class DigestPrefsIn(BaseModel):
opt_in: bool
tone: Literal["NOVICE", "INTERMEDIATE"]
class DigestPrefsOut(BaseModel):
opt_in: bool
tone: str
@router.patch("/settings/digest", response_model=DigestPrefsOut)
async def patch_digest_prefs(
payload: DigestPrefsIn,
principal: CurrentUser = Depends(require_auth),
session: AsyncSession = Depends(get_session),
) -> DigestPrefsOut:
if principal.user is None:
# Admin bearer-token path: nothing to persist.
raise HTTPException(status_code=400, detail="no_user_context")
principal.user.email_digest_opt_in = payload.opt_in
principal.user.digest_tone = payload.tone
await session.commit()
return DigestPrefsOut(opt_in=payload.opt_in, tone=payload.tone)
```
If `require_auth`, `CurrentUser`, or `HTTPException` aren't already imported, extend the import block at the top of the file.
- [ ] **Step 4: Run the tests, expect PASS**
```bash
pytest tests/test_settings_digest_api.py -v
```
Expected: 3 passed.
- [ ] **Step 5: Add the Settings template section**
Open `app/templates/settings.html`. Find a sensible insertion point (after the existing tier/credit section, before the cloud-sync section). Add:
```html
<section class="settings-row">
<div class="settings-row__label">Email digests</div>
<div class="settings-row__value">
<label style="display:block; margin-bottom:8px;">
<input type="checkbox" id="digest-opt-in"
{% if user.email_digest_opt_in %}checked{% endif %}>
Send me digests
</label>
<div class="settings-row__hint" style="margin-bottom:8px;">
Free tier: Sunday weekly. Paid: daily MonSat plus the Sunday recap.
</div>
<div style="display:flex; gap:14px; margin-bottom:8px;">
<label><input type="radio" name="digest-tone" value="NOVICE"
{% if (user.digest_tone or 'INTERMEDIATE') == 'NOVICE' %}checked{% endif %}> Novice</label>
<label><input type="radio" name="digest-tone" value="INTERMEDIATE"
{% if (user.digest_tone or 'INTERMEDIATE') == 'INTERMEDIATE' %}checked{% endif %}> Intermediate</label>
</div>
<div class="settings-row__hint">
Last delivery:
<span id="digest-last">{% if last_email_send %}{{ last_email_send.sent_at.strftime("%Y-%m-%d %H:%M") }} UTC — {{ last_email_send.status }}{% else %}—{% endif %}</span>
</div>
<div id="digest-feedback" class="settings-row__hint" style="margin-top:6px;"></div>
</div>
</section>
<script>
(function () {
const opt = document.getElementById('digest-opt-in');
const tones = document.querySelectorAll('input[name="digest-tone"]');
const fb = document.getElementById('digest-feedback');
function patch() {
fb.textContent = 'Saving…';
const tone = Array.from(tones).find(t => t.checked)?.value || 'INTERMEDIATE';
fetch('/api/settings/digest', {
method: 'PATCH',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ opt_in: opt.checked, tone: tone }),
}).then(r => {
fb.textContent = r.ok ? 'Saved.' : 'Could not save — try again.';
}).catch(() => { fb.textContent = 'Network error.'; });
}
opt.addEventListener('change', patch);
tones.forEach(t => t.addEventListener('change', patch));
})();
</script>
```
- [ ] **Step 6: Pass the last EmailSend into the template context**
Open `app/routers/pages.py`. Find `settings_page` (line ~114). Inside the handler, before the existing `TemplateResponse(... "settings.html", ...)`, query the most recent EmailSend for the current user:
```python
from sqlalchemy import desc, select
from app.models import EmailSend
# ...
last_email_send = (await session.execute(
select(EmailSend)
.where(EmailSend.user_id == user.id)
.order_by(desc(EmailSend.sent_at))
.limit(1)
)).scalar_one_or_none()
```
Add `"last_email_send": last_email_send` to the template context dict that's passed to `templates.TemplateResponse(..., "settings.html", {...})`. There may be two TemplateResponse calls in this handler (admin path + user path) — only the user path needs this.
- [ ] **Step 7: Manual UI check**
```bash
docker compose restart app
```
Open `/settings` in a browser, flip the checkbox, change the tone. Confirm "Saved." appears. Reload — selections persist.
- [ ] **Step 8: Commit**
```bash
git add app/routers/api.py app/routers/pages.py app/templates/settings.html tests/test_settings_digest_api.py
git commit -m "settings: digest opt-in + tone (PATCH /api/settings/digest + UI)"
```
---
## Task 10: Sign-up checkbox
**Files:**
- Modify: `app/templates/verify.html`
- Modify: `app/routers/auth.py`
- Create: `tests/test_verify_subscribe.py`
- [ ] **Step 1: Write the failing test**
Create `tests/test_verify_subscribe.py`:
```python
"""Verify-POST persists the subscribe_to_digests form field."""
from __future__ import annotations
import asyncio
import pytest
def _build(tmp_path):
from fastapi import FastAPI
from fastapi.testclient import TestClient
from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine
from app import db as db_mod
from app.db import Base
from app.models import User
from app.routers import auth as auth_router
from app.auth import _pending_serializer
engine = create_async_engine(f"sqlite+aiosqlite:///{tmp_path}/v.db")
factory = async_sessionmaker(engine, expire_on_commit=False)
db_mod._engine = engine
db_mod._session_factory = factory
async def _seed():
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
async with factory() as s:
s.add(User(id=10, email="newbie@x", tier="free",
email_digest_opt_in=True))
await s.commit()
# Mark the OTP as "verified-able" — easiest is to monkeypatch
# otp_service.verify in the test, see below.
asyncio.run(_seed())
app = FastAPI()
app.include_router(auth_router.router)
pending = _pending_serializer().dumps({"email": "newbie@x", "uid": 10, "ref": None})
return TestClient(app), pending
def test_verify_with_unchecked_subscribe_disables_opt_in(tmp_path, monkeypatch):
from app.services import otp_service
async def _ok(*args, **kwargs):
return None
monkeypatch.setattr(otp_service, "verify", _ok)
client, pending = _build(tmp_path)
r = client.post(
"/verify",
data={"code": "000000"}, # form field "subscribe_to_digests" omitted
cookies={"cassandra_pending": pending},
follow_redirects=False,
)
assert r.status_code == 303, r.text
async def _check():
from app import db as db_mod
from app.models import User
async with db_mod._session_factory() as s:
u = await s.get(User, 10)
assert u.email_digest_opt_in is False
asyncio.run(_check())
def test_verify_with_checked_subscribe_keeps_opt_in(tmp_path, monkeypatch):
from app.services import otp_service
async def _ok(*args, **kwargs):
return None
monkeypatch.setattr(otp_service, "verify", _ok)
client, pending = _build(tmp_path)
r = client.post(
"/verify",
data={"code": "000000", "subscribe_to_digests": "on"},
cookies={"cassandra_pending": pending},
follow_redirects=False,
)
assert r.status_code == 303
async def _check():
from app import db as db_mod
from app.models import User
async with db_mod._session_factory() as s:
u = await s.get(User, 10)
assert u.email_digest_opt_in is True
asyncio.run(_check())
```
- [ ] **Step 2: Run the failing test**
```bash
pytest tests/test_verify_subscribe.py -v
```
Expected: FAIL — the verify handler doesn't read `subscribe_to_digests` yet, so the unchecked case still leaves opt-in `True` from the seed.
- [ ] **Step 3: Update the verify template**
Open `app/templates/verify.html`. Find the `<form method="post" action="/verify"…>` block. Inside the form, after the code input and before the submit button, add:
```html
<label style="display:block; margin:14px 0 0; font-size:12.5px; color:var(--muted); line-height:1.55;">
<input type="checkbox" name="subscribe_to_digests" value="on" checked
style="vertical-align:middle; margin-right:6px;">
Email me the digest — daily for paid, Sunday for everyone.
One-click unsubscribe in every email.
</label>
```
- [ ] **Step 4: Update the verify POST handler**
Open `app/routers/auth.py`. Find `verify_submit` (line ~215). Add `subscribe_to_digests` to the form fields and persist it after a successful OTP verify:
```python
@router.post("/verify")
async def verify_submit(
request: Request,
code: str = Form(...),
subscribe_to_digests: str | None = Form(default=None),
session: AsyncSession = Depends(get_session),
):
# ... existing OTP verification + pending lookup unchanged ...
user = await get_user(session, pending["uid"])
if user is None:
return RedirectResponse(url="/login", status_code=303)
user.last_login_at = utcnow()
# An unchecked HTML checkbox sends NO field; that means "opt out".
user.email_digest_opt_in = subscribe_to_digests is not None
await session.commit()
log.info("user.login", user_id=user.id, email=email)
# ... existing redirect + cookie setup unchanged ...
```
- [ ] **Step 5: Run the tests, expect PASS**
```bash
pytest tests/test_verify_subscribe.py -v
```
Expected: 2 passed.
- [ ] **Step 6: Commit**
```bash
git add app/templates/verify.html app/routers/auth.py tests/test_verify_subscribe.py
git commit -m "auth: subscribe-to-digests checkbox on verify (default on)"
```
---
## Task 11: Pricing copy updates
**Files:**
- Modify: `app/templates/pricing.html`
- [ ] **Step 1: Rewrite the intro paragraph**
Open `app/templates/pricing.html`. Replace lines 8-13 (the intro paragraph under the `Pricing` heading) with:
```html
<p>
Two tiers. The news aggregator and hourly AI interpretation are
available to everyone &mdash; paid extends the news window from
6 hours to 24 hours and adds daily editorial by email, plus the
portfolio-import features.
</p>
```
- [ ] **Step 2: Update the Free tier bullets**
Find the Free `<ul>` block. Replace it with:
```html
<ul>
<li>News aggregator &mdash; last 6 hours, auto-tagged by theme</li>
<li>Cross-asset macro signals across every asset class</li>
<li>Hourly AI interpretation of the news + the tape</li>
<li>Per-group cross-asset summaries</li>
<li>Novice / Intermediate reading levels</li>
<li><strong>Sunday weekly digest by email</strong></li>
<li class="tier-card__excluded">Portfolio import &amp; analysis</li>
<li class="tier-card__excluded">Encrypted cloud sync</li>
</ul>
```
- [ ] **Step 3: Update the Paid tier bullets**
Find the Paid `<ul>` block. Replace it with:
```html
<ul>
<li>Everything in Free</li>
<li><strong>News aggregator &mdash; full 24 hours</strong></li>
<li>Portfolio import (Trading 212 CSV)</li>
<li>AI commentary on diversification, sector and currency concentration, and macro-regime context for the holdings you upload</li>
<li>Optional encrypted cloud sync across devices</li>
<li><strong>Daily email digest</strong> (Mon&ndash;Sat) plus the Sunday recap</li>
</ul>
```
- [ ] **Step 4: Manual visual check**
```bash
docker compose restart app
```
Load `/pricing` in a browser. Confirm both tier cards render the new bullets and the intro paragraph reads cleanly.
- [ ] **Step 5: Commit**
```bash
git add app/templates/pricing.html
git commit -m "pricing: free=6h news + Sunday digest; paid=24h + daily digest"
```
---
## Task 12: `send-test-digest` admin CLI
**Files:**
- Modify: `app/cli.py`
- [ ] **Step 1: Add the subcommand handler**
Open `app/cli.py`. After `show_status`, add:
```python
async def send_test_digest(email: str, kind: str) -> int:
"""Generate a digest and send it to the named user immediately, ignoring
opt-in state and idempotency. Useful for previewing copy in your own
inbox before a real run lands."""
import httpx
from app import branding
from app.db import get_session_factory
from app.jobs.ai_log_job import (
REFERENCE_LINE, _latest_quotes_by_group, _recent_headlines_by_bucket,
)
from app.jobs.email_digest_job import _send_one, _generate_variants
from app.services.openrouter import llm_configured
if kind not in ("daily", "weekly"):
print(f"error: kind must be 'daily' or 'weekly' (got {kind!r})",
file=sys.stderr)
return 2
if not llm_configured():
print("error: LLM provider not configured (set OPENROUTER_API_KEY)",
file=sys.stderr)
return 1
factory = get_session_factory()
async with factory() as session:
user = await _get_user_by_email(session, email)
if user is None:
print(f"error: no user with email {email!r}", file=sys.stderr)
return 1
today = _utcnow()
quotes = await _latest_quotes_by_group(session)
news = await _recent_headlines_by_bucket(
session, hours=(168 if kind == "weekly" else 24),
)
ctx = dict(today=today, quotes_by_group=quotes,
headlines_by_bucket=news, reference_line=REFERENCE_LINE)
async with httpx.AsyncClient(follow_redirects=True) as client:
variants = await _generate_variants(client, kind, ctx)
tone = (user.digest_tone or "INTERMEDIATE").upper()
content = variants.get(tone) or variants.get("INTERMEDIATE")
if content is None:
print("error: all LLM variants failed", file=sys.stderr)
return 1
date_str = today.strftime("%Y-%m-%d")
await _send_one(user, kind, content, date_str, session)
print(f"sent {kind} digest to {email} (tone={tone})")
return 0
```
- [ ] **Step 2: Wire it into the argparse dispatcher**
Find `build_parser()` (line ~97). Add a new subparser:
```python
t = sub.add_parser("send-test-digest",
help="Send one digest immediately (bypasses opt-in/idempotency)")
t.add_argument("email")
t.add_argument("kind", choices=("daily", "weekly"))
```
Find `_dispatch(args)` (line ~114). Add a branch:
```python
if args.cmd == "send-test-digest":
return await send_test_digest(args.email, args.kind)
```
(The exact form depends on the existing dispatcher's pattern — match it.)
- [ ] **Step 3: Smoke-test the CLI**
```bash
docker compose exec app python -m app.cli send-test-digest you@example.com daily
```
Expected output: `sent daily digest to you@example.com (tone=INTERMEDIATE)`. Check your inbox — the email should render with the new template + working unsubscribe link.
- [ ] **Step 4: Commit**
```bash
git add app/cli.py
git commit -m "cli: send-test-digest for previewing digest emails"
```
---
## Final Verification
- [ ] **Step 1: Run the full suite**
```bash
pytest tests/ -q
```
Expected: all tests pass.
- [ ] **Step 2: Manual smoke**
1. Visit `/news` while signed out — only ~6h of headlines, footer says "Free tier, upgrade for 24h".
2. Sign in as a free user — same 6h cap, footer present.
3. Sign in as a paid user (or admin) — 24h, no footer.
4. Visit `/settings` — Digest section present; toggle persists across reloads.
5. Visit `/pricing` — Free has Sunday-weekly bullet; Paid has 24h + daily bullets.
6. Sign up a new account — verify the "Email me the digest" checkbox is visible and on by default.
7. Send a test digest with the CLI — check inbox, click unsubscribe, confirm DB flag flipped, confirm Settings now shows opt-in=off.
- [ ] **Step 3: Push**
After the operator reviews and tags the work, push:
```bash
git push
```
(Up to the operator — the spec says nothing about auto-push.)