read.markets/docs/superpowers/plans/2026-05-25-beta-mode-and-paid-gap.md
Giorgio Gilestro 8bc546220d docs: implementation plan for beta + paid-gap rollout
Twelve-task plan covering the BETA chip, free-tier 6h news cap, daily
+ Sunday digest job, one-click unsubscribe, settings UI, sign-up
checkbox, pricing copy, and an admin send-test-digest CLI. Each task
is TDD where feasible.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-25 18:28:39 +02:00

68 KiB
Raw Blame History

Beta Mode + Paid/Free Gap Implementation Plan

For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (- [ ]) syntax for tracking.

Goal: Ship the closed-beta launch package: a visible BETA chip in the app chrome, a 6-hour free-tier news cap, and email digests (daily for paid MonSat, Sunday weekly for everyone), all gated by an opt-in flag.

Architecture: Three coordinated changes layered onto existing FastAPI + APScheduler + MariaDB + OpenRouter + SMTP infrastructure. No new external services. New persisted state: two columns on users (email_digest_opt_in, digest_tone) and one new table (email_sends). One new job (email_digest_job), one new router (email), two new prompt builders.

Tech Stack: FastAPI · SQLAlchemy 2.0 (async) · Alembic · APScheduler · aiosmtplib · itsdangerous · pytest + aiosqlite (tests) · Jinja2 templates · vanilla JS + HTMX.

Spec: docs/superpowers/specs/2026-05-25-beta-mode-and-paid-gap-design.md


File Map

New files

  • app/jobs/email_digest_job.py — orchestrator (daily MonSat for paid, weekly Sunday for all opt-in)
  • app/routers/email.pyGET /email/unsubscribe?token=…
  • alembic/versions/0017_email_digest.py — migration
  • tests/test_news_window.py — news-cap regression
  • tests/test_email_digest_job.py — job recipient selection, idempotency
  • tests/test_email_unsubscribe.py — token roundtrip + endpoint
  • tests/test_email_render.py — digest email renderer
  • tests/test_digest_prompts.py — prompt builder unit tests
  • tests/test_settings_digest_api.py — PATCH endpoint
  • tests/test_verify_subscribe.py — sign-up checkbox

Modified files

  • app/config.py — add BETA_MODE flag
  • app/templates_env.py — expose BETA_MODE to templates
  • app/templates/base.html — render the chip
  • app/static/css/cassandra.css.beta-chip styling
  • app/services/access.pyFREE_NEWS_WINDOW_HOURS constant
  • app/routers/api.pynews_list clamps since_hours for non-paid; PATCH endpoint for digest prefs
  • app/templates/partials/news.html — capped-footer marker
  • app/models.pyUser.email_digest_opt_in, User.digest_tone, EmailSend model
  • app/services/openrouter.pybuild_daily_digest_prompt, build_weekly_digest_prompt, bump PROMPT_VERSION
  • app/services/email_service.pyrender_digest_email, send_digest
  • app/scheduler_main.py — register the digest job at 06:30 UTC
  • app/templates/settings.html — Email digests section
  • app/templates/verify.html — subscribe checkbox
  • app/routers/auth.py — read subscribe_to_digests on verify POST
  • app/templates/pricing.html — updated tier copy
  • app/cli.pysend-test-digest command
  • app/main.py — include new email router

Task 1: BETA chip

Files:

  • Modify: app/config.py

  • Modify: app/templates_env.py

  • Modify: app/templates/base.html (line ~140)

  • Modify: app/static/css/cassandra.css

  • Step 1: Add BETA_MODE to config

Open app/config.py. Find the Settings class. Add:

    BETA_MODE: bool = True  # Shows a "BETA" pill in the app header. Flip to False at GA.

Place it adjacent to other display/flag-style settings (alongside CASSANDRA_TONE or the LLM caps — whichever cluster fits).

  • Step 2: Expose the flag to templates

Open app/templates_env.py. After the existing templates.env.globals[...] block (around line 75), add:

from app.config import get_settings as _get_settings  # if not already imported
templates.env.globals["BETA_MODE"] = _get_settings().BETA_MODE

If get_settings is already imported in the file under another alias, reuse it.

  • Step 3: Render the chip in the app header

Open app/templates/base.html. Find the brand link (line ~140):

    <a href="/" class="brand" aria-label="Dashboard">{{ BRAND_NAME }}</a>

Insert immediately after it:

    {% if BETA_MODE %}<span class="beta-chip" title="Beta — feedback welcome at hello@read.markets">BETA</span>{% endif %}
  • Step 4: Style the chip

Open app/static/css/cassandra.css. Append at the bottom of the file:

/* BETA indicator pill in the app header — see app/templates/base.html. */
.beta-chip {
  display: inline-block;
  margin-left: 8px;
  padding: 2px 7px;
  font-size: 10px;
  font-weight: 700;
  letter-spacing: 0.14em;
  font-family: var(--font-mono);
  color: var(--bg);
  background: var(--accent);
  border-radius: 2px;
  vertical-align: middle;
  user-select: none;
}
  • Step 5: Manual visual check

Run the app locally (docker compose restart app — the new volume mount picks up changes). Load any logged-in page and confirm the BETA pill renders next to the brand. Confirm the chip does NOT appear on /pricing (which uses public_base.html, not base.html).

  • Step 6: Commit
git add app/config.py app/templates_env.py app/templates/base.html app/static/css/cassandra.css
git commit -m "beta: header chip flagged by BETA_MODE config (default on)"

Task 2: Free-tier news window cap

Files:

  • Modify: app/services/access.py

  • Modify: app/routers/api.py (lines 222-280, news_list)

  • Modify: app/templates/partials/news.html

  • Create: tests/test_news_window.py

  • Step 1: Write the failing test

Create tests/test_news_window.py:

"""Free vs paid window clamp on /api/news."""
from __future__ import annotations

import asyncio
from datetime import datetime, timedelta, timezone

import pytest


def _build_app(tmp_path):
    from fastapi import FastAPI
    from fastapi.testclient import TestClient
    from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine

    from app import db as db_mod
    from app.auth import sign_session
    from app.db import Base
    from app.models import Headline, User
    from app.routers import api as api_router

    engine = create_async_engine(f"sqlite+aiosqlite:///{tmp_path}/news.db")
    factory = async_sessionmaker(engine, expire_on_commit=False)
    db_mod._engine = engine
    db_mod._session_factory = factory

    now = datetime.now(timezone.utc)

    async def _seed():
        async with engine.begin() as conn:
            await conn.run_sync(Base.metadata.create_all)
        async with factory() as s:
            s.add(User(id=1, email="free@x", tier="free"))
            s.add(User(id=2, email="paid@x", tier="paid"))
            # Headlines: one 1h old, one 12h old, one 20h old.
            for hours_old, title in ((1, "fresh"), (12, "mid"), (20, "old")):
                s.add(Headline(
                    source="test", title=title, url=f"https://e/{title}",
                    category="general",
                    published_at=now - timedelta(hours=hours_old),
                    fetched_at=now,
                    tags=[],
                ))
            await s.commit()

    asyncio.run(_seed())

    app = FastAPI()
    app.include_router(api_router.router, prefix="/api")
    client = TestClient(app)
    return client, sign_session(1), sign_session(2)


@pytest.mark.skipif(False, reason="requires aiosqlite + httpx")
def test_free_user_clamped_to_6h(tmp_path):
    client, free_sess, _ = _build_app(tmp_path)
    r = client.get("/api/news?since_hours=24",
                   cookies={"cassandra_session": free_sess})
    assert r.status_code == 200
    titles = [h["title"] for h in r.json()]
    assert "fresh" in titles
    assert "mid" not in titles  # 12h ago, beyond 6h
    assert "old" not in titles


def test_paid_user_full_24h(tmp_path):
    client, _, paid_sess = _build_app(tmp_path)
    r = client.get("/api/news?since_hours=24",
                   cookies={"cassandra_session": paid_sess})
    assert r.status_code == 200
    titles = [h["title"] for h in r.json()]
    assert {"fresh", "mid", "old"} <= set(titles)


def test_anonymous_clamped_to_6h(tmp_path):
    client, _, _ = _build_app(tmp_path)
    r = client.get("/api/news?since_hours=24")
    assert r.status_code == 200
    titles = [h["title"] for h in r.json()]
    assert "fresh" in titles
    assert "mid" not in titles
  • Step 2: Run the failing test
