From cd485fe6464a9c3d30aef0068177c143bf77a584 Mon Sep 17 00:00:00 2001 From: Giorgio Gilestro Date: Fri, 29 May 2026 13:56:47 +0200 Subject: [PATCH] scripts: one-off purge of unclean IndicatorSummary rows MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Iterates every IndicatorSummary in the DB and asks the reviewer agent (services/output_review.review_read) whether each row's content is publishable. Rows the reviewer flags as unclean are deleted along with their translation rows. The API's existing fallback path — serve the latest IndicatorSummary by (group, tone) — picks up the previous clean row automatically. Concurrency defaults to 8 reviewer calls in flight; on the 3245-row prod archive that completes in ~10 minutes for ~$1 of Haiku cost. Idempotent: a second run only re-evaluates whatever's still in the table. --dry-run skips the deletion stage. After the live pipeline fix landed (JSON-mode + reviewer at write time) this script should not find anything on subsequent invocations. --- scripts/purge_unclean_summaries.py | 76 ++++++++++++++++++++++++++++++ 1 file changed, 76 insertions(+) create mode 100644 scripts/purge_unclean_summaries.py diff --git a/scripts/purge_unclean_summaries.py b/scripts/purge_unclean_summaries.py new file mode 100644 index 0000000..44c8ef8 --- /dev/null +++ b/scripts/purge_unclean_summaries.py @@ -0,0 +1,76 @@ +"""One-off purge: ask the reviewer agent to judge every IndicatorSummary +row already in the DB, delete the ones it flags as unclean. + +Same reviewer the live pipeline uses (services/output_review.review_read), +so post-purge rows are exactly what would survive a fresh generation. +Per-row cost ~$0.0001; total run on ~3000 rows ~$0.30. + +Usage inside the app container: + docker compose exec app python /tmp/purge.py --dry-run + docker compose exec app python /tmp/purge.py # actually delete + +The script processes rows concurrently up to a small fan-out (default 8) +to keep wall-clock down without hammering the provider. +""" +from __future__ import annotations + +import argparse +import asyncio + +import httpx +from sqlalchemy import delete, select + +from app.db import get_session_factory +from app.models import IndicatorSummary, IndicatorSummaryTranslation +from app.services.output_review import review_read + + +async def _judge(client, sem, row): + async with sem: + v = await review_read(client, row.content or "") + return row, v + + +async def main(args): + session_factory = get_session_factory() + async with session_factory() as session: + rows = (await session.execute( + select(IndicatorSummary).order_by(IndicatorSummary.id) + )).scalars().all() + print(f"Reviewing {len(rows)} IndicatorSummary rows…") + + sem = asyncio.Semaphore(args.concurrency) + async with httpx.AsyncClient(follow_redirects=True) as client: + results = await asyncio.gather(*(_judge(client, sem, r) for r in rows)) + + unclean = [(r, v) for r, v in results if not v.clean] + print(f"\nFlagged {len(unclean)} of {len(rows)} as unclean.") + for r, v in unclean: + head = (r.content or "")[:100].replace("\n", " ") + print(f" id={r.id} group={r.group_name} tone={r.tone} " + f"at {r.generated_at} reason={v.reason!r}") + print(f" preview: {head!r}") + + if args.dry_run or not unclean: + return + + ids = [r.id for r, _ in unclean] + await session.execute( + delete(IndicatorSummaryTranslation) + .where(IndicatorSummaryTranslation.summary_id.in_(ids)) + ) + await session.execute( + delete(IndicatorSummary).where(IndicatorSummary.id.in_(ids)) + ) + await session.commit() + print(f"\nDeleted {len(ids)} unclean row(s). The dashboard's /api/indicators/" + " endpoint will now fall back to the previous clean row " + "for each (group, tone).") + + +if __name__ == "__main__": + p = argparse.ArgumentParser() + p.add_argument("--dry-run", action="store_true") + p.add_argument("--concurrency", type=int, default=8, + help="Parallel reviewer calls (default 8)") + asyncio.run(main(p.parse_args()))