"""Try auto-detection of L-shape targets on each video and save JSON sidecars. Useful for: - videos that DO have visible black-circle targets (saves manual clicks); - as a smoke test of the whole pipeline before running the picker. Failure is silent — videos that fail auto-detection are simply not written to disk, leaving them for the manual `pick_targets.py` tool. Output JSON has the same shape as the manual picker's so `track_videos.py` can consume either. """ from __future__ import annotations import argparse import datetime as dt import json import logging import sys from pathlib import Path import cv2 import numpy as np import pandas as pd from config import ETHOSCOPE_SRC, INVENTORY_CSV, TARGETS_DIR # ethoscope source tree sys.path.insert(0, str(ETHOSCOPE_SRC)) from ethoscope.roi_builders.target_roi_builder import TargetGridROIBuilder # noqa: E402 def detect_one(video_path: Path, frame_idx: int) -> tuple[list[list[int]], int] | None: """Run ethoscope target detection on one frame; return (points, frame_idx) or None.""" cap = cv2.VideoCapture(str(video_path)) if not cap.isOpened(): return None n = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) if n > 0 and frame_idx >= n: frame_idx = max(0, n - 1) cap.set(cv2.CAP_PROP_POS_FRAMES, frame_idx) ok, frame = cap.read() cap.release() if not ok or frame is None: return None # The detector expects a single-channel image (grey) like ethoscope cameras produce. if frame.ndim == 3: gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) else: gray = frame # We don't actually need a fully-configured grid here — _find_target_coordinates # alone gives us the 3 reference points. builder = TargetGridROIBuilder(n_rows=2, n_cols=3) try: ref = builder._find_target_coordinates(gray) except Exception as e: logging.debug(f"detection failed for {video_path.name}: {e}") return None if ref is None: return None return [[int(p[0]), int(p[1])] for p in ref], frame_idx def main() -> None: parser = argparse.ArgumentParser(description=__doc__) parser.add_argument("--frame", type=int, default=125) parser.add_argument("--limit", type=int, default=None) parser.add_argument("--video", type=str, default=None, help="run on a single video path (skips inventory)") parser.add_argument("--overwrite", action="store_true", help="overwrite existing JSON sidecars") args = parser.parse_args() TARGETS_DIR.mkdir(parents=True, exist_ok=True) if args.video: videos = [Path(args.video)] else: if not INVENTORY_CSV.exists(): sys.exit("Inventory missing — run build_video_inventory.py first.") inv = pd.read_csv(INVENTORY_CSV) todo = inv[inv["in_xlsx"] & ~inv["already_tracked"]] videos = [Path(p) for p in todo["mp4_path"].tolist()] if args.limit: videos = videos[: args.limit] n_ok = n_fail = n_skip = 0 for v in videos: out = TARGETS_DIR / f"{v.stem}.json" if out.exists() and not args.overwrite: n_skip += 1 continue result = detect_one(v, args.frame) if result is None: n_fail += 1 print(f" fail: {v.name}") continue points, used_frame = result out.write_text(json.dumps({ "video_path": str(v), "frame_index": int(used_frame), "reference_points": points, "order": ["top", "corner", "left"], "picked_at": dt.datetime.now().isoformat(timespec="seconds"), "method": "auto", }, indent=2)) n_ok += 1 print(f" ok: {v.name} → {points}") print(f"\nDone. ok={n_ok} fail={n_fail} skipped(existing)={n_skip}") if __name__ == "__main__": logging.basicConfig(level=logging.WARNING, format="%(levelname)s %(message)s") main()