- Tracked DBs now live at /mnt/data/projects/cupido/tracked/ (out of
ownCloud to avoid sync conflicts and bandwidth churn). config.py
TRACKING_OUTPUT_DIR points there; the docker-compose for ethoscope-lab
mounts it world-readable for JupyterHub users.
- New scripts/export_video_db_index.py joins all_video_info_merged.xlsx
with the video inventory and the on-disk DBs, producing a TSV that has
one row per fly/ROI plus training/testing video and DB paths. Handles
approximate xlsx times, cross-day training/testing, the 12 AM/PM
ambiguity, and date typos.
- scripts/load_roi_data.py rewritten as a TSV-driven loader returning a
single DataFrame with session and metadata columns. calculate_distances
and the two flies_analysis notebooks migrated to use it; downstream
trained/naive splits remain available via simple equality filters.
- Metadata vocabulary canonicalized: {naïve, niave, untrained, test} all
resolve to {trained, naive}. Normalization happens at the TSV-export
boundary (idempotent); the xlsx and the 2025-07-15 legacy CSV were
edited in place to remove the worst variants.
- scripts/monitor_tracking.py rate calculation fixed: with N parallel
workers, completions arrive in bursts; the old formula divided by burst
width and reported nonsense rates. Now uses a 6 h window denominator.
- scripts/track_videos.py: BGRMovieCamera retries cv2.read on transient
NFS hiccups and a post-tracking completeness gate (≥ 90 % of expected
duration via MAX(t) across all 6 ROIs) deletes silent partial DBs.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
176 lines
6.2 KiB
Python
176 lines
6.2 KiB
Python
"""Live progress + ETA for the offline tracker batch.
|
||
|
||
Counts ground-truth (DBs on disk) rather than parsing log lines, so it works
|
||
whether the batch is running fresh or was resumed after a crash. Errors are
|
||
parsed out of any *.log files in data/logs/.
|
||
|
||
Usage:
|
||
python monitor_tracking.py # one snapshot, exit
|
||
python monitor_tracking.py --watch # refresh every 10 s
|
||
python monitor_tracking.py --watch 30 # refresh every 30 s
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import argparse
|
||
import json
|
||
import re
|
||
import time
|
||
from datetime import datetime, timedelta
|
||
from pathlib import Path
|
||
|
||
from config import LOGS_DIR, TARGETS_DIR, TRACKING_OUTPUT_DIR
|
||
|
||
|
||
def count_target_jsons() -> tuple[int, int, list[str]]:
|
||
"""Return (n_pickable, n_unusable, unusable_video_stems)."""
|
||
pickable = 0
|
||
unusable_stems: list[str] = []
|
||
for j in TARGETS_DIR.glob("*.json"):
|
||
try:
|
||
d = json.loads(j.read_text())
|
||
except Exception:
|
||
continue
|
||
if d.get("unusable"):
|
||
unusable_stems.append(j.stem)
|
||
elif d.get("reference_points"):
|
||
pickable += 1
|
||
return pickable, len(unusable_stems), unusable_stems
|
||
|
||
|
||
def count_tracked_dbs() -> tuple[int, datetime | None, str | None]:
|
||
"""Return (n_dbs, mtime_of_newest, name_of_newest)."""
|
||
dbs = list(TRACKING_OUTPUT_DIR.glob("*_tracking.db"))
|
||
if not dbs:
|
||
return 0, None, None
|
||
newest = max(dbs, key=lambda p: p.stat().st_mtime)
|
||
return len(dbs), datetime.fromtimestamp(newest.stat().st_mtime), newest.stem
|
||
|
||
|
||
def parse_recent_errors(log_dir: Path, tail_lines: int = 5000) -> list[str]:
|
||
"""Scan the most recent *.log file for lines reporting errors."""
|
||
if not log_dir.exists():
|
||
return []
|
||
logs = sorted(log_dir.glob("*.log"), key=lambda p: p.stat().st_mtime)
|
||
if not logs:
|
||
return []
|
||
latest = logs[-1]
|
||
try:
|
||
with latest.open() as f:
|
||
tail = f.readlines()[-tail_lines:]
|
||
except Exception:
|
||
return []
|
||
out = []
|
||
for line in tail:
|
||
if re.search(r":\s*error\b", line) or " error: " in line.lower():
|
||
out.append(line.rstrip())
|
||
return out
|
||
|
||
|
||
def db_completion_history() -> list[float]:
|
||
"""Return mtimes of all tracking DBs, sorted ascending. Used for rate."""
|
||
return sorted(p.stat().st_mtime for p in TRACKING_OUTPUT_DIR.glob("*_tracking.db"))
|
||
|
||
|
||
def fmt_duration(seconds: float) -> str:
|
||
if seconds < 60:
|
||
return f"{int(seconds)} s"
|
||
if seconds < 3600:
|
||
return f"{int(seconds // 60)} min"
|
||
h = int(seconds // 3600)
|
||
m = int((seconds % 3600) // 60)
|
||
return f"{h} h {m} min"
|
||
|
||
|
||
def snapshot() -> str:
|
||
pickable, unusable, _ = count_target_jsons()
|
||
tracked, last_mtime, last_name = count_tracked_dbs()
|
||
history = db_completion_history()
|
||
errors = parse_recent_errors(LOGS_DIR)
|
||
|
||
lines = [f"tracking progress @ {datetime.now():%Y-%m-%d %H:%M:%S}"]
|
||
lines.append(f" pickable JSONs: {pickable}")
|
||
lines.append(f" unusable JSONs: {unusable} (skipped by tracker)")
|
||
pct = (tracked / pickable * 100) if pickable else 0
|
||
lines.append(
|
||
f" DBs on disk: {tracked} / {pickable} ({pct:.0f}%)"
|
||
)
|
||
lines.append(f" errors in log: {len(errors)}")
|
||
|
||
# Rate from completions in the last 6 h — robust to gaps from killed /
|
||
# restarted runs, while wide enough to span multiple parallel-worker
|
||
# completion bursts. Reason: with 8 workers all started together on
|
||
# multi-hour videos, completions arrive in tight bursts every ~video-
|
||
# length apart; a 30-min window catches one burst and overestimates by
|
||
# ~10×. 6 h spans at least one full burst cycle for typical videos.
|
||
now_ts = time.time()
|
||
window_secs = 6 * 3600
|
||
recent = [t for t in history if t >= now_ts - window_secs]
|
||
if len(recent) >= 2:
|
||
# Reason: with N parallel workers, completions arrive in clumps
|
||
# (all workers finish near-simultaneously). Dividing N by the *burst*
|
||
# span gives nonsense rates. Use the full window as the denominator
|
||
# once the batch has been running long enough to fill it; otherwise
|
||
# use elapsed-since-first-DB. Detection: if every DB on disk also
|
||
# falls inside the window, the batch is younger than the window.
|
||
if len(recent) == len(history):
|
||
elapsed = max(1.0, now_ts - history[0])
|
||
else:
|
||
elapsed = float(window_secs)
|
||
if elapsed > 0:
|
||
rate_per_hour = len(recent) / elapsed * 3600
|
||
lines.append(
|
||
f" rate (last {len(recent)} in {int(window_secs/3600)} h):"
|
||
f" {rate_per_hour:.1f} videos/hour"
|
||
)
|
||
remaining = max(0, pickable - tracked)
|
||
if rate_per_hour > 0 and remaining > 0:
|
||
eta_sec = remaining * 3600 / rate_per_hour
|
||
eta_at = datetime.now() + timedelta(seconds=eta_sec)
|
||
lines.append(
|
||
f" ETA remaining: {fmt_duration(eta_sec)} "
|
||
f"(done by {eta_at:%H:%M %a})"
|
||
)
|
||
else:
|
||
lines.append(" rate: (warming up — check again in a few min)")
|
||
|
||
if last_mtime is not None and last_name is not None:
|
||
ago = (datetime.now() - last_mtime).total_seconds()
|
||
lines.append(
|
||
f" most recent DB: {last_name[:60]}... ({fmt_duration(ago)} ago)"
|
||
)
|
||
|
||
if errors:
|
||
lines.append("")
|
||
lines.append(f" recent errors ({min(5, len(errors))} of {len(errors)}):")
|
||
for e in errors[-5:]:
|
||
lines.append(f" {e[:120]}")
|
||
|
||
return "\n".join(lines)
|
||
|
||
|
||
def main() -> None:
|
||
parser = argparse.ArgumentParser(description=__doc__)
|
||
parser.add_argument(
|
||
"--watch", nargs="?", type=int, const=10, default=None,
|
||
help="refresh every N seconds (default 10 if flag given without value)",
|
||
)
|
||
args = parser.parse_args()
|
||
|
||
if args.watch is None:
|
||
print(snapshot())
|
||
return
|
||
|
||
try:
|
||
while True:
|
||
# Clear screen and reprint
|
||
print("\033[2J\033[H", end="")
|
||
print(snapshot())
|
||
print(f"\n(refreshing every {args.watch}s — Ctrl-C to exit)")
|
||
time.sleep(args.watch)
|
||
except KeyboardInterrupt:
|
||
print()
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|