"""Interactive picker for barrier-opening time. Two-stage flow per video: 1. Coarse: 10×6 thumbnail grid spanning 10 min (~10 s spacing) lets you click the rough moment where the barrier opens. 2. Fine: launches mpv at the coarse pick, paused with on-screen fractional time. You scrub to the exact frame; on close, type the time you saw on the OSD into the terminal prompt. Default is the coarse pick. After the time is set, the picker also prompts for non-opening ROIs (comma-separated list, e.g. "1,2,3"). Useful for the partial-opening videos where only some sub-arenas open. Saved to the `bad_rois` column. Output: data/metadata/barrier_opening.csv with columns machine_name, session_date, session_time, opening_s, trim_first_s, bad_rois, notes Coarse-grid keys: click pick that timestamp n skip this video for THIS run u mark unusable (opening_s = NaN) q / ESC save+quit Usage: python pick_barrier.py python pick_barrier.py --redo python pick_barrier.py --limit 10 python pick_barrier.py --db /path/to/specific_tracking.db python pick_barrier.py --no-player # skip the video-player refinement step """ from __future__ import annotations import argparse import re import shutil import sqlite3 import subprocess import sys from pathlib import Path import cv2 import matplotlib.pyplot as plt import numpy as np import pandas as pd from config import DATA_METADATA, INVENTORY_CSV, VIDEO_INFO_TSV from detect_barrier_opening import ( STEP_S, WINDOW_S, per_frame_distance, sliding_mean, ) OUT_CSV = DATA_METADATA / "barrier_opening.csv" OUT_COLS = ["machine_name", "session_date", "session_time", "opening_s", "trim_first_s", "bad_rois", "notes"] DB_NAME_RE = re.compile( r"^(\d{4}-\d{2}-\d{2})_(\d{2}-\d{2}-\d{2})_([0-9a-f]{32})__" ) GRID_ROWS, GRID_COLS = 6, 10 N_THUMBS = GRID_ROWS * GRID_COLS # 60 DEFAULT_COARSE_SPAN_S = 600.0 # 0..10 min, ~10 s spacing — covers late-opening videos FINE_SPAN_S = 12.0 # ±6 s around coarse pick → ~0.2 s spacing AUTO_SEARCH_END_S = 600.0 # how far the auto-detector scans for its suggestion def auto_suggest(db_path: Path) -> float | None: """Median of per-ROI biggest-drop times. None if too noisy.""" try: conn = sqlite3.connect(f"file:{db_path}?mode=ro", uri=True) except sqlite3.Error: return None candidates = [] for roi in range(1, 7): try: df = pd.read_sql_query(f"SELECT t, x, y, id FROM ROI_{roi}", conn) except Exception: continue dist = per_frame_distance(df) smean = sliding_mean(dist, WINDOW_S, STEP_S, AUTO_SEARCH_END_S) pad = max(1, int(WINDOW_S / STEP_S)) if len(smean) < 2 * pad + 1: continue best_drop = -np.inf best_t = None for i in range(pad, len(smean) - pad): pre = smean["mean_dist"].iloc[:i].median() post = smean["mean_dist"].iloc[i:].median() drop = pre - post if drop > best_drop: best_drop = drop best_t = float(smean["mid_t"].iloc[i]) if best_drop > 30 and best_t is not None: candidates.append(best_t) conn.close() if not candidates: return None return float(np.median(candidates)) def grab_thumbnails(video_path: Path, target_times_s: np.ndarray, thumb_w: int = 320) -> list[np.ndarray | None]: """Read thumbnails at the requested timestamps via a single sequential pass. Linear-decode is much faster than seeking per-frame on H.264. We read frames sequentially from the earliest target onward, keeping only the ones at requested target frames. """ cap = cv2.VideoCapture(str(video_path)) fps = cap.get(cv2.CAP_PROP_FPS) or 25.0 total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) src_w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) src_h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) if total_frames <= 0: cap.release() return [None] * len(target_times_s) target_frames = np.clip( (target_times_s * fps).round().astype(int), 0, total_frames - 1 ) sort_idx = np.argsort(target_frames) sorted_targets = target_frames[sort_idx] out: list[np.ndarray | None] = [None] * len(target_times_s) if sorted_targets.size == 0: cap.release() return out cap.set(cv2.CAP_PROP_POS_FRAMES, int(sorted_targets[0])) cur_frame = int(sorted_targets[0]) last_frame_data: np.ndarray | None = None scale = thumb_w / src_w if src_w > 0 else 1.0 thumb_h = max(1, int(round(src_h * scale))) for ord_i, target in zip(sort_idx, sorted_targets): while cur_frame <= target: ret, frame = cap.read() if not ret: last_frame_data = None break last_frame_data = frame cur_frame += 1 if last_frame_data is not None: small = cv2.resize(last_frame_data, (thumb_w, thumb_h), interpolation=cv2.INTER_AREA) out[ord_i] = cv2.cvtColor(small, cv2.COLOR_BGR2RGB) cap.release() return out def show_thumbnail_grid( video_path: Path, center_t: float, span_s: float, title: str, ) -> tuple[float | None, str]: """Show a 10×6 thumbnail grid; return (clicked_time, action). `action` is one of: 'pick', 'skip', 'unusable', 'back', 'quit'. `clicked_time` is None unless action == 'pick'. """ half = span_s / 2.0 times = np.linspace(max(0.0, center_t - half), center_t + half, N_THUMBS) print(f" loading {N_THUMBS} thumbnails ({times[0]:.1f}–{times[-1]:.1f}s)...", flush=True) thumbs = grab_thumbnails(video_path, times) fig, axes = plt.subplots(GRID_ROWS, GRID_COLS, figsize=(20, 11)) fig.suptitle( f"{title}\nclick a thumbnail · n=skip · u=unusable · b=back · q=quit", fontsize=11, ) state = {"time": None, "action": None} for ax, t, thumb in zip(axes.flat, times, thumbs): if thumb is not None: ax.imshow(thumb) else: ax.set_facecolor("black") ax.text(0.5, 0.5, "no frame", transform=ax.transAxes, ha="center", va="center", color="white") # Format time as M:SS.s for readability m, s = divmod(t, 60) ax.set_title(f"{int(m):d}:{s:05.2f}", fontsize=8, pad=1) ax.set_xticks([]); ax.set_yticks([]) fig.subplots_adjust(left=0.01, right=0.99, top=0.93, bottom=0.01, wspace=0.03, hspace=0.18) def on_click(event): if event.inaxes is None: return for i, ax in enumerate(axes.flat): if ax is event.inaxes: state["time"] = float(times[i]) state["action"] = "pick" plt.close(fig) return def on_key(event): k = event.key if k == "n": state["action"] = "skip"; plt.close(fig) elif k == "u": state["action"] = "unusable"; plt.close(fig) elif k == "b": state["action"] = "back"; plt.close(fig) elif k in ("q", "escape"): state["action"] = "quit"; plt.close(fig) fig.canvas.mpl_connect("button_press_event", on_click) fig.canvas.mpl_connect("key_press_event", on_key) plt.show() return state["time"], state["action"] or "skip" def parse_time_input(s: str) -> float | None: """Accept seconds ('290'), m:ss ('4:50'), or m:ss.ss ('4:50.40').""" s = s.strip() if not s: return None try: if ":" in s: parts = s.split(":") if len(parts) == 2: return float(parts[0]) * 60 + float(parts[1]) if len(parts) == 3: return float(parts[0]) * 3600 + float(parts[1]) * 60 + float(parts[2]) return float(s) except ValueError: return None def play_video(video_path: Path, start_t: float) -> str | None: """Launch a video player at start_t-30s. Returns player name, or None. Tries mpv (best UX for this), then vlc, then ffplay. The user scrubs to the exact frame, reads the timestamp off the player's OSD/seekbar, closes the player, and types the time at the terminal prompt. """ seek = max(0.0, start_t - 30.0) if shutil.which("mpv"): cmd = ["mpv", "--no-resume-playback", "--osd-level=3", "--osd-fractions", "--pause", f"--start={seek:.1f}", str(video_path)] name = "mpv" elif shutil.which("vlc"): cmd = ["vlc", "--no-video-title-show", f"--start-time={seek:.1f}", "--play-and-pause", str(video_path)] name = "vlc" elif shutil.which("ffplay"): cmd = ["ffplay", "-hide_banner", "-loglevel", "error", "-ss", f"{seek:.1f}", str(video_path)] name = "ffplay" else: print(" ! no video player found (tried mpv, vlc, ffplay)") return None print(f" launching {name} at {seek:.1f}s — pause on the opening frame, " "read the time off the player, then close it.") try: subprocess.run(cmd, check=False, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) except OSError as e: print(f" {name} launch failed: {e}") return None return name def prompt_opening_time(default_s: float) -> float | str: """Prompt for the exact opening time. Returns float, 'skip', 'unusable', 'quit'.""" while True: s = input( f" opening time [enter={default_s:.1f}s · n=skip · u=unusable · q=quit]: " ).strip() if not s: return default_s if s.lower() in ("n", "skip"): return "skip" if s.lower() in ("u", "unusable"): return "unusable" if s.lower() in ("q", "quit"): return "quit" t = parse_time_input(s) if t is None: print(" ? enter seconds, m:ss, or m:ss.ss") continue return t def prompt_bad_rois() -> str: """Prompt for non-opening ROIs. Returns comma-separated string (may be empty).""" while True: s = input(" non-opening ROIs (e.g. '1,2,3') [enter=none]: ").strip() if not s: return "" # Sanity-check: comma-separated integers in 1..6 try: rois = sorted({int(x.strip()) for x in s.split(",") if x.strip()}) except ValueError: print(" ? enter integers separated by commas") continue if not all(1 <= r <= 6 for r in rois): print(" ? ROI numbers must be 1..6") continue return ",".join(str(r) for r in rois) def pick_for_video( video_path: Path, db_path: Path | None, machine_name: str, session_date: str, session_time: str, coarse_span_s: float = DEFAULT_COARSE_SPAN_S, use_player: bool = True, ) -> dict | str | None: """Run the picker. Return result dict, 'skip', or 'quit'.""" auto_t = auto_suggest(db_path) if db_path else None print(f" auto-suggest: {f'{auto_t:.1f}s' if auto_t else '(none)'}") # Stage 1: coarse thumbnail grid for rough localisation. coarse_center = auto_t if auto_t is not None else coarse_span_s / 2 title_coarse = (f"COARSE {machine_name} {session_date} {session_time} " f"· spanning {coarse_span_s/60:.0f} min " f"· click ≈ where the barrier opens") coarse_t, action = show_thumbnail_grid( video_path, coarse_center, coarse_span_s, title_coarse ) if action == "skip": return "skip" if action == "unusable": return { "machine_name": machine_name, "session_date": session_date, "session_time": session_time, "opening_s": np.nan, "trim_first_s": 0, "bad_rois": "", "notes": "unusable", } if action == "quit": return "quit" if action != "pick" or coarse_t is None: return "skip" # Stage 2: hand off to a video player for frame-accurate refinement. if use_player: played = play_video(video_path, coarse_t) if played is None: print(" ! using coarse pick as the answer.") fine_t = prompt_opening_time(default_s=coarse_t) if fine_t == "skip": return "skip" if fine_t == "unusable": return { "machine_name": machine_name, "session_date": session_date, "session_time": session_time, "opening_s": np.nan, "trim_first_s": 0, "bad_rois": "", "notes": "unusable", } if fine_t == "quit": return "quit" bad_rois = prompt_bad_rois() return { "machine_name": machine_name, "session_date": session_date, "session_time": session_time, "opening_s": round(float(fine_t), 1), "trim_first_s": 0, "bad_rois": bad_rois, "notes": "", } def lookup_video_path(machine_name: str, session_date: str, session_time: str, inv: pd.DataFrame) -> Path | None: """Find the mp4 path for (machine, date, time) in the inventory.""" match = inv[ (inv["machine_name"] == machine_name) & (inv["session_date"] == session_date) & (inv["session_time"] == session_time) ] if match.empty: return None return Path(match.iloc[0]["mp4_path"]) def main() -> None: parser = argparse.ArgumentParser(description=__doc__) parser.add_argument("--redo", action="store_true", help="re-pick videos that already have a row in the output CSV") parser.add_argument("--limit", type=int, default=None, help="only process the first N videos") parser.add_argument("--db", type=Path, default=None, help="annotate this specific tracking DB only") parser.add_argument("--coarse-span", type=float, default=DEFAULT_COARSE_SPAN_S, help=f"coarse-grid time span in seconds (default {DEFAULT_COARSE_SPAN_S:.0f})") parser.add_argument("--no-player", action="store_true", help="skip the video-player refinement step (use the coarse pick directly)") args = parser.parse_args() OUT_CSV.parent.mkdir(parents=True, exist_ok=True) if OUT_CSV.exists(): out = pd.read_csv(OUT_CSV) else: out = pd.DataFrame(columns=OUT_COLS) # Reason: backfill bad_rois column for older CSVs without it. for col in OUT_COLS: if col not in out.columns: out[col] = "" done = set(zip(out["machine_name"], out["session_date"], out["session_time"])) if not INVENTORY_CSV.exists(): sys.exit(f"Inventory not found at {INVENTORY_CSV}. Run build_video_inventory.py first.") inv = pd.read_csv(INVENTORY_CSV) # Build the queue: every (machine, date, time) referenced by the merged # TSV that has a tracking DB on disk and isn't yet annotated. tsv = pd.read_csv(VIDEO_INFO_TSV, sep="\t") queue: list[tuple[Path, Path, str, str, str]] = [] seen: set[tuple[str, str, str]] = set() for col in ("training_db_path", "testing_db_path"): for _, row in tsv.iterrows(): db = row[col] if not isinstance(db, str) or not db: continue db_path = Path(db) if not db_path.exists(): continue m = DB_NAME_RE.match(db_path.name) if not m: continue session_date, session_time = m.group(1), m.group(2) key = (row["machine_name"], session_date, session_time) if key in seen: continue seen.add(key) if key in done and not args.redo: continue video = lookup_video_path(*key, inv) if video is None or not video.exists(): print(f" ! no video for {key}; skipping") continue queue.append((db_path, video, *key)) if args.db: target = Path(args.db).resolve() queue = [q for q in queue if q[0].resolve() == target] if not queue: sys.exit(f"DB not found in queue: {args.db}") if args.limit: queue = queue[: args.limit] if not queue: print("Nothing to pick. All eligible videos already have a barrier_opening row.") return print(f"Picking barrier-opening for {len(queue)} videos.") print("Window keys: click=pick · n=skip · u=unusable · b=back · q=quit") saved = skipped = unusable = 0 for i, (db, video, machine_name, session_date, session_time) in enumerate(queue, 1): prefix = f"[{i}/{len(queue)}] {machine_name} {session_date} {session_time}" print(f"\n{prefix}") result = pick_for_video(video, db, machine_name, session_date, session_time, coarse_span_s=args.coarse_span, use_player=not args.no_player) if result is None or result == "skip": skipped += 1 continue if result == "quit": print(" quit requested — saving and exiting") break new_row = pd.DataFrame([result]) out = pd.concat([ out[~((out.machine_name == result["machine_name"]) & (out.session_date == result["session_date"]) & (out.session_time == result["session_time"]))], new_row, ], ignore_index=True) out[OUT_COLS].to_csv(OUT_CSV, index=False) if pd.isna(result["opening_s"]): unusable += 1 print(" saved as unusable") else: saved += 1 print(f" saved opening_s = {result['opening_s']} s") print(f"\nDone: {saved} saved, {unusable} unusable, {skipped} skipped.") print(f" → {OUT_CSV}") if __name__ == "__main__": main()