"""Interactive target picker for offline tracking (matplotlib/Tk GUI). Loops through videos that need tracking and lets the user click 3 reference points per video in L-shape order: 1) TOP target (above the corner) 2) CORNER target (the right-angle vertex) 3) LEFT target (to the left of the corner) These three points are the same reference layout used by ethoscope's `TargetGridROIBuilder`: dst_points = [(0, -1), (0, 0), (-1, 0)] in unit coordinates. Saving them as a JSON sidecar lets the offline tracker build the 6-ROI HD mating arena grid without needing auto-target detection. Output JSON sidecar: TARGETS_DIR/.json { "video_path": "/mnt/.../*.mp4", "frame_index": , "reference_points": [[x0, y0], [x1, y1], [x2, y2]], "order": ["top", "corner", "left"], "picked_at": "" } Keys (in the picker window): LEFT-CLICK add a point (top → corner → left) r reset clicks for current video d skip this video for THIS run only (no JSON written) u mark this video unusable (FOV wrong etc.); skipped forever . / , advance / rewind by 25 frames (≈ 1 s @ 25 fps) ] / [ advance / rewind by 5% of the video (~3 min in a 1 h video) # jump to the middle of the video enter save the 3 points and move on q / ESC quit picker After the 3rd click, the 6 ROI rectangles are drawn over the frame so you can sanity-check the geometry before pressing ENTER. With --redo, if a JSON sidecar exists, its points are pre-loaded so you can nudge them rather than restart from scratch. Why matplotlib instead of cv2.imshow: OpenCV's bundled GUI uses Qt, which needs XKeyboard + a fonts directory and is fragile over SSH X11-forwarding. matplotlib's TkAgg backend uses pure Tk/X11 and works out of the box on any DISPLAY (and gives free pan/zoom via the toolbar — useful for clicking small targets precisely). """ from __future__ import annotations import argparse import datetime as dt import json import os import sys from pathlib import Path # Force TkAgg BEFORE importing matplotlib. We override even if MPLBACKEND is # already set, because the script is unusable with a non-interactive backend. os.environ["MPLBACKEND"] = "TkAgg" import cv2 # noqa: E402 import matplotlib # noqa: E402 import matplotlib.pyplot as plt # noqa: E402 import numpy as np # noqa: E402 import pandas as pd # noqa: E402 # matplotlib.backend_bases exposes the cursor identifiers under different # names depending on version: `Cursors` enum on 3.5+, lowercase `cursors` # instance on older releases. Both have the same integer attributes. try: from matplotlib.backend_bases import Cursors as _Cursors # 3.5+ except ImportError: try: from matplotlib.backend_bases import cursors as _Cursors # older except ImportError: _Cursors = None # Verify we ended up on an interactive backend; bail loud (with a concrete # explanation) if not. matplotlib silently falls back to 'agg' when its # requested backend can't load, which is hard to debug without help. _backend = matplotlib.get_backend() if _backend.lower() in ("agg", "headless", "template", "pdf", "svg", "ps"): diag = [] try: import tkinter as _tk try: _tk.Tk().destroy() diag.append("tkinter import + Tk() instantiation: OK") except Exception as e: diag.append(f"tkinter imported but Tk() failed: {e!r}") except Exception as e: diag.append(f"tkinter import FAILED: {e!r}") diag.append(" → on Manjaro/Arch, run: sudo pacman -S tk") print( f"ERROR: matplotlib loaded the non-interactive backend {_backend!r}.\n" f" Expected 'TkAgg'. Diagnostic info:\n" f" DISPLAY = {os.environ.get('DISPLAY')!r}\n" f" MPLBACKEND = {os.environ.get('MPLBACKEND')!r}\n" f" matplotlib ver = {matplotlib.__version__}\n" + "\n".join(f" {d}" for d in diag), file=sys.stderr, ) sys.exit(2) from config import INVENTORY_CSV, TARGETS_DIR # noqa: E402 from tracking_geometry import compute_roi_polygons # noqa: E402 # Strip default matplotlib keybindings that would conflict with ours. for k in ("keymap.home", "keymap.save", "keymap.quit", "keymap.fullscreen", "keymap.pan", "keymap.zoom", "keymap.back", "keymap.forward"): try: plt.rcParams[k] = [] except KeyError: pass CLICK_LABELS = ("TOP", "CORNER", "LEFT") CLICK_COLORS = ("red", "lime", "deepskyblue") def grab_frame( video_path: Path, frame_idx: int ) -> tuple[np.ndarray, int, int] | None: """Return (RGB frame, actual_frame_idx, n_frames) from the video, or None. Clamps frame_idx to [0, n_frames-1] so callers can step blindly. """ cap = cv2.VideoCapture(str(video_path)) if not cap.isOpened(): return None n = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) if n > 0: frame_idx = max(0, min(frame_idx, n - 1)) cap.set(cv2.CAP_PROP_POS_FRAMES, frame_idx) ok, frame = cap.read() cap.release() if not ok or frame is None: return None return cv2.cvtColor(frame, cv2.COLOR_BGR2RGB), frame_idx, n def pick_one( video_path: Path, frame_idx: int, status_prefix: str, initial_points: list[tuple[float, float]] | None = None, ) -> dict | None: """Show the picker UI for a single video; return the result dict or None.""" grabbed = grab_frame(video_path, frame_idx) if grabbed is None: print(f" ! cannot read {video_path}") return None frame, frame_idx, n_frames = grabbed # Big-step size for ] / [ : 5% of total length, ~3 min in a 1h video. big_step = max(1, int(round(0.05 * n_frames))) if n_frames > 0 else 250 fig, ax = plt.subplots(figsize=(14, 8)) try: fig.canvas.manager.set_window_title("pick targets") except Exception: pass # Use a crosshair cursor over the axes so it's obvious where the click # will land. matplotlib's toolbar resets the cursor to POINTER (arrow) on # every mouse-move when no tool is active, so we intercept set_cursor: # whenever it asks for POINTER, we substitute SELECT_REGION (crosshair). # Tool modes (zoom/pan) keep their native cursors. if _Cursors is not None: _orig_set_cursor = fig.canvas.set_cursor def _set_cursor_with_crosshair(cursor): if cursor == _Cursors.POINTER: cursor = _Cursors.SELECT_REGION return _orig_set_cursor(cursor) fig.canvas.set_cursor = _set_cursor_with_crosshair try: fig.canvas.set_cursor(_Cursors.SELECT_REGION) except Exception: pass else: # Last-ditch: just set the Tk widget's cursor once and hope the # toolbar doesn't immediately overwrite it. try: fig.canvas.get_tk_widget().config(cursor="tcross") except Exception: pass img_artist = ax.imshow(frame) ax.set_axis_off() fig.tight_layout() state = { "points": list(initial_points) if initial_points else [], "action": None, # 'save' | 'skip' | 'quit' | 'unusable' "frame": frame, "frame_idx": frame_idx, "drawn": [], # artists drawn on top of the image } def update_title(): nb = len(state["points"]) nxt = ( f"click {CLICK_LABELS[nb]}" if nb < 3 else "ENTER=save | r=reset d=skip u=unusable q=quit | . , [ ] # = step frame" ) ax.set_title( f'{status_prefix} frame {state["frame_idx"]} | {nxt}', fontsize=10, ) def redraw_points(): for a in state["drawn"]: try: a.remove() except Exception: pass state["drawn"].clear() for i, (x, y) in enumerate(state["points"]): color = CLICK_COLORS[i] label = CLICK_LABELS[i] (cross,) = ax.plot(x, y, marker="+", color=color, markersize=22, mew=2) (ring,) = ax.plot( x, y, marker="o", color=color, markersize=22, fillstyle="none", mew=2, ) txt = ax.text( x + 14, y - 14, label, color=color, fontsize=10, weight="bold", ) state["drawn"].extend([cross, ring, txt]) if len(state["points"]) >= 2: (line1,) = ax.plot( [state["points"][0][0], state["points"][1][0]], [state["points"][0][1], state["points"][1][1]], color="white", linewidth=0.7, alpha=0.6, ) state["drawn"].append(line1) if len(state["points"]) == 3: (line2,) = ax.plot( [state["points"][1][0], state["points"][2][0]], [state["points"][1][1], state["points"][2][1]], color="white", linewidth=0.7, alpha=0.6, ) state["drawn"].append(line2) # ROI overlay — draw the 6 computed rectangles on top of the frame try: polys = compute_roi_polygons(state["points"]) except Exception as e: polys = [] print(f" (ROI preview failed: {e})") for j, poly in enumerate(polys): # Close the polygon by repeating the first point xs = list(poly[:, 0]) + [poly[0, 0]] ys = list(poly[:, 1]) + [poly[0, 1]] (line,) = ax.plot( xs, ys, color="yellow", linewidth=1.5, alpha=0.9, ) state["drawn"].append(line) cx = float(np.mean(poly[:, 0])) cy = float(np.mean(poly[:, 1])) lbl = ax.text( cx, cy, str(j + 1), color="yellow", fontsize=14, weight="bold", ha="center", va="center", ) state["drawn"].append(lbl) update_title() fig.canvas.draw_idle() def reload_frame(new_idx: int): grabbed = grab_frame(video_path, new_idx) if grabbed is None: return new_frame, new_idx, _ = grabbed state["frame"] = new_frame state["frame_idx"] = new_idx img_artist.set_data(new_frame) # Keep clicked targets + ROI overlay in place across frame-stepping — # press 'r' to clear them explicitly. redraw_points() def on_click(event): if event.inaxes is not ax: return if event.button != 1: # left click only return if event.xdata is None or event.ydata is None: return # Skip clicks fired while the toolbar's pan/zoom is active. toolbar = getattr(fig.canvas, "toolbar", None) if toolbar is not None and getattr(toolbar, "mode", ""): return x, y = float(event.xdata), float(event.ydata) if len(state["points"]) < 3: state["points"].append((x, y)) else: # 3 points already there — replace the nearest one. Lets the user # nudge pre-loaded targets in --redo mode, or correct a bad click. dists = [(x - px) ** 2 + (y - py) ** 2 for px, py in state["points"]] i_nearest = min(range(3), key=dists.__getitem__) state["points"][i_nearest] = (x, y) redraw_points() def on_key(event): k = event.key or "" if k in ("escape", "q"): state["action"] = "quit" plt.close(fig) elif k == "r": state["points"].clear() redraw_points() elif k == "d": state["action"] = "skip" plt.close(fig) elif k == "u": state["action"] = "unusable" plt.close(fig) elif k == "enter": if len(state["points"]) == 3: state["action"] = "save" plt.close(fig) elif k == ".": reload_frame(state["frame_idx"] + 25) elif k == ",": reload_frame(state["frame_idx"] - 25) elif k == "]": reload_frame(state["frame_idx"] + big_step) elif k == "[": reload_frame(state["frame_idx"] - big_step) elif k == "#": if n_frames > 0: reload_frame(n_frames // 2) fig.canvas.mpl_connect("button_press_event", on_click) fig.canvas.mpl_connect("key_press_event", on_key) update_title() plt.show() # blocks until the figure is closed if state["action"] == "save": return { "action": "save", "frame_idx": state["frame_idx"], "points": state["points"], } if state["action"] == "unusable": return {"action": "unusable", "frame_idx": state["frame_idx"]} if state["action"] in ("skip", "quit"): return {"action": state["action"]} # Window closed via the WM "X" button — treat as quit so the loop stops return {"action": "quit"} def main() -> None: parser = argparse.ArgumentParser(description=__doc__) parser.add_argument( "--redo", action="store_true", help="re-pick videos that already have JSON sidecars", ) parser.add_argument( "--frame", type=int, default=125, help="default frame index to display (default 125 ≈ 5 s @ 25 fps)", ) parser.add_argument( "--limit", type=int, default=None, help="only process the first N videos", ) args = parser.parse_args() if not INVENTORY_CSV.exists(): sys.exit( f"Inventory not found at {INVENTORY_CSV}. " "Run build_video_inventory.py first." ) inv = pd.read_csv(INVENTORY_CSV) todo = inv[inv["in_xlsx"] & ~inv["already_tracked"]].copy() todo = todo.sort_values( ["session_date", "machine_name", "session_time"] ).reset_index(drop=True) TARGETS_DIR.mkdir(parents=True, exist_ok=True) def sidecar_for(mp4_path: str) -> Path: return TARGETS_DIR / (Path(mp4_path).stem + ".json") if not args.redo: todo = todo[ ~todo["mp4_path"].apply(lambda p: sidecar_for(p).exists()) ].reset_index(drop=True) if args.limit: todo = todo.head(args.limit) n = len(todo) if n == 0: print("Nothing to pick. All eligible videos already have target JSONs.") return print( f"Picking targets for {n} videos. " "Window keys: ENTER=save r=reset d=skip u=unusable q=quit " ".,[]=step frame | pan/zoom via toolbar" ) saved = skipped = unusable = 0 for i, row in todo.iterrows(): mp4 = Path(row["mp4_path"]) prefix = f"[{i + 1}/{n}] {row['machine_name']} {row['session_datetime']}" print(f"\n{prefix}") # If --redo and a JSON sidecar exists, pre-load its points (only for # regular saves — unusable sidecars are left as-is and shown empty). initial_points = None existing = sidecar_for(row["mp4_path"]) if args.redo and existing.exists(): try: prev = json.loads(existing.read_text()) if not prev.get("unusable") and prev.get("reference_points"): initial_points = [tuple(p) for p in prev["reference_points"]] print(f" pre-loaded {len(initial_points)} previous point(s)") except Exception as e: print(f" ! could not read previous sidecar: {e}") result = pick_one(mp4, args.frame, prefix, initial_points=initial_points) if result is None or result.get("action") == "quit": print(" quitting picker.") break if result["action"] == "skip": skipped += 1 print(" skipped (no JSON written, will be re-asked next run).") continue if result["action"] == "unusable": try: reason = input(" reason for marking unusable (Enter to skip): ").strip() except EOFError: reason = "" payload = { "video_path": str(mp4), "unusable": True, "reason": reason, "marked_at": dt.datetime.now().isoformat(timespec="seconds"), } out_path = sidecar_for(row["mp4_path"]) out_path.write_text(json.dumps(payload, indent=2)) unusable += 1 print(f" marked unusable → {out_path.name}") continue if result["action"] == "save": payload = { "video_path": str(mp4), "frame_index": int(result["frame_idx"]), "reference_points": [list(map(int, p)) for p in result["points"]], "order": ["top", "corner", "left"], "picked_at": dt.datetime.now().isoformat(timespec="seconds"), } out_path = sidecar_for(row["mp4_path"]) out_path.write_text(json.dumps(payload, indent=2)) saved += 1 print(f" saved → {out_path.name}") remaining = n - saved - skipped - unusable print( f"\nDone. saved={saved} unusable={unusable} " f"skipped(this run)={skipped} remaining={remaining}" ) if __name__ == "__main__": main()