diff --git a/app/jobs/email_digest_job.py b/app/jobs/email_digest_job.py new file mode 100644 index 0000000..6274231 --- /dev/null +++ b/app/jobs/email_digest_job.py @@ -0,0 +1,200 @@ +"""Daily/weekly editorial email digest. + +Runs once a day at 06:30 UTC via the scheduler. On Sundays sends the +weekly recap to every opt-in user (free + paid). On other days sends +the daily digest to opt-in paid users only. + +Generates LLM content once per tone (NOVICE + INTERMEDIATE), then fans +out by SMTP. EmailSend audit rows guard against double-delivery if the +job is re-run within the same UTC day. +""" +from __future__ import annotations + +import asyncio +from datetime import datetime, timedelta + +import httpx +from sqlalchemy import select + +from app import branding +from app.config import get_settings +from app.db import utcnow +from app.jobs._helpers import job_lifecycle, log +from app.jobs.ai_log_job import ( + REFERENCE_LINE, + _latest_quotes_by_group, + _recent_headlines_by_bucket, + _month_spend, +) +from app.models import EmailSend, User +from app.routers.email import sign_unsubscribe_token +from app.services.access import paid_status +from app.services.email_service import render_digest_email, send_email +from app.services.openrouter import ( + PROMPT_VERSION, + build_daily_digest_prompt, + build_weekly_digest_prompt, + call_llm, + llm_configured, +) + + +def _now() -> datetime: + """Indirection so tests can monkeypatch the "current time" without + touching the system clock.""" + return utcnow() + + +async def _opt_in_recipients(session, *, paid_only: bool) -> list[User]: + stmt = select(User).where(User.email_digest_opt_in.is_(True)) + rows = (await session.execute(stmt)).scalars().all() + if paid_only: + rows = [u for u in rows if paid_status(u).active] + return rows + + +async def _already_sent_today(session, user_id: int, kind: str, today: datetime) -> bool: + """True if an EmailSend row exists for this user+kind on the same UTC + day, with status in ('sent','error'). 'error' counts because we don't + want to keep retrying a bad address inside the same daily slot.""" + day_start = today.replace(hour=0, minute=0, second=0, microsecond=0) + day_end = day_start + timedelta(days=1) + stmt = select(EmailSend.id).where( + EmailSend.user_id == user_id, + EmailSend.kind == kind, + EmailSend.sent_at >= day_start, + EmailSend.sent_at < day_end, + EmailSend.status.in_(("sent", "error")), + ) + return (await session.execute(stmt)).first() is not None + + +async def _generate_variants(client, kind: str, ctx: dict) -> dict[str, str]: + """Returns {tone: html_content}. Missing tone means generation failed + for that variant — skip recipients on that tone.""" + builder = build_weekly_digest_prompt if kind == "weekly" else build_daily_digest_prompt + out: dict[str, str] = {} + for tone in ("NOVICE", "INTERMEDIATE"): + sys_, usr = builder(tone=tone, **ctx) + try: + result = await call_llm( + client, + [{"role": "system", "content": sys_}, + {"role": "user", "content": usr}], + ) + out[tone] = result.content + log.info("digest.variant_ok", kind=kind, tone=tone, + prompt_tokens=result.prompt_tokens, + completion_tokens=result.completion_tokens) + except Exception as e: + log.error("digest.variant_failed", kind=kind, tone=tone, + error=str(e)[:200]) + return out + + +def _kind_for_today(today: datetime) -> str: + """Sunday → weekly. Mon–Sat → daily.""" + return "weekly" if today.weekday() == 6 else "daily" + + +async def _send_one(user: User, kind: str, content_html: str, date_str: str, + session) -> None: + settings_url = f"{branding.SITE_URL}/settings" + unsubscribe_url = ( + f"{branding.SITE_URL}/email/unsubscribe" + f"?token={sign_unsubscribe_token(user.id)}" + ) + subject, text_body, html_body = render_digest_email( + kind=kind, date_str=date_str, + content_html=content_html, + unsubscribe_url=unsubscribe_url, + settings_url=settings_url, + ) + try: + await send_email(to=user.email, subject=subject, + text_body=text_body, html_body=html_body) + status_ = "sent" + err = None + except Exception as e: + status_ = "error" + err = str(e)[:255] + log.error("digest.send_failed", user_id=user.id, error=err) + session.add(EmailSend( + user_id=user.id, kind=kind, sent_at=_now(), + status=status_, error=err, + )) + await session.commit() + + +async def run() -> None: + async with job_lifecycle("email_digest_job") as (session, jr): + if jr.status == "skipped": + return + s = get_settings() + if not llm_configured(): + log.warning("digest.skipped_no_key", provider=s.LLM_PROVIDER) + jr.status = "skipped" + return + + today = _now() + kind = _kind_for_today(today) + date_str = today.strftime("%Y-%m-%d") + + recipients = await _opt_in_recipients( + session, paid_only=(kind == "daily"), + ) + fresh: list[User] = [] + for u in recipients: + if not await _already_sent_today(session, u.id, kind, today): + fresh.append(u) + if not fresh: + log.info("digest.no_fresh_recipients", kind=kind, + total=len(recipients)) + jr.status = "skipped" + return + + spent = await _month_spend(session) + if spent >= s.OPENROUTER_MONTHLY_CAP_USD: + log.warning("digest.cap_reached", spent=spent, + cap=s.OPENROUTER_MONTHLY_CAP_USD) + jr.status = "skipped" + jr.error = f"monthly cost cap reached (${spent:.2f})" + return + + quotes = await _latest_quotes_by_group(session) + news = await _recent_headlines_by_bucket( + session, hours=(168 if kind == "weekly" else 24), + ) + ctx = dict( + today=today, + quotes_by_group=quotes, + headlines_by_bucket=news, + reference_line=REFERENCE_LINE, + ) + + async with httpx.AsyncClient(follow_redirects=True) as client: + variants = await _generate_variants(client, kind, ctx) + + if not variants: + log.warning("digest.all_variants_failed", kind=kind) + jr.status = "failed" + jr.error = "all variants failed" + return + + written = 0 + for u in fresh: + tone = (u.digest_tone or "INTERMEDIATE").upper() + content = variants.get(tone) or variants.get("INTERMEDIATE") + if content is None: + continue + await _send_one(u, kind, content, date_str, session) + await asyncio.sleep(0.1) + written += 1 + + jr.items_written = written + log.info("digest.done", kind=kind, written=written, + prompt_version=PROMPT_VERSION) + + +if __name__ == "__main__": + asyncio.run(run()) diff --git a/tests/test_email_digest_job.py b/tests/test_email_digest_job.py new file mode 100644 index 0000000..c8611a1 --- /dev/null +++ b/tests/test_email_digest_job.py @@ -0,0 +1,96 @@ +"""Recipient selection + idempotency for the digest job.""" +from __future__ import annotations + +import asyncio +from datetime import datetime, timezone, timedelta +from unittest.mock import AsyncMock, patch + + +def _bootstrap(tmp_path): + """Spin up an in-memory DB with three users: a paid opt-in, a paid + opt-out, a free opt-in.""" + from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine + + from app import db as db_mod + from app.db import Base + from app.models import User + + engine = create_async_engine(f"sqlite+aiosqlite:///{tmp_path}/dj.db") + factory = async_sessionmaker(engine, expire_on_commit=False) + db_mod._engine = engine + db_mod._session_factory = factory + + async def _seed(): + async with engine.begin() as conn: + await conn.run_sync(Base.metadata.create_all) + async with factory() as s: + s.add(User(id=1, email="paid_in@x", tier="paid", email_digest_opt_in=True)) + s.add(User(id=2, email="paid_out@x", tier="paid", email_digest_opt_in=False)) + s.add(User(id=3, email="free_in@x", tier="free", email_digest_opt_in=True)) + await s.commit() + + asyncio.run(_seed()) + return factory + + +def _patch_today(weekday: int): + """Return a datetime whose weekday() == `weekday` (0=Mon, 6=Sun).""" + base = datetime(2026, 5, 25, 6, 30, tzinfo=timezone.utc) # Monday + return base + timedelta(days=(weekday - base.weekday()) % 7) + + +def _stub_generate(content="

