cupido/scripts/monitor_tracking.py
Giorgio Gilestro f60a9d0530 Unify analysis pipeline around the TSV; move tracked DBs out of cloud sync
- Tracked DBs now live at /mnt/data/projects/cupido/tracked/ (out of
  ownCloud to avoid sync conflicts and bandwidth churn). config.py
  TRACKING_OUTPUT_DIR points there; the docker-compose for ethoscope-lab
  mounts it world-readable for JupyterHub users.
- New scripts/export_video_db_index.py joins all_video_info_merged.xlsx
  with the video inventory and the on-disk DBs, producing a TSV that has
  one row per fly/ROI plus training/testing video and DB paths. Handles
  approximate xlsx times, cross-day training/testing, the 12 AM/PM
  ambiguity, and date typos.
- scripts/load_roi_data.py rewritten as a TSV-driven loader returning a
  single DataFrame with session and metadata columns. calculate_distances
  and the two flies_analysis notebooks migrated to use it; downstream
  trained/naive splits remain available via simple equality filters.
- Metadata vocabulary canonicalized: {naïve, niave, untrained, test} all
  resolve to {trained, naive}. Normalization happens at the TSV-export
  boundary (idempotent); the xlsx and the 2025-07-15 legacy CSV were
  edited in place to remove the worst variants.
- scripts/monitor_tracking.py rate calculation fixed: with N parallel
  workers, completions arrive in bursts; the old formula divided by burst
  width and reported nonsense rates. Now uses a 6 h window denominator.
- scripts/track_videos.py: BGRMovieCamera retries cv2.read on transient
  NFS hiccups and a post-tracking completeness gate (≥ 90 % of expected
  duration via MAX(t) across all 6 ROIs) deletes silent partial DBs.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-30 15:20:14 +01:00

