cupido/scripts/auto_detect_targets.py
Giorgio e4da7691d5 Add offline tracking pipeline for video backlog
The 2024 video set in all_video_info_merged.xlsx covers 63 (date, machine)
sessions — 129 video instances — that have no auto-detectable targets, so
ROI placement requires manual reference-point selection. This commit adds
the three-stage pipeline that lets a user click for an hour, then walk
away while the tracker grinds overnight:

  1. build_video_inventory.py — scan /mnt/ethoscope_data/videos/ and join
     against the xlsx, producing data/metadata/video_inventory.csv

  2. pick_targets.py — interactive matplotlib/Tk picker. User clicks
     TOP/CORNER/LEFT (the L-shape ethoscope expects); after the third
     click the 6 ROI rectangles are drawn on top of the frame so geometry
     can be verified before saving. Also supports marking a video
     'unusable' (FOV wrong) so it's permanently skipped, frame stepping
     by ±1s/±5%/midpoint, point editing in --redo mode, and a crosshair
     cursor that survives matplotlib's per-motion cursor reset.

  3. track_videos.py — headless batch tracker. Reads the JSON sidecars,
     builds 6 ROIs from the HD-mating-arena geometry, runs MultiFlyTracker
     against the merged.mp4 via MovieVirtualCamera, writes SQLite DBs to
     data/tracked/. Idempotent (skips done DBs), parallel via --jobs,
     subclasses MovieVirtualCamera so frames stay BGR (MultiFlyTracker
     calls cvtColor(BGR2GRAY) without checking channel count).

Plus auto_detect_targets.py (fallback that runs ethoscope's auto-detector
in case any videos do have visible target dots), monitor_tracking.py
(progress + ETA from data/tracked/ ground truth, --watch for live view),
and tracking_geometry.py (single source of truth for the affine math
shared by picker and tracker).

requirements-tracking.txt pins the extra deps (opencv-python, openpyxl,
gitpython, netifaces, mysql-connector-python) — these are only needed
for the tracking pipeline, not the existing analysis notebooks.

Verified end-to-end on one of the user-picked videos: ~4000 rows/ROI in
a 120s slice, fly bounding boxes in the expected 800-2000 px² band.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-27 17:25:26 +01:00

119 lines
4 KiB
Python

"""Try auto-detection of L-shape targets on each video and save JSON sidecars.
Useful for:
- videos that DO have visible black-circle targets (saves manual clicks);
- as a smoke test of the whole pipeline before running the picker.
Failure is silent — videos that fail auto-detection are simply not written
to disk, leaving them for the manual `pick_targets.py` tool.
Output JSON has the same shape as the manual picker's so `track_videos.py`
can consume either.
"""
from __future__ import annotations
import argparse
import datetime as dt
import json
import logging
import sys
from pathlib import Path
import cv2
import numpy as np
import pandas as pd
# ethoscope source tree
sys.path.insert(0, "/home/gg/Code/ethoscope_project/ethoscope/src/ethoscope")
from config import INVENTORY_CSV, TARGETS_DIR # noqa: E402
from ethoscope.roi_builders.target_roi_builder import TargetGridROIBuilder # noqa: E402
def detect_one(video_path: Path, frame_idx: int) -> tuple[list[list[int]], int] | None:
"""Run ethoscope target detection on one frame; return (points, frame_idx) or None."""
cap = cv2.VideoCapture(str(video_path))
if not cap.isOpened():
return None
n = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
if n > 0 and frame_idx >= n:
frame_idx = max(0, n - 1)
cap.set(cv2.CAP_PROP_POS_FRAMES, frame_idx)
ok, frame = cap.read()
cap.release()
if not ok or frame is None:
return None
# The detector expects a single-channel image (grey) like ethoscope cameras produce.
if frame.ndim == 3:
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
else:
gray = frame
# We don't actually need a fully-configured grid here — _find_target_coordinates
# alone gives us the 3 reference points.
builder = TargetGridROIBuilder(n_rows=2, n_cols=3)
try:
ref = builder._find_target_coordinates(gray)
except Exception as e:
logging.debug(f"detection failed for {video_path.name}: {e}")
return None
if ref is None:
return None
return [[int(p[0]), int(p[1])] for p in ref], frame_idx
def main() -> None:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--frame", type=int, default=125)
parser.add_argument("--limit", type=int, default=None)
parser.add_argument("--video", type=str, default=None,
help="run on a single video path (skips inventory)")
parser.add_argument("--overwrite", action="store_true",
help="overwrite existing JSON sidecars")
args = parser.parse_args()
TARGETS_DIR.mkdir(parents=True, exist_ok=True)
if args.video:
videos = [Path(args.video)]
else:
if not INVENTORY_CSV.exists():
sys.exit("Inventory missing — run build_video_inventory.py first.")
inv = pd.read_csv(INVENTORY_CSV)
todo = inv[inv["in_xlsx"] & ~inv["already_tracked"]]
videos = [Path(p) for p in todo["mp4_path"].tolist()]
if args.limit:
videos = videos[: args.limit]
n_ok = n_fail = n_skip = 0
for v in videos:
out = TARGETS_DIR / f"{v.stem}.json"
if out.exists() and not args.overwrite:
n_skip += 1
continue
result = detect_one(v, args.frame)
if result is None:
n_fail += 1
print(f" fail: {v.name}")
continue
points, used_frame = result
out.write_text(json.dumps({
"video_path": str(v),
"frame_index": int(used_frame),
"reference_points": points,
"order": ["top", "corner", "left"],
"picked_at": dt.datetime.now().isoformat(timespec="seconds"),
"method": "auto",
}, indent=2))
n_ok += 1
print(f" ok: {v.name}{points}")
print(f"\nDone. ok={n_ok} fail={n_fail} skipped(existing)={n_skip}")
if __name__ == "__main__":
logging.basicConfig(level=logging.WARNING, format="%(levelname)s %(message)s")
main()