x

"): + """Stub out the LLM call so the test never hits the network. Use a + SimpleNamespace so we don't have to know the real result class name.""" + from types import SimpleNamespace + async def _fake(_client, messages, **kwargs): + return SimpleNamespace( + content=content, model="stub", + prompt_tokens=10, completion_tokens=10, cost_usd=0.0, + ) + return _fake + + +def test_daily_run_only_paid_opt_in(tmp_path): + _bootstrap(tmp_path) + from app.jobs import email_digest_job + with patch("app.jobs.email_digest_job._now", + return_value=_patch_today(0)), \ + patch("app.jobs.email_digest_job.send_email", + new=AsyncMock()) as send_mock, \ + patch("app.jobs.email_digest_job.call_llm", + new=AsyncMock(side_effect=_stub_generate())): + asyncio.run(email_digest_job.run()) + addresses_sent = {call.kwargs.get("to") for call in send_mock.await_args_list} + assert addresses_sent == {"paid_in@x"} + + +def test_weekly_run_includes_free_and_paid_opt_in(tmp_path): + _bootstrap(tmp_path) + from app.jobs import email_digest_job + with patch("app.jobs.email_digest_job._now", + return_value=_patch_today(6)), \ + patch("app.jobs.email_digest_job.send_email", + new=AsyncMock()) as send_mock, \ + patch("app.jobs.email_digest_job.call_llm", + new=AsyncMock(side_effect=_stub_generate())): + asyncio.run(email_digest_job.run()) + addresses_sent = {call.kwargs.get("to") for call in send_mock.await_args_list} + assert addresses_sent == {"paid_in@x", "free_in@x"} + + +def test_second_run_same_day_is_idempotent(tmp_path): + _bootstrap(tmp_path) + from app.jobs import email_digest_job + with patch("app.jobs.email_digest_job._now", + return_value=_patch_today(0)), \ + patch("app.jobs.email_digest_job.send_email", + new=AsyncMock()) as send_mock, \ + patch("app.jobs.email_digest_job.call_llm", + new=AsyncMock(side_effect=_stub_generate())): + asyncio.run(email_digest_job.run()) + first_count = len(send_mock.await_args_list) + asyncio.run(email_digest_job.run()) + second_count = len(send_mock.await_args_list) + assert first_count > 0 + assert second_count == first_count, "second run should not re-send"