diff --git a/scripts/purge_unclean_summaries.py b/scripts/purge_unclean_summaries.py new file mode 100644 index 0000000..44c8ef8 --- /dev/null +++ b/scripts/purge_unclean_summaries.py @@ -0,0 +1,76 @@ +"""One-off purge: ask the reviewer agent to judge every IndicatorSummary +row already in the DB, delete the ones it flags as unclean. + +Same reviewer the live pipeline uses (services/output_review.review_read), +so post-purge rows are exactly what would survive a fresh generation. +Per-row cost ~$0.0001; total run on ~3000 rows ~$0.30. + +Usage inside the app container: + docker compose exec app python /tmp/purge.py --dry-run + docker compose exec app python /tmp/purge.py # actually delete + +The script processes rows concurrently up to a small fan-out (default 8) +to keep wall-clock down without hammering the provider. +""" +from __future__ import annotations + +import argparse +import asyncio + +import httpx +from sqlalchemy import delete, select + +from app.db import get_session_factory +from app.models import IndicatorSummary, IndicatorSummaryTranslation +from app.services.output_review import review_read + + +async def _judge(client, sem, row): + async with sem: + v = await review_read(client, row.content or "") + return row, v + + +async def main(args): + session_factory = get_session_factory() + async with session_factory() as session: + rows = (await session.execute( + select(IndicatorSummary).order_by(IndicatorSummary.id) + )).scalars().all() + print(f"Reviewing {len(rows)} IndicatorSummary rows…") + + sem = asyncio.Semaphore(args.concurrency) + async with httpx.AsyncClient(follow_redirects=True) as client: + results = await asyncio.gather(*(_judge(client, sem, r) for r in rows)) + + unclean = [(r, v) for r, v in results if not v.clean] + print(f"\nFlagged {len(unclean)} of {len(rows)} as unclean.") + for r, v in unclean: + head = (r.content or "")[:100].replace("\n", " ") + print(f" id={r.id} group={r.group_name} tone={r.tone} " + f"at {r.generated_at} reason={v.reason!r}") + print(f" preview: {head!r}") + + if args.dry_run or not unclean: + return + + ids = [r.id for r, _ in unclean] + await session.execute( + delete(IndicatorSummaryTranslation) + .where(IndicatorSummaryTranslation.summary_id.in_(ids)) + ) + await session.execute( + delete(IndicatorSummary).where(IndicatorSummary.id.in_(ids)) + ) + await session.commit() + print(f"\nDeleted {len(ids)} unclean row(s). The dashboard's /api/indicators/" + " endpoint will now fall back to the previous clean row " + "for each (group, tone).") + + +if __name__ == "__main__": + p = argparse.ArgumentParser() + p.add_argument("--dry-run", action="store_true") + p.add_argument("--concurrency", type=int, default=8, + help="Parallel reviewer calls (default 8)") + asyncio.run(main(p.parse_args()))