read.markets/app/jobs/email_digest_job.py
Giorgio Gilestro b055eea1c2 email: split digest renderer to digest_email.py
email_service.py was 428 lines covering three different concerns:
SMTP transport, OTP/welcome rendering (tightly coupled — same brand
template + theme), and digest rendering (a totally different shape
of email, different layout, different copy cadence). The two halves
changed at different cadences and made the file noisy to navigate.

Extracted render_digest_email + _DIGEST_HTML_TEMPLATE +
_strip_html_to_text to app/services/digest_email.py. SMTP transport
and the OTP/welcome surface stay in email_service.py.

Import sites updated: email_digest_job and test_email_render now
import render_digest_email from digest_email. The OTP/welcome
import sites (auth router, branding tests, test_email_service) are
untouched.

No behaviour change — pure relocation. Templates byte-identical.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-27 21:33:06 +02:00

289 lines
9.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Daily/weekly editorial email digest.
Runs once a day at 06:30 UTC via the scheduler. On Sundays sends the
weekly recap to every opt-in user (free + paid). On other days sends
the daily digest to opt-in paid users only.
Generates LLM content once per tone (NOVICE + INTERMEDIATE), then fans
out by SMTP. EmailSend audit rows guard against double-delivery if the
job is re-run within the same UTC day.
"""
from __future__ import annotations
import asyncio
from datetime import datetime, timedelta
import httpx
from sqlalchemy import select
from app import branding
from app.config import get_settings
from app.db import utcnow
from app.jobs._helpers import job_lifecycle, log
from app.jobs._market_context import (
REFERENCE_LINE,
latest_quotes_by_group,
month_spend,
recent_headlines_by_bucket,
)
from app.models import EmailSend, User
from app.routers.email import sign_unsubscribe_token
from app.services.access import paid_status
from app.services.digest_email import render_digest_email
from app.services.email_service import send_email
from app.services.i18n import ACTIVE_LANGUAGES
from app.services.llm_prompts import (
PROMPT_VERSION,
build_daily_digest_prompt,
build_weekly_digest_prompt,
)
from app.services.openrouter import (
call_llm,
llm_configured,
)
from app.services.translation import translate
def _now() -> datetime:
"""Indirection so tests can monkeypatch the "current time" without
touching the system clock."""
return utcnow()
async def _opt_in_recipients(session, *, paid_only: bool) -> list[User]:
stmt = select(User).where(User.email_digest_opt_in.is_(True))
rows = (await session.execute(stmt)).scalars().all()
if paid_only:
rows = [u for u in rows if paid_status(u).active]
return rows
async def _already_sent_today(session, user_id: int, kind: str, today: datetime) -> bool:
"""True if an EmailSend row exists for this user+kind on the same UTC
day, with status in ('sent','error'). 'error' counts because we don't
want to keep retrying a bad address inside the same daily slot."""
day_start = today.replace(hour=0, minute=0, second=0, microsecond=0)
day_end = day_start + timedelta(days=1)
stmt = select(EmailSend.id).where(
EmailSend.user_id == user_id,
EmailSend.kind == kind,
EmailSend.sent_at >= day_start,
EmailSend.sent_at < day_end,
EmailSend.status.in_(("sent", "error")),
)
return (await session.execute(stmt)).first() is not None
async def _generate_variants(session, client, kind: str, ctx: dict) -> dict[str, str]:
"""Returns {tone: html_content}. Missing tone means generation failed
for that variant — skip recipients on that tone.
Persists an AICall row per attempt so digest LLM spend counts toward
the monthly cost cap on subsequent runs."""
from app.models import AICall
from app.services.openrouter import active_model
builder = build_weekly_digest_prompt if kind == "weekly" else build_daily_digest_prompt
out: dict[str, str] = {}
for tone in ("NOVICE", "INTERMEDIATE"):
sys_, usr = builder(tone=tone, **ctx)
try:
result = await call_llm(
client,
[{"role": "system", "content": sys_},
{"role": "user", "content": usr}],
)
out[tone] = result.content
session.add(AICall(
model=result.model,
prompt_tokens=result.prompt_tokens,
completion_tokens=result.completion_tokens,
cost_usd=result.cost_usd,
status="ok",
))
await session.commit()
log.info("digest.variant_ok", kind=kind, tone=tone,
prompt_tokens=result.prompt_tokens,
completion_tokens=result.completion_tokens)
except Exception as e:
session.add(AICall(
model=active_model(), status="error",
error=f"{kind}/{tone}: {str(e)[:480]}",
))
await session.commit()
log.error("digest.variant_failed", kind=kind, tone=tone,
error=str(e)[:200])
return out
def _kind_for_today(today: datetime) -> str:
"""Sunday → weekly. MonSat → daily."""
return "weekly" if today.weekday() == 6 else "daily"
async def _translate_variants_for_active_langs(
client,
english_variants: dict[str, str],
target_langs: list[str],
) -> dict[tuple[str, str], str]:
"""Build a {(tone, lang): content_md} table.
Starts with the English variants as the canonical cells. For each
(tone, target_lang) pair where target_lang != 'en', calls translate()
in parallel; on failure the cell falls back to the English variant
of the same tone so the digest still goes out, just untranslated.
"""
table: dict[tuple[str, str], str] = {
(tone, "en"): content for tone, content in english_variants.items()
}
pairs = [
(tone, lang)
for tone in english_variants
for lang in target_langs
if lang != "en"
]
if not pairs:
return table
results = await asyncio.gather(*[
translate(client, english_variants[tone], lang) for tone, lang in pairs
], return_exceptions=True)
for (tone, lang), result in zip(pairs, results):
if isinstance(result, Exception):
log.warning("digest.translate.failed",
tone=tone, lang=lang, error=str(result)[:200])
table[(tone, lang)] = english_variants[tone]
continue
translated_md, _llm_log = result
table[(tone, lang)] = translated_md
return table
def _pick_variant(
table: dict[tuple[str, str], str], tone: str, lang: str,
) -> str:
"""Return the digest content for a recipient.
Lookup order: exact (tone, lang) → (tone, 'en') → ('INTERMEDIATE',
'en') → first table value. The last falls are defensive; the table
always contains at least one English entry when the job is sending.
"""
if (tone, lang) in table:
return table[(tone, lang)]
if (tone, "en") in table:
return table[(tone, "en")]
if ("INTERMEDIATE", "en") in table:
return table[("INTERMEDIATE", "en")]
return next(iter(table.values()))
async def _send_one(user: User, kind: str, content_html: str, date_str: str,
session) -> None:
settings_url = f"{branding.SITE_URL}/settings"
unsubscribe_url = (
f"{branding.SITE_URL}/email/unsubscribe"
f"?token={sign_unsubscribe_token(user.id)}"
)
subject, text_body, html_body = render_digest_email(
kind=kind, date_str=date_str,
content_html=content_html,
unsubscribe_url=unsubscribe_url,
settings_url=settings_url,
)
try:
await send_email(to=user.email, subject=subject,
text_body=text_body, html_body=html_body)
status_ = "sent"
err = None
except Exception as e:
status_ = "error"
err = str(e)[:255]
log.error("digest.send_failed", user_id=user.id, error=err)
session.add(EmailSend(
user_id=user.id, kind=kind, sent_at=_now(),
status=status_, error=err,
))
await session.commit()
async def run() -> None:
async with job_lifecycle("email_digest_job") as (session, jr):
if jr.status == "skipped":
return
s = get_settings()
if not llm_configured():
log.warning("digest.skipped_no_key", provider=s.LLM_PROVIDER)
jr.status = "skipped"
return
today = _now()
kind = _kind_for_today(today)
date_str = today.strftime("%Y-%m-%d")
recipients = await _opt_in_recipients(
session, paid_only=(kind == "daily"),
)
fresh: list[User] = []
for u in recipients:
if not await _already_sent_today(session, u.id, kind, today):
fresh.append(u)
if not fresh:
log.info("digest.no_fresh_recipients", kind=kind,
total=len(recipients))
jr.status = "skipped"
return
spent = await month_spend(session)
if spent >= s.OPENROUTER_MONTHLY_CAP_USD:
log.warning("digest.cap_reached", spent=spent,
cap=s.OPENROUTER_MONTHLY_CAP_USD)
jr.status = "skipped"
jr.error = f"monthly cost cap reached (${spent:.2f})"
return
quotes = await latest_quotes_by_group(session)
news = await recent_headlines_by_bucket(
session, hours=(168 if kind == "weekly" else 24),
)
ctx = dict(
today=today,
quotes_by_group=quotes,
headlines_by_bucket=news,
reference_line=REFERENCE_LINE,
)
async with httpx.AsyncClient(follow_redirects=True) as client:
variants = await _generate_variants(session, client, kind, ctx)
if not variants:
log.warning("digest.all_variants_failed", kind=kind)
jr.status = "failed"
jr.error = "all variants failed"
return
# Build the per-language translation table once per job run.
active_non_en = sorted({l for l in ACTIVE_LANGUAGES if l != "en"})
async with httpx.AsyncClient(follow_redirects=True) as client:
variant_table = await _translate_variants_for_active_langs(
client, variants, active_non_en,
)
written = 0
for u in fresh:
tone = (u.digest_tone or "INTERMEDIATE").upper()
content = _pick_variant(
variant_table,
tone=tone,
lang=(u.lang or "en"),
)
await _send_one(u, kind, content, date_str, session)
await asyncio.sleep(0.1)
written += 1
jr.items_written = written
log.info("digest.done", kind=kind, written=written,
prompt_version=PROMPT_VERSION)
if __name__ == "__main__":
asyncio.run(run())