cupido/scripts/pick_barrier.py
Giorgio Gilestro 24403e0474 Force interactive matplotlib backend in pick_barrier
Some environments default matplotlib to Agg (non-interactive), which
silently no-ops plt.show() — the picker would print "FigureCanvasAgg
is non-interactive" and never display the thumbnail grid. Probe TkAgg
> QtAgg > Qt5Agg > GTK3Agg before pyplot import.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-01 12:23:15 +01:00

504 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Interactive picker for barrier-opening time.
Two-stage flow per video:
1. Coarse: 10×6 thumbnail grid spanning 10 min (~10 s spacing) lets
you click the rough moment where the barrier opens.
2. Fine: launches mpv at the coarse pick, paused with on-screen
fractional time. You scrub to the exact frame; on close, type the
time you saw on the OSD into the terminal prompt. Default is the
coarse pick.
After the time is set, the picker also prompts for non-opening ROIs
(comma-separated list, e.g. "1,2,3"). Useful for the partial-opening
videos where only some sub-arenas open. Saved to the `bad_rois` column.
Output: data/metadata/barrier_opening.csv with columns
machine_name, session_date, session_time, opening_s, trim_first_s,
bad_rois, notes
Coarse-grid keys:
click pick that timestamp
n skip this video for THIS run
u mark unusable (opening_s = NaN)
q / ESC save+quit
Usage:
python pick_barrier.py
python pick_barrier.py --redo
python pick_barrier.py --limit 10
python pick_barrier.py --db /path/to/specific_tracking.db
python pick_barrier.py --no-player # skip the video-player refinement step
"""
from __future__ import annotations
import argparse
import re
import shutil
import sqlite3
import subprocess
import sys
from pathlib import Path
import cv2
import matplotlib
# Reason: force an interactive backend so plt.show() actually opens a window.
# Some environments (depending on matplotlibrc / DISPLAY / installed Qt-vs-Tk
# bindings) default to Agg, which silently no-ops the picker. Try the common
# GUI backends in order; fall through to whatever is set if none are available.
for _backend in ("TkAgg", "QtAgg", "Qt5Agg", "GTK3Agg"):
try:
matplotlib.use(_backend, force=True)
break
except (ImportError, ValueError):
continue
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from config import DATA_METADATA, INVENTORY_CSV, VIDEO_INFO_TSV
from detect_barrier_opening import (
STEP_S, WINDOW_S,
per_frame_distance, sliding_mean,
)
OUT_CSV = DATA_METADATA / "barrier_opening.csv"
OUT_COLS = ["machine_name", "session_date", "session_time",
"opening_s", "trim_first_s", "bad_rois", "notes"]
DB_NAME_RE = re.compile(
r"^(\d{4}-\d{2}-\d{2})_(\d{2}-\d{2}-\d{2})_([0-9a-f]{32})__"
)
GRID_ROWS, GRID_COLS = 6, 10
N_THUMBS = GRID_ROWS * GRID_COLS # 60
DEFAULT_COARSE_SPAN_S = 600.0 # 0..10 min, ~10 s spacing — covers late-opening videos
FINE_SPAN_S = 12.0 # ±6 s around coarse pick → ~0.2 s spacing
AUTO_SEARCH_END_S = 600.0 # how far the auto-detector scans for its suggestion
def auto_suggest(db_path: Path) -> float | None:
"""Median of per-ROI biggest-drop times. None if too noisy."""
try:
conn = sqlite3.connect(f"file:{db_path}?mode=ro", uri=True)
except sqlite3.Error:
return None
candidates = []
for roi in range(1, 7):
try:
df = pd.read_sql_query(f"SELECT t, x, y, id FROM ROI_{roi}", conn)
except Exception:
continue
dist = per_frame_distance(df)
smean = sliding_mean(dist, WINDOW_S, STEP_S, AUTO_SEARCH_END_S)
pad = max(1, int(WINDOW_S / STEP_S))
if len(smean) < 2 * pad + 1:
continue
best_drop = -np.inf
best_t = None
for i in range(pad, len(smean) - pad):
pre = smean["mean_dist"].iloc[:i].median()
post = smean["mean_dist"].iloc[i:].median()
drop = pre - post
if drop > best_drop:
best_drop = drop
best_t = float(smean["mid_t"].iloc[i])
if best_drop > 30 and best_t is not None:
candidates.append(best_t)
conn.close()
if not candidates:
return None
return float(np.median(candidates))
def grab_thumbnails(video_path: Path, target_times_s: np.ndarray,
thumb_w: int = 320) -> list[np.ndarray | None]:
"""Read thumbnails at the requested timestamps via a single sequential pass.
Linear-decode is much faster than seeking per-frame on H.264. We read
frames sequentially from the earliest target onward, keeping only the
ones at requested target frames.
"""
cap = cv2.VideoCapture(str(video_path))
fps = cap.get(cv2.CAP_PROP_FPS) or 25.0
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
src_w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
src_h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
if total_frames <= 0:
cap.release()
return [None] * len(target_times_s)
target_frames = np.clip(
(target_times_s * fps).round().astype(int), 0, total_frames - 1
)
sort_idx = np.argsort(target_frames)
sorted_targets = target_frames[sort_idx]
out: list[np.ndarray | None] = [None] * len(target_times_s)
if sorted_targets.size == 0:
cap.release()
return out
cap.set(cv2.CAP_PROP_POS_FRAMES, int(sorted_targets[0]))
cur_frame = int(sorted_targets[0])
last_frame_data: np.ndarray | None = None
scale = thumb_w / src_w if src_w > 0 else 1.0
thumb_h = max(1, int(round(src_h * scale)))
for ord_i, target in zip(sort_idx, sorted_targets):
while cur_frame <= target:
ret, frame = cap.read()
if not ret:
last_frame_data = None
break
last_frame_data = frame
cur_frame += 1
if last_frame_data is not None:
small = cv2.resize(last_frame_data, (thumb_w, thumb_h),
interpolation=cv2.INTER_AREA)
out[ord_i] = cv2.cvtColor(small, cv2.COLOR_BGR2RGB)
cap.release()
return out
def show_thumbnail_grid(
video_path: Path,
center_t: float,
span_s: float,
title: str,
) -> tuple[float | None, str]:
"""Show a 10×6 thumbnail grid; return (clicked_time, action).
`action` is one of: 'pick', 'skip', 'unusable', 'back', 'quit'.
`clicked_time` is None unless action == 'pick'.
"""
half = span_s / 2.0
times = np.linspace(max(0.0, center_t - half), center_t + half, N_THUMBS)
print(f" loading {N_THUMBS} thumbnails ({times[0]:.1f}{times[-1]:.1f}s)...", flush=True)
thumbs = grab_thumbnails(video_path, times)
fig, axes = plt.subplots(GRID_ROWS, GRID_COLS, figsize=(20, 11))
fig.suptitle(
f"{title}\nclick a thumbnail · n=skip · u=unusable · b=back · q=quit",
fontsize=11,
)
state = {"time": None, "action": None}
for ax, t, thumb in zip(axes.flat, times, thumbs):
if thumb is not None:
ax.imshow(thumb)
else:
ax.set_facecolor("black")
ax.text(0.5, 0.5, "no frame",
transform=ax.transAxes, ha="center", va="center", color="white")
# Format time as M:SS.s for readability
m, s = divmod(t, 60)
ax.set_title(f"{int(m):d}:{s:05.2f}", fontsize=8, pad=1)
ax.set_xticks([]); ax.set_yticks([])
fig.subplots_adjust(left=0.01, right=0.99, top=0.93, bottom=0.01,
wspace=0.03, hspace=0.18)
def on_click(event):
if event.inaxes is None:
return
for i, ax in enumerate(axes.flat):
if ax is event.inaxes:
state["time"] = float(times[i])
state["action"] = "pick"
plt.close(fig)
return
def on_key(event):
k = event.key
if k == "n":
state["action"] = "skip"; plt.close(fig)
elif k == "u":
state["action"] = "unusable"; plt.close(fig)
elif k == "b":
state["action"] = "back"; plt.close(fig)
elif k in ("q", "escape"):
state["action"] = "quit"; plt.close(fig)
fig.canvas.mpl_connect("button_press_event", on_click)
fig.canvas.mpl_connect("key_press_event", on_key)
plt.show()
return state["time"], state["action"] or "skip"
def parse_time_input(s: str) -> float | None:
"""Accept seconds ('290'), m:ss ('4:50'), or m:ss.ss ('4:50.40')."""
s = s.strip()
if not s:
return None
try:
if ":" in s:
parts = s.split(":")
if len(parts) == 2:
return float(parts[0]) * 60 + float(parts[1])
if len(parts) == 3:
return float(parts[0]) * 3600 + float(parts[1]) * 60 + float(parts[2])
return float(s)
except ValueError:
return None
def play_video(video_path: Path, start_t: float) -> str | None:
"""Launch a video player at start_t-30s. Returns player name, or None.
Tries mpv (best UX for this), then vlc, then ffplay. The user scrubs
to the exact frame, reads the timestamp off the player's OSD/seekbar,
closes the player, and types the time at the terminal prompt.
"""
seek = max(0.0, start_t - 30.0)
if shutil.which("mpv"):
cmd = ["mpv", "--no-resume-playback", "--osd-level=3", "--osd-fractions",
"--pause", f"--start={seek:.1f}", str(video_path)]
name = "mpv"
elif shutil.which("vlc"):
cmd = ["vlc", "--no-video-title-show", f"--start-time={seek:.1f}",
"--play-and-pause", str(video_path)]
name = "vlc"
elif shutil.which("ffplay"):
cmd = ["ffplay", "-hide_banner", "-loglevel", "error",
"-ss", f"{seek:.1f}", str(video_path)]
name = "ffplay"
else:
print(" ! no video player found (tried mpv, vlc, ffplay)")
return None
print(f" launching {name} at {seek:.1f}s — pause on the opening frame, "
"read the time off the player, then close it.")
try:
subprocess.run(cmd, check=False,
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
except OSError as e:
print(f" {name} launch failed: {e}")
return None
return name
def prompt_opening_time(default_s: float) -> float | str:
"""Prompt for the exact opening time. Returns float, 'skip', 'unusable', 'quit'."""
while True:
s = input(
f" opening time [enter={default_s:.1f}s · n=skip · u=unusable · q=quit]: "
).strip()
if not s:
return default_s
if s.lower() in ("n", "skip"):
return "skip"
if s.lower() in ("u", "unusable"):
return "unusable"
if s.lower() in ("q", "quit"):
return "quit"
t = parse_time_input(s)
if t is None:
print(" ? enter seconds, m:ss, or m:ss.ss")
continue
return t
def prompt_bad_rois() -> str:
"""Prompt for non-opening ROIs. Returns comma-separated string (may be empty)."""
while True:
s = input(" non-opening ROIs (e.g. '1,2,3') [enter=none]: ").strip()
if not s:
return ""
# Sanity-check: comma-separated integers in 1..6
try:
rois = sorted({int(x.strip()) for x in s.split(",") if x.strip()})
except ValueError:
print(" ? enter integers separated by commas")
continue
if not all(1 <= r <= 6 for r in rois):
print(" ? ROI numbers must be 1..6")
continue
return ",".join(str(r) for r in rois)
def pick_for_video(
video_path: Path,
db_path: Path | None,
machine_name: str,
session_date: str,
session_time: str,
coarse_span_s: float = DEFAULT_COARSE_SPAN_S,
use_player: bool = True,
) -> dict | str | None:
"""Run the picker. Return result dict, 'skip', or 'quit'."""
auto_t = auto_suggest(db_path) if db_path else None
print(f" auto-suggest: {f'{auto_t:.1f}s' if auto_t else '(none)'}")
# Stage 1: coarse thumbnail grid for rough localisation.
coarse_center = auto_t if auto_t is not None else coarse_span_s / 2
title_coarse = (f"COARSE {machine_name} {session_date} {session_time} "
f"· spanning {coarse_span_s/60:.0f} min "
f"· click ≈ where the barrier opens")
coarse_t, action = show_thumbnail_grid(
video_path, coarse_center, coarse_span_s, title_coarse
)
if action == "skip":
return "skip"
if action == "unusable":
return {
"machine_name": machine_name, "session_date": session_date,
"session_time": session_time, "opening_s": np.nan,
"trim_first_s": 0, "bad_rois": "", "notes": "unusable",
}
if action == "quit":
return "quit"
if action != "pick" or coarse_t is None:
return "skip"
# Stage 2: hand off to a video player for frame-accurate refinement.
if use_player:
played = play_video(video_path, coarse_t)
if played is None:
print(" ! using coarse pick as the answer.")
fine_t = prompt_opening_time(default_s=coarse_t)
if fine_t == "skip":
return "skip"
if fine_t == "unusable":
return {
"machine_name": machine_name, "session_date": session_date,
"session_time": session_time, "opening_s": np.nan,
"trim_first_s": 0, "bad_rois": "", "notes": "unusable",
}
if fine_t == "quit":
return "quit"
bad_rois = prompt_bad_rois()
return {
"machine_name": machine_name, "session_date": session_date,
"session_time": session_time, "opening_s": round(float(fine_t), 1),
"trim_first_s": 0, "bad_rois": bad_rois, "notes": "",
}
def lookup_video_path(machine_name: str, session_date: str,
session_time: str, inv: pd.DataFrame) -> Path | None:
"""Find the mp4 path for (machine, date, time) in the inventory."""
match = inv[
(inv["machine_name"] == machine_name)
& (inv["session_date"] == session_date)
& (inv["session_time"] == session_time)
]
if match.empty:
return None
return Path(match.iloc[0]["mp4_path"])
def main() -> None:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--redo", action="store_true",
help="re-pick videos that already have a row in the output CSV")
parser.add_argument("--limit", type=int, default=None,
help="only process the first N videos")
parser.add_argument("--db", type=Path, default=None,
help="annotate this specific tracking DB only")
parser.add_argument("--coarse-span", type=float, default=DEFAULT_COARSE_SPAN_S,
help=f"coarse-grid time span in seconds (default {DEFAULT_COARSE_SPAN_S:.0f})")
parser.add_argument("--no-player", action="store_true",
help="skip the video-player refinement step (use the coarse pick directly)")
args = parser.parse_args()
OUT_CSV.parent.mkdir(parents=True, exist_ok=True)
if OUT_CSV.exists():
out = pd.read_csv(OUT_CSV)
else:
out = pd.DataFrame(columns=OUT_COLS)
# Reason: backfill bad_rois column for older CSVs without it.
for col in OUT_COLS:
if col not in out.columns:
out[col] = ""
done = set(zip(out["machine_name"], out["session_date"], out["session_time"]))
if not INVENTORY_CSV.exists():
sys.exit(f"Inventory not found at {INVENTORY_CSV}. Run build_video_inventory.py first.")
inv = pd.read_csv(INVENTORY_CSV)
# Build the queue: every (machine, date, time) referenced by the merged
# TSV that has a tracking DB on disk and isn't yet annotated.
tsv = pd.read_csv(VIDEO_INFO_TSV, sep="\t")
queue: list[tuple[Path, Path, str, str, str]] = []
seen: set[tuple[str, str, str]] = set()
for col in ("training_db_path", "testing_db_path"):
for _, row in tsv.iterrows():
db = row[col]
if not isinstance(db, str) or not db:
continue
db_path = Path(db)
if not db_path.exists():
continue
m = DB_NAME_RE.match(db_path.name)
if not m:
continue
session_date, session_time = m.group(1), m.group(2)
key = (row["machine_name"], session_date, session_time)
if key in seen:
continue
seen.add(key)
if key in done and not args.redo:
continue
video = lookup_video_path(*key, inv)
if video is None or not video.exists():
print(f" ! no video for {key}; skipping")
continue
queue.append((db_path, video, *key))
if args.db:
target = Path(args.db).resolve()
queue = [q for q in queue if q[0].resolve() == target]
if not queue:
sys.exit(f"DB not found in queue: {args.db}")
if args.limit:
queue = queue[: args.limit]
if not queue:
print("Nothing to pick. All eligible videos already have a barrier_opening row.")
return
print(f"Picking barrier-opening for {len(queue)} videos.")
print("Window keys: click=pick · n=skip · u=unusable · b=back · q=quit")
saved = skipped = unusable = 0
for i, (db, video, machine_name, session_date, session_time) in enumerate(queue, 1):
prefix = f"[{i}/{len(queue)}] {machine_name} {session_date} {session_time}"
print(f"\n{prefix}")
result = pick_for_video(video, db, machine_name, session_date, session_time,
coarse_span_s=args.coarse_span,
use_player=not args.no_player)
if result is None or result == "skip":
skipped += 1
continue
if result == "quit":
print(" quit requested — saving and exiting")
break
new_row = pd.DataFrame([result])
out = pd.concat([
out[~((out.machine_name == result["machine_name"]) &
(out.session_date == result["session_date"]) &
(out.session_time == result["session_time"]))],
new_row,
], ignore_index=True)
out[OUT_COLS].to_csv(OUT_CSV, index=False)
if pd.isna(result["opening_s"]):
unusable += 1
print(" saved as unusable")
else:
saved += 1
print(f" saved opening_s = {result['opening_s']} s")
print(f"\nDone: {saved} saved, {unusable} unusable, {skipped} skipped.")
print(f"{OUT_CSV}")
if __name__ == "__main__":
main()