From 8bc546220dd17145ce1b3fc72a17ac5e082120ec Mon Sep 17 00:00:00 2001 From: Giorgio Gilestro Date: Mon, 25 May 2026 18:28:39 +0200 Subject: [PATCH] docs: implementation plan for beta + paid-gap rollout Twelve-task plan covering the BETA chip, free-tier 6h news cap, daily + Sunday digest job, one-click unsubscribe, settings UI, sign-up checkbox, pricing copy, and an admin send-test-digest CLI. Each task is TDD where feasible. Co-Authored-By: Claude Opus 4.7 --- .../2026-05-25-beta-mode-and-paid-gap.md | 2096 +++++++++++++++++ 1 file changed, 2096 insertions(+) create mode 100644 docs/superpowers/plans/2026-05-25-beta-mode-and-paid-gap.md diff --git a/docs/superpowers/plans/2026-05-25-beta-mode-and-paid-gap.md b/docs/superpowers/plans/2026-05-25-beta-mode-and-paid-gap.md new file mode 100644 index 0000000..328fe47 --- /dev/null +++ b/docs/superpowers/plans/2026-05-25-beta-mode-and-paid-gap.md @@ -0,0 +1,2096 @@ +# Beta Mode + Paid/Free Gap Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Ship the closed-beta launch package: a visible BETA chip in the app chrome, a 6-hour free-tier news cap, and email digests (daily for paid Mon–Sat, Sunday weekly for everyone), all gated by an opt-in flag. + +**Architecture:** Three coordinated changes layered onto existing FastAPI + APScheduler + MariaDB + OpenRouter + SMTP infrastructure. No new external services. New persisted state: two columns on `users` (`email_digest_opt_in`, `digest_tone`) and one new table (`email_sends`). One new job (`email_digest_job`), one new router (`email`), two new prompt builders. + +**Tech Stack:** FastAPI · SQLAlchemy 2.0 (async) · Alembic · APScheduler · aiosmtplib · itsdangerous · pytest + aiosqlite (tests) · Jinja2 templates · vanilla JS + HTMX. + +**Spec:** `docs/superpowers/specs/2026-05-25-beta-mode-and-paid-gap-design.md` + +--- + +## File Map + +**New files** +- `app/jobs/email_digest_job.py` — orchestrator (daily Mon–Sat for paid, weekly Sunday for all opt-in) +- `app/routers/email.py` — `GET /email/unsubscribe?token=…` +- `alembic/versions/0017_email_digest.py` — migration +- `tests/test_news_window.py` — news-cap regression +- `tests/test_email_digest_job.py` — job recipient selection, idempotency +- `tests/test_email_unsubscribe.py` — token roundtrip + endpoint +- `tests/test_email_render.py` — digest email renderer +- `tests/test_digest_prompts.py` — prompt builder unit tests +- `tests/test_settings_digest_api.py` — PATCH endpoint +- `tests/test_verify_subscribe.py` — sign-up checkbox + +**Modified files** +- `app/config.py` — add `BETA_MODE` flag +- `app/templates_env.py` — expose `BETA_MODE` to templates +- `app/templates/base.html` — render the chip +- `app/static/css/cassandra.css` — `.beta-chip` styling +- `app/services/access.py` — `FREE_NEWS_WINDOW_HOURS` constant +- `app/routers/api.py` — `news_list` clamps `since_hours` for non-paid; PATCH endpoint for digest prefs +- `app/templates/partials/news.html` — capped-footer marker +- `app/models.py` — `User.email_digest_opt_in`, `User.digest_tone`, `EmailSend` model +- `app/services/openrouter.py` — `build_daily_digest_prompt`, `build_weekly_digest_prompt`, bump `PROMPT_VERSION` +- `app/services/email_service.py` — `render_digest_email`, `send_digest` +- `app/scheduler_main.py` — register the digest job at 06:30 UTC +- `app/templates/settings.html` — Email digests section +- `app/templates/verify.html` — subscribe checkbox +- `app/routers/auth.py` — read `subscribe_to_digests` on verify POST +- `app/templates/pricing.html` — updated tier copy +- `app/cli.py` — `send-test-digest` command +- `app/main.py` — include new email router + +--- + +## Task 1: BETA chip + +**Files:** +- Modify: `app/config.py` +- Modify: `app/templates_env.py` +- Modify: `app/templates/base.html` (line ~140) +- Modify: `app/static/css/cassandra.css` + +- [ ] **Step 1: Add `BETA_MODE` to config** + +Open `app/config.py`. Find the `Settings` class. Add: + +```python + BETA_MODE: bool = True # Shows a "BETA" pill in the app header. Flip to False at GA. +``` + +Place it adjacent to other display/flag-style settings (alongside `CASSANDRA_TONE` or the LLM caps — whichever cluster fits). + +- [ ] **Step 2: Expose the flag to templates** + +Open `app/templates_env.py`. After the existing `templates.env.globals[...]` block (around line 75), add: + +```python +from app.config import get_settings as _get_settings # if not already imported +templates.env.globals["BETA_MODE"] = _get_settings().BETA_MODE +``` + +If `get_settings` is already imported in the file under another alias, reuse it. + +- [ ] **Step 3: Render the chip in the app header** + +Open `app/templates/base.html`. Find the brand link (line ~140): + +```html + {{ BRAND_NAME }} +``` + +Insert immediately after it: + +```html + {% if BETA_MODE %}BETA{% endif %} +``` + +- [ ] **Step 4: Style the chip** + +Open `app/static/css/cassandra.css`. Append at the bottom of the file: + +```css +/* BETA indicator pill in the app header — see app/templates/base.html. */ +.beta-chip { + display: inline-block; + margin-left: 8px; + padding: 2px 7px; + font-size: 10px; + font-weight: 700; + letter-spacing: 0.14em; + font-family: var(--font-mono); + color: var(--bg); + background: var(--accent); + border-radius: 2px; + vertical-align: middle; + user-select: none; +} +``` + +- [ ] **Step 5: Manual visual check** + +Run the app locally (`docker compose restart app` — the new volume mount picks up changes). Load any logged-in page and confirm the `BETA` pill renders next to the brand. Confirm the chip does NOT appear on `/pricing` (which uses `public_base.html`, not `base.html`). + +- [ ] **Step 6: Commit** + +```bash +git add app/config.py app/templates_env.py app/templates/base.html app/static/css/cassandra.css +git commit -m "beta: header chip flagged by BETA_MODE config (default on)" +``` + +--- + +## Task 2: Free-tier news window cap + +**Files:** +- Modify: `app/services/access.py` +- Modify: `app/routers/api.py` (lines 222-280, `news_list`) +- Modify: `app/templates/partials/news.html` +- Create: `tests/test_news_window.py` + +- [ ] **Step 1: Write the failing test** + +Create `tests/test_news_window.py`: + +```python +"""Free vs paid window clamp on /api/news.""" +from __future__ import annotations + +import asyncio +from datetime import datetime, timedelta, timezone + +import pytest + + +def _build_app(tmp_path): + from fastapi import FastAPI + from fastapi.testclient import TestClient + from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine + + from app import db as db_mod + from app.auth import sign_session + from app.db import Base + from app.models import Headline, User + from app.routers import api as api_router + + engine = create_async_engine(f"sqlite+aiosqlite:///{tmp_path}/news.db") + factory = async_sessionmaker(engine, expire_on_commit=False) + db_mod._engine = engine + db_mod._session_factory = factory + + now = datetime.now(timezone.utc) + + async def _seed(): + async with engine.begin() as conn: + await conn.run_sync(Base.metadata.create_all) + async with factory() as s: + s.add(User(id=1, email="free@x", tier="free")) + s.add(User(id=2, email="paid@x", tier="paid")) + # Headlines: one 1h old, one 12h old, one 20h old. + for hours_old, title in ((1, "fresh"), (12, "mid"), (20, "old")): + s.add(Headline( + source="test", title=title, url=f"https://e/{title}", + category="general", + published_at=now - timedelta(hours=hours_old), + fetched_at=now, + tags=[], + )) + await s.commit() + + asyncio.run(_seed()) + + app = FastAPI() + app.include_router(api_router.router, prefix="/api") + client = TestClient(app) + return client, sign_session(1), sign_session(2) + + +@pytest.mark.skipif(False, reason="requires aiosqlite + httpx") +def test_free_user_clamped_to_6h(tmp_path): + client, free_sess, _ = _build_app(tmp_path) + r = client.get("/api/news?since_hours=24", + cookies={"cassandra_session": free_sess}) + assert r.status_code == 200 + titles = [h["title"] for h in r.json()] + assert "fresh" in titles + assert "mid" not in titles # 12h ago, beyond 6h + assert "old" not in titles + + +def test_paid_user_full_24h(tmp_path): + client, _, paid_sess = _build_app(tmp_path) + r = client.get("/api/news?since_hours=24", + cookies={"cassandra_session": paid_sess}) + assert r.status_code == 200 + titles = [h["title"] for h in r.json()] + assert {"fresh", "mid", "old"} <= set(titles) + + +def test_anonymous_clamped_to_6h(tmp_path): + client, _, _ = _build_app(tmp_path) + r = client.get("/api/news?since_hours=24") + assert r.status_code == 200 + titles = [h["title"] for h in r.json()] + assert "fresh" in titles + assert "mid" not in titles +``` + +- [ ] **Step 2: Run the failing test** + +```bash +pytest tests/test_news_window.py -v +``` + +Expected: FAIL — the clamp doesn't exist yet, so the free/anonymous calls return 12h and 20h headlines too. + +- [ ] **Step 3: Add the constant** + +Open `app/services/access.py`. Below the imports / above `_utcnow`, add: + +```python +# How many hours of news the free tier sees. Paid sees whatever the +# endpoint's `since_hours` param requests (up to its own max). +FREE_NEWS_WINDOW_HOURS = 6.0 +``` + +- [ ] **Step 4: Clamp the endpoint** + +Open `app/routers/api.py`. Find `news_list` (line 222). Update: + +```python +@router.get("/news") +async def news_list( + request: Request, + session: AsyncSession = Depends(get_session), + principal: CurrentUser | None = Depends(maybe_current_user), + category: str | None = Query(None), + since_hours: float = Query(24.0, ge=0.1, le=720.0), + limit: int = Query(50, ge=1, le=500), + tags: str | None = Query(None, description="comma-separated include list"), + exclude_tags: str | None = Query(None, description="comma-separated exclude list"), + as_: str | None = Query(default=None, alias="as"), +): + from app.services.news_tagging import TAG_LABELS, TAG_VOCABULARY + from app.services.access import FREE_NEWS_WINDOW_HOURS, is_paid_active + + effective_hours = since_hours + capped = not is_paid_active(principal) + if capped: + effective_hours = min(since_hours, FREE_NEWS_WINDOW_HOURS) + + cutoff = utcnow() - timedelta(hours=effective_hours) + # ... rest of the function unchanged through `filtered = ...` ... +``` + +Also add `maybe_current_user` to the imports at the top of `app/routers/api.py`: + +```python +from app.auth import maybe_current_user +``` + +(The existing `from app.auth import …` line may already import other names — extend it.) + +In the `as_ == "html"` branch, extend the template context with `capped` and `window_hours`: + +```python + return templates.TemplateResponse( + request, "partials/news.html", + {"headlines": items, + "tag_vocabulary": TAG_VOCABULARY, + "tag_labels": TAG_LABELS, + "active_include": sorted(include), + "active_exclude": sorted(exclude), + "capped": capped, + "window_hours": effective_hours}, + ) +``` + +The JSON branch is unchanged — the test asserts via the JSON shape. + +- [ ] **Step 5: Run the test, expect PASS** + +```bash +pytest tests/test_news_window.py -v +``` + +Expected: all three tests pass. + +- [ ] **Step 6: Add the partial-template footer** + +Open `app/templates/partials/news.html`. At the bottom of the file (after the last existing item rendering and any close tags), add: + +```html +{% if capped %} +
+ Free tier — showing the last {{ window_hours|int }} hours of news. + Upgrade + for the full 24-hour feed plus daily and weekly email digests. +
+{% endif %} +``` + +- [ ] **Step 7: Run the full suite to catch regressions** + +```bash +pytest tests/ -x -q +``` + +Expected: no regressions (the existing `test_news_*` tests don't depend on the clamp). + +- [ ] **Step 8: Commit** + +```bash +git add app/services/access.py app/routers/api.py app/templates/partials/news.html tests/test_news_window.py +git commit -m "news: clamp free + anonymous to last 6h; paid keeps 24h" +``` + +--- + +## Task 3: DB model + Alembic migration + +**Files:** +- Modify: `app/models.py` +- Create: `alembic/versions/0017_email_digest.py` + +- [ ] **Step 1: Add columns to `User`** + +Open `app/models.py`. Find the `User` class (search for `class User(Base)`). Add two columns alongside the existing tier/credit fields: + +```python + email_digest_opt_in: Mapped[bool] = mapped_column( + Boolean, nullable=False, default=True, server_default=text("1"), + ) + # NULL = use INTERMEDIATE at render time. Server-side mirror of the + # dashboard tone, decoupled because the dashboard pref is localStorage. + digest_tone: Mapped[str | None] = mapped_column(String(16)) +``` + +If `Boolean` or `text` aren't already imported, extend the SQLAlchemy import line at the top of the file: + +```python +from sqlalchemy import (..., Boolean, text, ...) +``` + +- [ ] **Step 2: Add the `EmailSend` model** + +In the same file, after the existing model definitions, add: + +```python +class EmailSend(Base): + """Audit row per digest email send. Used for idempotency (don't send + twice on the same UTC day) and for surfacing 'last delivery' on the + Settings page.""" + __tablename__ = "email_sends" + + id: Mapped[int] = mapped_column(BigInteger, primary_key=True, autoincrement=True) + user_id: Mapped[int] = mapped_column( + ForeignKey("users.id", ondelete="CASCADE"), nullable=False, index=True, + ) + kind: Mapped[str] = mapped_column(String(16), nullable=False) # "daily" | "weekly" + sent_at: Mapped[datetime] = mapped_column( + DateTime(timezone=True), default=utcnow, nullable=False, + ) + status: Mapped[str] = mapped_column(String(16), nullable=False) # "sent" | "skipped" | "error" + error: Mapped[str | None] = mapped_column(String(255)) + + __table_args__ = ( + Index("ix_email_sends_user_kind_sent", "user_id", "kind", "sent_at"), + ) +``` + +Add any missing imports (`BigInteger`, `ForeignKey`, `Index`) at the top. + +- [ ] **Step 3: Create the Alembic migration** + +Create `alembic/versions/0017_email_digest.py`: + +```python +"""email digests: User.email_digest_opt_in, User.digest_tone, email_sends table. + +Revision ID: 0017 +Revises: 0016 +Create Date: 2026-05-25 +""" +from typing import Sequence, Union + +import sqlalchemy as sa +from alembic import op + + +revision: str = "0017" +down_revision: Union[str, None] = "0016" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + op.add_column( + "users", + sa.Column( + "email_digest_opt_in", sa.Boolean(), nullable=False, + server_default=sa.text("1"), + ), + ) + op.add_column( + "users", + sa.Column("digest_tone", sa.String(length=16), nullable=True), + ) + + op.create_table( + "email_sends", + sa.Column("id", sa.BigInteger(), autoincrement=True, primary_key=True), + sa.Column( + "user_id", sa.Integer(), nullable=False, + ), + sa.Column("kind", sa.String(length=16), nullable=False), + sa.Column("sent_at", sa.DateTime(timezone=True), nullable=False), + sa.Column("status", sa.String(length=16), nullable=False), + sa.Column("error", sa.String(length=255), nullable=True), + sa.ForeignKeyConstraint( + ["user_id"], ["users.id"], ondelete="CASCADE", + ), + ) + op.create_index( + "ix_email_sends_user_kind_sent", + "email_sends", + ["user_id", "kind", "sent_at"], + ) + op.create_index( + op.f("ix_email_sends_user_id"), "email_sends", ["user_id"], + ) + + +def downgrade() -> None: + op.drop_index(op.f("ix_email_sends_user_id"), table_name="email_sends") + op.drop_index("ix_email_sends_user_kind_sent", table_name="email_sends") + op.drop_table("email_sends") + op.drop_column("users", "digest_tone") + op.drop_column("users", "email_digest_opt_in") +``` + +- [ ] **Step 4: Apply the migration to the dev DB** + +```bash +docker compose exec app alembic upgrade head +``` + +Expected output ends with: `Running upgrade 0016 -> 0017, email digests...` + +- [ ] **Step 5: Verify the columns exist** + +```bash +docker compose exec db mysql -ucassandra -p${MARIADB_PASSWORD} -e "DESCRIBE cassandra.users;" | grep -E "email_digest_opt_in|digest_tone" +docker compose exec db mysql -ucassandra -p${MARIADB_PASSWORD} -e "DESCRIBE cassandra.email_sends;" +``` + +Expected: both new columns on `users` and the full `email_sends` table appear. + +- [ ] **Step 6: Run the test suite — confirm no regressions from the model change** + +```bash +pytest tests/ -x -q +``` + +Expected: all existing tests still pass (no test depends on the old User shape). + +- [ ] **Step 7: Commit** + +```bash +git add app/models.py alembic/versions/0017_email_digest.py +git commit -m "db: add digest opt-in/tone on users, email_sends audit table" +``` + +--- + +## Task 4: Daily / Weekly digest prompts + +**Files:** +- Modify: `app/services/openrouter.py` +- Create: `tests/test_digest_prompts.py` + +- [ ] **Step 1: Inspect the existing prompt scaffolding** + +Read `app/services/openrouter.py` to confirm the shape of `build_system_prompt(tone, analysis)` and `build_user_prompt(...)`. The new functions will mirror that contract: return `(system_prompt, user_prompt)` strings ready for `call_llm`. + +- [ ] **Step 2: Write the failing test** + +Create `tests/test_digest_prompts.py`: + +```python +"""Unit tests for the daily / weekly digest prompt builders.""" +from __future__ import annotations + +from datetime import datetime, timezone + +from app.services.openrouter import ( + build_daily_digest_prompt, + build_weekly_digest_prompt, +) + + +def _ctx(): + return dict( + today=datetime(2026, 5, 25, 6, 30, tzinfo=timezone.utc), + quotes_by_group={"equities": [{"symbol": "SPX", "price": 7500.0, + "label": "S&P 500", "currency": "USD", + "source": "test", "note": "", + "as_of": None, "changes": {}}]}, + headlines_by_bucket={"general": [{"when": "2026-05-25T05:00:00+00:00", + "source": "FT", "title": "Brent slides"}]}, + reference_line="S&P 7,501 (ATH) · VIX 18.0 · US 10y 4.45%", + ) + + +def test_daily_prompt_tone_intermediate(): + sys_, usr = build_daily_digest_prompt(tone="INTERMEDIATE", **_ctx()) + assert "INTERMEDIATE" in sys_.upper() or "intermediate" in sys_.lower() + assert "Brent slides" in usr + assert "daily" in sys_.lower() + + +def test_daily_prompt_tone_novice_differs(): + sys_int, _ = build_daily_digest_prompt(tone="INTERMEDIATE", **_ctx()) + sys_nov, _ = build_daily_digest_prompt(tone="NOVICE", **_ctx()) + assert sys_int != sys_nov + + +def test_weekly_prompt_mentions_week(): + sys_, usr = build_weekly_digest_prompt(tone="INTERMEDIATE", **_ctx()) + assert "week" in sys_.lower() or "weekly" in sys_.lower() + assert "Brent slides" in usr + + +def test_prompts_return_strings(): + for fn in (build_daily_digest_prompt, build_weekly_digest_prompt): + sys_, usr = fn(tone="INTERMEDIATE", **_ctx()) + assert isinstance(sys_, str) and isinstance(usr, str) + assert len(sys_) > 50 and len(usr) > 50 +``` + +- [ ] **Step 3: Run the failing test** + +```bash +pytest tests/test_digest_prompts.py -v +``` + +Expected: FAIL — `ImportError: cannot import name 'build_daily_digest_prompt'`. + +- [ ] **Step 4: Implement the prompt builders** + +Open `app/services/openrouter.py`. After `build_user_prompt`, add: + +```python +def build_daily_digest_prompt( + *, + tone: str, + today, + quotes_by_group: dict, + headlines_by_bucket: dict, + reference_line: str, +) -> tuple[str, str]: + """System + user prompt for the once-a-day editorial digest. + + Different from the hourly log: the daily digest reflects on the past + 24h and looks forward to the upcoming session. Longer, less + 'live-blogging,' more contextual. Target ~600 words.""" + tone_clause = ( + "Use plain English. Define any jargon on first use." + if tone.upper() == "NOVICE" + else "Write for a reader who already speaks markets fluently." + ) + system = ( + "You write the daily editorial digest for Read the Markets. " + f"Audience tone: {tone.upper()}. {tone_clause} " + "Cover: (1) what mattered yesterday, (2) what to watch in today's " + "EU and US sessions, (3) one cross-asset thread connecting them. " + "No predictions of price level, no buy/sell language. Target ~600 " + "words. Output HTML using only