176 lines
6.2 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Live progress + ETA for the offline tracker batch.
Counts ground-truth (DBs on disk) rather than parsing log lines, so it works
whether the batch is running fresh or was resumed after a crash. Errors are
parsed out of any *.log files in data/logs/.
Usage:
python monitor_tracking.py # one snapshot, exit
python monitor_tracking.py --watch # refresh every 10 s
python monitor_tracking.py --watch 30 # refresh every 30 s
"""
from __future__ import annotations
import argparse
import json
import re
import time
from datetime import datetime, timedelta
from pathlib import Path
from config import LOGS_DIR, TARGETS_DIR, TRACKING_OUTPUT_DIR
def count_target_jsons() -> tuple[int, int, list[str]]:
"""Return (n_pickable, n_unusable, unusable_video_stems)."""
pickable = 0
unusable_stems: list[str] = []
for j in TARGETS_DIR.glob("*.json"):
try:
d = json.loads(j.read_text())
except Exception:
continue
if d.get("unusable"):
unusable_stems.append(j.stem)
elif d.get("reference_points"):
pickable += 1
return pickable, len(unusable_stems), unusable_stems
def count_tracked_dbs() -> tuple[int, datetime | None, str | None]:
"""Return (n_dbs, mtime_of_newest, name_of_newest)."""
dbs = list(TRACKING_OUTPUT_DIR.glob("*_tracking.db"))
if not dbs:
return 0, None, None
newest = max(dbs, key=lambda p: p.stat().st_mtime)
return len(dbs), datetime.fromtimestamp(newest.stat().st_mtime), newest.stem
def parse_recent_errors(log_dir: Path, tail_lines: int = 5000) -> list[str]:
"""Scan the most recent *.log file for lines reporting errors."""
if not log_dir.exists():
return []
logs = sorted(log_dir.glob("*.log"), key=lambda p: p.stat().st_mtime)
if not logs:
return []
latest = logs[-1]
try:
with latest.open() as f:
tail = f.readlines()[-tail_lines:]
except Exception:
return []
out = []
for line in tail:
if re.search(r":\s*error\b", line) or " error: " in line.lower():
out.append(line.rstrip())
return out
def db_completion_history() -> list[float]:
"""Return mtimes of all tracking DBs, sorted ascending. Used for rate."""
return sorted(p.stat().st_mtime for p in TRACKING_OUTPUT_DIR.glob("*_tracking.db"))
def fmt_duration(seconds: float) -> str:
if seconds < 60:
return f"{int(seconds)} s"
if seconds < 3600:
return f"{int(seconds // 60)} min"
h = int(seconds // 3600)
m = int((seconds % 3600) // 60)
return f"{h} h {m} min"
def snapshot() -> str:
pickable, unusable, _ = count_target_jsons()
tracked, last_mtime, last_name = count_tracked_dbs()
history = db_completion_history()
errors = parse_recent_errors(LOGS_DIR)
lines = [f"tracking progress @ {datetime.now():%Y-%m-%d %H:%M:%S}"]
lines.append(f" pickable JSONs: {pickable}")
lines.append(f" unusable JSONs: {unusable} (skipped by tracker)")
pct = (tracked / pickable * 100) if pickable else 0
lines.append(
f" DBs on disk: {tracked} / {pickable} ({pct:.0f}%)"
)
lines.append(f" errors in log: {len(errors)}")
# Rate from completions in the last 6 h — robust to gaps from killed /
# restarted runs, while wide enough to span multiple parallel-worker
# completion bursts. Reason: with 8 workers all started together on
# multi-hour videos, completions arrive in tight bursts every ~video-
# length apart; a 30-min window catches one burst and overestimates by
# ~10×. 6 h spans at least one full burst cycle for typical videos.
now_ts = time.time()
window_secs = 6 * 3600
recent = [t for t in history if t >= now_ts - window_secs]
if len(recent) >= 2:
# Reason: with N parallel workers, completions arrive in clumps
# (all workers finish near-simultaneously). Dividing N by the *burst*
# span gives nonsense rates. Use the full window as the denominator
# once the batch has been running long enough to fill it; otherwise
# use elapsed-since-first-DB. Detection: if every DB on disk also
# falls inside the window, the batch is younger than the window.
if len(recent) == len(history):
elapsed = max(1.0, now_ts - history[0])
else:
elapsed = float(window_secs)
if elapsed > 0:
rate_per_hour = len(recent) / elapsed * 3600
lines.append(
f" rate (last {len(recent)} in {int(window_secs/3600)} h):"
f" {rate_per_hour:.1f} videos/hour"
)
remaining = max(0, pickable - tracked)
if rate_per_hour > 0 and remaining > 0:
eta_sec = remaining * 3600 / rate_per_hour
eta_at = datetime.now() + timedelta(seconds=eta_sec)
lines.append(
f" ETA remaining: {fmt_duration(eta_sec)} "
f"(done by {eta_at:%H:%M %a})"
)
else:
lines.append(" rate: (warming up — check again in a few min)")
if last_mtime is not None and last_name is not None:
ago = (datetime.now() - last_mtime).total_seconds()
lines.append(
f" most recent DB: {last_name[:60]}... ({fmt_duration(ago)} ago)"
)
if errors:
lines.append("")
lines.append(f" recent errors ({min(5, len(errors))} of {len(errors)}):")
for e in errors[-5:]:
lines.append(f" {e[:120]}")
return "\n".join(lines)
def main() -> None:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument(
"--watch", nargs="?", type=int, const=10, default=None,
help="refresh every N seconds (default 10 if flag given without value)",
)
args = parser.parse_args()
if args.watch is None:
print(snapshot())
return
try:
while True:
# Clear screen and reprint
print("\033[2J\033[H", end="")
print(snapshot())
print(f"\n(refreshing every {args.watch}s — Ctrl-C to exit)")
time.sleep(args.watch)
except KeyboardInterrupt:
print()
if __name__ == "__main__":
main()