read.markets/app/jobs/email_digest_job.py
2026-05-25 23:17:14 +02:00

224 lines
7.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Daily/weekly editorial email digest.
Runs once a day at 06:30 UTC via the scheduler. On Sundays sends the
weekly recap to every opt-in user (free + paid). On other days sends
the daily digest to opt-in paid users only.
Generates LLM content once per tone (NOVICE + INTERMEDIATE), then fans
out by SMTP. EmailSend audit rows guard against double-delivery if the
job is re-run within the same UTC day.
"""
from __future__ import annotations
import asyncio
from datetime import datetime, timedelta
import httpx
from sqlalchemy import select
from app import branding
from app.config import get_settings
from app.db import utcnow
from app.jobs._helpers import job_lifecycle, log
from app.jobs.ai_log_job import (
REFERENCE_LINE,
_latest_quotes_by_group,
_recent_headlines_by_bucket,
_month_spend,
)
from app.models import EmailSend, User
from app.routers.email import sign_unsubscribe_token
from app.services.access import paid_status
from app.services.email_service import render_digest_email, send_email
from app.services.openrouter import (
PROMPT_VERSION,
build_daily_digest_prompt,
build_weekly_digest_prompt,
call_llm,
llm_configured,
)
def _now() -> datetime:
"""Indirection so tests can monkeypatch the "current time" without
touching the system clock."""
return utcnow()
async def _opt_in_recipients(session, *, paid_only: bool) -> list[User]:
stmt = select(User).where(User.email_digest_opt_in.is_(True))
rows = (await session.execute(stmt)).scalars().all()
if paid_only:
rows = [u for u in rows if paid_status(u).active]
return rows
async def _already_sent_today(session, user_id: int, kind: str, today: datetime) -> bool:
"""True if an EmailSend row exists for this user+kind on the same UTC
day, with status in ('sent','error'). 'error' counts because we don't
want to keep retrying a bad address inside the same daily slot."""
day_start = today.replace(hour=0, minute=0, second=0, microsecond=0)
day_end = day_start + timedelta(days=1)
stmt = select(EmailSend.id).where(
EmailSend.user_id == user_id,
EmailSend.kind == kind,
EmailSend.sent_at >= day_start,
EmailSend.sent_at < day_end,
EmailSend.status.in_(("sent", "error")),
)
return (await session.execute(stmt)).first() is not None
async def _generate_variants(session, client, kind: str, ctx: dict) -> dict[str, str]:
"""Returns {tone: html_content}. Missing tone means generation failed
for that variant — skip recipients on that tone.
Persists an AICall row per attempt so digest LLM spend counts toward
the monthly cost cap on subsequent runs."""
from app.models import AICall
from app.services.openrouter import active_model
builder = build_weekly_digest_prompt if kind == "weekly" else build_daily_digest_prompt
out: dict[str, str] = {}
for tone in ("NOVICE", "INTERMEDIATE"):
sys_, usr = builder(tone=tone, **ctx)
try:
result = await call_llm(
client,
[{"role": "system", "content": sys_},
{"role": "user", "content": usr}],
)
out[tone] = result.content
session.add(AICall(
model=result.model,
prompt_tokens=result.prompt_tokens,
completion_tokens=result.completion_tokens,
cost_usd=result.cost_usd,
status="ok",
))
await session.commit()
log.info("digest.variant_ok", kind=kind, tone=tone,
prompt_tokens=result.prompt_tokens,
completion_tokens=result.completion_tokens)
except Exception as e:
session.add(AICall(
model=active_model(), status="error",
error=f"{kind}/{tone}: {str(e)[:480]}",
))
await session.commit()
log.error("digest.variant_failed", kind=kind, tone=tone,
error=str(e)[:200])
return out
def _kind_for_today(today: datetime) -> str:
"""Sunday → weekly. MonSat → daily."""
return "weekly" if today.weekday() == 6 else "daily"
async def _send_one(user: User, kind: str, content_html: str, date_str: str,
session) -> None:
settings_url = f"{branding.SITE_URL}/settings"
unsubscribe_url = (
f"{branding.SITE_URL}/email/unsubscribe"
f"?token={sign_unsubscribe_token(user.id)}"
)
subject, text_body, html_body = render_digest_email(
kind=kind, date_str=date_str,
content_html=content_html,
unsubscribe_url=unsubscribe_url,
settings_url=settings_url,
)
try:
await send_email(to=user.email, subject=subject,
text_body=text_body, html_body=html_body)
status_ = "sent"
err = None
except Exception as e:
status_ = "error"
err = str(e)[:255]
log.error("digest.send_failed", user_id=user.id, error=err)
session.add(EmailSend(
user_id=user.id, kind=kind, sent_at=_now(),
status=status_, error=err,
))
await session.commit()
async def run() -> None:
async with job_lifecycle("email_digest_job") as (session, jr):
if jr.status == "skipped":
return
s = get_settings()
if not llm_configured():
log.warning("digest.skipped_no_key", provider=s.LLM_PROVIDER)
jr.status = "skipped"
return
today = _now()
kind = _kind_for_today(today)
date_str = today.strftime("%Y-%m-%d")
recipients = await _opt_in_recipients(
session, paid_only=(kind == "daily"),
)
fresh: list[User] = []
for u in recipients:
if not await _already_sent_today(session, u.id, kind, today):
fresh.append(u)
if not fresh:
log.info("digest.no_fresh_recipients", kind=kind,
total=len(recipients))
jr.status = "skipped"
return
spent = await _month_spend(session)
if spent >= s.OPENROUTER_MONTHLY_CAP_USD:
log.warning("digest.cap_reached", spent=spent,
cap=s.OPENROUTER_MONTHLY_CAP_USD)
jr.status = "skipped"
jr.error = f"monthly cost cap reached (${spent:.2f})"
return
quotes = await _latest_quotes_by_group(session)
news = await _recent_headlines_by_bucket(
session, hours=(168 if kind == "weekly" else 24),
)
ctx = dict(
today=today,
quotes_by_group=quotes,
headlines_by_bucket=news,
reference_line=REFERENCE_LINE,
)
async with httpx.AsyncClient(follow_redirects=True) as client:
variants = await _generate_variants(session, client, kind, ctx)
if not variants:
log.warning("digest.all_variants_failed", kind=kind)
jr.status = "failed"
jr.error = "all variants failed"
return
written = 0
for u in fresh:
tone = (u.digest_tone or "INTERMEDIATE").upper()
# Fall back to INTERMEDIATE first (the more common tone) and then
# to whatever variant succeeded, so an asymmetric LLM failure
# doesn't silently skip the user.
content = (variants.get(tone)
or variants.get("INTERMEDIATE")
or next(iter(variants.values()), None))
if content is None:
continue
await _send_one(u, kind, content, date_str, session)
await asyncio.sleep(0.1)
written += 1
jr.items_written = written
log.info("digest.done", kind=kind, written=written,
prompt_version=PROMPT_VERSION)
if __name__ == "__main__":
asyncio.run(run())