,

,
    ,
  • , , " + " — no , , or wrapper, no inline styles." + ) + user = _digest_user_prompt(today, quotes_by_group, headlines_by_bucket, reference_line) + return system, user + + +def build_weekly_digest_prompt( + *, + tone: str, + today, + quotes_by_group: dict, + headlines_by_bucket: dict, + reference_line: str, +) -> tuple[str, str]: + """System + user prompt for the Sunday weekly recap + look-ahead. + + Sent to ALL opt-in users (free and paid). Target ~900 words.""" + tone_clause = ( + "Use plain English. Define any jargon on first use." + if tone.upper() == "NOVICE" + else "Write for a reader who already speaks markets fluently." + ) + system = ( + "You write the Sunday weekly digest for Read the Markets. " + f"Audience tone: {tone.upper()}. {tone_clause} " + "Cover: (1) the week behind — what moved and why, " + "(2) the week ahead — releases, earnings, central-bank meetings, " + "(3) the cross-asset story to keep in mind. " + "No predictions of price level, no buy/sell language. Target ~900 " + "words. Output HTML using only

    ,

    ,
      ,
    • , , " + " — no , , or wrapper, no inline styles." + ) + user = _digest_user_prompt(today, quotes_by_group, headlines_by_bucket, reference_line) + return system, user + + +def _digest_user_prompt(today, quotes_by_group, headlines_by_bucket, reference_line): + """Shared user-message body used by both digest prompts. Same data + shape as the hourly user prompt; reformatted for the digest context.""" + today_str = today.strftime("%A %d %B %Y") if hasattr(today, "strftime") else str(today) + lines = [f"TODAY (UTC): {today_str}", "", f"REFERENCE: {reference_line}", ""] + + if headlines_by_bucket: + lines.append("HEADLINES BY CATEGORY") + for cat, items in headlines_by_bucket.items(): + lines.append(f" [{cat}]") + for h in items[:30]: + when = h.get("when", "") + src = h.get("source", "") + title = h.get("title", "") + lines.append(f" {when} · {src} · {title}") + lines.append("") + + if quotes_by_group: + lines.append("LATEST QUOTES BY GROUP") + for grp, items in quotes_by_group.items(): + lines.append(f" [{grp}]") + for q in items[:30]: + sym = q.get("symbol", "") + price = q.get("price", "") + lbl = q.get("label", "") + ccy = q.get("currency", "") + lines.append(f" {sym} ({lbl}) — {price} {ccy}") + lines.append("") + + return "\n".join(lines) +``` + +Bump the PROMPT_VERSION constant at the top of the file. Locate it (search for `PROMPT_VERSION =`) and increment by one — e.g., `8` → `9`. Add a short comment line above noting the bump is for digest prompts. + +- [ ] **Step 5: Run the tests, expect PASS** + +```bash +pytest tests/test_digest_prompts.py -v +``` + +Expected: 4 passed. + +- [ ] **Step 6: Commit** + +```bash +git add app/services/openrouter.py tests/test_digest_prompts.py +git commit -m "digest: daily + weekly prompt builders (NOVICE/INTERMEDIATE)" +``` + +--- + +## Task 5: Digest email rendering + +**Files:** +- Modify: `app/services/email_service.py` +- Create: `tests/test_email_render.py` + +- [ ] **Step 1: Write the failing test** + +Create `tests/test_email_render.py`: + +```python +"""Unit tests for render_digest_email.""" +from __future__ import annotations + +from app.services.email_service import render_digest_email + + +def test_daily_subject_and_bodies(): + subj, text, html = render_digest_email( + kind="daily", + date_str="2026-05-25", + content_html="

      Markets did stuff today.

      ", + unsubscribe_url="https://read.markets/email/unsubscribe?token=abc", + settings_url="https://read.markets/settings", + ) + assert "Daily" in subj + assert "2026-05-25" in subj + assert "Markets did stuff today" in html + assert "abc" in html # unsubscribe link landed + assert "/settings" in html + # Plain-text fallback strips HTML. + assert "

      " not in text + assert "Markets did stuff today" in text + + +def test_weekly_subject_says_recap(): + subj, _, _ = render_digest_email( + kind="weekly", + date_str="2026-05-25", + content_html="

      x

      ", + unsubscribe_url="https://x/u", + settings_url="https://x/s", + ) + assert "Weekly" in subj + assert "recap" in subj.lower() + + +def test_invalid_kind_raises(): + import pytest + with pytest.raises(ValueError): + render_digest_email( + kind="bogus", date_str="2026-05-25", + content_html="

      x

      ", + unsubscribe_url="u", settings_url="s", + ) +``` + +- [ ] **Step 2: Run the failing test** + +```bash +pytest tests/test_email_render.py -v +``` + +Expected: FAIL — `render_digest_email` doesn't exist. + +- [ ] **Step 3: Implement `render_digest_email`** + +Open `app/services/email_service.py`. Add at the bottom of the file: + +```python +# --------------------------------------------------------------------------- +# Digest email rendering +# --------------------------------------------------------------------------- + +import html as _html_lib +import re as _re + + +_DIGEST_HTML_TEMPLATE = """\ + + + + + + + {brand} — {label} + + + + + +
      +
      + ▰ {brand_upper} · {label_upper} +
      +
       
      +
      + {content_html} +
      +
       
      +
      +
       
      + +
      + + +""" + + +def _strip_html_to_text(html_body: str) -> str: + """Best-effort HTML → plain text for the multipart fallback. We don't + need perfection — just readable prose for clients that won't render + HTML.""" + # Block-level tags become double newlines,
      becomes single newline. + text = _re.sub(r"(?i)<(/(p|h[1-6]|li|ul|ol)|br\s*/?)>", "\n", html_body) + text = _re.sub(r"<[^>]+>", "", text) + text = _html_lib.unescape(text) + text = _re.sub(r"\n{3,}", "\n\n", text) + return text.strip() + + +def render_digest_email( + *, + kind: str, + date_str: str, + content_html: str, + unsubscribe_url: str, + settings_url: str, +) -> tuple[str, str, str]: + """Returns (subject, text_body, html_body) for a digest email. + + `kind` is "daily" or "weekly". Anything else raises ValueError.""" + if kind == "daily": + label = "Daily" + subject = f"{branding.BRAND_NAME} · Daily — {date_str}" + elif kind == "weekly": + label = "Weekly recap" + subject = f"{branding.BRAND_NAME} · Weekly recap — {date_str}" + else: + raise ValueError(f"unknown digest kind: {kind!r}") + + html_body = _DIGEST_HTML_TEMPLATE.format( + brand=branding.BRAND_NAME, + brand_upper=branding.BRAND_NAME.upper(), + label=label, + label_upper=label.upper(), + FONT_MONO=branding.FONT_MONO, + content_html=content_html, + unsubscribe_url=unsubscribe_url, + settings_url=settings_url, + **{f"L_{k.replace('-', '_')}": v for k, v in branding.LIGHT.items()}, + **{f"D_{k.replace('-', '_')}": v for k, v in branding.DARK.items()}, + ) + + text_lines = [ + f"{branding.BRAND_NAME} — {label}", + date_str, + "", + _strip_html_to_text(content_html), + "", + f"Unsubscribe: {unsubscribe_url}", + f"Manage preferences: {settings_url}", + ] + text_body = "\n".join(text_lines) + return subject, text_body, html_body +``` + +- [ ] **Step 4: Run the tests, expect PASS** + +```bash +pytest tests/test_email_render.py -v +``` + +Expected: 3 passed. + +- [ ] **Step 5: Commit** + +```bash +git add app/services/email_service.py tests/test_email_render.py +git commit -m "email: render_digest_email — multipart digest template" +``` + +--- + +## Task 6: Unsubscribe token + route + +**Files:** +- Create: `app/routers/email.py` +- Modify: `app/main.py` (router include) +- Create: `tests/test_email_unsubscribe.py` + +- [ ] **Step 1: Write the failing test** + +Create `tests/test_email_unsubscribe.py`: + +```python +"""Unsubscribe token roundtrip + endpoint.""" +from __future__ import annotations + +import asyncio + +import pytest + + +def _build_app(tmp_path, secret="testsecret"): + import os + os.environ["CASSANDRA_SESSION_SECRET"] = secret + + from fastapi import FastAPI + from fastapi.testclient import TestClient + from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine + + from app import db as db_mod + from app.db import Base + from app.models import User + from app.routers import email as email_router + from app.config import get_settings + get_settings.cache_clear() # pick up the new env var + + engine = create_async_engine(f"sqlite+aiosqlite:///{tmp_path}/u.db") + factory = async_sessionmaker(engine, expire_on_commit=False) + db_mod._engine = engine + db_mod._session_factory = factory + + async def _seed(): + async with engine.begin() as conn: + await conn.run_sync(Base.metadata.create_all) + async with factory() as s: + s.add(User(id=42, email="u@x", tier="paid", email_digest_opt_in=True)) + await s.commit() + + asyncio.run(_seed()) + + app = FastAPI() + app.include_router(email_router.router) + return TestClient(app) + + +def test_sign_and_verify_token_roundtrip(tmp_path, monkeypatch): + monkeypatch.setenv("CASSANDRA_SESSION_SECRET", "rt-secret-32-bytes-or-so-padding-here") + from app.config import get_settings + get_settings.cache_clear() + from app.routers.email import sign_unsubscribe_token, verify_unsubscribe_token + tok = sign_unsubscribe_token(42) + assert verify_unsubscribe_token(tok) == 42 + assert verify_unsubscribe_token("garbage") is None + + +def test_get_unsubscribe_flips_flag(tmp_path): + client = _build_app(tmp_path) + from app.routers.email import sign_unsubscribe_token + tok = sign_unsubscribe_token(42) + r = client.get(f"/email/unsubscribe?token={tok}") + assert r.status_code == 200 + assert "unsubscribed" in r.text.lower() + + # DB state changed. + async def _check(): + from app import db as db_mod + async with db_mod._session_factory() as s: + from app.models import User + u = await s.get(User, 42) + assert u.email_digest_opt_in is False + asyncio.run(_check()) + + +def test_get_unsubscribe_invalid_token_returns_generic_page(tmp_path): + client = _build_app(tmp_path) + r = client.get("/email/unsubscribe?token=garbage") + # We don't 4xx — that would leak token validity. Show the generic page. + assert r.status_code == 200 + assert "unsubscribed" in r.text.lower() or "preferences" in r.text.lower() + + +def test_replay_is_idempotent(tmp_path): + client = _build_app(tmp_path) + from app.routers.email import sign_unsubscribe_token + tok = sign_unsubscribe_token(42) + r1 = client.get(f"/email/unsubscribe?token={tok}") + r2 = client.get(f"/email/unsubscribe?token={tok}") + assert r1.status_code == 200 + assert r2.status_code == 200 +``` + +- [ ] **Step 2: Run the failing test** + +```bash +pytest tests/test_email_unsubscribe.py -v +``` + +Expected: FAIL — `app.routers.email` doesn't exist. + +- [ ] **Step 3: Create the router** + +Create `app/routers/email.py`: + +```python +"""Email-related public routes. + +Currently: +- GET /email/unsubscribe?token=... + +The token is `itsdangerous.URLSafeSerializer` over a small payload, +signed with CASSANDRA_SESSION_SECRET. No auth dependency: the whole +point of one-click unsubscribe is that the user does not have to +sign in. +""" +from __future__ import annotations + +from fastapi import APIRouter, Depends, Request, Query +from fastapi.responses import HTMLResponse +from itsdangerous import BadSignature, URLSafeSerializer +from sqlalchemy.ext.asyncio import AsyncSession + +from app.config import get_settings +from app.db import get_session +from app.logging import get_logger +from app.models import User +from app.templates_env import templates + + +router = APIRouter() +log = get_logger("email_router") + +_SALT = "digest-unsubscribe-v1" + + +def _serializer() -> URLSafeSerializer: + s = get_settings() + if not s.CASSANDRA_SESSION_SECRET: + # In tests with no secret configured, fall back to a constant — + # NEVER reach production; settings validation should catch this. + return URLSafeSerializer("dev-only-empty-secret", salt=_SALT) + return URLSafeSerializer(s.CASSANDRA_SESSION_SECRET, salt=_SALT) + + +def sign_unsubscribe_token(user_id: int) -> str: + return _serializer().dumps({"uid": int(user_id), "purpose": "digest_optout"}) + + +def verify_unsubscribe_token(token: str) -> int | None: + try: + data = _serializer().loads(token) + except BadSignature: + return None + if not isinstance(data, dict): + return None + if data.get("purpose") != "digest_optout": + return None + try: + return int(data["uid"]) + except (KeyError, TypeError, ValueError): + return None + + +_CONFIRM_PAGE = """\ + + + + + Unsubscribed — {brand} + + + +
      +
      {brand}
      +
      email preferences
      +

      You're unsubscribed from email digests.

      +

      + You can re-enable digests any time from + Settings. +

      +
      + + +""" + + +@router.get("/email/unsubscribe", response_class=HTMLResponse) +async def unsubscribe( + request: Request, + token: str = Query(...), + session: AsyncSession = Depends(get_session), +): + from app import branding + uid = verify_unsubscribe_token(token) + if uid is not None: + user = await session.get(User, uid) + if user is not None and user.email_digest_opt_in: + user.email_digest_opt_in = False + await session.commit() + log.info("email.unsubscribe.ok", user_id=uid) + else: + log.info("email.unsubscribe.noop_or_unknown", user_id=uid) + else: + log.info("email.unsubscribe.bad_token") + + # Same confirmation page regardless — don't leak token validity. + return HTMLResponse(_CONFIRM_PAGE.format(brand=branding.BRAND_NAME)) +``` + +- [ ] **Step 4: Wire the router into the FastAPI app** + +Open `app/main.py`. Find the existing router includes (line ~85). Add: + +```python +from app.routers import email as email_router +# ... +app.include_router(email_router.router, tags=["email"]) +``` + +- [ ] **Step 5: Run the tests, expect PASS** + +```bash +pytest tests/test_email_unsubscribe.py -v +``` + +Expected: 4 passed. + +- [ ] **Step 6: Commit** + +```bash +git add app/routers/email.py app/main.py tests/test_email_unsubscribe.py +git commit -m "email: one-click unsubscribe endpoint w/ signed token" +``` + +--- + +## Task 7: Email digest job + idempotency + +**Files:** +- Create: `app/jobs/email_digest_job.py` +- Create: `tests/test_email_digest_job.py` + +- [ ] **Step 1: Write the failing test** + +Create `tests/test_email_digest_job.py`: + +```python +"""Recipient selection + idempotency for the digest job.""" +from __future__ import annotations + +import asyncio +from datetime import datetime, timezone, timedelta +from unittest.mock import AsyncMock, patch + +import pytest + + +def _bootstrap(tmp_path, today_weekday: int): + """Spin up an in-memory DB with three users: a paid opt-in, a paid + opt-out, a free opt-in. Returns the session factory + the test users.""" + from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine + + from app import db as db_mod + from app.db import Base + from app.models import User + + engine = create_async_engine(f"sqlite+aiosqlite:///{tmp_path}/dj.db") + factory = async_sessionmaker(engine, expire_on_commit=False) + db_mod._engine = engine + db_mod._session_factory = factory + + async def _seed(): + async with engine.begin() as conn: + await conn.run_sync(Base.metadata.create_all) + async with factory() as s: + s.add(User(id=1, email="paid_in@x", tier="paid", email_digest_opt_in=True)) + s.add(User(id=2, email="paid_out@x", tier="paid", email_digest_opt_in=False)) + s.add(User(id=3, email="free_in@x", tier="free", email_digest_opt_in=True)) + await s.commit() + + asyncio.run(_seed()) + return factory + + +def _patch_today(weekday: int): + """Return a datetime whose weekday() == `weekday` (0=Mon, 6=Sun).""" + base = datetime(2026, 5, 25, 6, 30, tzinfo=timezone.utc) # Monday + return base + timedelta(days=(weekday - base.weekday()) % 7) + + +def _stub_generate(content="

      x

      "): + """Stub out the LLM call so the test never hits the network. Use a + SimpleNamespace so we don't have to know the real result class name.""" + from types import SimpleNamespace + async def _fake(_client, messages, **kwargs): + return SimpleNamespace( + content=content, model="stub", + prompt_tokens=10, completion_tokens=10, cost_usd=0.0, + ) + return _fake + + +@pytest.mark.skipif(False, reason="requires aiosqlite") +def test_daily_run_only_paid_opt_in(tmp_path): + _bootstrap(tmp_path, today_weekday=0) + from app.jobs import email_digest_job + with patch("app.jobs.email_digest_job._now", + return_value=_patch_today(0)), \ + patch("app.jobs.email_digest_job.send_email", + new=AsyncMock()) as send_mock, \ + patch("app.jobs.email_digest_job.call_llm", + new=AsyncMock(side_effect=_stub_generate())): + asyncio.run(email_digest_job.run()) + # Only user_id=1 (paid + opt-in) received an email. Two calls + # (one per tone), but only one recipient. + sent_to = {kwargs["to"] for _, _, kwargs in send_mock.mock_calls + if "to" in kwargs} + addresses_sent = {call.kwargs.get("to") for call in send_mock.await_args_list} + assert addresses_sent == {"paid_in@x"} + + +def test_weekly_run_includes_free_and_paid_opt_in(tmp_path): + _bootstrap(tmp_path, today_weekday=6) + from app.jobs import email_digest_job + with patch("app.jobs.email_digest_job._now", + return_value=_patch_today(6)), \ + patch("app.jobs.email_digest_job.send_email", + new=AsyncMock()) as send_mock, \ + patch("app.jobs.email_digest_job.call_llm", + new=AsyncMock(side_effect=_stub_generate())): + asyncio.run(email_digest_job.run()) + addresses_sent = {call.kwargs.get("to") for call in send_mock.await_args_list} + assert addresses_sent == {"paid_in@x", "free_in@x"} + + +def test_second_run_same_day_is_idempotent(tmp_path): + _bootstrap(tmp_path, today_weekday=0) + from app.jobs import email_digest_job + with patch("app.jobs.email_digest_job._now", + return_value=_patch_today(0)), \ + patch("app.jobs.email_digest_job.send_email", + new=AsyncMock()) as send_mock, \ + patch("app.jobs.email_digest_job.call_llm", + new=AsyncMock(side_effect=_stub_generate())): + asyncio.run(email_digest_job.run()) + first_count = len(send_mock.await_args_list) + asyncio.run(email_digest_job.run()) + second_count = len(send_mock.await_args_list) + assert first_count > 0 + assert second_count == first_count, "second run should not re-send" +``` + +- [ ] **Step 2: Run the failing test** + +```bash +pytest tests/test_email_digest_job.py -v +``` + +Expected: FAIL — `app.jobs.email_digest_job` doesn't exist yet. + +- [ ] **Step 3: Implement the job** + +Create `app/jobs/email_digest_job.py`: + +```python +"""Daily/weekly editorial email digest. + +Runs once a day at 06:30 UTC via the scheduler. On Sundays sends the +weekly recap to every opt-in user (free + paid). On other days sends +the daily digest to opt-in paid users only. + +Generates LLM content once per tone (NOVICE + INTERMEDIATE), then fans +out by SMTP. EmailSend audit rows guard against double-delivery if the +job is re-run within the same UTC day. +""" +from __future__ import annotations + +import asyncio +from collections import defaultdict +from datetime import datetime, timedelta, timezone + +import httpx +from sqlalchemy import desc, func, select + +from app import branding +from app.config import get_settings +from app.db import utcnow +from app.jobs._helpers import job_lifecycle, log +from app.jobs.ai_log_job import ( + REFERENCE_LINE, + _latest_quotes_by_group, + _recent_headlines_by_bucket, + _month_spend, +) +from app.models import EmailSend, User +from app.routers.email import sign_unsubscribe_token +from app.services.email_service import render_digest_email, send_email +from app.services.openrouter import ( + PROMPT_VERSION, + active_model, + build_daily_digest_prompt, + build_weekly_digest_prompt, + call_llm, + llm_configured, +) +from app.services.access import paid_status + + +# Indirection so tests can monkeypatch the "current time" without +# touching the system clock. +def _now() -> datetime: + return utcnow() + + +async def _opt_in_recipients(session, *, paid_only: bool) -> list[User]: + stmt = select(User).where(User.email_digest_opt_in.is_(True)) + rows = (await session.execute(stmt)).scalars().all() + if paid_only: + rows = [u for u in rows if paid_status(u).active] + return rows + + +async def _already_sent_today(session, user_id: int, kind: str, today: datetime) -> bool: + """True if an EmailSend row exists for this user+kind on the same UTC + day, with status in ('sent','error'). 'error' counts because we don't + want to keep retrying a bad address inside the same daily slot.""" + day_start = today.replace(hour=0, minute=0, second=0, microsecond=0) + day_end = day_start + timedelta(days=1) + stmt = select(EmailSend.id).where( + EmailSend.user_id == user_id, + EmailSend.kind == kind, + EmailSend.sent_at >= day_start, + EmailSend.sent_at < day_end, + EmailSend.status.in_(("sent", "error")), + ) + return (await session.execute(stmt)).first() is not None + + +async def _generate_variants(client, kind: str, ctx: dict) -> dict[str, str]: + """Returns {tone: html_content}. Missing tone means generation failed + for that variant — skip recipients on that tone.""" + builder = build_weekly_digest_prompt if kind == "weekly" else build_daily_digest_prompt + out: dict[str, str] = {} + for tone in ("NOVICE", "INTERMEDIATE"): + sys_, usr = builder(tone=tone, **ctx) + try: + result = await call_llm( + client, + [{"role": "system", "content": sys_}, + {"role": "user", "content": usr}], + ) + out[tone] = result.content + log.info("digest.variant_ok", kind=kind, tone=tone, + prompt_tokens=result.prompt_tokens, + completion_tokens=result.completion_tokens) + except Exception as e: + log.error("digest.variant_failed", kind=kind, tone=tone, + error=str(e)[:200]) + return out + + +def _kind_for_today(today: datetime) -> str | None: + """Sunday → weekly. Mon–Sat → daily. None means 'no run today'.""" + return "weekly" if today.weekday() == 6 else "daily" + + +async def _send_one(user: User, kind: str, content_html: str, date_str: str, + session) -> None: + settings_url = f"{branding.SITE_URL}/settings" + unsubscribe_url = ( + f"{branding.SITE_URL}/email/unsubscribe" + f"?token={sign_unsubscribe_token(user.id)}" + ) + subject, text_body, html_body = render_digest_email( + kind=kind, date_str=date_str, + content_html=content_html, + unsubscribe_url=unsubscribe_url, + settings_url=settings_url, + ) + try: + await send_email(to=user.email, subject=subject, + text_body=text_body, html_body=html_body) + status_ = "sent" + err = None + except Exception as e: + status_ = "error" + err = str(e)[:255] + log.error("digest.send_failed", user_id=user.id, error=err) + session.add(EmailSend( + user_id=user.id, kind=kind, sent_at=_now(), + status=status_, error=err, + )) + await session.commit() + + +async def run() -> None: + async with job_lifecycle("email_digest_job") as (session, jr): + if jr.status == "skipped": + return + s = get_settings() + if not llm_configured(): + log.warning("digest.skipped_no_key", provider=s.LLM_PROVIDER) + jr.status = "skipped" + return + + today = _now() + kind = _kind_for_today(today) + date_str = today.strftime("%Y-%m-%d") + + # Build the recipient list before LLM work — if it's empty, + # skip the LLM spend. + recipients = await _opt_in_recipients( + session, paid_only=(kind == "daily"), + ) + # Filter out anyone already sent today (idempotency). + fresh: list[User] = [] + for u in recipients: + if not await _already_sent_today(session, u.id, kind, today): + fresh.append(u) + if not fresh: + log.info("digest.no_fresh_recipients", kind=kind, + total=len(recipients)) + jr.status = "skipped" + return + + spent = await _month_spend(session) + if spent >= s.OPENROUTER_MONTHLY_CAP_USD: + log.warning("digest.cap_reached", spent=spent, + cap=s.OPENROUTER_MONTHLY_CAP_USD) + jr.status = "skipped" + jr.error = f"monthly cost cap reached (${spent:.2f})" + return + + quotes = await _latest_quotes_by_group(session) + news = await _recent_headlines_by_bucket( + session, hours=(168 if kind == "weekly" else 24), + ) + ctx = dict( + today=today, + quotes_by_group=quotes, + headlines_by_bucket=news, + reference_line=REFERENCE_LINE, + ) + + async with httpx.AsyncClient(follow_redirects=True) as client: + variants = await _generate_variants(client, kind, ctx) + + if not variants: + log.warning("digest.all_variants_failed", kind=kind) + jr.status = "failed" + jr.error = "all variants failed" + return + + written = 0 + for u in fresh: + tone = (u.digest_tone or "INTERMEDIATE").upper() + content = variants.get(tone) or variants.get("INTERMEDIATE") + if content is None: + # Both variants failed for this user's tone — skip. + continue + await _send_one(u, kind, content, date_str, session) + await asyncio.sleep(0.1) # gentle SMTP pacing + written += 1 + + jr.items_written = written + log.info("digest.done", kind=kind, written=written, + prompt_version=PROMPT_VERSION) + + +if __name__ == "__main__": + asyncio.run(run()) +``` + +- [ ] **Step 4: Run the tests, expect PASS** + +```bash +pytest tests/test_email_digest_job.py -v +``` + +Expected: 3 passed. + +- [ ] **Step 5: Commit** + +```bash +git add app/jobs/email_digest_job.py tests/test_email_digest_job.py +git commit -m "digest: daily/weekly job w/ EmailSend idempotency" +``` + +--- + +## Task 8: Scheduler registration + +**Files:** +- Modify: `app/scheduler_main.py` + +- [ ] **Step 1: Add the cron entry** + +Open `app/scheduler_main.py`. Locate the block where the other jobs are registered (line ~42 onward — `sched.add_job(ai_log_job.run, ...)` etc.). Add an import at the top and a new `sched.add_job` line: + +```python +from app.jobs import email_digest_job +# ... +sched.add_job( + email_digest_job.run, + CronTrigger(hour=6, minute=30), + name="email_digest_job", + id="email_digest_job", +) +``` + +- [ ] **Step 2: Verify the scheduler boots clean** + +```bash +docker compose restart scheduler +docker compose logs --tail=40 scheduler +``` + +Expected: `scheduler.started jobs=[..., 'email_digest_job']` in the log. + +- [ ] **Step 3: Commit** + +```bash +git add app/scheduler_main.py +git commit -m "scheduler: register email_digest_job at 06:30 UTC" +``` + +--- + +## Task 9: Settings page section + PATCH endpoint + +**Files:** +- Modify: `app/routers/api.py` (add PATCH /settings/digest) +- Modify: `app/templates/settings.html` +- Modify: `app/routers/pages.py` (settings_page context: pass last EmailSend) +- Create: `tests/test_settings_digest_api.py` + +- [ ] **Step 1: Write the failing test** + +Create `tests/test_settings_digest_api.py`: + +```python +"""PATCH /api/settings/digest persists opt-in + tone.""" +from __future__ import annotations + +import asyncio + +import pytest + + +def _build(tmp_path): + from fastapi import FastAPI + from fastapi.testclient import TestClient + from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine + + from app import db as db_mod + from app.auth import sign_session + from app.db import Base + from app.models import User + from app.routers import api as api_router + + engine = create_async_engine(f"sqlite+aiosqlite:///{tmp_path}/s.db") + factory = async_sessionmaker(engine, expire_on_commit=False) + db_mod._engine = engine + db_mod._session_factory = factory + + async def _seed(): + async with engine.begin() as conn: + await conn.run_sync(Base.metadata.create_all) + async with factory() as s: + s.add(User(id=1, email="u@x", tier="paid", + email_digest_opt_in=True)) + await s.commit() + + asyncio.run(_seed()) + app = FastAPI() + app.include_router(api_router.router, prefix="/api") + return TestClient(app), sign_session(1) + + +def test_patch_round_trip(tmp_path): + client, sess = _build(tmp_path) + r = client.patch( + "/api/settings/digest", + json={"opt_in": False, "tone": "NOVICE"}, + cookies={"cassandra_session": sess}, + ) + assert r.status_code == 200, r.text + assert r.json() == {"opt_in": False, "tone": "NOVICE"} + + # Round-trip GET to confirm persistence. + async def _check(): + from app import db as db_mod + from app.models import User + async with db_mod._session_factory() as s: + u = await s.get(User, 1) + assert u.email_digest_opt_in is False + assert u.digest_tone == "NOVICE" + asyncio.run(_check()) + + +def test_patch_rejects_invalid_tone(tmp_path): + client, sess = _build(tmp_path) + r = client.patch( + "/api/settings/digest", + json={"opt_in": True, "tone": "PRO"}, # not in NOVICE|INTERMEDIATE + cookies={"cassandra_session": sess}, + ) + assert r.status_code == 422 + + +def test_patch_requires_auth(tmp_path): + client, _ = _build(tmp_path) + r = client.patch("/api/settings/digest", + json={"opt_in": True, "tone": "NOVICE"}) + assert r.status_code in (401, 303) +``` + +- [ ] **Step 2: Run the failing test** + +```bash +pytest tests/test_settings_digest_api.py -v +``` + +Expected: FAIL — endpoint not defined. + +- [ ] **Step 3: Add the endpoint** + +Open `app/routers/api.py`. Near the other settings-related code (or at the bottom of the file), add: + +```python +from pydantic import BaseModel, Field +from typing import Literal + + +class DigestPrefsIn(BaseModel): + opt_in: bool + tone: Literal["NOVICE", "INTERMEDIATE"] + + +class DigestPrefsOut(BaseModel): + opt_in: bool + tone: str + + +@router.patch("/settings/digest", response_model=DigestPrefsOut) +async def patch_digest_prefs( + payload: DigestPrefsIn, + principal: CurrentUser = Depends(require_auth), + session: AsyncSession = Depends(get_session), +) -> DigestPrefsOut: + if principal.user is None: + # Admin bearer-token path: nothing to persist. + raise HTTPException(status_code=400, detail="no_user_context") + principal.user.email_digest_opt_in = payload.opt_in + principal.user.digest_tone = payload.tone + await session.commit() + return DigestPrefsOut(opt_in=payload.opt_in, tone=payload.tone) +``` + +If `require_auth`, `CurrentUser`, or `HTTPException` aren't already imported, extend the import block at the top of the file. + +- [ ] **Step 4: Run the tests, expect PASS** + +```bash +pytest tests/test_settings_digest_api.py -v +``` + +Expected: 3 passed. + +- [ ] **Step 5: Add the Settings template section** + +Open `app/templates/settings.html`. Find a sensible insertion point (after the existing tier/credit section, before the cloud-sync section). Add: + +```html +
      +
      Email digests
      +
      + +
      + Free tier: Sunday weekly. Paid: daily Mon–Sat plus the Sunday recap. +
      +
      + + +
      +
      + Last delivery: + {% if last_email_send %}{{ last_email_send.sent_at.strftime("%Y-%m-%d %H:%M") }} UTC — {{ last_email_send.status }}{% else %}—{% endif %} +
      +
      +
      +
      + + +``` + +- [ ] **Step 6: Pass the last EmailSend into the template context** + +Open `app/routers/pages.py`. Find `settings_page` (line ~114). Inside the handler, before the existing `TemplateResponse(... "settings.html", ...)`, query the most recent EmailSend for the current user: + +```python +from sqlalchemy import desc, select +from app.models import EmailSend + +# ... +last_email_send = (await session.execute( + select(EmailSend) + .where(EmailSend.user_id == user.id) + .order_by(desc(EmailSend.sent_at)) + .limit(1) +)).scalar_one_or_none() +``` + +Add `"last_email_send": last_email_send` to the template context dict that's passed to `templates.TemplateResponse(..., "settings.html", {...})`. There may be two TemplateResponse calls in this handler (admin path + user path) — only the user path needs this. + +- [ ] **Step 7: Manual UI check** + +```bash +docker compose restart app +``` + +Open `/settings` in a browser, flip the checkbox, change the tone. Confirm "Saved." appears. Reload — selections persist. + +- [ ] **Step 8: Commit** + +```bash +git add app/routers/api.py app/routers/pages.py app/templates/settings.html tests/test_settings_digest_api.py +git commit -m "settings: digest opt-in + tone (PATCH /api/settings/digest + UI)" +``` + +--- + +## Task 10: Sign-up checkbox + +**Files:** +- Modify: `app/templates/verify.html` +- Modify: `app/routers/auth.py` +- Create: `tests/test_verify_subscribe.py` + +- [ ] **Step 1: Write the failing test** + +Create `tests/test_verify_subscribe.py`: + +```python +"""Verify-POST persists the subscribe_to_digests form field.""" +from __future__ import annotations + +import asyncio + +import pytest + + +def _build(tmp_path): + from fastapi import FastAPI + from fastapi.testclient import TestClient + from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine + + from app import db as db_mod + from app.db import Base + from app.models import User + from app.routers import auth as auth_router + from app.auth import _pending_serializer + + engine = create_async_engine(f"sqlite+aiosqlite:///{tmp_path}/v.db") + factory = async_sessionmaker(engine, expire_on_commit=False) + db_mod._engine = engine + db_mod._session_factory = factory + + async def _seed(): + async with engine.begin() as conn: + await conn.run_sync(Base.metadata.create_all) + async with factory() as s: + s.add(User(id=10, email="newbie@x", tier="free", + email_digest_opt_in=True)) + await s.commit() + # Mark the OTP as "verified-able" — easiest is to monkeypatch + # otp_service.verify in the test, see below. + + asyncio.run(_seed()) + app = FastAPI() + app.include_router(auth_router.router) + pending = _pending_serializer().dumps({"email": "newbie@x", "uid": 10, "ref": None}) + return TestClient(app), pending + + +def test_verify_with_unchecked_subscribe_disables_opt_in(tmp_path, monkeypatch): + from app.services import otp_service + + async def _ok(*args, **kwargs): + return None + monkeypatch.setattr(otp_service, "verify", _ok) + + client, pending = _build(tmp_path) + r = client.post( + "/verify", + data={"code": "000000"}, # form field "subscribe_to_digests" omitted + cookies={"cassandra_pending": pending}, + follow_redirects=False, + ) + assert r.status_code == 303, r.text + + async def _check(): + from app import db as db_mod + from app.models import User + async with db_mod._session_factory() as s: + u = await s.get(User, 10) + assert u.email_digest_opt_in is False + asyncio.run(_check()) + + +def test_verify_with_checked_subscribe_keeps_opt_in(tmp_path, monkeypatch): + from app.services import otp_service + + async def _ok(*args, **kwargs): + return None + monkeypatch.setattr(otp_service, "verify", _ok) + + client, pending = _build(tmp_path) + r = client.post( + "/verify", + data={"code": "000000", "subscribe_to_digests": "on"}, + cookies={"cassandra_pending": pending}, + follow_redirects=False, + ) + assert r.status_code == 303 + + async def _check(): + from app import db as db_mod + from app.models import User + async with db_mod._session_factory() as s: + u = await s.get(User, 10) + assert u.email_digest_opt_in is True + asyncio.run(_check()) +``` + +- [ ] **Step 2: Run the failing test** + +```bash +pytest tests/test_verify_subscribe.py -v +``` + +Expected: FAIL — the verify handler doesn't read `subscribe_to_digests` yet, so the unchecked case still leaves opt-in `True` from the seed. + +- [ ] **Step 3: Update the verify template** + +Open `app/templates/verify.html`. Find the `
      ` block. Inside the form, after the code input and before the submit button, add: + +```html + +``` + +- [ ] **Step 4: Update the verify POST handler** + +Open `app/routers/auth.py`. Find `verify_submit` (line ~215). Add `subscribe_to_digests` to the form fields and persist it after a successful OTP verify: + +```python +@router.post("/verify") +async def verify_submit( + request: Request, + code: str = Form(...), + subscribe_to_digests: str | None = Form(default=None), + session: AsyncSession = Depends(get_session), +): + # ... existing OTP verification + pending lookup unchanged ... + + user = await get_user(session, pending["uid"]) + if user is None: + return RedirectResponse(url="/login", status_code=303) + user.last_login_at = utcnow() + # An unchecked HTML checkbox sends NO field; that means "opt out". + user.email_digest_opt_in = subscribe_to_digests is not None + await session.commit() + log.info("user.login", user_id=user.id, email=email) + # ... existing redirect + cookie setup unchanged ... +``` + +- [ ] **Step 5: Run the tests, expect PASS** + +```bash +pytest tests/test_verify_subscribe.py -v +``` + +Expected: 2 passed. + +- [ ] **Step 6: Commit** + +```bash +git add app/templates/verify.html app/routers/auth.py tests/test_verify_subscribe.py +git commit -m "auth: subscribe-to-digests checkbox on verify (default on)" +``` + +--- + +## Task 11: Pricing copy updates + +**Files:** +- Modify: `app/templates/pricing.html` + +- [ ] **Step 1: Rewrite the intro paragraph** + +Open `app/templates/pricing.html`. Replace lines 8-13 (the intro paragraph under the `Pricing` heading) with: + +```html +

      + Two tiers. The news aggregator and hourly AI interpretation are + available to everyone — paid extends the news window from + 6 hours to 24 hours and adds daily editorial by email, plus the + portfolio-import features. +

      +``` + +- [ ] **Step 2: Update the Free tier bullets** + +Find the Free `
        ` block. Replace it with: + +```html +
          +
        • News aggregator — last 6 hours, auto-tagged by theme
        • +
        • Cross-asset macro signals across every asset class
        • +
        • Hourly AI interpretation of the news + the tape
        • +
        • Per-group cross-asset summaries
        • +
        • Novice / Intermediate reading levels
        • +
        • Sunday weekly digest by email
        • +
        • Portfolio import & analysis
        • +
        • Encrypted cloud sync
        • +
        +``` + +- [ ] **Step 3: Update the Paid tier bullets** + +Find the Paid `
          ` block. Replace it with: + +```html +
            +
          • Everything in Free
          • +
          • News aggregator — full 24 hours
          • +
          • Portfolio import (Trading 212 CSV)
          • +
          • AI commentary on diversification, sector and currency concentration, and macro-regime context for the holdings you upload
          • +
          • Optional encrypted cloud sync across devices
          • +
          • Daily email digest (Mon–Sat) plus the Sunday recap
          • +
          +``` + +- [ ] **Step 4: Manual visual check** + +```bash +docker compose restart app +``` + +Load `/pricing` in a browser. Confirm both tier cards render the new bullets and the intro paragraph reads cleanly. + +- [ ] **Step 5: Commit** + +```bash +git add app/templates/pricing.html +git commit -m "pricing: free=6h news + Sunday digest; paid=24h + daily digest" +``` + +--- + +## Task 12: `send-test-digest` admin CLI + +**Files:** +- Modify: `app/cli.py` + +- [ ] **Step 1: Add the subcommand handler** + +Open `app/cli.py`. After `show_status`, add: + +```python +async def send_test_digest(email: str, kind: str) -> int: + """Generate a digest and send it to the named user immediately, ignoring + opt-in state and idempotency. Useful for previewing copy in your own + inbox before a real run lands.""" + import httpx + from app import branding + from app.db import get_session_factory + from app.jobs.ai_log_job import ( + REFERENCE_LINE, _latest_quotes_by_group, _recent_headlines_by_bucket, + ) + from app.jobs.email_digest_job import _send_one, _generate_variants + from app.services.openrouter import llm_configured + + if kind not in ("daily", "weekly"): + print(f"error: kind must be 'daily' or 'weekly' (got {kind!r})", + file=sys.stderr) + return 2 + if not llm_configured(): + print("error: LLM provider not configured (set OPENROUTER_API_KEY)", + file=sys.stderr) + return 1 + + factory = get_session_factory() + async with factory() as session: + user = await _get_user_by_email(session, email) + if user is None: + print(f"error: no user with email {email!r}", file=sys.stderr) + return 1 + today = _utcnow() + quotes = await _latest_quotes_by_group(session) + news = await _recent_headlines_by_bucket( + session, hours=(168 if kind == "weekly" else 24), + ) + ctx = dict(today=today, quotes_by_group=quotes, + headlines_by_bucket=news, reference_line=REFERENCE_LINE) + async with httpx.AsyncClient(follow_redirects=True) as client: + variants = await _generate_variants(client, kind, ctx) + tone = (user.digest_tone or "INTERMEDIATE").upper() + content = variants.get(tone) or variants.get("INTERMEDIATE") + if content is None: + print("error: all LLM variants failed", file=sys.stderr) + return 1 + date_str = today.strftime("%Y-%m-%d") + await _send_one(user, kind, content, date_str, session) + print(f"sent {kind} digest to {email} (tone={tone})") + return 0 +``` + +- [ ] **Step 2: Wire it into the argparse dispatcher** + +Find `build_parser()` (line ~97). Add a new subparser: + +```python + t = sub.add_parser("send-test-digest", + help="Send one digest immediately (bypasses opt-in/idempotency)") + t.add_argument("email") + t.add_argument("kind", choices=("daily", "weekly")) +``` + +Find `_dispatch(args)` (line ~114). Add a branch: + +```python + if args.cmd == "send-test-digest": + return await send_test_digest(args.email, args.kind) +``` + +(The exact form depends on the existing dispatcher's pattern — match it.) + +- [ ] **Step 3: Smoke-test the CLI** + +```bash +docker compose exec app python -m app.cli send-test-digest you@example.com daily +``` + +Expected output: `sent daily digest to you@example.com (tone=INTERMEDIATE)`. Check your inbox — the email should render with the new template + working unsubscribe link. + +- [ ] **Step 4: Commit** + +```bash +git add app/cli.py +git commit -m "cli: send-test-digest for previewing digest emails" +``` + +--- + +## Final Verification + +- [ ] **Step 1: Run the full suite** + +```bash +pytest tests/ -q +``` + +Expected: all tests pass. + +- [ ] **Step 2: Manual smoke** + +1. Visit `/news` while signed out — only ~6h of headlines, footer says "Free tier, upgrade for 24h". +2. Sign in as a free user — same 6h cap, footer present. +3. Sign in as a paid user (or admin) — 24h, no footer. +4. Visit `/settings` — Digest section present; toggle persists across reloads. +5. Visit `/pricing` — Free has Sunday-weekly bullet; Paid has 24h + daily bullets. +6. Sign up a new account — verify the "Email me the digest" checkbox is visible and on by default. +7. Send a test digest with the CLI — check inbox, click unsubscribe, confirm DB flag flipped, confirm Settings now shows opt-in=off. + +- [ ] **Step 3: Push** + +After the operator reviews and tags the work, push: + +```bash +git push +``` + +(Up to the operator — the spec says nothing about auto-push.)