cupido/scripts/export_video_db_index.py
Giorgio Gilestro 53b45e373b Dedupe + canonicalise the merged xlsx, then guard the export
108 of 508 rows in all_video_info_merged.xlsx were duplicates left over
from merging multiple source spreadsheets — same (date, machine, ROI)
appearing under two source_date values, identical data otherwise. The
`male` column was also using a mix of variants ('naïve', 'niave',
'naive', 'trained') with the canonical 'naive' a minority of 12/200.

scripts/cleanup_xlsx.py
    Idempotent one-off: backs up the xlsx, dedupes preferring the row
    whose source_date matches the experiment date, normalises `male`
    spellings, strips whitespace from string columns. Re-running on a
    clean file is a no-op.

scripts/export_video_db_index.py
    New _validate_xlsx() runs first thing in main() and aborts the
    export with an actionable error if duplicates or non-canonical
    male values are present. Prevents silent regressions when the
    xlsx is edited or re-merged in the future.

Result: TSV is now 400 rows (was 508), exactly 200 trained / 200
naive, no duplicates.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-01 13:39:57 +01:00

221 lines
8.3 KiB
Python

"""Augment all_video_info_merged.xlsx with the input video + tracking DB paths.
Each xlsx row represents one fly (date, machine_name, ROI), observed across a
training session and a testing session. We resolve those two sessions to the
on-disk video files (via the inventory CSV) and to their tracking DBs (under
TRACKING_OUTPUT_DIR), then write the result as TSV.
Output columns added:
training_video_path, training_db_path,
testing_video_path, testing_db_path
Empty values mean either no video matched (rare — implies missing inventory
entry) or no DB exists yet (e.g. the one video the completeness gate
rejected).
Usage:
python export_video_db_index.py
python export_video_db_index.py --out path/to/output.tsv
"""
from __future__ import annotations
import argparse
import re
from pathlib import Path
import pandas as pd
from config import INVENTORY_CSV, TRACKING_OUTPUT_DIR, VIDEO_INFO_TSV, VIDEO_INFO_XLSX
_TIME_RE = re.compile(r"^(\d{8})_(\d{1,2})(\d{2})?(AM|PM)$", re.IGNORECASE)
def parse_xlsx_time(value: str) -> tuple[str, int] | None:
"""Convert '20241021_11AM' / '20240918_1030AM' to (YYYY-MM-DD, minutes24).
Resolution is hour-only when no minutes are given (e.g. '11AM' → 11:00).
Returns minutes-from-midnight so we can do nearest-neighbor matching.
"""
if not isinstance(value, str):
return None
m = _TIME_RE.match(value.strip())
if not m:
return None
ymd, hh, mm, ampm = m.groups()
date = f"{ymd[:4]}-{ymd[4:6]}-{ymd[6:8]}"
hour = int(hh)
minute = int(mm) if mm else 0
if ampm.upper() == "PM" and hour != 12:
hour += 12
if ampm.upper() == "AM" and hour == 12:
hour = 0
return date, hour * 60 + minute
def build_session_index(inventory: pd.DataFrame) -> dict[tuple[str, str], list[dict]]:
"""Index inventory rows by (date, machine_name) → list of session dicts."""
idx: dict[tuple[str, str], list[dict]] = {}
has_duration = "duration_s" in inventory.columns
for row in inventory.itertuples(index=False):
h, m, _s = (int(p) for p in str(row.session_time).split("-"))
key = (row.session_date, row.machine_name)
idx.setdefault(key, []).append({
"mp4_path": row.mp4_path,
"session_datetime": row.session_datetime,
"minutes": h * 60 + m,
"duration_s": float(row.duration_s) if has_duration and pd.notna(row.duration_s) else None,
})
return idx
def db_path_for_video(mp4_path: str) -> Path | None:
"""Tracker writes <video_stem>_tracking.db under TRACKING_OUTPUT_DIR."""
stem = Path(mp4_path).stem
db = TRACKING_OUTPUT_DIR / f"{stem}_tracking.db"
return db if db.exists() else None
_TIME_TOLERANCE_MIN = 90 # xlsx labels are approximate ("11AM" → 10:51 is fine)
def resolve_session(
machine_name: str,
when: str,
fallback_date: str | None,
index: dict[tuple[str, str], list[dict]],
) -> tuple[str, str, float | None]:
"""Look up the video + db whose start time is closest to `when`.
Match strategy:
1. Use the date embedded in `when` (training/testing can fall on a
different calendar day from the row's ``date`` column).
2. If no candidates exist for that date, fall back to ``fallback_date``
(the xlsx row's ``date`` column). Reason: the xlsx contains
date typos like '20240110_11AM' for an Oct 1 experiment.
Among candidates, pick the video whose start minute is closest to the
xlsx-claimed time, within ±_TIME_TOLERANCE_MIN.
Returns (mp4_path, db_path, duration_s) — empty strings / None if no match.
"""
parsed = parse_xlsx_time(when)
if parsed is None:
return "", "", None
date, target_min = parsed
candidates = index.get((date, machine_name), [])
if not candidates and fallback_date:
candidates = index.get((fallback_date, machine_name), [])
if not candidates:
return "", "", None
def _gap(target: int, c: dict) -> int:
# Reason: xlsx times like '1230AM' are ambiguous (12 AM vs 12 PM).
# We try both the literal time AND a +12-hour shift, picking the
# interpretation that brings us closest to a real session.
return min(abs(c["minutes"] - target), abs(c["minutes"] - (target + 720) % 1440))
best = min(candidates, key=lambda c: _gap(target_min, c))
if _gap(target_min, best) > _TIME_TOLERANCE_MIN:
return "", "", None
db = db_path_for_video(best["mp4_path"])
return best["mp4_path"], (str(db) if db else ""), best.get("duration_s")
# Variants of "naive" the xlsx has accumulated: 'naïve', 'niave', plus
# trailing whitespace. All collapse to a single canonical 'naive'.
_MALE_NAIVE_VARIANTS = {"naïve", "niave", "naive"}
def _validate_xlsx(df: pd.DataFrame) -> None:
"""Refuse to export if the xlsx has duplicates or non-canonical values.
The export pipeline assumes one row per (date, machine_name, roi). If
that ever stops being true (e.g. a future merge re-introduces dupes),
every downstream count silently doubles. Catch it at the source.
"""
key = ["date", "machine_name", "roi"]
dupes = df[df.duplicated(subset=key, keep=False)]
if not dupes.empty:
n_unique = df[key].drop_duplicates().shape[0]
sample = dupes.head(4)[["date", "machine_name", "roi", "source_date"]]
raise SystemExit(
f"\n ERROR: xlsx has {len(dupes)} duplicate rows "
f"({len(df)} total, {n_unique} unique on {key}).\n"
f" Sample:\n{sample.to_string(index=False)}\n"
f" Run scripts/cleanup_xlsx.py to fix.\n"
)
bad_male = sorted(set(df["male"].dropna().astype(str).str.strip().unique())
- {"naive", "trained"})
if bad_male:
raise SystemExit(
f"\n ERROR: xlsx `male` column has non-canonical values: {bad_male}\n"
f" Expected only 'trained' and 'naive'.\n"
f" Run scripts/cleanup_xlsx.py to fix.\n"
)
def _normalize_metadata(df: pd.DataFrame) -> None:
"""Strip whitespace and canonicalize the ``male`` column in place."""
for col in df.select_dtypes(include=("object", "string")).columns:
df[col] = df[col].astype(str).str.strip()
df["male"] = df["male"].apply(
lambda v: "naive" if v.lower() in _MALE_NAIVE_VARIANTS else v
)
def main() -> None:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument(
"--out",
type=Path,
default=VIDEO_INFO_TSV,
help="output TSV path (default: alongside the xlsx)",
)
args = parser.parse_args()
inv = pd.read_csv(INVENTORY_CSV)
inv = inv[inv["in_xlsx"]].copy()
index = build_session_index(inv)
df = pd.read_excel(VIDEO_INFO_XLSX)
_validate_xlsx(df)
_normalize_metadata(df)
date_iso = pd.to_datetime(df["date"]).dt.strftime("%Y-%m-%d")
train_videos, train_dbs, train_durs = [], [], []
test_videos, test_dbs, test_durs = [], [], []
for fallback, row in zip(date_iso, df.itertuples(index=False)):
tv, td, tdur = resolve_session(row.machine_name, row.training_date_time, fallback, index)
sv, sd, sdur = resolve_session(row.machine_name, row.testing_date_time, fallback, index)
train_videos.append(tv); train_dbs.append(td); train_durs.append(tdur)
test_videos.append(sv); test_dbs.append(sd); test_durs.append(sdur)
df["training_video_path"] = train_videos
df["training_db_path"] = train_dbs
df["training_video_duration_s"] = train_durs
df["testing_video_path"] = test_videos
df["testing_db_path"] = test_dbs
df["testing_video_duration_s"] = test_durs
# Reason: an analyst flag for excluding individual fly/session rows that
# turn out to be too noisy or otherwise unusable. Default True; flip to
# False directly in the TSV to drop a row from analysis without having
# to delete it. load_roi_data() respects this flag.
df["include"] = True
df.to_csv(args.out, sep="\t", index=False)
n_rows = len(df)
n_train_video = sum(bool(v) for v in train_videos)
n_train_db = sum(bool(v) for v in train_dbs)
n_test_video = sum(bool(v) for v in test_videos)
n_test_db = sum(bool(v) for v in test_dbs)
print(f"wrote {args.out} ({n_rows} rows)")
print(f" training: {n_train_video} with video, {n_train_db} with DB")
print(f" testing: {n_test_video} with video, {n_test_db} with DB")
if __name__ == "__main__":
main()