"""Daily/weekly editorial email digest. Runs once a day at 06:30 UTC via the scheduler. On Sundays sends the weekly recap to every opt-in user (free + paid). On other days sends the daily digest to opt-in paid users only. Generates LLM content once per tone (NOVICE + INTERMEDIATE), then fans out by SMTP. EmailSend audit rows guard against double-delivery if the job is re-run within the same UTC day. """ from __future__ import annotations import asyncio from datetime import datetime, timedelta import httpx from sqlalchemy import select from app import branding from app.config import get_settings from app.db import utcnow from app.jobs._helpers import job_lifecycle, log from app.jobs.ai_log_job import ( REFERENCE_LINE, _latest_quotes_by_group, _recent_headlines_by_bucket, _month_spend, ) from app.models import EmailSend, User from app.routers.email import sign_unsubscribe_token from app.services.access import paid_status from app.services.email_service import render_digest_email, send_email from app.services.openrouter import ( PROMPT_VERSION, build_daily_digest_prompt, build_weekly_digest_prompt, call_llm, llm_configured, ) def _now() -> datetime: """Indirection so tests can monkeypatch the "current time" without touching the system clock.""" return utcnow() async def _opt_in_recipients(session, *, paid_only: bool) -> list[User]: stmt = select(User).where(User.email_digest_opt_in.is_(True)) rows = (await session.execute(stmt)).scalars().all() if paid_only: rows = [u for u in rows if paid_status(u).active] return rows async def _already_sent_today(session, user_id: int, kind: str, today: datetime) -> bool: """True if an EmailSend row exists for this user+kind on the same UTC day, with status in ('sent','error'). 'error' counts because we don't want to keep retrying a bad address inside the same daily slot.""" day_start = today.replace(hour=0, minute=0, second=0, microsecond=0) day_end = day_start + timedelta(days=1) stmt = select(EmailSend.id).where( EmailSend.user_id == user_id, EmailSend.kind == kind, EmailSend.sent_at >= day_start, EmailSend.sent_at < day_end, EmailSend.status.in_(("sent", "error")), ) return (await session.execute(stmt)).first() is not None async def _generate_variants(client, kind: str, ctx: dict) -> dict[str, str]: """Returns {tone: html_content}. Missing tone means generation failed for that variant — skip recipients on that tone.""" builder = build_weekly_digest_prompt if kind == "weekly" else build_daily_digest_prompt out: dict[str, str] = {} for tone in ("NOVICE", "INTERMEDIATE"): sys_, usr = builder(tone=tone, **ctx) try: result = await call_llm( client, [{"role": "system", "content": sys_}, {"role": "user", "content": usr}], ) out[tone] = result.content log.info("digest.variant_ok", kind=kind, tone=tone, prompt_tokens=result.prompt_tokens, completion_tokens=result.completion_tokens) except Exception as e: log.error("digest.variant_failed", kind=kind, tone=tone, error=str(e)[:200]) return out def _kind_for_today(today: datetime) -> str: """Sunday → weekly. Mon–Sat → daily.""" return "weekly" if today.weekday() == 6 else "daily" async def _send_one(user: User, kind: str, content_html: str, date_str: str, session) -> None: settings_url = f"{branding.SITE_URL}/settings" unsubscribe_url = ( f"{branding.SITE_URL}/email/unsubscribe" f"?token={sign_unsubscribe_token(user.id)}" ) subject, text_body, html_body = render_digest_email( kind=kind, date_str=date_str, content_html=content_html, unsubscribe_url=unsubscribe_url, settings_url=settings_url, ) try: await send_email(to=user.email, subject=subject, text_body=text_body, html_body=html_body) status_ = "sent" err = None except Exception as e: status_ = "error" err = str(e)[:255] log.error("digest.send_failed", user_id=user.id, error=err) session.add(EmailSend( user_id=user.id, kind=kind, sent_at=_now(), status=status_, error=err, )) await session.commit() async def run() -> None: async with job_lifecycle("email_digest_job") as (session, jr): if jr.status == "skipped": return s = get_settings() if not llm_configured(): log.warning("digest.skipped_no_key", provider=s.LLM_PROVIDER) jr.status = "skipped" return today = _now() kind = _kind_for_today(today) date_str = today.strftime("%Y-%m-%d") recipients = await _opt_in_recipients( session, paid_only=(kind == "daily"), ) fresh: list[User] = [] for u in recipients: if not await _already_sent_today(session, u.id, kind, today): fresh.append(u) if not fresh: log.info("digest.no_fresh_recipients", kind=kind, total=len(recipients)) jr.status = "skipped" return spent = await _month_spend(session) if spent >= s.OPENROUTER_MONTHLY_CAP_USD: log.warning("digest.cap_reached", spent=spent, cap=s.OPENROUTER_MONTHLY_CAP_USD) jr.status = "skipped" jr.error = f"monthly cost cap reached (${spent:.2f})" return quotes = await _latest_quotes_by_group(session) news = await _recent_headlines_by_bucket( session, hours=(168 if kind == "weekly" else 24), ) ctx = dict( today=today, quotes_by_group=quotes, headlines_by_bucket=news, reference_line=REFERENCE_LINE, ) async with httpx.AsyncClient(follow_redirects=True) as client: variants = await _generate_variants(client, kind, ctx) if not variants: log.warning("digest.all_variants_failed", kind=kind) jr.status = "failed" jr.error = "all variants failed" return written = 0 for u in fresh: tone = (u.digest_tone or "INTERMEDIATE").upper() # Fall back to INTERMEDIATE first (the more common tone) and then # to whatever variant succeeded, so an asymmetric LLM failure # doesn't silently skip the user. content = (variants.get(tone) or variants.get("INTERMEDIATE") or next(iter(variants.values()), None)) if content is None: continue await _send_one(u, kind, content, date_str, session) await asyncio.sleep(0.1) written += 1 jr.items_written = written log.info("digest.done", kind=kind, written=written, prompt_version=PROMPT_VERSION) if __name__ == "__main__": asyncio.run(run())