pytest tests/test_news_window.py -v

Expected: FAIL — the clamp doesn't exist yet, so the free/anonymous calls return 12h and 20h headlines too.

  • Step 3: Add the constant

Open app/services/access.py. Below the imports / above _utcnow, add:

# How many hours of news the free tier sees. Paid sees whatever the
# endpoint's `since_hours` param requests (up to its own max).
FREE_NEWS_WINDOW_HOURS = 6.0
  • Step 4: Clamp the endpoint

Open app/routers/api.py. Find news_list (line 222). Update:

@router.get("/news")
async def news_list(
    request: Request,
    session: AsyncSession = Depends(get_session),
    principal: CurrentUser | None = Depends(maybe_current_user),
    category: str | None = Query(None),
    since_hours: float = Query(24.0, ge=0.1, le=720.0),
    limit: int = Query(50, ge=1, le=500),
    tags: str | None = Query(None, description="comma-separated include list"),
    exclude_tags: str | None = Query(None, description="comma-separated exclude list"),
    as_: str | None = Query(default=None, alias="as"),
):
    from app.services.news_tagging import TAG_LABELS, TAG_VOCABULARY
    from app.services.access import FREE_NEWS_WINDOW_HOURS, is_paid_active

    effective_hours = since_hours
    capped = not is_paid_active(principal)
    if capped:
        effective_hours = min(since_hours, FREE_NEWS_WINDOW_HOURS)

    cutoff = utcnow() - timedelta(hours=effective_hours)
    # ... rest of the function unchanged through `filtered = ...` ...

Also add maybe_current_user to the imports at the top of app/routers/api.py:

from app.auth import maybe_current_user

(The existing from app.auth import … line may already import other names — extend it.)

In the as_ == "html" branch, extend the template context with capped and window_hours:

        return templates.TemplateResponse(
            request, "partials/news.html",
            {"headlines": items,
             "tag_vocabulary": TAG_VOCABULARY,
             "tag_labels": TAG_LABELS,
             "active_include": sorted(include),
             "active_exclude": sorted(exclude),
             "capped": capped,
             "window_hours": effective_hours},
        )

The JSON branch is unchanged — the test asserts via the JSON shape.

  • Step 5: Run the test, expect PASS
pytest tests/test_news_window.py -v

Expected: all three tests pass.

  • Step 6: Add the partial-template footer

Open app/templates/partials/news.html. At the bottom of the file (after the last existing item rendering and any close tags), add:

{% if capped %}
<div class="news-capped-note" style="margin-top:14px; padding:10px 12px; border:1px dashed var(--border); color:var(--muted); font-size:12px; line-height:1.55;">
  Free tier — showing the last {{ window_hours|int }} hours of news.
  <a href="/pricing" style="color:var(--accent);">Upgrade</a>
  for the full 24-hour feed plus daily and weekly email digests.
</div>
{% endif %}
  • Step 7: Run the full suite to catch regressions
pytest tests/ -x -q

