"""Augment all_video_info_merged.xlsx with the input video + tracking DB paths. Each xlsx row represents one fly (date, machine_name, ROI), observed across a training session and a testing session. We resolve those two sessions to the on-disk video files (via the inventory CSV) and to their tracking DBs (under TRACKING_OUTPUT_DIR), then write the result as TSV. Output columns added: training_video_path, training_db_path, testing_video_path, testing_db_path Empty values mean either no video matched (rare — implies missing inventory entry) or no DB exists yet (e.g. the one video the completeness gate rejected). Usage: python export_video_db_index.py python export_video_db_index.py --out path/to/output.tsv """ from __future__ import annotations import argparse import re from pathlib import Path import pandas as pd from config import INVENTORY_CSV, TRACKING_OUTPUT_DIR, VIDEO_INFO_TSV, VIDEO_INFO_XLSX _TIME_RE = re.compile(r"^(\d{8})_(\d{1,2})(\d{2})?(AM|PM)$", re.IGNORECASE) def parse_xlsx_time(value: str) -> tuple[str, int] | None: """Convert '20241021_11AM' / '20240918_1030AM' to (YYYY-MM-DD, minutes24). Resolution is hour-only when no minutes are given (e.g. '11AM' → 11:00). Returns minutes-from-midnight so we can do nearest-neighbor matching. """ if not isinstance(value, str): return None m = _TIME_RE.match(value.strip()) if not m: return None ymd, hh, mm, ampm = m.groups() date = f"{ymd[:4]}-{ymd[4:6]}-{ymd[6:8]}" hour = int(hh) minute = int(mm) if mm else 0 if ampm.upper() == "PM" and hour != 12: hour += 12 if ampm.upper() == "AM" and hour == 12: hour = 0 return date, hour * 60 + minute def build_session_index(inventory: pd.DataFrame) -> dict[tuple[str, str], list[dict]]: """Index inventory rows by (date, machine_name) → list of session dicts.""" idx: dict[tuple[str, str], list[dict]] = {} for row in inventory.itertuples(index=False): h, m, _s = (int(p) for p in str(row.session_time).split("-")) key = (row.session_date, row.machine_name) idx.setdefault(key, []).append({ "mp4_path": row.mp4_path, "session_datetime": row.session_datetime, "minutes": h * 60 + m, }) return idx def db_path_for_video(mp4_path: str) -> Path | None: """Tracker writes _tracking.db under TRACKING_OUTPUT_DIR.""" stem = Path(mp4_path).stem db = TRACKING_OUTPUT_DIR / f"{stem}_tracking.db" return db if db.exists() else None _TIME_TOLERANCE_MIN = 90 # xlsx labels are approximate ("11AM" → 10:51 is fine) def resolve_session( machine_name: str, when: str, fallback_date: str | None, index: dict[tuple[str, str], list[dict]], ) -> tuple[str, str]: """Look up the video + db whose start time is closest to `when`. Match strategy: 1. Use the date embedded in `when` (training/testing can fall on a different calendar day from the row's ``date`` column). 2. If no candidates exist for that date, fall back to ``fallback_date`` (the xlsx row's ``date`` column). Reason: the xlsx contains date typos like '20240110_11AM' for an Oct 1 experiment. Among candidates, pick the video whose start minute is closest to the xlsx-claimed time, within ±_TIME_TOLERANCE_MIN. """ parsed = parse_xlsx_time(when) if parsed is None: return "", "" date, target_min = parsed candidates = index.get((date, machine_name), []) if not candidates and fallback_date: candidates = index.get((fallback_date, machine_name), []) if not candidates: return "", "" def _gap(target: int, c: dict) -> int: # Reason: xlsx times like '1230AM' are ambiguous (12 AM vs 12 PM). # We try both the literal time AND a +12-hour shift, picking the # interpretation that brings us closest to a real session. return min(abs(c["minutes"] - target), abs(c["minutes"] - (target + 720) % 1440)) best = min(candidates, key=lambda c: _gap(target_min, c)) if _gap(target_min, best) > _TIME_TOLERANCE_MIN: return "", "" db = db_path_for_video(best["mp4_path"]) return best["mp4_path"], (str(db) if db else "") # Variants of "naive" the xlsx has accumulated: 'naïve', 'niave', plus # trailing whitespace. All collapse to a single canonical 'naive'. _MALE_NAIVE_VARIANTS = {"naïve", "niave", "naive"} def _normalize_metadata(df: pd.DataFrame) -> None: """Strip whitespace and canonicalize the ``male`` column in place.""" for col in df.select_dtypes(include=("object", "string")).columns: df[col] = df[col].astype(str).str.strip() df["male"] = df["male"].apply( lambda v: "naive" if v.lower() in _MALE_NAIVE_VARIANTS else v ) def main() -> None: parser = argparse.ArgumentParser(description=__doc__) parser.add_argument( "--out", type=Path, default=VIDEO_INFO_TSV, help="output TSV path (default: alongside the xlsx)", ) args = parser.parse_args() inv = pd.read_csv(INVENTORY_CSV) inv = inv[inv["in_xlsx"]].copy() index = build_session_index(inv) df = pd.read_excel(VIDEO_INFO_XLSX) _normalize_metadata(df) date_iso = pd.to_datetime(df["date"]).dt.strftime("%Y-%m-%d") train_videos, train_dbs, test_videos, test_dbs = [], [], [], [] for fallback, row in zip(date_iso, df.itertuples(index=False)): tv, td = resolve_session(row.machine_name, row.training_date_time, fallback, index) sv, sd = resolve_session(row.machine_name, row.testing_date_time, fallback, index) train_videos.append(tv) train_dbs.append(td) test_videos.append(sv) test_dbs.append(sd) df["training_video_path"] = train_videos df["training_db_path"] = train_dbs df["testing_video_path"] = test_videos df["testing_db_path"] = test_dbs # Reason: an analyst flag for excluding individual fly/session rows that # turn out to be too noisy or otherwise unusable. Default True; flip to # False directly in the TSV to drop a row from analysis without having # to delete it. load_roi_data() respects this flag. df["include"] = True df.to_csv(args.out, sep="\t", index=False) n_rows = len(df) n_train_video = sum(bool(v) for v in train_videos) n_train_db = sum(bool(v) for v in train_dbs) n_test_video = sum(bool(v) for v in test_videos) n_test_db = sum(bool(v) for v in test_dbs) print(f"wrote {args.out} ({n_rows} rows)") print(f" training: {n_train_video} with video, {n_train_db} with DB") print(f" testing: {n_test_video} with video, {n_test_db} with DB") if __name__ == "__main__": main()