read.markets/app/cli.py

194 lines
6.9 KiB
Python

"""Admin CLI — runs inside the `app` container.
Usage from the host::
docker compose exec app python -m app.cli grant-credit <email> <months>
docker compose exec app python -m app.cli revoke-credit <email>
docker compose exec app python -m app.cli show-status <email>
`grant-credit` is idempotent: it extends `users.credit_until` from
``max(now, current_credit_until)``, so granting "1 month" twice gives
two months, not one (avoids accidental erosion of an existing grant
when re-running the command).
This is the manual lever for Phase D.2. In D.3 the Paddle webhook will
call the same helper for both sides of a referral conversion.
"""
from __future__ import annotations
import argparse
import asyncio
import sys
from datetime import datetime, timedelta, timezone
from sqlalchemy import select
from app.db import get_engine, get_session_factory
from app.models import User
from app.services.access import _aware, paid_status
def _utcnow() -> datetime:
return datetime.now(timezone.utc)
async def _get_user_by_email(session, email: str) -> User | None:
return (await session.execute(
select(User).where(User.email == email)
)).scalar_one_or_none()
async def grant_credit(email: str, months: float) -> int:
if months <= 0:
print(f"error: months must be positive (got {months})", file=sys.stderr)
return 2
factory = get_session_factory()
async with factory() as session:
user = await _get_user_by_email(session, email)
if user is None:
print(f"error: no user with email {email!r}", file=sys.stderr)
return 1
anchor = max(_utcnow(), _aware(user.credit_until) or _utcnow())
# 30-day months — simple, predictable, no calendar arithmetic.
days = int(round(months * 30))
new_expiry = anchor + timedelta(days=days)
user.credit_until = new_expiry
await session.commit()
# Refresh status snapshot from the just-committed value.
st = paid_status(user)
print(
f"granted {months} month(s) to {email}: "
f"credit_until={new_expiry.isoformat()} "
f"(~{st.days_remaining} days remaining)"
)
return 0
async def revoke_credit(email: str) -> int:
factory = get_session_factory()
async with factory() as session:
user = await _get_user_by_email(session, email)
if user is None:
print(f"error: no user with email {email!r}", file=sys.stderr)
return 1
user.credit_until = None
await session.commit()
print(f"revoked: credit_until cleared for {email}")
return 0
async def show_status(email: str) -> int:
factory = get_session_factory()
async with factory() as session:
user = await _get_user_by_email(session, email)
if user is None:
print(f"error: no user with email {email!r}", file=sys.stderr)
return 1
st = paid_status(user)
print(f"email: {user.email}")
print(f"tier: {user.tier}")
print(f"credit_until: {user.credit_until or ''}")
print(f"paid active: {st.active} (source={st.source or ''})")
if st.expires_at:
print(f"expires in: {st.days_remaining} days")
return 0
async def send_test_digest(email: str, kind: str) -> int:
"""Generate a digest and send it to the named user immediately, ignoring
opt-in state and idempotency. Useful for previewing copy in your own
inbox before a real run lands."""
import httpx
from app.jobs._market_context import (
REFERENCE_LINE,
latest_quotes_by_group,
recent_headlines_by_bucket,
)
from app.jobs.email_digest_job import _generate_variants, _send_one
from app.services.openrouter import llm_configured
if kind not in ("daily", "weekly"):
print(f"error: kind must be 'daily' or 'weekly' (got {kind!r})",
file=sys.stderr)
return 2
if not llm_configured():
print("error: LLM provider not configured (set OPENROUTER_API_KEY)",
file=sys.stderr)
return 1
factory = get_session_factory()
async with factory() as session:
user = await _get_user_by_email(session, email)
if user is None:
print(f"error: no user with email {email!r}", file=sys.stderr)
return 1
today = _utcnow()
quotes = await latest_quotes_by_group(session)
news = await recent_headlines_by_bucket(
session, hours=(168 if kind == "weekly" else 24),
)
ctx = dict(today=today, quotes_by_group=quotes,
headlines_by_bucket=news, reference_line=REFERENCE_LINE)
async with httpx.AsyncClient(follow_redirects=True) as client:
variants = await _generate_variants(session, client, kind, ctx)
tone = (user.digest_tone or "INTERMEDIATE").upper()
content = (variants.get(tone)
or variants.get("INTERMEDIATE")
or next(iter(variants.values()), None))
if content is None:
print("error: all LLM variants failed", file=sys.stderr)
return 1
date_str = today.strftime("%Y-%m-%d")
await _send_one(user, kind, content, date_str, session)
print(f"sent {kind} digest to {email} (tone={tone})")
return 0
def build_parser() -> argparse.ArgumentParser:
p = argparse.ArgumentParser(prog="app.cli", description="Cassandra admin CLI")
sub = p.add_subparsers(dest="cmd", required=True)
g = sub.add_parser("grant-credit", help="Extend a user's paid-credit window")
g.add_argument("email")
g.add_argument("months", type=float)
r = sub.add_parser("revoke-credit", help="Clear a user's credit_until")
r.add_argument("email")
s = sub.add_parser("show-status", help="Print paid-tier status for a user")
s.add_argument("email")
t = sub.add_parser("send-test-digest",
help="Send one digest immediately (bypasses opt-in/idempotency)")
t.add_argument("email")
t.add_argument("kind", choices=("daily", "weekly"))
return p
async def _dispatch(args) -> int:
"""Run the chosen sub-command, then dispose the async engine cleanly
so aiomysql's __del__ doesn't squawk at interpreter shutdown about a
closed event loop."""
try:
if args.cmd == "grant-credit":
return await grant_credit(args.email, args.months)
if args.cmd == "revoke-credit":
return await revoke_credit(args.email)
if args.cmd == "show-status":
return await show_status(args.email)
if args.cmd == "send-test-digest":
return await send_test_digest(args.email, args.kind)
return 2
finally:
await get_engine().dispose()
def main(argv: list[str] | None = None) -> int:
args = build_parser().parse_args(argv)
return asyncio.run(_dispatch(args))
if __name__ == "__main__":
sys.exit(main())