"""Interactive picker for barrier-opening time, frame-by-frame thumbnail style. For each video that doesn't yet have a barrier-opening annotation, show a 10x6 grid of timestamped thumbnails extracted directly from the .mp4. The analyst clicks the thumbnail at (or just after) the moment the barrier opens; the picker then refines with a second tighter grid for sub-second precision. Two-stage flow per video: 1. Coarse grid: 60 thumbs spanning the 5-min search window (5 s spacing). Click → pick that 5 s slot. 2. Fine grid: 60 thumbs spanning ±6 s of the coarse pick (0.2 s spacing). Click → final answer with 0.2 s precision. Output: data/metadata/barrier_opening.csv with columns machine_name, session_date, session_time, opening_s, trim_first_s, notes Window keys: click select thumbnail at that timestamp n skip this video for THIS run u mark unusable (opening_s = NaN) b back to coarse grid (after seeing fine grid) q / ESC save+quit Usage: python pick_barrier.py python pick_barrier.py --redo python pick_barrier.py --limit 10 python pick_barrier.py --db /path/to/specific_tracking.db """ from __future__ import annotations import argparse import re import sqlite3 import sys from pathlib import Path import cv2 import matplotlib.pyplot as plt import numpy as np import pandas as pd from config import DATA_METADATA, INVENTORY_CSV, VIDEO_INFO_TSV from detect_barrier_opening import ( STEP_S, WINDOW_S, per_frame_distance, sliding_mean, ) OUT_CSV = DATA_METADATA / "barrier_opening.csv" OUT_COLS = ["machine_name", "session_date", "session_time", "opening_s", "trim_first_s", "notes"] DB_NAME_RE = re.compile( r"^(\d{4}-\d{2}-\d{2})_(\d{2}-\d{2}-\d{2})_([0-9a-f]{32})__" ) GRID_ROWS, GRID_COLS = 6, 10 N_THUMBS = GRID_ROWS * GRID_COLS # 60 DEFAULT_COARSE_SPAN_S = 600.0 # 0..10 min, ~10 s spacing — covers late-opening videos FINE_SPAN_S = 12.0 # ±6 s around coarse pick → ~0.2 s spacing AUTO_SEARCH_END_S = 600.0 # how far the auto-detector scans for its suggestion def auto_suggest(db_path: Path) -> float | None: """Median of per-ROI biggest-drop times. None if too noisy.""" try: conn = sqlite3.connect(f"file:{db_path}?mode=ro", uri=True) except sqlite3.Error: return None candidates = [] for roi in range(1, 7): try: df = pd.read_sql_query(f"SELECT t, x, y, id FROM ROI_{roi}", conn) except Exception: continue dist = per_frame_distance(df) smean = sliding_mean(dist, WINDOW_S, STEP_S, AUTO_SEARCH_END_S) pad = max(1, int(WINDOW_S / STEP_S)) if len(smean) < 2 * pad + 1: continue best_drop = -np.inf best_t = None for i in range(pad, len(smean) - pad): pre = smean["mean_dist"].iloc[:i].median() post = smean["mean_dist"].iloc[i:].median() drop = pre - post if drop > best_drop: best_drop = drop best_t = float(smean["mid_t"].iloc[i]) if best_drop > 30 and best_t is not None: candidates.append(best_t) conn.close() if not candidates: return None return float(np.median(candidates)) def grab_thumbnails(video_path: Path, target_times_s: np.ndarray, thumb_w: int = 320) -> list[np.ndarray | None]: """Read thumbnails at the requested timestamps via a single sequential pass. Linear-decode is much faster than seeking per-frame on H.264. We read frames sequentially from the earliest target onward, keeping only the ones at requested target frames. """ cap = cv2.VideoCapture(str(video_path)) fps = cap.get(cv2.CAP_PROP_FPS) or 25.0 total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) src_w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) src_h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) if total_frames <= 0: cap.release() return [None] * len(target_times_s) target_frames = np.clip( (target_times_s * fps).round().astype(int), 0, total_frames - 1 ) sort_idx = np.argsort(target_frames) sorted_targets = target_frames[sort_idx] out: list[np.ndarray | None] = [None] * len(target_times_s) if sorted_targets.size == 0: cap.release() return out cap.set(cv2.CAP_PROP_POS_FRAMES, int(sorted_targets[0])) cur_frame = int(sorted_targets[0]) last_frame_data: np.ndarray | None = None scale = thumb_w / src_w if src_w > 0 else 1.0 thumb_h = max(1, int(round(src_h * scale))) for ord_i, target in zip(sort_idx, sorted_targets): while cur_frame <= target: ret, frame = cap.read() if not ret: last_frame_data = None break last_frame_data = frame cur_frame += 1 if last_frame_data is not None: small = cv2.resize(last_frame_data, (thumb_w, thumb_h), interpolation=cv2.INTER_AREA) out[ord_i] = cv2.cvtColor(small, cv2.COLOR_BGR2RGB) cap.release() return out def show_thumbnail_grid( video_path: Path, center_t: float, span_s: float, title: str, ) -> tuple[float | None, str]: """Show a 10×6 thumbnail grid; return (clicked_time, action). `action` is one of: 'pick', 'skip', 'unusable', 'back', 'quit'. `clicked_time` is None unless action == 'pick'. """ half = span_s / 2.0 times = np.linspace(max(0.0, center_t - half), center_t + half, N_THUMBS) print(f" loading {N_THUMBS} thumbnails ({times[0]:.1f}–{times[-1]:.1f}s)...", flush=True) thumbs = grab_thumbnails(video_path, times) fig, axes = plt.subplots(GRID_ROWS, GRID_COLS, figsize=(20, 11)) fig.suptitle( f"{title}\nclick a thumbnail · n=skip · u=unusable · b=back · q=quit", fontsize=11, ) state = {"time": None, "action": None} for ax, t, thumb in zip(axes.flat, times, thumbs): if thumb is not None: ax.imshow(thumb) else: ax.set_facecolor("black") ax.text(0.5, 0.5, "no frame", transform=ax.transAxes, ha="center", va="center", color="white") # Format time as M:SS.s for readability m, s = divmod(t, 60) ax.set_title(f"{int(m):d}:{s:05.2f}", fontsize=8, pad=1) ax.set_xticks([]); ax.set_yticks([]) fig.subplots_adjust(left=0.01, right=0.99, top=0.93, bottom=0.01, wspace=0.03, hspace=0.18) def on_click(event): if event.inaxes is None: return for i, ax in enumerate(axes.flat): if ax is event.inaxes: state["time"] = float(times[i]) state["action"] = "pick" plt.close(fig) return def on_key(event): k = event.key if k == "n": state["action"] = "skip"; plt.close(fig) elif k == "u": state["action"] = "unusable"; plt.close(fig) elif k == "b": state["action"] = "back"; plt.close(fig) elif k in ("q", "escape"): state["action"] = "quit"; plt.close(fig) fig.canvas.mpl_connect("button_press_event", on_click) fig.canvas.mpl_connect("key_press_event", on_key) plt.show() return state["time"], state["action"] or "skip" def pick_for_video( video_path: Path, db_path: Path | None, machine_name: str, session_date: str, session_time: str, coarse_span_s: float = DEFAULT_COARSE_SPAN_S, ) -> dict | str | None: """Run the two-stage thumbnail picker. Return dict, 'skip', or 'quit'.""" auto_t = auto_suggest(db_path) if db_path else None print(f" auto-suggest: {f'{auto_t:.1f}s' if auto_t else '(none)'}") # Stage 1: coarse grid centred on auto-suggest, or middle of span. coarse_center = auto_t if auto_t is not None else coarse_span_s / 2 title_coarse = (f"COARSE {machine_name} {session_date} {session_time} " f"· spanning {coarse_span_s/60:.0f} min") while True: coarse_t, action = show_thumbnail_grid( video_path, coarse_center, coarse_span_s, title_coarse ) if action == "skip": return "skip" if action == "unusable": return { "machine_name": machine_name, "session_date": session_date, "session_time": session_time, "opening_s": np.nan, "trim_first_s": 0, "notes": "unusable", } if action == "quit": return "quit" if action == "back": continue # already at the top stage; redraw if action == "pick" and coarse_t is not None: break # Stage 2: fine grid around the coarse pick. title_fine = (f"FINE {machine_name} {session_date} {session_time} " f"· ±{FINE_SPAN_S/2:.0f} s around {coarse_t:.1f} s") while True: fine_t, action = show_thumbnail_grid( video_path, coarse_t, FINE_SPAN_S, title_fine ) if action == "back": return pick_for_video(video_path, db_path, machine_name, session_date, session_time) if action == "skip": return "skip" if action == "unusable": return { "machine_name": machine_name, "session_date": session_date, "session_time": session_time, "opening_s": np.nan, "trim_first_s": 0, "notes": "unusable", } if action == "quit": return "quit" if action == "pick" and fine_t is not None: return { "machine_name": machine_name, "session_date": session_date, "session_time": session_time, "opening_s": round(fine_t, 1), "trim_first_s": 0, "notes": "", } def lookup_video_path(machine_name: str, session_date: str, session_time: str, inv: pd.DataFrame) -> Path | None: """Find the mp4 path for (machine, date, time) in the inventory.""" match = inv[ (inv["machine_name"] == machine_name) & (inv["session_date"] == session_date) & (inv["session_time"] == session_time) ] if match.empty: return None return Path(match.iloc[0]["mp4_path"]) def main() -> None: parser = argparse.ArgumentParser(description=__doc__) parser.add_argument("--redo", action="store_true", help="re-pick videos that already have a row in the output CSV") parser.add_argument("--limit", type=int, default=None, help="only process the first N videos") parser.add_argument("--db", type=Path, default=None, help="annotate this specific tracking DB only") parser.add_argument("--coarse-span", type=float, default=DEFAULT_COARSE_SPAN_S, help=f"coarse-grid time span in seconds (default {DEFAULT_COARSE_SPAN_S:.0f})") args = parser.parse_args() OUT_CSV.parent.mkdir(parents=True, exist_ok=True) if OUT_CSV.exists(): out = pd.read_csv(OUT_CSV) else: out = pd.DataFrame(columns=OUT_COLS) done = set(zip(out["machine_name"], out["session_date"], out["session_time"])) if not INVENTORY_CSV.exists(): sys.exit(f"Inventory not found at {INVENTORY_CSV}. Run build_video_inventory.py first.") inv = pd.read_csv(INVENTORY_CSV) # Build the queue: every (machine, date, time) referenced by the merged # TSV that has a tracking DB on disk and isn't yet annotated. tsv = pd.read_csv(VIDEO_INFO_TSV, sep="\t") queue: list[tuple[Path, Path, str, str, str]] = [] seen: set[tuple[str, str, str]] = set() for col in ("training_db_path", "testing_db_path"): for _, row in tsv.iterrows(): db = row[col] if not isinstance(db, str) or not db: continue db_path = Path(db) if not db_path.exists(): continue m = DB_NAME_RE.match(db_path.name) if not m: continue session_date, session_time = m.group(1), m.group(2) key = (row["machine_name"], session_date, session_time) if key in seen: continue seen.add(key) if key in done and not args.redo: continue video = lookup_video_path(*key, inv) if video is None or not video.exists(): print(f" ! no video for {key}; skipping") continue queue.append((db_path, video, *key)) if args.db: target = Path(args.db).resolve() queue = [q for q in queue if q[0].resolve() == target] if not queue: sys.exit(f"DB not found in queue: {args.db}") if args.limit: queue = queue[: args.limit] if not queue: print("Nothing to pick. All eligible videos already have a barrier_opening row.") return print(f"Picking barrier-opening for {len(queue)} videos.") print("Window keys: click=pick · n=skip · u=unusable · b=back · q=quit") saved = skipped = unusable = 0 for i, (db, video, machine_name, session_date, session_time) in enumerate(queue, 1): prefix = f"[{i}/{len(queue)}] {machine_name} {session_date} {session_time}" print(f"\n{prefix}") result = pick_for_video(video, db, machine_name, session_date, session_time, coarse_span_s=args.coarse_span) if result is None or result == "skip": skipped += 1 continue if result == "quit": print(" quit requested — saving and exiting") break new_row = pd.DataFrame([result]) out = pd.concat([ out[~((out.machine_name == result["machine_name"]) & (out.session_date == result["session_date"]) & (out.session_time == result["session_time"]))], new_row, ], ignore_index=True) out[OUT_COLS].to_csv(OUT_CSV, index=False) if pd.isna(result["opening_s"]): unusable += 1 print(" saved as unusable") else: saved += 1 print(f" saved opening_s = {result['opening_s']} s") print(f"\nDone: {saved} saved, {unusable} unusable, {skipped} skipped.") print(f" → {OUT_CSV}") if __name__ == "__main__": main()