Notebooks now use Path.home() / "cupido" for the repo root (works for any user inside the JupyterLab container), and the offline-tracking scripts read the ethoscope source-tree location from the new ETHOSCOPE_SRC config constant — defaulting to ~/Code/ethoscope_project/... and overridable via the ETHOSCOPE_SRC environment variable. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
119 lines
3.9 KiB
Python
119 lines
3.9 KiB
Python
"""Try auto-detection of L-shape targets on each video and save JSON sidecars.
|
|
|
|
Useful for:
|
|
- videos that DO have visible black-circle targets (saves manual clicks);
|
|
- as a smoke test of the whole pipeline before running the picker.
|
|
|
|
Failure is silent — videos that fail auto-detection are simply not written
|
|
to disk, leaving them for the manual `pick_targets.py` tool.
|
|
|
|
Output JSON has the same shape as the manual picker's so `track_videos.py`
|
|
can consume either.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import datetime as dt
|
|
import json
|
|
import logging
|
|
import sys
|
|
from pathlib import Path
|
|
|
|
import cv2
|
|
import numpy as np
|
|
import pandas as pd
|
|
|
|
from config import ETHOSCOPE_SRC, INVENTORY_CSV, TARGETS_DIR
|
|
|
|
# ethoscope source tree
|
|
sys.path.insert(0, str(ETHOSCOPE_SRC))
|
|
|
|
from ethoscope.roi_builders.target_roi_builder import TargetGridROIBuilder # noqa: E402
|
|
|
|
|
|
def detect_one(video_path: Path, frame_idx: int) -> tuple[list[list[int]], int] | None:
|
|
"""Run ethoscope target detection on one frame; return (points, frame_idx) or None."""
|
|
cap = cv2.VideoCapture(str(video_path))
|
|
if not cap.isOpened():
|
|
return None
|
|
n = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
|
|
if n > 0 and frame_idx >= n:
|
|
frame_idx = max(0, n - 1)
|
|
cap.set(cv2.CAP_PROP_POS_FRAMES, frame_idx)
|
|
ok, frame = cap.read()
|
|
cap.release()
|
|
if not ok or frame is None:
|
|
return None
|
|
|
|
# The detector expects a single-channel image (grey) like ethoscope cameras produce.
|
|
if frame.ndim == 3:
|
|
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
|
|
else:
|
|
gray = frame
|
|
|
|
# We don't actually need a fully-configured grid here — _find_target_coordinates
|
|
# alone gives us the 3 reference points.
|
|
builder = TargetGridROIBuilder(n_rows=2, n_cols=3)
|
|
try:
|
|
ref = builder._find_target_coordinates(gray)
|
|
except Exception as e:
|
|
logging.debug(f"detection failed for {video_path.name}: {e}")
|
|
return None
|
|
if ref is None:
|
|
return None
|
|
return [[int(p[0]), int(p[1])] for p in ref], frame_idx
|
|
|
|
|
|
def main() -> None:
|
|
parser = argparse.ArgumentParser(description=__doc__)
|
|
parser.add_argument("--frame", type=int, default=125)
|
|
parser.add_argument("--limit", type=int, default=None)
|
|
parser.add_argument("--video", type=str, default=None,
|
|
help="run on a single video path (skips inventory)")
|
|
parser.add_argument("--overwrite", action="store_true",
|
|
help="overwrite existing JSON sidecars")
|
|
args = parser.parse_args()
|
|
|
|
TARGETS_DIR.mkdir(parents=True, exist_ok=True)
|
|
|
|
if args.video:
|
|
videos = [Path(args.video)]
|
|
else:
|
|
if not INVENTORY_CSV.exists():
|
|
sys.exit("Inventory missing — run build_video_inventory.py first.")
|
|
inv = pd.read_csv(INVENTORY_CSV)
|
|
todo = inv[inv["in_xlsx"] & ~inv["already_tracked"]]
|
|
videos = [Path(p) for p in todo["mp4_path"].tolist()]
|
|
if args.limit:
|
|
videos = videos[: args.limit]
|
|
|
|
n_ok = n_fail = n_skip = 0
|
|
for v in videos:
|
|
out = TARGETS_DIR / f"{v.stem}.json"
|
|
if out.exists() and not args.overwrite:
|
|
n_skip += 1
|
|
continue
|
|
result = detect_one(v, args.frame)
|
|
if result is None:
|
|
n_fail += 1
|
|
print(f" fail: {v.name}")
|
|
continue
|
|
points, used_frame = result
|
|
out.write_text(json.dumps({
|
|
"video_path": str(v),
|
|
"frame_index": int(used_frame),
|
|
"reference_points": points,
|
|
"order": ["top", "corner", "left"],
|
|
"picked_at": dt.datetime.now().isoformat(timespec="seconds"),
|
|
"method": "auto",
|
|
}, indent=2))
|
|
n_ok += 1
|
|
print(f" ok: {v.name} → {points}")
|
|
|
|
print(f"\nDone. ok={n_ok} fail={n_fail} skipped(existing)={n_skip}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
logging.basicConfig(level=logging.WARNING, format="%(levelname)s %(message)s")
|
|
main()
|