"""Try auto-detection of L-shape targets on each video and save JSON sidecars. Useful for: - videos that DO have visible black-circle targets (saves manual clicks); - as a smoke test of the whole pipeline before running the picker. Failure is silent — videos that fail auto-detection are simply not written to disk, leaving them for the manual `pick_targets.py` tool. Output JSON has the same shape as the manual picker's so `track_videos.py` can consume either. """ from __future__ import annotations import argparse import datetime as dt import json import logging import sys from pathlib import Path import cv2 import numpy as np import pandas as pd # ethoscope source tree sys.path.insert(0, "/home/gg/Code/ethoscope_project/ethoscope/src/ethoscope") from config import INVENTORY_CSV, TARGETS_DIR # noqa: E402 from ethoscope.roi_builders.target_roi_builder import TargetGridROIBuilder # noqa: E402 def detect_one(video_path: Path, frame_idx: int) -> tuple[list[list[int]], int] | None: """Run ethoscope target detection on one frame; return (points, frame_idx) or None.""" cap = cv2.VideoCapture(str(video_path)) if not cap.isOpened(): return None n = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) if n > 0 and frame_idx >= n: frame_idx = max(0, n - 1) cap.set(cv2.CAP_PROP_POS_FRAMES, frame_idx) ok, frame = cap.read() cap.release() if not ok or frame is None: return None # The detector expects a single-channel image (grey) like ethoscope cameras produce. if frame.ndim == 3: gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) else: gray = frame # We don't actually need a fully-configured grid here — _find_target_coordinates # alone gives us the 3 reference points. builder = TargetGridROIBuilder(n_rows=2, n_cols=3) try: ref = builder._find_target_coordinates(gray) except Exception as e: logging.debug(f"detection failed for {video_path.name}: {e}") return None if ref is None: return None return [[int(p[0]), int(p[1])] for p in ref], frame_idx def main() -> None: parser = argparse.ArgumentParser(description=__doc__) parser.add_argument("--frame", type=int, default=125) parser.add_argument("--limit", type=int, default=None) parser.add_argument("--video", type=str, default=None, help="run on a single video path (skips inventory)") parser.add_argument("--overwrite", action="store_true", help="overwrite existing JSON sidecars") args = parser.parse_args() TARGETS_DIR.mkdir(parents=True, exist_ok=True) if args.video: videos = [Path(args.video)] else: if not INVENTORY_CSV.exists(): sys.exit("Inventory missing — run build_video_inventory.py first.") inv = pd.read_csv(INVENTORY_CSV) todo = inv[inv["in_xlsx"] & ~inv["already_tracked"]] videos = [Path(p) for p in todo["mp4_path"].tolist()] if args.limit: videos = videos[: args.limit] n_ok = n_fail = n_skip = 0 for v in videos: out = TARGETS_DIR / f"{v.stem}.json" if out.exists() and not args.overwrite: n_skip += 1 continue result = detect_one(v, args.frame) if result is None: n_fail += 1 print(f" fail: {v.name}") continue points, used_frame = result out.write_text(json.dumps({ "video_path": str(v), "frame_index": int(used_frame), "reference_points": points, "order": ["top", "corner", "left"], "picked_at": dt.datetime.now().isoformat(timespec="seconds"), "method": "auto", }, indent=2)) n_ok += 1 print(f" ok: {v.name} → {points}") print(f"\nDone. ok={n_ok} fail={n_fail} skipped(existing)={n_skip}") if __name__ == "__main__": logging.basicConfig(level=logging.WARNING, format="%(levelname)s %(message)s") main()