digest: daily/weekly job w/ EmailSend idempotency

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Giorgio Gilestro 2026-05-25 23:13:24 +02:00
parent 0a476bed22
commit 2462882006
2 changed files with 296 additions and 0 deletions

View file

@ -0,0 +1,200 @@
"""Daily/weekly editorial email digest.
Runs once a day at 06:30 UTC via the scheduler. On Sundays sends the
weekly recap to every opt-in user (free + paid). On other days sends
the daily digest to opt-in paid users only.
Generates LLM content once per tone (NOVICE + INTERMEDIATE), then fans
out by SMTP. EmailSend audit rows guard against double-delivery if the
job is re-run within the same UTC day.
"""
from __future__ import annotations
import asyncio
from datetime import datetime, timedelta
import httpx
from sqlalchemy import select
from app import branding
from app.config import get_settings
from app.db import utcnow
from app.jobs._helpers import job_lifecycle, log
from app.jobs.ai_log_job import (
REFERENCE_LINE,
_latest_quotes_by_group,
_recent_headlines_by_bucket,
_month_spend,
)
from app.models import EmailSend, User
from app.routers.email import sign_unsubscribe_token
from app.services.access import paid_status
from app.services.email_service import render_digest_email, send_email
from app.services.openrouter import (
PROMPT_VERSION,
build_daily_digest_prompt,
build_weekly_digest_prompt,
call_llm,
llm_configured,
)
def _now() -> datetime:
"""Indirection so tests can monkeypatch the "current time" without
touching the system clock."""
return utcnow()
async def _opt_in_recipients(session, *, paid_only: bool) -> list[User]:
stmt = select(User).where(User.email_digest_opt_in.is_(True))
rows = (await session.execute(stmt)).scalars().all()
if paid_only:
rows = [u for u in rows if paid_status(u).active]
return rows
async def _already_sent_today(session, user_id: int, kind: str, today: datetime) -> bool:
"""True if an EmailSend row exists for this user+kind on the same UTC
day, with status in ('sent','error'). 'error' counts because we don't
want to keep retrying a bad address inside the same daily slot."""
day_start = today.replace(hour=0, minute=0, second=0, microsecond=0)
day_end = day_start + timedelta(days=1)
stmt = select(EmailSend.id).where(
EmailSend.user_id == user_id,
EmailSend.kind == kind,
EmailSend.sent_at >= day_start,
EmailSend.sent_at < day_end,
EmailSend.status.in_(("sent", "error")),
)
return (await session.execute(stmt)).first() is not None
async def _generate_variants(client, kind: str, ctx: dict) -> dict[str, str]:
"""Returns {tone: html_content}. Missing tone means generation failed
for that variant skip recipients on that tone."""
builder = build_weekly_digest_prompt if kind == "weekly" else build_daily_digest_prompt
out: dict[str, str] = {}
for tone in ("NOVICE", "INTERMEDIATE"):
sys_, usr = builder(tone=tone, **ctx)
try:
result = await call_llm(
client,
[{"role": "system", "content": sys_},
{"role": "user", "content": usr}],
)
out[tone] = result.content
log.info("digest.variant_ok", kind=kind, tone=tone,
prompt_tokens=result.prompt_tokens,
completion_tokens=result.completion_tokens)
except Exception as e:
log.error("digest.variant_failed", kind=kind, tone=tone,
error=str(e)[:200])
return out
def _kind_for_today(today: datetime) -> str:
"""Sunday → weekly. MonSat → daily."""
return "weekly" if today.weekday() == 6 else "daily"
async def _send_one(user: User, kind: str, content_html: str, date_str: str,
session) -> None:
settings_url = f"{branding.SITE_URL}/settings"
unsubscribe_url = (
f"{branding.SITE_URL}/email/unsubscribe"
f"?token={sign_unsubscribe_token(user.id)}"
)
subject, text_body, html_body = render_digest_email(
kind=kind, date_str=date_str,
content_html=content_html,
unsubscribe_url=unsubscribe_url,
settings_url=settings_url,
)
try:
await send_email(to=user.email, subject=subject,
text_body=text_body, html_body=html_body)
status_ = "sent"
err = None
except Exception as e:
status_ = "error"
err = str(e)[:255]
log.error("digest.send_failed", user_id=user.id, error=err)
session.add(EmailSend(
user_id=user.id, kind=kind, sent_at=_now(),
status=status_, error=err,
))
await session.commit()
async def run() -> None:
async with job_lifecycle("email_digest_job") as (session, jr):
if jr.status == "skipped":
return
s = get_settings()
if not llm_configured():
log.warning("digest.skipped_no_key", provider=s.LLM_PROVIDER)
jr.status = "skipped"
return
today = _now()
kind = _kind_for_today(today)
date_str = today.strftime("%Y-%m-%d")
recipients = await _opt_in_recipients(
session, paid_only=(kind == "daily"),
)
fresh: list[User] = []
for u in recipients:
if not await _already_sent_today(session, u.id, kind, today):
fresh.append(u)
if not fresh:
log.info("digest.no_fresh_recipients", kind=kind,
total=len(recipients))
jr.status = "skipped"
return
spent = await _month_spend(session)
if spent >= s.OPENROUTER_MONTHLY_CAP_USD:
log.warning("digest.cap_reached", spent=spent,
cap=s.OPENROUTER_MONTHLY_CAP_USD)
jr.status = "skipped"
jr.error = f"monthly cost cap reached (${spent:.2f})"
return
quotes = await _latest_quotes_by_group(session)
news = await _recent_headlines_by_bucket(
session, hours=(168 if kind == "weekly" else 24),
)
ctx = dict(
today=today,
quotes_by_group=quotes,
headlines_by_bucket=news,
reference_line=REFERENCE_LINE,
)
async with httpx.AsyncClient(follow_redirects=True) as client:
variants = await _generate_variants(client, kind, ctx)
if not variants:
log.warning("digest.all_variants_failed", kind=kind)
jr.status = "failed"
jr.error = "all variants failed"
return
written = 0
for u in fresh:
tone = (u.digest_tone or "INTERMEDIATE").upper()
content = variants.get(tone) or variants.get("INTERMEDIATE")
if content is None:
continue
await _send_one(u, kind, content, date_str, session)
await asyncio.sleep(0.1)
written += 1
jr.items_written = written
log.info("digest.done", kind=kind, written=written,
prompt_version=PROMPT_VERSION)
if __name__ == "__main__":
asyncio.run(run())