scripts: one-off purge of unclean IndicatorSummary rows

Iterates every IndicatorSummary in the DB and asks the reviewer agent
(services/output_review.review_read) whether each row's content is
publishable. Rows the reviewer flags as unclean are deleted along
with their translation rows. The API's existing fallback path —
serve the latest IndicatorSummary by (group, tone) — picks up the
previous clean row automatically.

Concurrency defaults to 8 reviewer calls in flight; on the 3245-row
prod archive that completes in ~10 minutes for ~$1 of Haiku cost.

Idempotent: a second run only re-evaluates whatever's still in the
table. --dry-run skips the deletion stage. After the live pipeline
fix landed (JSON-mode + reviewer at write time) this script should
not find anything on subsequent invocations.
This commit is contained in:
Giorgio Gilestro 2026-05-29 13:56:47 +02:00
parent 385c5fdc60
commit cd485fe646

View file

@ -0,0 +1,76 @@
"""One-off purge: ask the reviewer agent to judge every IndicatorSummary
row already in the DB, delete the ones it flags as unclean.
Same reviewer the live pipeline uses (services/output_review.review_read),
so post-purge rows are exactly what would survive a fresh generation.
Per-row cost ~$0.0001; total run on ~3000 rows ~$0.30.
Usage inside the app container:
docker compose exec app python /tmp/purge.py --dry-run
docker compose exec app python /tmp/purge.py # actually delete
The script processes rows concurrently up to a small fan-out (default 8)
to keep wall-clock down without hammering the provider.
"""
from __future__ import annotations
import argparse
import asyncio
import httpx
from sqlalchemy import delete, select
from app.db import get_session_factory
from app.models import IndicatorSummary, IndicatorSummaryTranslation
from app.services.output_review import review_read
async def _judge(client, sem, row):
async with sem:
v = await review_read(client, row.content or "")
return row, v
async def main(args):
session_factory = get_session_factory()
async with session_factory() as session:
rows = (await session.execute(
select(IndicatorSummary).order_by(IndicatorSummary.id)
)).scalars().all()
print(f"Reviewing {len(rows)} IndicatorSummary rows…")
sem = asyncio.Semaphore(args.concurrency)
async with httpx.AsyncClient(follow_redirects=True) as client:
results = await asyncio.gather(*(_judge(client, sem, r) for r in rows))
unclean = [(r, v) for r, v in results if not v.clean]
print(f"\nFlagged {len(unclean)} of {len(rows)} as unclean.")
for r, v in unclean:
head = (r.content or "")[:100].replace("\n", " ")
print(f" id={r.id} group={r.group_name} tone={r.tone} "
f"at {r.generated_at} reason={v.reason!r}")
print(f" preview: {head!r}")
if args.dry_run or not unclean:
return
ids = [r.id for r, _ in unclean]
await session.execute(
delete(IndicatorSummaryTranslation)
.where(IndicatorSummaryTranslation.summary_id.in_(ids))
)
await session.execute(
delete(IndicatorSummary).where(IndicatorSummary.id.in_(ids))
)
await session.commit()
print(f"\nDeleted {len(ids)} unclean row(s). The dashboard's /api/indicators/"
"<group> endpoint will now fall back to the previous clean row "
"for each (group, tone).")
if __name__ == "__main__":
p = argparse.ArgumentParser()
p.add_argument("--dry-run", action="store_true")
p.add_argument("--concurrency", type=int, default=8,
help="Parallel reviewer calls (default 8)")
asyncio.run(main(p.parse_args()))