Expected: no regressions (the existing test_news_* tests don't depend on the clamp).

  • Step 8: Commit
git add app/services/access.py app/routers/api.py app/templates/partials/news.html tests/test_news_window.py
git commit -m "news: clamp free + anonymous to last 6h; paid keeps 24h"

Task 3: DB model + Alembic migration

Files:

  • Modify: app/models.py

  • Create: alembic/versions/0017_email_digest.py

  • Step 1: Add columns to User

Open app/models.py. Find the User class (search for class User(Base)). Add two columns alongside the existing tier/credit fields:

    email_digest_opt_in: Mapped[bool] = mapped_column(
        Boolean, nullable=False, default=True, server_default=text("1"),
    )
    # NULL = use INTERMEDIATE at render time. Server-side mirror of the
    # dashboard tone, decoupled because the dashboard pref is localStorage.
    digest_tone: Mapped[str | None] = mapped_column(String(16))

If Boolean or text aren't already imported, extend the SQLAlchemy import line at the top of the file:

from sqlalchemy import (..., Boolean, text, ...)
  • Step 2: Add the EmailSend model

In the same file, after the existing model definitions, add:

class EmailSend(Base):
    """Audit row per digest email send. Used for idempotency (don't send
    twice on the same UTC day) and for surfacing 'last delivery' on the
    Settings page."""
    __tablename__ = "email_sends"

    id: Mapped[int] = mapped_column(BigInteger, primary_key=True, autoincrement=True)
    user_id: Mapped[int] = mapped_column(
        ForeignKey("users.id", ondelete="CASCADE"), nullable=False, index=True,
    )
    kind: Mapped[str] = mapped_column(String(16), nullable=False)   # "daily" | "weekly"
    sent_at: Mapped[datetime] = mapped_column(
        DateTime(timezone=True), default=utcnow, nullable=False,
    )
    status: Mapped[str] = mapped_column(String(16), nullable=False) # "sent" | "skipped" | "error"
    error: Mapped[str | None] = mapped_column(String(255))

    __table_args__ = (
        Index("ix_email_sends_user_kind_sent", "user_id", "kind", "sent_at"),
    )

Add any missing imports (BigInteger, ForeignKey, Index) at the top.

  • Step 3: Create the Alembic migration

Create alembic/versions/0017_email_digest.py:

"""email digests: User.email_digest_opt_in, User.digest_tone, email_sends table.

Revision ID: 0017
Revises: 0016
Create Date: 2026-05-25
"""
from typing import Sequence, Union

import sqlalchemy as sa
from alembic import op


revision: str = "0017"
down_revision: Union[str, None] = "0016"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
    op.add_column(
        "users",
        sa.Column(
            "email_digest_opt_in", sa.Boolean(), nullable=False,
            server_default=sa.text("1"),
        ),
    )
    op.add_column(
        "users",
        sa.Column("digest_tone", sa.String(length=16), nullable=True),
    )

    op.create_table(
        "email_sends",
        sa.Column("id", sa.BigInteger(), autoincrement=True, primary_key=True),
        sa.Column(
            "user_id", sa.Integer(), nullable=False,
        ),
        sa.Column("kind", sa.String(length=16), nullable=False),
        sa.Column("sent_at", sa.DateTime(timezone=True), nullable=False),
        sa.Column("status", sa.String(length=16), nullable=False),
        sa.Column("error", sa.String(length=255), nullable=True),
        sa.ForeignKeyConstraint(
            ["user_id"], ["users.id"], ondelete="CASCADE",
        ),
    )
    op.create_index(
        "ix_email_sends_user_kind_sent",
        "email_sends",
        ["user_id", "kind", "sent_at"],
    )
    op.create_index(
        op.f("ix_email_sends_user_id"), "email_sends", ["user_id"],
    )


def downgrade() -> None:
    op.drop_index(op.f("ix_email_sends_user_id"), table_name="email_sends")
    op.drop_index("ix_email_sends_user_kind_sent", table_name="email_sends")
    op.drop_table("email_sends")
    op.drop_column("users", "digest_tone")
    op.drop_column("users", "email_digest_opt_in")
  • Step 4: Apply the migration to the dev DB
docker compose exec app alembic upgrade head

Expected output ends with: Running upgrade 0016 -> 0017, email digests...

  • Step 5: Verify the columns exist
docker compose exec db mysql -ucassandra -p${MARIADB_PASSWORD} -e "DESCRIBE cassandra.users;" | grep -E "email_digest_opt_in|digest_tone"
docker compose exec db mysql -ucassandra -p${MARIADB_PASSWORD} -e "DESCRIBE cassandra.email_sends;"

Expected: both new columns on users and the full email_sends table appear.

  • Step 6: Run the test suite — confirm no regressions from the model change
pytest tests/ -x -q

Expected: all existing tests still pass (no test depends on the old User shape).

  • Step 7: Commit
git add app/models.py alembic/versions/0017_email_digest.py
git commit -m "db: add digest opt-in/tone on users, email_sends audit table"

Task 4: Daily / Weekly digest prompts

Files:

  • Modify: app/services/openrouter.py

  • Create: tests/test_digest_prompts.py

  • Step 1: Inspect the existing prompt scaffolding

Read app/services/openrouter.py to confirm the shape of build_system_prompt(tone, analysis) and build_user_prompt(...). The new functions will mirror that contract: return (system_prompt, user_prompt) strings ready for call_llm.

  • Step 2: Write the failing test

Create tests/test_digest_prompts.py:

"""Unit tests for the daily / weekly digest prompt builders."""
from __future__ import annotations

from datetime import datetime, timezone

from app.services.openrouter import (
    build_daily_digest_prompt,
    build_weekly_digest_prompt,
)


def _ctx():
    return dict(
        today=datetime(2026, 5, 25, 6, 30, tzinfo=timezone.utc),
        quotes_by_group={"equities": [{"symbol": "SPX", "price": 7500.0,
                                       "label": "S&P 500", "currency": "USD",
                                       "source": "test", "note": "",
                                       "as_of": None, "changes": {}}]},
        headlines_by_bucket={"general": [{"when": "2026-05-25T05:00:00+00:00",
                                          "source": "FT", "title": "Brent slides"}]},
        reference_line="S&P 7,501 (ATH) · VIX 18.0 · US 10y 4.45%",
    )


def test_daily_prompt_tone_intermediate():
    sys_, usr = build_daily_digest_prompt(tone="INTERMEDIATE", **_ctx())
    assert "INTERMEDIATE" in sys_.upper() or "intermediate" in sys_.lower()
    assert "Brent slides" in usr
    assert "daily" in sys_.lower()


def test_daily_prompt_tone_novice_differs():
    sys_int, _ = build_daily_digest_prompt(tone="INTERMEDIATE", **_ctx())
    sys_nov, _ = build_daily_digest_prompt(tone="NOVICE", **_ctx())
    assert sys_int != sys_nov


def test_weekly_prompt_mentions_week():
    sys_, usr = build_weekly_digest_prompt(tone="INTERMEDIATE", **_ctx())
    assert "week" in sys_.lower() or "weekly" in sys_.lower()
    assert "Brent slides" in usr


def test_prompts_return_strings():
    for fn in (build_daily_digest_prompt, build_weekly_digest_prompt):
        sys_, usr = fn(tone="INTERMEDIATE", **_ctx())
        assert isinstance(sys_, str) and isinstance(usr, str)
        assert len(sys_) > 50 and len(usr) > 50
  • Step 3: Run the failing test
pytest tests/test_digest_prompts.py -v

Expected: FAIL — ImportError: cannot import name 'build_daily_digest_prompt'.

  • Step 4: Implement the prompt builders

Open app/services/openrouter.py. After build_user_prompt, add:

def build_daily_digest_prompt(
    *,
    tone: str,
    today,
    quotes_by_group: dict,
    headlines_by_bucket: dict,
    reference_line: str,
) -> tuple[str, str]:
    """System + user prompt for the once-a-day editorial digest.

    Different from the hourly log: the daily digest reflects on the past
    24h and looks forward to the upcoming session. Longer, less
    'live-blogging,' more contextual. Target ~600 words."""
    tone_clause = (
        "Use plain English. Define any jargon on first use."
        if tone.upper() == "NOVICE"
        else "Write for a reader who already speaks markets fluently."
    )
    system = (
        "You write the daily editorial digest for Read the Markets. "
        f"Audience tone: {tone.upper()}. {tone_clause} "
        "Cover: (1) what mattered yesterday, (2) what to watch in today's "
        "EU and US sessions, (3) one cross-asset thread connecting them. "
        "No predictions of price level, no buy/sell language. Target ~600 "
        "words. Output HTML using only <p>, <h3>, <ul>, <li>, <strong>, "
        "<em> — no <html>, <head>, or <body> wrapper, no inline styles."
    )
    user = _digest_user_prompt(today, quotes_by_group, headlines_by_bucket, reference_line)
    return system, user


def build_weekly_digest_prompt(
    *,
    tone: str,
    today,
    quotes_by_group: dict,
    headlines_by_bucket: dict,
    reference_line: str,
) -> tuple[str, str]:
    """System + user prompt for the Sunday weekly recap + look-ahead.

    Sent to ALL opt-in users (free and paid). Target ~900 words."""
    tone_clause = (
        "Use plain English. Define any jargon on first use."
        if tone.upper() == "NOVICE"
        else "Write for a reader who already speaks markets fluently."
    )
    system = (
        "You write the Sunday weekly digest for Read the Markets. "
        f"Audience tone: {tone.upper()}. {tone_clause} "
        "Cover: (1) the week behind — what moved and why, "
        "(2) the week ahead — releases, earnings, central-bank meetings, "
        "(3) the cross-asset story to keep in mind. "
        "No predictions of price level, no buy/sell language. Target ~900 "
        "words. Output HTML using only <p>, <h3>, <ul>, <li>, <strong>, "
        "<em> — no <html>, <head>, or <body> wrapper, no inline styles."
    )
    user = _digest_user_prompt(today, quotes_by_group, headlines_by_bucket, reference_line)
    return system, user


def _digest_user_prompt(today, quotes_by_group, headlines_by_bucket, reference_line):
    """Shared user-message body used by both digest prompts. Same data
    shape as the hourly user prompt; reformatted for the digest context."""
    today_str = today.strftime("%A %d %B %Y") if hasattr(today, "strftime") else str(today)
    lines = [f"TODAY (UTC): {today_str}", "", f"REFERENCE: {reference_line}", ""]

    if headlines_by_bucket:
        lines.append("HEADLINES BY CATEGORY")
        for cat, items in headlines_by_bucket.items():
            lines.append(f"  [{cat}]")
            for h in items[:30]:
                when = h.get("when", "")
                src = h.get("source", "")
                title = h.get("title", "")
                lines.append(f"    {when} · {src} · {title}")
        lines.append("")

    if quotes_by_group:
        lines.append("LATEST QUOTES BY GROUP")
        for grp, items in quotes_by_group.items():
            lines.append(f"  [{grp}]")
            for q in items[:30]:
                sym = q.get("symbol", "")
                price = q.get("price", "")
                lbl = q.get("label", "")
                ccy = q.get("currency", "")
                lines.append(f"    {sym} ({lbl}) — {price} {ccy}")
        lines.append("")

    return "\n".join(lines)

Bump the PROMPT_VERSION constant at the top of the file. Locate it (search for PROMPT_VERSION =) and increment by one — e.g., 89. Add a short comment line above noting the bump is for digest prompts.

  • Step 5: Run the tests, expect PASS
pytest tests/test_digest_prompts.py -v

Expected: 4 passed.

  • Step 6: Commit
git add app/services/openrouter.py tests/test_digest_prompts.py
git commit -m "digest: daily + weekly prompt builders (NOVICE/INTERMEDIATE)"

Task 5: Digest email rendering

Files:

  • Modify: app/services/email_service.py

  • Create: tests/test_email_render.py

  • Step 1: Write the failing test

Create tests/test_email_render.py:

"""Unit tests for render_digest_email."""
from __future__ import annotations

from app.services.email_service import render_digest_email


def test_daily_subject_and_bodies():
    subj, text, html = render_digest_email(
        kind="daily",
        date_str="2026-05-25",
        content_html="<p>Markets did stuff today.</p>",
        unsubscribe_url="https://read.markets/email/unsubscribe?token=abc",
        settings_url="https://read.markets/settings",
    )
    assert "Daily" in subj
    assert "2026-05-25" in subj
    assert "Markets did stuff today" in html
    assert "abc" in html  # unsubscribe link landed
    assert "/settings" in html
    # Plain-text fallback strips HTML.
    assert "<p>" not in text
    assert "Markets did stuff today" in text


def test_weekly_subject_says_recap():
    subj, _, _ = render_digest_email(
        kind="weekly",
        date_str="2026-05-25",
        content_html="<p>x</p>",
        unsubscribe_url="https://x/u",
        settings_url="https://x/s",
    )
    assert "Weekly" in subj
    assert "recap" in subj.lower()


def test_invalid_kind_raises():
    import pytest
    with pytest.raises(ValueError):
        render_digest_email(
            kind="bogus", date_str="2026-05-25",
            content_html="<p>x</p>",
            unsubscribe_url="u", settings_url="s",
        )
  • Step 2: Run the failing test
pytest tests/test_email_render.py -v

Expected: FAIL — render_digest_email doesn't exist.

  • Step 3: Implement render_digest_email

Open app/services/email_service.py. Add at the bottom of the file:

# ---------------------------------------------------------------------------
# Digest email rendering
# ---------------------------------------------------------------------------

import html as _html_lib
import re as _re


_DIGEST_HTML_TEMPLATE = """\
<!DOCTYPE html>
<html lang="en">
<head>
  <meta charset="utf-8">
  <meta name="viewport" content="width=device-width, initial-scale=1">
  <meta name="color-scheme" content="light dark">
  <title>{brand}{label}</title>
  <style>
    @media (prefers-color-scheme: dark) {{
      body {{ background:{D_bg} !important; }}
      .card {{ background:{D_surface} !important; border-color:{D_border} !important; }}
      .h1, p, li {{ color:{D_text} !important; }}
      .muted {{ color:{D_muted} !important; }}
      a {{ color:{D_accent} !important; }}
    }}
  </style>
</head>
<body style="margin:0; padding:24px 12px; background:{L_bg}; font-family:{FONT_MONO}; color:{L_text};">
  <table role="presentation" cellpadding="0" cellspacing="0" border="0" align="center" width="100%" style="max-width:520px; margin:0 auto; border-collapse:separate;">
    <tr><td class="card" style="background:{L_surface}; border:1px solid {L_border}; padding:32px 28px;">
      <div class="muted" style="font-size:11px; letter-spacing:0.32em; color:{L_muted}; text-transform:uppercase;">
        &#9648;&nbsp;{brand_upper} &middot; {label_upper}
      </div>
      <div style="height:20px; line-height:20px; font-size:0;">&nbsp;</div>
      <div class="content" style="font-size:14px; line-height:1.65; color:{L_text};">
        {content_html}
      </div>
      <div style="height:24px; line-height:24px; font-size:0;">&nbsp;</div>
      <div style="border-top:1px solid {L_border};"></div>
      <div style="height:14px; line-height:14px; font-size:0;">&nbsp;</div>
      <div class="muted" style="font-size:11px; color:{L_muted};">
        <a href="{unsubscribe_url}" style="color:{L_accent};">Unsubscribe in one click</a>
        &middot; <a href="{settings_url}" style="color:{L_accent};">Manage preferences</a>
      </div>
    </td></tr>
  </table>
</body>
</html>
"""


def _strip_html_to_text(html_body: str) -> str:
    """Best-effort HTML → plain text for the multipart fallback. We don't
    need perfection — just readable prose for clients that won't render
    HTML."""
    # Block-level tags become double newlines, <br> becomes single newline.
    text = _re.sub(r"(?i)<(/(p|h[1-6]|li|ul|ol)|br\s*/?)>", "\n", html_body)
    text = _re.sub(r"<[^>]+>", "", text)
    text = _html_lib.unescape(text)
    text = _re.sub(r"\n{3,}", "\n\n", text)
    return text.strip()


def render_digest_email(
    *,
    kind: str,
    date_str: str,
    content_html: str,
    unsubscribe_url: str,
    settings_url: str,
) -> tuple[str, str, str]:
    """Returns (subject, text_body, html_body) for a digest email.

    `kind` is "daily" or "weekly". Anything else raises ValueError."""
    if kind == "daily":
        label = "Daily"
        subject = f"{branding.BRAND_NAME} · Daily — {date_str}"
    elif kind == "weekly":
        label = "Weekly recap"
        subject = f"{branding.BRAND_NAME} · Weekly recap — {date_str}"
    else:
        raise ValueError(f"unknown digest kind: {kind!r}")

    html_body = _DIGEST_HTML_TEMPLATE.format(
        brand=branding.BRAND_NAME,
        brand_upper=branding.BRAND_NAME.upper(),
        label=label,
        label_upper=label.upper(),
        FONT_MONO=branding.FONT_MONO,
        content_html=content_html,
        unsubscribe_url=unsubscribe_url,
        settings_url=settings_url,
        **{f"L_{k.replace('-', '_')}": v for k, v in branding.LIGHT.items()},
        **{f"D_{k.replace('-', '_')}": v for k, v in branding.DARK.items()},
    )

    text_lines = [
        f"{branding.BRAND_NAME}{label}",
        date_str,
        "",
        _strip_html_to_text(content_html),
        "",
        f"Unsubscribe: {unsubscribe_url}",
        f"Manage preferences: {settings_url}",
    ]
    text_body = "\n".join(text_lines)
    return subject, text_body, html_body
  • Step 4: Run the tests, expect PASS
pytest tests/test_email_render.py -v

Expected: 3 passed.

  • Step 5: Commit
git add app/services/email_service.py tests/test_email_render.py
git commit -m "email: render_digest_email — multipart digest template"

Task 6: Unsubscribe token + route

Files:

  • Create: app/routers/email.py

  • Modify: app/main.py (router include)

  • Create: tests/test_email_unsubscribe.py

  • Step 1: Write the failing test

Create tests/test_email_unsubscribe.py:

"""Unsubscribe token roundtrip + endpoint."""
from __future__ import annotations

import asyncio

import pytest


def _build_app(tmp_path, secret="testsecret"):
    import os
    os.environ["CASSANDRA_SESSION_SECRET"] = secret

    from fastapi import FastAPI
    from fastapi.testclient import TestClient
    from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine

    from app import db as db_mod
    from app.db import Base
    from app.models import User
    from app.routers import email as email_router
    from app.config import get_settings
    get_settings.cache_clear()  # pick up the new env var

    engine = create_async_engine(f"sqlite+aiosqlite:///{tmp_path}/u.db")
    factory = async_sessionmaker(engine, expire_on_commit=False)
    db_mod._engine = engine
    db_mod._session_factory = factory

    async def _seed():
        async with engine.begin() as conn:
            await conn.run_sync(Base.metadata.create_all)
        async with factory() as s:
            s.add(User(id=42, email="u@x", tier="paid", email_digest_opt_in=True))
            await s.commit()

    asyncio.run(_seed())

    app = FastAPI()
    app.include_router(email_router.router)
    return TestClient(app)


def test_sign_and_verify_token_roundtrip(tmp_path, monkeypatch):
    monkeypatch.setenv("CASSANDRA_SESSION_SECRET", "rt-secret-32-bytes-or-so-padding-here")
    from app.config import get_settings
    get_settings.cache_clear()
    from app.routers.email import sign_unsubscribe_token, verify_unsubscribe_token
    tok = sign_unsubscribe_token(42)
    assert verify_unsubscribe_token(tok) == 42
    assert verify_unsubscribe_token("garbage") is None


def test_get_unsubscribe_flips_flag(tmp_path):
    client = _build_app(tmp_path)
    from app.routers.email import sign_unsubscribe_token
    tok = sign_unsubscribe_token(42)
    r = client.get(f"/email/unsubscribe?token={tok}")
    assert r.status_code == 200
    assert "unsubscribed" in r.text.lower()

    # DB state changed.
    async def _check():
        from app import db as db_mod
        async with db_mod._session_factory() as s:
            from app.models import User
            u = await s.get(User, 42)
            assert u.email_digest_opt_in is False
    asyncio.run(_check())


def test_get_unsubscribe_invalid_token_returns_generic_page(tmp_path):
    client = _build_app(tmp_path)
    r = client.get("/email/unsubscribe?token=garbage")
    # We don't 4xx — that would leak token validity. Show the generic page.
    assert r.status_code == 200
    assert "unsubscribed" in r.text.lower() or "preferences" in r.text.lower()


def test_replay_is_idempotent(tmp_path):
    client = _build_app(tmp_path)
    from app.routers.email import sign_unsubscribe_token
    tok = sign_unsubscribe_token(42)
    r1 = client.get(f"/email/unsubscribe?token={tok}")
    r2 = client.get(f"/email/unsubscribe?token={tok}")
    assert r1.status_code == 200
    assert r2.status_code == 200
  • Step 2: Run the failing test
pytest tests/test_email_unsubscribe.py -v

Expected: FAIL — app.routers.email doesn't exist.

  • Step 3: Create the router

Create app/routers/email.py:

"""Email-related public routes.

Currently:
- GET /email/unsubscribe?token=...

The token is `itsdangerous.URLSafeSerializer` over a small payload,
signed with CASSANDRA_SESSION_SECRET. No auth dependency: the whole
point of one-click unsubscribe is that the user does not have to
sign in.
"""
from __future__ import annotations

from fastapi import APIRouter, Depends, Request, Query
from fastapi.responses import HTMLResponse
from itsdangerous import BadSignature, URLSafeSerializer
from sqlalchemy.ext.asyncio import AsyncSession

from app.config import get_settings
from app.db import get_session
from app.logging import get_logger
from app.models import User
from app.templates_env import templates


router = APIRouter()
log = get_logger("email_router")

_SALT = "digest-unsubscribe-v1"


def _serializer() -> URLSafeSerializer:
    s = get_settings()
    if not s.CASSANDRA_SESSION_SECRET:
        # In tests with no secret configured, fall back to a constant —
        # NEVER reach production; settings validation should catch this.
        return URLSafeSerializer("dev-only-empty-secret", salt=_SALT)
    return URLSafeSerializer(s.CASSANDRA_SESSION_SECRET, salt=_SALT)


def sign_unsubscribe_token(user_id: int) -> str:
    return _serializer().dumps({"uid": int(user_id), "purpose": "digest_optout"})


def verify_unsubscribe_token(token: str) -> int | None:
    try:
        data = _serializer().loads(token)
    except BadSignature:
        return None
    if not isinstance(data, dict):
        return None
    if data.get("purpose") != "digest_optout":
        return None
    try:
        return int(data["uid"])
    except (KeyError, TypeError, ValueError):
        return None


_CONFIRM_PAGE = """\
<!DOCTYPE html>
<html lang="en">
<head>
  <meta charset="utf-8">
  <title>Unsubscribed — {brand}</title>
  <link rel="stylesheet" href="/static/css/cassandra.css">
</head>
<body class="auth-shell">
  <div class="auth-card" style="max-width:480px;">
    <div class="auth-card__brand">{brand}</div>
    <div class="auth-card__hint">email preferences</div>
    <p class="auth-card__lede">You're unsubscribed from email digests.</p>
    <p style="font-size:13px; color:var(--muted); line-height:1.6;">
      You can re-enable digests any time from
      <a href="/settings" style="color:var(--accent);">Settings</a>.
    </p>
  </div>
</body>
</html>
"""


@router.get("/email/unsubscribe", response_class=HTMLResponse)
async def unsubscribe(
    request: Request,
    token: str = Query(...),
    session: AsyncSession = Depends(get_session),
):
    from app import branding
    uid = verify_unsubscribe_token(token)
    if uid is not None:
        user = await session.get(User, uid)
        if user is not None and user.email_digest_opt_in:
            user.email_digest_opt_in = False
            await session.commit()
            log.info("email.unsubscribe.ok", user_id=uid)
        else:
            log.info("email.unsubscribe.noop_or_unknown", user_id=uid)
    else:
        log.info("email.unsubscribe.bad_token")

    # Same confirmation page regardless — don't leak token validity.
    return HTMLResponse(_CONFIRM_PAGE.format(brand=branding.BRAND_NAME))
  • Step 4: Wire the router into the FastAPI app

Open app/main.py. Find the existing router includes (line ~85). Add:

from app.routers import email as email_router
# ...
app.include_router(email_router.router, tags=["email"])
  • Step 5: Run the tests, expect PASS
pytest tests/test_email_unsubscribe.py -v

Expected: 4 passed.

  • Step 6: Commit
git add app/routers/email.py app/main.py tests/test_email_unsubscribe.py
git commit -m "email: one-click unsubscribe endpoint w/ signed token"

Task 7: Email digest job + idempotency

Files:

  • Create: app/jobs/email_digest_job.py

  • Create: tests/test_email_digest_job.py

  • Step 1: Write the failing test

Create tests/test_email_digest_job.py:

"""Recipient selection + idempotency for the digest job."""
from __future__ import annotations

import asyncio
from datetime import datetime, timezone, timedelta
from unittest.mock import AsyncMock, patch

import pytest


def _bootstrap(tmp_path, today_weekday: int):
    """Spin up an in-memory DB with three users: a paid opt-in, a paid
    opt-out, a free opt-in. Returns the session factory + the test users."""
    from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine

    from app import db as db_mod
    from app.db import Base
    from app.models import User

    engine = create_async_engine(f"sqlite+aiosqlite:///{tmp_path}/dj.db")
    factory = async_sessionmaker(engine, expire_on_commit=False)
    db_mod._engine = engine
    db_mod._session_factory = factory

    async def _seed():
        async with engine.begin() as conn:
            await conn.run_sync(Base.metadata.create_all)
        async with factory() as s:
            s.add(User(id=1, email="paid_in@x", tier="paid", email_digest_opt_in=True))
            s.add(User(id=2, email="paid_out@x", tier="paid", email_digest_opt_in=False))
            s.add(User(id=3, email="free_in@x", tier="free", email_digest_opt_in=True))
            await s.commit()

    asyncio.run(_seed())
    return factory


def _patch_today(weekday: int):
    """Return a datetime whose weekday() == `weekday` (0=Mon, 6=Sun)."""
    base = datetime(2026, 5, 25, 6, 30, tzinfo=timezone.utc)  # Monday
    return base + timedelta(days=(weekday - base.weekday()) % 7)


def _stub_generate(content="<p>x</p>"):
    """Stub out the LLM call so the test never hits the network. Use a
    SimpleNamespace so we don't have to know the real result class name."""
    from types import SimpleNamespace
    async def _fake(_client, messages, **kwargs):
        return SimpleNamespace(
            content=content, model="stub",
            prompt_tokens=10, completion_tokens=10, cost_usd=0.0,
        )
    return _fake


@pytest.mark.skipif(False, reason="requires aiosqlite")
def test_daily_run_only_paid_opt_in(tmp_path):
    _bootstrap(tmp_path, today_weekday=0)
    from app.jobs import email_digest_job
    with patch("app.jobs.email_digest_job._now",
               return_value=_patch_today(0)), \
         patch("app.jobs.email_digest_job.send_email",
               new=AsyncMock()) as send_mock, \
         patch("app.jobs.email_digest_job.call_llm",
               new=AsyncMock(side_effect=_stub_generate())):
        asyncio.run(email_digest_job.run())
    # Only user_id=1 (paid + opt-in) received an email. Two calls
    # (one per tone), but only one recipient.
    sent_to = {kwargs["to"] for _, _, kwargs in send_mock.mock_calls
               if "to" in kwargs}
    addresses_sent = {call.kwargs.get("to") for call in send_mock.await_args_list}
    assert addresses_sent == {"paid_in@x"}


def test_weekly_run_includes_free_and_paid_opt_in(tmp_path):
    _bootstrap(tmp_path, today_weekday=6)
    from app.jobs import email_digest_job
    with patch("app.jobs.email_digest_job._now",
               return_value=_patch_today(6)), \
         patch("app.jobs.email_digest_job.send_email",
               new=AsyncMock()) as send_mock, \
         patch("app.jobs.email_digest_job.call_llm",
               new=AsyncMock(side_effect=_stub_generate())):
        asyncio.run(email_digest_job.run())
    addresses_sent = {call.kwargs.get("to") for call in send_mock.await_args_list}
    assert addresses_sent == {"paid_in@x", "free_in@x"}


def test_second_run_same_day_is_idempotent(tmp_path):
    _bootstrap(tmp_path, today_weekday=0)
    from app.jobs import email_digest_job
    with patch("app.jobs.email_digest_job._now",
               return_value=_patch_today(0)), \
         patch("app.jobs.email_digest_job.send_email",
               new=AsyncMock()) as send_mock, \
         patch("app.jobs.email_digest_job.call_llm",
               new=AsyncMock(side_effect=_stub_generate())):
        asyncio.run(email_digest_job.run())
        first_count = len(send_mock.await_args_list)
        asyncio.run(email_digest_job.run())
        second_count = len(send_mock.await_args_list)
    assert first_count > 0
    assert second_count == first_count, "second run should not re-send"
  • Step 2: Run the failing test
pytest tests/test_email_digest_job.py -v

Expected: FAIL — app.jobs.email_digest_job doesn't exist yet.

  • Step 3: Implement the job

Create app/jobs/email_digest_job.py:

"""Daily/weekly editorial email digest.

Runs once a day at 06:30 UTC via the scheduler. On Sundays sends the
weekly recap to every opt-in user (free + paid). On other days sends
the daily digest to opt-in paid users only.

Generates LLM content once per tone (NOVICE + INTERMEDIATE), then fans
out by SMTP. EmailSend audit rows guard against double-delivery if the
job is re-run within the same UTC day.
"""
from __future__ import annotations

import asyncio
from collections import defaultdict
from datetime import datetime, timedelta, timezone

import httpx
from sqlalchemy import desc, func, select

from app import branding
from app.config import get_settings
from app.db import utcnow
from app.jobs._helpers import job_lifecycle, log
from app.jobs.ai_log_job import (
    REFERENCE_LINE,
    _latest_quotes_by_group,
    _recent_headlines_by_bucket,
    _month_spend,
)
from app.models import EmailSend, User
from app.routers.email import sign_unsubscribe_token
from app.services.email_service import render_digest_email, send_email
from app.services.openrouter import (
    PROMPT_VERSION,
    active_model,
    build_daily_digest_prompt,
    build_weekly_digest_prompt,
    call_llm,
    llm_configured,
)
from app.services.access import paid_status


# Indirection so tests can monkeypatch the "current time" without
# touching the system clock.
def _now() -> datetime:
    return utcnow()


async def _opt_in_recipients(session, *, paid_only: bool) -> list[User]:
    stmt = select(User).where(User.email_digest_opt_in.is_(True))
    rows = (await session.execute(stmt)).scalars().all()
    if paid_only:
        rows = [u for u in rows if paid_status(u).active]
    return rows


async def _already_sent_today(session, user_id: int, kind: str, today: datetime) -> bool:
    """True if an EmailSend row exists for this user+kind on the same UTC
    day, with status in ('sent','error'). 'error' counts because we don't
    want to keep retrying a bad address inside the same daily slot."""
    day_start = today.replace(hour=0, minute=0, second=0, microsecond=0)
    day_end = day_start + timedelta(days=1)
    stmt = select(EmailSend.id).where(
        EmailSend.user_id == user_id,
        EmailSend.kind == kind,
        EmailSend.sent_at >= day_start,
        EmailSend.sent_at < day_end,
        EmailSend.status.in_(("sent", "error")),
    )
    return (await session.execute(stmt)).first() is not None


async def _generate_variants(client, kind: str, ctx: dict) -> dict[str, str]:
    """Returns {tone: html_content}. Missing tone means generation failed
    for that variant — skip recipients on that tone."""
    builder = build_weekly_digest_prompt if kind == "weekly" else build_daily_digest_prompt
    out: dict[str, str] = {}
    for tone in ("NOVICE", "INTERMEDIATE"):
        sys_, usr = builder(tone=tone, **ctx)
        try:
            result = await call_llm(
                client,
                [{"role": "system", "content": sys_},
                 {"role": "user", "content": usr}],
            )
            out[tone] = result.content
            log.info("digest.variant_ok", kind=kind, tone=tone,
                     prompt_tokens=result.prompt_tokens,
                     completion_tokens=result.completion_tokens)
        except Exception as e:
            log.error("digest.variant_failed", kind=kind, tone=tone,
                      error=str(e)[:200])
    return out


def _kind_for_today(today: datetime) -> str | None:
    """Sunday → weekly. MonSat → daily. None means 'no run today'."""
    return "weekly" if today.weekday() == 6 else "daily"


async def _send_one(user: User, kind: str, content_html: str, date_str: str,
                    session) -> None:
    settings_url = f"{branding.SITE_URL}/settings"
    unsubscribe_url = (
        f"{branding.SITE_URL}/email/unsubscribe"
        f"?token={sign_unsubscribe_token(user.id)}"
    )
    subject, text_body, html_body = render_digest_email(
        kind=kind, date_str=date_str,
        content_html=content_html,
        unsubscribe_url=unsubscribe_url,
        settings_url=settings_url,
    )
    try:
        await send_email(to=user.email, subject=subject,
                         text_body=text_body, html_body=html_body)
        status_ = "sent"
        err = None
    except Exception as e:
        status_ = "error"
        err = str(e)[:255]
        log.error("digest.send_failed", user_id=user.id, error=err)
    session.add(EmailSend(
        user_id=user.id, kind=kind, sent_at=_now(),
        status=status_, error=err,
    ))
    await session.commit()


async def run() -> None:
    async with job_lifecycle("email_digest_job") as (session, jr):
        if jr.status == "skipped":
            return
        s = get_settings()
        if not llm_configured():
            log.warning("digest.skipped_no_key", provider=s.LLM_PROVIDER)
            jr.status = "skipped"
            return

        today = _now()
        kind = _kind_for_today(today)
        date_str = today.strftime("%Y-%m-%d")

        # Build the recipient list before LLM work — if it's empty,
        # skip the LLM spend.
        recipients = await _opt_in_recipients(
            session, paid_only=(kind == "daily"),
        )
        # Filter out anyone already sent today (idempotency).
        fresh: list[User] = []
        for u in recipients:
            if not await _already_sent_today(session, u.id, kind, today):
                fresh.append(u)
        if not fresh:
            log.info("digest.no_fresh_recipients", kind=kind,
                     total=len(recipients))
            jr.status = "skipped"
            return

        spent = await _month_spend(session)
        if spent >= s.OPENROUTER_MONTHLY_CAP_USD:
            log.warning("digest.cap_reached", spent=spent,
                        cap=s.OPENROUTER_MONTHLY_CAP_USD)
            jr.status = "skipped"
            jr.error = f"monthly cost cap reached (${spent:.2f})"
            return

        quotes = await _latest_quotes_by_group(session)
        news = await _recent_headlines_by_bucket(
            session, hours=(168 if kind == "weekly" else 24),
        )
        ctx = dict(
            today=today,
            quotes_by_group=quotes,
            headlines_by_bucket=news,
            reference_line=REFERENCE_LINE,
        )

        async with httpx.AsyncClient(follow_redirects=True) as client:
            variants = await _generate_variants(client, kind, ctx)

        if not variants:
            log.warning("digest.all_variants_failed", kind=kind)
            jr.status = "failed"
            jr.error = "all variants failed"
            return

        written = 0
        for u in fresh:
            tone = (u.digest_tone or "INTERMEDIATE").upper()
            content = variants.get(tone) or variants.get("INTERMEDIATE")
            if content is None:
                # Both variants failed for this user's tone — skip.
                continue
            await _send_one(u, kind, content, date_str, session)
            await asyncio.sleep(0.1)  # gentle SMTP pacing
            written += 1

        jr.items_written = written
        log.info("digest.done", kind=kind, written=written,
                 prompt_version=PROMPT_VERSION)


if __name__ == "__main__":
    asyncio.run(run())
  • Step 4: Run the tests, expect PASS
pytest tests/test_email_digest_job.py -v

Expected: 3 passed.

  • Step 5: Commit
git add app/jobs/email_digest_job.py tests/test_email_digest_job.py
git commit -m "digest: daily/weekly job w/ EmailSend idempotency"

Task 8: Scheduler registration

Files:

  • Modify: app/scheduler_main.py

  • Step 1: Add the cron entry

Open app/scheduler_main.py. Locate the block where the other jobs are registered (line ~42 onward — sched.add_job(ai_log_job.run, ...) etc.). Add an import at the top and a new sched.add_job line:

from app.jobs import email_digest_job
# ...
sched.add_job(
    email_digest_job.run,
    CronTrigger(hour=6, minute=30),
    name="email_digest_job",
    id="email_digest_job",
)
  • Step 2: Verify the scheduler boots clean
docker compose restart scheduler
docker compose logs --tail=40 scheduler

Expected: scheduler.started jobs=[..., 'email_digest_job'] in the log.

  • Step 3: Commit
git add app/scheduler_main.py
git commit -m "scheduler: register email_digest_job at 06:30 UTC"

Task 9: Settings page section + PATCH endpoint

Files:

  • Modify: app/routers/api.py (add PATCH /settings/digest)

  • Modify: app/templates/settings.html

  • Modify: app/routers/pages.py (settings_page context: pass last EmailSend)

  • Create: tests/test_settings_digest_api.py

  • Step 1: Write the failing test

Create tests/test_settings_digest_api.py:

"""PATCH /api/settings/digest persists opt-in + tone."""
from __future__ import annotations

import asyncio

import pytest


def _build(tmp_path):
    from fastapi import FastAPI
    from fastapi.testclient import TestClient
    from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine

    from app import db as db_mod
    from app.auth import sign_session
    from app.db import Base
    from app.models import User
    from app.routers import api as api_router

    engine = create_async_engine(f"sqlite+aiosqlite:///{tmp_path}/s.db")
    factory = async_sessionmaker(engine, expire_on_commit=False)
    db_mod._engine = engine
    db_mod._session_factory = factory

    async def _seed():
        async with engine.begin() as conn:
            await conn.run_sync(Base.metadata.create_all)
        async with factory() as s:
            s.add(User(id=1, email="u@x", tier="paid",
                       email_digest_opt_in=True))
            await s.commit()

    asyncio.run(_seed())
    app = FastAPI()
    app.include_router(api_router.router, prefix="/api")
    return TestClient(app), sign_session(1)


def test_patch_round_trip(tmp_path):
    client, sess = _build(tmp_path)
    r = client.patch(
        "/api/settings/digest",
        json={"opt_in": False, "tone": "NOVICE"},
        cookies={"cassandra_session": sess},
    )
    assert r.status_code == 200, r.text
    assert r.json() == {"opt_in": False, "tone": "NOVICE"}

    # Round-trip GET to confirm persistence.
    async def _check():
        from app import db as db_mod
        from app.models import User
        async with db_mod._session_factory() as s:
            u = await s.get(User, 1)
            assert u.email_digest_opt_in is False
            assert u.digest_tone == "NOVICE"
    asyncio.run(_check())


def test_patch_rejects_invalid_tone(tmp_path):
    client, sess = _build(tmp_path)
    r = client.patch(
        "/api/settings/digest",
        json={"opt_in": True, "tone": "PRO"},  # not in NOVICE|INTERMEDIATE
        cookies={"cassandra_session": sess},
    )
    assert r.status_code == 422


def test_patch_requires_auth(tmp_path):
    client, _ = _build(tmp_path)
    r = client.patch("/api/settings/digest",
                     json={"opt_in": True, "tone": "NOVICE"})
    assert r.status_code in (401, 303)
  • Step 2: Run the failing test
pytest tests/test_settings_digest_api.py -v

Expected: FAIL — endpoint not defined.

  • Step 3: Add the endpoint

Open app/routers/api.py. Near the other settings-related code (or at the bottom of the file), add:

from pydantic import BaseModel, Field
from typing import Literal


class DigestPrefsIn(BaseModel):
    opt_in: bool
    tone: Literal["NOVICE", "INTERMEDIATE"]


class DigestPrefsOut(BaseModel):
    opt_in: bool
    tone: str


@router.patch("/settings/digest", response_model=DigestPrefsOut)
async def patch_digest_prefs(
    payload: DigestPrefsIn,
    principal: CurrentUser = Depends(require_auth),
    session: AsyncSession = Depends(get_session),
) -> DigestPrefsOut:
    if principal.user is None:
        # Admin bearer-token path: nothing to persist.
        raise HTTPException(status_code=400, detail="no_user_context")
    principal.user.email_digest_opt_in = payload.opt_in
    principal.user.digest_tone = payload.tone
    await session.commit()
    return DigestPrefsOut(opt_in=payload.opt_in, tone=payload.tone)

If require_auth, CurrentUser, or HTTPException aren't already imported, extend the import block at the top of the file.

  • Step 4: Run the tests, expect PASS
pytest tests/test_settings_digest_api.py -v

Expected: 3 passed.

  • Step 5: Add the Settings template section

Open app/templates/settings.html. Find a sensible insertion point (after the existing tier/credit section, before the cloud-sync section). Add:

<section class="settings-row">
  <div class="settings-row__label">Email digests</div>
  <div class="settings-row__value">
    <label style="display:block; margin-bottom:8px;">
      <input type="checkbox" id="digest-opt-in"
             {% if user.email_digest_opt_in %}checked{% endif %}>
      Send me digests
    </label>
    <div class="settings-row__hint" style="margin-bottom:8px;">
      Free tier: Sunday weekly. Paid: daily MonSat plus the Sunday recap.
    </div>
    <div style="display:flex; gap:14px; margin-bottom:8px;">
      <label><input type="radio" name="digest-tone" value="NOVICE"
        {% if (user.digest_tone or 'INTERMEDIATE') == 'NOVICE' %}checked{% endif %}> Novice</label>
      <label><input type="radio" name="digest-tone" value="INTERMEDIATE"
        {% if (user.digest_tone or 'INTERMEDIATE') == 'INTERMEDIATE' %}checked{% endif %}> Intermediate</label>
    </div>
    <div class="settings-row__hint">
      Last delivery:
      <span id="digest-last">{% if last_email_send %}{{ last_email_send.sent_at.strftime("%Y-%m-%d %H:%M") }} UTC — {{ last_email_send.status }}{% else %}—{% endif %}</span>
    </div>
    <div id="digest-feedback" class="settings-row__hint" style="margin-top:6px;"></div>
  </div>
</section>

<script>
(function () {
  const opt = document.getElementById('digest-opt-in');
  const tones = document.querySelectorAll('input[name="digest-tone"]');
  const fb = document.getElementById('digest-feedback');
  function patch() {
    fb.textContent = 'Saving…';
    const tone = Array.from(tones).find(t => t.checked)?.value || 'INTERMEDIATE';
    fetch('/api/settings/digest', {
      method: 'PATCH',
      headers: { 'Content-Type': 'application/json' },
      body: JSON.stringify({ opt_in: opt.checked, tone: tone }),
    }).then(r => {
      fb.textContent = r.ok ? 'Saved.' : 'Could not save — try again.';
    }).catch(() => { fb.textContent = 'Network error.'; });
  }
  opt.addEventListener('change', patch);
  tones.forEach(t => t.addEventListener('change', patch));
})();
</script>
  • Step 6: Pass the last EmailSend into the template context

Open app/routers/pages.py. Find settings_page (line ~114). Inside the handler, before the existing TemplateResponse(... "settings.html", ...), query the most recent EmailSend for the current user:

from sqlalchemy import desc, select
from app.models import EmailSend

# ...
last_email_send = (await session.execute(
    select(EmailSend)
    .where(EmailSend.user_id == user.id)
    .order_by(desc(EmailSend.sent_at))
    .limit(1)
)).scalar_one_or_none()

Add "last_email_send": last_email_send to the template context dict that's passed to templates.TemplateResponse(..., "settings.html", {...}). There may be two TemplateResponse calls in this handler (admin path + user path) — only the user path needs this.

  • Step 7: Manual UI check
docker compose restart app

Open /settings in a browser, flip the checkbox, change the tone. Confirm "Saved." appears. Reload — selections persist.

  • Step 8: Commit
git add app/routers/api.py app/routers/pages.py app/templates/settings.html tests/test_settings_digest_api.py
git commit -m "settings: digest opt-in + tone (PATCH /api/settings/digest + UI)"

Task 10: Sign-up checkbox

Files:

  • Modify: app/templates/verify.html

  • Modify: app/routers/auth.py

  • Create: tests/test_verify_subscribe.py

  • Step 1: Write the failing test

Create tests/test_verify_subscribe.py:

"""Verify-POST persists the subscribe_to_digests form field."""
from __future__ import annotations

import asyncio

import pytest


def _build(tmp_path):
    from fastapi import FastAPI
    from fastapi.testclient import TestClient
    from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine

    from app import db as db_mod
    from app.db import Base
    from app.models import User
    from app.routers import auth as auth_router
    from app.auth import _pending_serializer

    engine = create_async_engine(f"sqlite+aiosqlite:///{tmp_path}/v.db")
    factory = async_sessionmaker(engine, expire_on_commit=False)
    db_mod._engine = engine
    db_mod._session_factory = factory

    async def _seed():
        async with engine.begin() as conn:
            await conn.run_sync(Base.metadata.create_all)
        async with factory() as s:
            s.add(User(id=10, email="newbie@x", tier="free",
                       email_digest_opt_in=True))
            await s.commit()
            # Mark the OTP as "verified-able" — easiest is to monkeypatch
            # otp_service.verify in the test, see below.

    asyncio.run(_seed())
    app = FastAPI()
    app.include_router(auth_router.router)
    pending = _pending_serializer().dumps({"email": "newbie@x", "uid": 10, "ref": None})
    return TestClient(app), pending


def test_verify_with_unchecked_subscribe_disables_opt_in(tmp_path, monkeypatch):
    from app.services import otp_service

    async def _ok(*args, **kwargs):
        return None
    monkeypatch.setattr(otp_service, "verify", _ok)

    client, pending = _build(tmp_path)
    r = client.post(
        "/verify",
        data={"code": "000000"},  # form field "subscribe_to_digests" omitted
        cookies={"cassandra_pending": pending},
        follow_redirects=False,
    )
    assert r.status_code == 303, r.text

    async def _check():
        from app import db as db_mod
        from app.models import User
        async with db_mod._session_factory() as s:
            u = await s.get(User, 10)
            assert u.email_digest_opt_in is False
    asyncio.run(_check())


def test_verify_with_checked_subscribe_keeps_opt_in(tmp_path, monkeypatch):
    from app.services import otp_service

    async def _ok(*args, **kwargs):
        return None
    monkeypatch.setattr(otp_service, "verify", _ok)

    client, pending = _build(tmp_path)
    r = client.post(
        "/verify",
        data={"code": "000000", "subscribe_to_digests": "on"},
        cookies={"cassandra_pending": pending},
        follow_redirects=False,
    )
    assert r.status_code == 303

    async def _check():
        from app import db as db_mod
        from app.models import User
        async with db_mod._session_factory() as s:
            u = await s.get(User, 10)
            assert u.email_digest_opt_in is True
    asyncio.run(_check())
  • Step 2: Run the failing test
pytest tests/test_verify_subscribe.py -v

Expected: FAIL — the verify handler doesn't read subscribe_to_digests yet, so the unchecked case still leaves opt-in True from the seed.

  • Step 3: Update the verify template

Open app/templates/verify.html. Find the <form method="post" action="/verify"…> block. Inside the form, after the code input and before the submit button, add:

      <label style="display:block; margin:14px 0 0; font-size:12.5px; color:var(--muted); line-height:1.55;">
        <input type="checkbox" name="subscribe_to_digests" value="on" checked
               style="vertical-align:middle; margin-right:6px;">
        Email me the digest — daily for paid, Sunday for everyone.
        One-click unsubscribe in every email.
      </label>
  • Step 4: Update the verify POST handler

Open app/routers/auth.py. Find verify_submit (line ~215). Add subscribe_to_digests to the form fields and persist it after a successful OTP verify:

@router.post("/verify")
async def verify_submit(
    request: Request,
    code: str = Form(...),
    subscribe_to_digests: str | None = Form(default=None),
    session: AsyncSession = Depends(get_session),
):
    # ... existing OTP verification + pending lookup unchanged ...

    user = await get_user(session, pending["uid"])
    if user is None:
        return RedirectResponse(url="/login", status_code=303)
    user.last_login_at = utcnow()
    # An unchecked HTML checkbox sends NO field; that means "opt out".
    user.email_digest_opt_in = subscribe_to_digests is not None
    await session.commit()
    log.info("user.login", user_id=user.id, email=email)
    # ... existing redirect + cookie setup unchanged ...
  • Step 5: Run the tests, expect PASS
pytest tests/test_verify_subscribe.py -v

Expected: 2 passed.

  • Step 6: Commit
git add app/templates/verify.html app/routers/auth.py tests/test_verify_subscribe.py
git commit -m "auth: subscribe-to-digests checkbox on verify (default on)"

Task 11: Pricing copy updates

Files:

  • Modify: app/templates/pricing.html

  • Step 1: Rewrite the intro paragraph

Open app/templates/pricing.html. Replace lines 8-13 (the intro paragraph under the Pricing heading) with:

  <p>
    Two tiers. The news aggregator and hourly AI interpretation are
    available to everyone &mdash; paid extends the news window from
    6 hours to 24 hours and adds daily editorial by email, plus the
    portfolio-import features.
  </p>
  • Step 2: Update the Free tier bullets

Find the Free <ul> block. Replace it with:

    <ul>
      <li>News aggregator &mdash; last 6 hours, auto-tagged by theme</li>
      <li>Cross-asset macro signals across every asset class</li>
      <li>Hourly AI interpretation of the news + the tape</li>
      <li>Per-group cross-asset summaries</li>
      <li>Novice / Intermediate reading levels</li>
      <li><strong>Sunday weekly digest by email</strong></li>
      <li class="tier-card__excluded">Portfolio import &amp; analysis</li>
      <li class="tier-card__excluded">Encrypted cloud sync</li>
    </ul>
  • Step 3: Update the Paid tier bullets

Find the Paid <ul> block. Replace it with:

    <ul>
      <li>Everything in Free</li>
      <li><strong>News aggregator &mdash; full 24 hours</strong></li>
      <li>Portfolio import (Trading 212 CSV)</li>
      <li>AI commentary on diversification, sector and currency concentration, and macro-regime context for the holdings you upload</li>
      <li>Optional encrypted cloud sync across devices</li>
      <li><strong>Daily email digest</strong> (Mon&ndash;Sat) plus the Sunday recap</li>
    </ul>
  • Step 4: Manual visual check
docker compose restart app

Load /pricing in a browser. Confirm both tier cards render the new bullets and the intro paragraph reads cleanly.

  • Step 5: Commit
git add app/templates/pricing.html
git commit -m "pricing: free=6h news + Sunday digest; paid=24h + daily digest"

Task 12: send-test-digest admin CLI

Files:

  • Modify: app/cli.py

  • Step 1: Add the subcommand handler

Open app/cli.py. After show_status, add:

async def send_test_digest(email: str, kind: str) -> int:
    """Generate a digest and send it to the named user immediately, ignoring
    opt-in state and idempotency. Useful for previewing copy in your own
    inbox before a real run lands."""
    import httpx
    from app import branding
    from app.db import get_session_factory
    from app.jobs.ai_log_job import (
        REFERENCE_LINE, _latest_quotes_by_group, _recent_headlines_by_bucket,
    )
    from app.jobs.email_digest_job import _send_one, _generate_variants
    from app.services.openrouter import llm_configured

    if kind not in ("daily", "weekly"):
        print(f"error: kind must be 'daily' or 'weekly' (got {kind!r})",
              file=sys.stderr)
        return 2
    if not llm_configured():
        print("error: LLM provider not configured (set OPENROUTER_API_KEY)",
              file=sys.stderr)
        return 1

    factory = get_session_factory()
    async with factory() as session:
        user = await _get_user_by_email(session, email)
        if user is None:
            print(f"error: no user with email {email!r}", file=sys.stderr)
            return 1
        today = _utcnow()
        quotes = await _latest_quotes_by_group(session)
        news = await _recent_headlines_by_bucket(
            session, hours=(168 if kind == "weekly" else 24),
        )
        ctx = dict(today=today, quotes_by_group=quotes,
                   headlines_by_bucket=news, reference_line=REFERENCE_LINE)
        async with httpx.AsyncClient(follow_redirects=True) as client:
            variants = await _generate_variants(client, kind, ctx)
        tone = (user.digest_tone or "INTERMEDIATE").upper()
        content = variants.get(tone) or variants.get("INTERMEDIATE")
        if content is None:
            print("error: all LLM variants failed", file=sys.stderr)
            return 1
        date_str = today.strftime("%Y-%m-%d")
        await _send_one(user, kind, content, date_str, session)
        print(f"sent {kind} digest to {email} (tone={tone})")
        return 0
  • Step 2: Wire it into the argparse dispatcher

Find build_parser() (line ~97). Add a new subparser:

    t = sub.add_parser("send-test-digest",
                       help="Send one digest immediately (bypasses opt-in/idempotency)")
    t.add_argument("email")
    t.add_argument("kind", choices=("daily", "weekly"))

Find _dispatch(args) (line ~114). Add a branch:

    if args.cmd == "send-test-digest":
        return await send_test_digest(args.email, args.kind)

(The exact form depends on the existing dispatcher's pattern — match it.)

  • Step 3: Smoke-test the CLI
docker compose exec app python -m app.cli send-test-digest you@example.com daily

Expected output: sent daily digest to you@example.com (tone=INTERMEDIATE). Check your inbox — the email should render with the new template + working unsubscribe link.

  • Step 4: Commit
git add app/cli.py
git commit -m "cli: send-test-digest for previewing digest emails"

Final Verification

  • Step 1: Run the full suite
pytest tests/ -q

Expected: all tests pass.

  • Step 2: Manual smoke
  1. Visit /news while signed out — only ~6h of headlines, footer says "Free tier, upgrade for 24h".
  2. Sign in as a free user — same 6h cap, footer present.
  3. Sign in as a paid user (or admin) — 24h, no footer.
  4. Visit /settings — Digest section present; toggle persists across reloads.
  5. Visit /pricing — Free has Sunday-weekly bullet; Paid has 24h + daily bullets.
  6. Sign up a new account — verify the "Email me the digest" checkbox is visible and on by default.
  7. Send a test digest with the CLI — check inbox, click unsubscribe, confirm DB flag flipped, confirm Settings now shows opt-in=off.
  • Step 3: Push

After the operator reviews and tags the work, push:

git push

(Up to the operator — the spec says nothing about auto-push.)