"""Daily/weekly editorial email digest. Runs once a day at 06:30 UTC via the scheduler. On Sundays sends the weekly recap to every opt-in user (free + paid). On other days sends the daily digest to opt-in paid users only. Generates LLM content once per tone (NOVICE + INTERMEDIATE), then fans out by SMTP. EmailSend audit rows guard against double-delivery if the job is re-run within the same UTC day. """ from __future__ import annotations import asyncio from datetime import datetime, timedelta import httpx from sqlalchemy import select from app import branding from app.config import get_settings from app.db import utcnow from app.jobs._helpers import job_lifecycle, log from app.jobs._market_context import ( REFERENCE_LINE, latest_quotes_by_group, month_spend, recent_headlines_by_bucket, ) from app.models import EmailSend, User from app.routers.email import sign_unsubscribe_token from app.services.access import paid_status from app.services.digest_email import render_digest_email from app.services.email_service import send_email from app.services.i18n import ACTIVE_LANGUAGES from app.services.llm_prompts import ( PROMPT_VERSION, build_daily_digest_prompt, build_weekly_digest_prompt, ) from app.services.openrouter import ( call_llm, llm_configured, ) from app.services.output_review import review_read from app.services.translation import translate def _now() -> datetime: """Indirection so tests can monkeypatch the "current time" without touching the system clock.""" return utcnow() async def _opt_in_recipients(session, *, paid_only: bool) -> list[User]: stmt = select(User).where(User.email_digest_opt_in.is_(True)) rows = (await session.execute(stmt)).scalars().all() if paid_only: rows = [u for u in rows if paid_status(u).active] return rows async def _already_sent_today(session, user_id: int, kind: str, today: datetime) -> bool: """True if an EmailSend row exists for this user+kind on the same UTC day, with status in ('sent','error'). 'error' counts because we don't want to keep retrying a bad address inside the same daily slot.""" day_start = today.replace(hour=0, minute=0, second=0, microsecond=0) day_end = day_start + timedelta(days=1) stmt = select(EmailSend.id).where( EmailSend.user_id == user_id, EmailSend.kind == kind, EmailSend.sent_at >= day_start, EmailSend.sent_at < day_end, EmailSend.status.in_(("sent", "error")), ) return (await session.execute(stmt)).first() is not None async def _generate_variants(session, client, kind: str, ctx: dict) -> dict[str, str]: """Returns {tone: html_content}. Missing tone means generation failed for that variant — skip recipients on that tone. Persists an AICall row per attempt so digest LLM spend counts toward the monthly cost cap on subsequent runs.""" from app.models import AICall from app.services.openrouter import active_model builder = build_weekly_digest_prompt if kind == "weekly" else build_daily_digest_prompt out: dict[str, str] = {} for tone in ("NOVICE", "INTERMEDIATE"): sys_, usr = builder(tone=tone, **ctx) try: result = await call_llm( client, [{"role": "system", "content": sys_}, {"role": "user", "content": usr}], ) # Reviewer gate. Digest emails land in inboxes — once # delivered they're unrecallable, so a financial-advice slip # has more reach than the dashboard. Drop rejected variants; # users on that tone get no digest this cycle (better than # delivering bad copy). verdict = await review_read(client, result.content) full_cost = (result.cost_usd or 0.0) + (verdict.cost_usd or 0.0) if not verdict.clean: session.add(AICall( model=result.model, prompt_tokens=result.prompt_tokens, completion_tokens=result.completion_tokens, cost_usd=full_cost, status="leaked", error=f"reviewer: {verdict.reason}", )) await session.commit() log.warning("digest.reviewer_rejected", kind=kind, tone=tone, reason=verdict.reason, preview=result.content[:120]) continue out[tone] = result.content session.add(AICall( model=result.model, prompt_tokens=result.prompt_tokens, completion_tokens=result.completion_tokens, cost_usd=full_cost, status="ok", )) await session.commit() log.info("digest.variant_ok", kind=kind, tone=tone, prompt_tokens=result.prompt_tokens, completion_tokens=result.completion_tokens) except Exception as e: session.add(AICall( model=active_model(), status="error", error=f"{kind}/{tone}: {str(e)[:480]}", )) await session.commit() log.error("digest.variant_failed", kind=kind, tone=tone, error=str(e)[:200]) return out def _kind_for_today(today: datetime) -> str: """Sunday → weekly. Mon–Sat → daily.""" return "weekly" if today.weekday() == 6 else "daily" async def _translate_variants_for_active_langs( client, english_variants: dict[str, str], target_langs: list[str], ) -> dict[tuple[str, str], str]: """Build a {(tone, lang): content_md} table. Starts with the English variants as the canonical cells. For each (tone, target_lang) pair where target_lang != 'en', calls translate() in parallel; on failure the cell falls back to the English variant of the same tone so the digest still goes out, just untranslated. """ table: dict[tuple[str, str], str] = { (tone, "en"): content for tone, content in english_variants.items() } pairs = [ (tone, lang) for tone in english_variants for lang in target_langs if lang != "en" ] if not pairs: return table results = await asyncio.gather(*[ translate(client, english_variants[tone], lang) for tone, lang in pairs ], return_exceptions=True) for (tone, lang), result in zip(pairs, results): if isinstance(result, Exception): log.warning("digest.translate.failed", tone=tone, lang=lang, error=str(result)[:200]) table[(tone, lang)] = english_variants[tone] continue translated_md, _llm_log = result table[(tone, lang)] = translated_md return table def _pick_variant( table: dict[tuple[str, str], str], tone: str, lang: str, ) -> str: """Return the digest content for a recipient. Lookup order: exact (tone, lang) → (tone, 'en') → ('INTERMEDIATE', 'en') → first table value. The last falls are defensive; the table always contains at least one English entry when the job is sending. """ if (tone, lang) in table: return table[(tone, lang)] if (tone, "en") in table: return table[(tone, "en")] if ("INTERMEDIATE", "en") in table: return table[("INTERMEDIATE", "en")] return next(iter(table.values())) async def _send_one(user: User, kind: str, content_html: str, date_str: str, session) -> None: settings_url = f"{branding.SITE_URL}/settings" unsubscribe_url = ( f"{branding.SITE_URL}/email/unsubscribe" f"?token={sign_unsubscribe_token(user.id)}" ) subject, text_body, html_body = render_digest_email( kind=kind, date_str=date_str, content_html=content_html, unsubscribe_url=unsubscribe_url, settings_url=settings_url, ) try: await send_email(to=user.email, subject=subject, text_body=text_body, html_body=html_body) status_ = "sent" err = None except Exception as e: status_ = "error" err = str(e)[:255] log.error("digest.send_failed", user_id=user.id, error=err) session.add(EmailSend( user_id=user.id, kind=kind, sent_at=_now(), status=status_, error=err, )) await session.commit() async def run() -> None: async with job_lifecycle("email_digest_job") as (session, jr): if jr.status == "skipped": return s = get_settings() if not llm_configured(): log.warning("digest.skipped_no_key", provider=s.LLM_PROVIDER) jr.status = "skipped" return today = _now() kind = _kind_for_today(today) date_str = today.strftime("%Y-%m-%d") recipients = await _opt_in_recipients( session, paid_only=(kind == "daily"), ) fresh: list[User] = [] for u in recipients: if not await _already_sent_today(session, u.id, kind, today): fresh.append(u) if not fresh: log.info("digest.no_fresh_recipients", kind=kind, total=len(recipients)) jr.status = "skipped" return spent = await month_spend(session) if spent >= s.OPENROUTER_MONTHLY_CAP_USD: log.warning("digest.cap_reached", spent=spent, cap=s.OPENROUTER_MONTHLY_CAP_USD) jr.status = "skipped" jr.error = f"monthly cost cap reached (${spent:.2f})" return quotes = await latest_quotes_by_group(session) news = await recent_headlines_by_bucket( session, hours=(168 if kind == "weekly" else 24), ) ctx = dict( today=today, quotes_by_group=quotes, headlines_by_bucket=news, reference_line=REFERENCE_LINE, ) async with httpx.AsyncClient(follow_redirects=True) as client: variants = await _generate_variants(session, client, kind, ctx) if not variants: log.warning("digest.all_variants_failed", kind=kind) jr.status = "failed" jr.error = "all variants failed" return # Build the per-language translation table once per job run. active_non_en = sorted({l for l in ACTIVE_LANGUAGES if l != "en"}) async with httpx.AsyncClient(follow_redirects=True) as client: variant_table = await _translate_variants_for_active_langs( client, variants, active_non_en, ) written = 0 for u in fresh: tone = (u.digest_tone or "INTERMEDIATE").upper() content = _pick_variant( variant_table, tone=tone, lang=(u.lang or "en"), ) await _send_one(u, kind, content, date_str, session) await asyncio.sleep(0.1) written += 1 jr.items_written = written log.info("digest.done", kind=kind, written=written, prompt_version=PROMPT_VERSION) if __name__ == "__main__": asyncio.run(run())