Unify analysis pipeline around the TSV; move tracked DBs out of cloud sync
- Tracked DBs now live at /mnt/data/projects/cupido/tracked/ (out of
ownCloud to avoid sync conflicts and bandwidth churn). config.py
TRACKING_OUTPUT_DIR points there; the docker-compose for ethoscope-lab
mounts it world-readable for JupyterHub users.
- New scripts/export_video_db_index.py joins all_video_info_merged.xlsx
with the video inventory and the on-disk DBs, producing a TSV that has
one row per fly/ROI plus training/testing video and DB paths. Handles
approximate xlsx times, cross-day training/testing, the 12 AM/PM
ambiguity, and date typos.
- scripts/load_roi_data.py rewritten as a TSV-driven loader returning a
single DataFrame with session and metadata columns. calculate_distances
and the two flies_analysis notebooks migrated to use it; downstream
trained/naive splits remain available via simple equality filters.
- Metadata vocabulary canonicalized: {naïve, niave, untrained, test} all
resolve to {trained, naive}. Normalization happens at the TSV-export
boundary (idempotent); the xlsx and the 2025-07-15 legacy CSV were
edited in place to remove the worst variants.
- scripts/monitor_tracking.py rate calculation fixed: with N parallel
workers, completions arrive in bursts; the old formula divided by burst
width and reported nonsense rates. Now uses a 6 h window denominator.
- scripts/track_videos.py: BGRMovieCamera retries cv2.read on transient
NFS hiccups and a post-tracking completeness gate (≥ 90 % of expected
duration via MAX(t) across all 6 ROIs) deletes silent partial DBs.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
parent
e4da7691d5
commit
f60a9d0530
13 changed files with 569 additions and 237 deletions
|
|
@ -1,117 +1,99 @@
|
|||
import pandas as pd
|
||||
"""Compute per-frame inter-fly distances for every (date, machine, ROI, session).
|
||||
|
||||
Reads tracking data via :func:`load_roi_data.load_roi_data` (which is driven
|
||||
by ``all_video_info_merged.tsv``) and produces one distances DataFrame
|
||||
spanning every fly/session in the batch. Group membership (``trained`` /
|
||||
``untrained``) is preserved from the ``male`` column.
|
||||
"""
|
||||
|
||||
import numpy as np
|
||||
import pandas as pd
|
||||
from scipy.spatial.distance import euclidean
|
||||
|
||||
from config import DATA_PROCESSED
|
||||
from load_roi_data import load_roi_data
|
||||
|
||||
|
||||
def calculate_fly_distances(trained_file=None, untrained_file=None):
|
||||
"""Calculate distances between flies at each time point.
|
||||
def calculate_fly_distances(data: pd.DataFrame | None = None) -> pd.DataFrame:
|
||||
"""Compute inter-fly distances over time for every fly/session.
|
||||
|
||||
For each time point:
|
||||
- If two flies are detected: calculate Cartesian distance between them
|
||||
- If one fly is detected: set distance to 0 if area > average area, otherwise NaN
|
||||
For each time point inside one (date, machine, ROI, session) trajectory:
|
||||
- 2+ flies detected: Euclidean distance between the first two by id
|
||||
- 1 fly detected: distance = 0 if its bbox area exceeds the global
|
||||
mean (likely a single blob containing both flies), else NaN
|
||||
|
||||
Args:
|
||||
trained_file (Path): Path to trained ROI data CSV.
|
||||
untrained_file (Path): Path to untrained ROI data CSV.
|
||||
data: optional pre-loaded DataFrame from :func:`load_roi_data`. If
|
||||
None, the full batch is loaded.
|
||||
|
||||
Returns:
|
||||
tuple: (trained_distances, untrained_distances) DataFrames.
|
||||
DataFrame with one row per (track, time) pair, including ``distance``,
|
||||
``n_flies``, ``area_fly1``, ``area_fly2``, plus the metadata columns
|
||||
propagated from the source row (``date``, ``machine_name``, ``ROI``,
|
||||
``session``, ``male``, ``species``, ``memory``, ``age``).
|
||||
"""
|
||||
if trained_file is None:
|
||||
trained_file = DATA_PROCESSED / 'trained_roi_data.csv'
|
||||
if untrained_file is None:
|
||||
untrained_file = DATA_PROCESSED / 'untrained_roi_data.csv'
|
||||
if data is None:
|
||||
data = load_roi_data()
|
||||
if data.empty:
|
||||
return pd.DataFrame()
|
||||
|
||||
trained_df = pd.read_csv(trained_file)
|
||||
untrained_df = pd.read_csv(untrained_file)
|
||||
|
||||
trained_df['area'] = trained_df['w'] * trained_df['h']
|
||||
untrained_df['area'] = untrained_df['w'] * untrained_df['h']
|
||||
|
||||
avg_area = np.mean([trained_df['area'].mean(), untrained_df['area'].mean()])
|
||||
data = data.copy()
|
||||
data["area"] = data["w"] * data["h"]
|
||||
avg_area = data["area"].mean()
|
||||
print(f"Average area across all data: {avg_area:.2f}")
|
||||
|
||||
trained_distances = process_distance_data(trained_df, avg_area)
|
||||
untrained_distances = process_distance_data(untrained_df, avg_area)
|
||||
# Carry these onto every output row (constant within a track).
|
||||
keep_meta = ["date", "machine_name", "ROI", "session", "male",
|
||||
"species", "memory", "age"]
|
||||
|
||||
return trained_distances, untrained_distances
|
||||
|
||||
|
||||
def process_distance_data(df, avg_area):
|
||||
"""Process a DataFrame to calculate distances between flies at each time point.
|
||||
|
||||
Args:
|
||||
df (pd.DataFrame): Input tracking data.
|
||||
avg_area (float): Average area threshold for single-fly detection.
|
||||
|
||||
Returns:
|
||||
pd.DataFrame: Distance data with columns for machine, ROI, time, distance.
|
||||
"""
|
||||
results = []
|
||||
|
||||
for (machine_name, roi), group in df.groupby(['machine_name', 'ROI']):
|
||||
for t, time_group in group.groupby('t'):
|
||||
time_group = time_group.sort_values('id').reset_index(drop=True)
|
||||
rows: list[dict] = []
|
||||
track_keys = ["date", "machine_name", "ROI", "session"]
|
||||
for track, track_df in data.groupby(track_keys, sort=False):
|
||||
meta_row = {k: v for k, v in zip(track_keys, track)}
|
||||
# Carry the rest of the metadata from any sample (constant per track).
|
||||
sample = track_df.iloc[0]
|
||||
for col in keep_meta:
|
||||
if col not in meta_row:
|
||||
meta_row[col] = sample[col]
|
||||
|
||||
for t, time_group in track_df.groupby("t", sort=False):
|
||||
time_group = time_group.sort_values("id").reset_index(drop=True)
|
||||
row = dict(meta_row)
|
||||
row["t"] = t
|
||||
if len(time_group) >= 2:
|
||||
fly1 = time_group.iloc[0]
|
||||
fly2 = time_group.iloc[1]
|
||||
distance = euclidean([fly1['x'], fly1['y']], [fly2['x'], fly2['y']])
|
||||
f1, f2 = time_group.iloc[0], time_group.iloc[1]
|
||||
row["distance"] = euclidean([f1["x"], f1["y"]], [f2["x"], f2["y"]])
|
||||
row["n_flies"] = len(time_group)
|
||||
row["area_fly1"] = f1["area"]
|
||||
row["area_fly2"] = f2["area"]
|
||||
else:
|
||||
f = time_group.iloc[0]
|
||||
row["distance"] = 0.0 if f["area"] > avg_area else np.nan
|
||||
row["n_flies"] = 1
|
||||
row["area_fly1"] = f["area"]
|
||||
row["area_fly2"] = np.nan
|
||||
rows.append(row)
|
||||
|
||||
results.append({
|
||||
'machine_name': machine_name,
|
||||
'ROI': roi,
|
||||
't': t,
|
||||
'distance': distance,
|
||||
'n_flies': len(time_group),
|
||||
'area_fly1': fly1['area'],
|
||||
'area_fly2': fly2['area']
|
||||
})
|
||||
elif len(time_group) == 1:
|
||||
fly = time_group.iloc[0]
|
||||
area = fly['area']
|
||||
|
||||
if area > avg_area:
|
||||
distance = 0.0
|
||||
else:
|
||||
distance = np.nan
|
||||
|
||||
results.append({
|
||||
'machine_name': machine_name,
|
||||
'ROI': roi,
|
||||
't': t,
|
||||
'distance': distance,
|
||||
'n_flies': 1,
|
||||
'area_fly1': area,
|
||||
'area_fly2': np.nan
|
||||
})
|
||||
|
||||
return pd.DataFrame(results)
|
||||
return pd.DataFrame(rows)
|
||||
|
||||
|
||||
def main():
|
||||
"""Run distance calculations and save results."""
|
||||
trained_distances, untrained_distances = calculate_fly_distances()
|
||||
def main() -> None:
|
||||
distances = calculate_fly_distances()
|
||||
|
||||
print(f"Trained data distance summary:")
|
||||
print(f" Shape: {trained_distances.shape}")
|
||||
print(f" Distance stats:")
|
||||
print(f" Count: {trained_distances['distance'].count()}")
|
||||
print(f" Mean: {trained_distances['distance'].mean():.2f}")
|
||||
print(f" Std: {trained_distances['distance'].std():.2f}")
|
||||
print("\nDistance summary:")
|
||||
print(f" Shape: {distances.shape}")
|
||||
if not distances.empty:
|
||||
print(f" Distance count: {distances['distance'].count()}")
|
||||
print(f" Distance mean: {distances['distance'].mean():.2f}")
|
||||
print(f" Distance std: {distances['distance'].std():.2f}")
|
||||
male = distances["male"]
|
||||
print(f" Trained tracks: {(male == 'trained').sum()}")
|
||||
print(f" Naive tracks: {(male == 'naive').sum()}")
|
||||
|
||||
print(f"\nUntrained data distance summary:")
|
||||
print(f" Shape: {untrained_distances.shape}")
|
||||
print(f" Distance stats:")
|
||||
print(f" Count: {untrained_distances['distance'].count()}")
|
||||
print(f" Mean: {untrained_distances['distance'].mean():.2f}")
|
||||
print(f" Std: {untrained_distances['distance'].std():.2f}")
|
||||
|
||||
trained_distances.to_csv(DATA_PROCESSED / 'trained_distances.csv', index=False)
|
||||
untrained_distances.to_csv(DATA_PROCESSED / 'untrained_distances.csv', index=False)
|
||||
print("\nDistance data saved")
|
||||
DATA_PROCESSED.mkdir(parents=True, exist_ok=True)
|
||||
out = DATA_PROCESSED / "distances.csv"
|
||||
distances.to_csv(out, index=False)
|
||||
print(f"\nSaved {out}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
|
|
|||
|
|
@ -13,5 +13,8 @@ VIDEOS_ROOT = Path("/mnt/ethoscope_data/videos")
|
|||
VIDEO_INFO_XLSX = PROJECT_ROOT.parent / "all_video_info_merged.xlsx"
|
||||
INVENTORY_CSV = DATA_METADATA / "video_inventory.csv"
|
||||
TARGETS_DIR = PROJECT_ROOT / "data" / "targets"
|
||||
TRACKING_OUTPUT_DIR = PROJECT_ROOT / "data" / "tracked"
|
||||
# Reason: tracking DBs are large binary files that don't belong in
|
||||
# ownCloud-synced storage (sync conflicts + bandwidth). They live on the
|
||||
# local data volume instead. Regenerable from videos + target JSONs.
|
||||
TRACKING_OUTPUT_DIR = Path("/mnt/data/projects/cupido/tracked")
|
||||
LOGS_DIR = PROJECT_ROOT / "data" / "logs"
|
||||
|
|
|
|||
181
scripts/export_video_db_index.py
Normal file
181
scripts/export_video_db_index.py
Normal file
|
|
@ -0,0 +1,181 @@
|
|||
"""Augment all_video_info_merged.xlsx with the input video + tracking DB paths.
|
||||
|
||||
Each xlsx row represents one fly (date, machine_name, ROI), observed across a
|
||||
training session and a testing session. We resolve those two sessions to the
|
||||
on-disk video files (via the inventory CSV) and to their tracking DBs (under
|
||||
TRACKING_OUTPUT_DIR), then write the result as TSV.
|
||||
|
||||
Output columns added:
|
||||
training_video_path, training_db_path,
|
||||
testing_video_path, testing_db_path
|
||||
|
||||
Empty values mean either no video matched (rare — implies missing inventory
|
||||
entry) or no DB exists yet (e.g. the one video the completeness gate
|
||||
rejected).
|
||||
|
||||
Usage:
|
||||
python export_video_db_index.py
|
||||
python export_video_db_index.py --out path/to/output.tsv
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import re
|
||||
from pathlib import Path
|
||||
|
||||
import pandas as pd
|
||||
|
||||
from config import INVENTORY_CSV, TRACKING_OUTPUT_DIR, VIDEO_INFO_XLSX
|
||||
|
||||
|
||||
_TIME_RE = re.compile(r"^(\d{8})_(\d{1,2})(\d{2})?(AM|PM)$", re.IGNORECASE)
|
||||
|
||||
|
||||
def parse_xlsx_time(value: str) -> tuple[str, int] | None:
|
||||
"""Convert '20241021_11AM' / '20240918_1030AM' to (YYYY-MM-DD, minutes24).
|
||||
|
||||
Resolution is hour-only when no minutes are given (e.g. '11AM' → 11:00).
|
||||
Returns minutes-from-midnight so we can do nearest-neighbor matching.
|
||||
"""
|
||||
if not isinstance(value, str):
|
||||
return None
|
||||
m = _TIME_RE.match(value.strip())
|
||||
if not m:
|
||||
return None
|
||||
ymd, hh, mm, ampm = m.groups()
|
||||
date = f"{ymd[:4]}-{ymd[4:6]}-{ymd[6:8]}"
|
||||
hour = int(hh)
|
||||
minute = int(mm) if mm else 0
|
||||
if ampm.upper() == "PM" and hour != 12:
|
||||
hour += 12
|
||||
if ampm.upper() == "AM" and hour == 12:
|
||||
hour = 0
|
||||
return date, hour * 60 + minute
|
||||
|
||||
|
||||
def build_session_index(inventory: pd.DataFrame) -> dict[tuple[str, str], list[dict]]:
|
||||
"""Index inventory rows by (date, machine_name) → list of session dicts."""
|
||||
idx: dict[tuple[str, str], list[dict]] = {}
|
||||
for row in inventory.itertuples(index=False):
|
||||
h, m, _s = (int(p) for p in str(row.session_time).split("-"))
|
||||
key = (row.session_date, row.machine_name)
|
||||
idx.setdefault(key, []).append({
|
||||
"mp4_path": row.mp4_path,
|
||||
"session_datetime": row.session_datetime,
|
||||
"minutes": h * 60 + m,
|
||||
})
|
||||
return idx
|
||||
|
||||
|
||||
def db_path_for_video(mp4_path: str) -> Path | None:
|
||||
"""Tracker writes <video_stem>_tracking.db under TRACKING_OUTPUT_DIR."""
|
||||
stem = Path(mp4_path).stem
|
||||
db = TRACKING_OUTPUT_DIR / f"{stem}_tracking.db"
|
||||
return db if db.exists() else None
|
||||
|
||||
|
||||
_TIME_TOLERANCE_MIN = 90 # xlsx labels are approximate ("11AM" → 10:51 is fine)
|
||||
|
||||
|
||||
def resolve_session(
|
||||
machine_name: str,
|
||||
when: str,
|
||||
fallback_date: str | None,
|
||||
index: dict[tuple[str, str], list[dict]],
|
||||
) -> tuple[str, str]:
|
||||
"""Look up the video + db whose start time is closest to `when`.
|
||||
|
||||
Match strategy:
|
||||
1. Use the date embedded in `when` (training/testing can fall on a
|
||||
different calendar day from the row's ``date`` column).
|
||||
2. If no candidates exist for that date, fall back to ``fallback_date``
|
||||
(the xlsx row's ``date`` column). Reason: the xlsx contains
|
||||
date typos like '20240110_11AM' for an Oct 1 experiment.
|
||||
|
||||
Among candidates, pick the video whose start minute is closest to the
|
||||
xlsx-claimed time, within ±_TIME_TOLERANCE_MIN.
|
||||
"""
|
||||
parsed = parse_xlsx_time(when)
|
||||
if parsed is None:
|
||||
return "", ""
|
||||
date, target_min = parsed
|
||||
candidates = index.get((date, machine_name), [])
|
||||
if not candidates and fallback_date:
|
||||
candidates = index.get((fallback_date, machine_name), [])
|
||||
if not candidates:
|
||||
return "", ""
|
||||
|
||||
def _gap(target: int, c: dict) -> int:
|
||||
# Reason: xlsx times like '1230AM' are ambiguous (12 AM vs 12 PM).
|
||||
# We try both the literal time AND a +12-hour shift, picking the
|
||||
# interpretation that brings us closest to a real session.
|
||||
return min(abs(c["minutes"] - target), abs(c["minutes"] - (target + 720) % 1440))
|
||||
|
||||
best = min(candidates, key=lambda c: _gap(target_min, c))
|
||||
if _gap(target_min, best) > _TIME_TOLERANCE_MIN:
|
||||
return "", ""
|
||||
db = db_path_for_video(best["mp4_path"])
|
||||
return best["mp4_path"], (str(db) if db else "")
|
||||
|
||||
|
||||
# Variants of "naive" the xlsx has accumulated: 'naïve', 'niave', plus
|
||||
# trailing whitespace. All collapse to a single canonical 'naive'.
|
||||
_MALE_NAIVE_VARIANTS = {"naïve", "niave", "naive"}
|
||||
|
||||
|
||||
def _normalize_metadata(df: pd.DataFrame) -> None:
|
||||
"""Strip whitespace and canonicalize the ``male`` column in place."""
|
||||
for col in df.select_dtypes(include=("object", "string")).columns:
|
||||
df[col] = df[col].astype(str).str.strip()
|
||||
df["male"] = df["male"].apply(
|
||||
lambda v: "naive" if v.lower() in _MALE_NAIVE_VARIANTS else v
|
||||
)
|
||||
|
||||
|
||||
def main() -> None:
|
||||
parser = argparse.ArgumentParser(description=__doc__)
|
||||
parser.add_argument(
|
||||
"--out",
|
||||
type=Path,
|
||||
default=VIDEO_INFO_XLSX.with_suffix(".tsv"),
|
||||
help="output TSV path (default: alongside the xlsx)",
|
||||
)
|
||||
args = parser.parse_args()
|
||||
|
||||
inv = pd.read_csv(INVENTORY_CSV)
|
||||
inv = inv[inv["in_xlsx"]].copy()
|
||||
index = build_session_index(inv)
|
||||
|
||||
df = pd.read_excel(VIDEO_INFO_XLSX)
|
||||
_normalize_metadata(df)
|
||||
date_iso = pd.to_datetime(df["date"]).dt.strftime("%Y-%m-%d")
|
||||
|
||||
train_videos, train_dbs, test_videos, test_dbs = [], [], [], []
|
||||
for fallback, row in zip(date_iso, df.itertuples(index=False)):
|
||||
tv, td = resolve_session(row.machine_name, row.training_date_time, fallback, index)
|
||||
sv, sd = resolve_session(row.machine_name, row.testing_date_time, fallback, index)
|
||||
train_videos.append(tv)
|
||||
train_dbs.append(td)
|
||||
test_videos.append(sv)
|
||||
test_dbs.append(sd)
|
||||
|
||||
df["training_video_path"] = train_videos
|
||||
df["training_db_path"] = train_dbs
|
||||
df["testing_video_path"] = test_videos
|
||||
df["testing_db_path"] = test_dbs
|
||||
|
||||
df.to_csv(args.out, sep="\t", index=False)
|
||||
|
||||
n_rows = len(df)
|
||||
n_train_video = sum(bool(v) for v in train_videos)
|
||||
n_train_db = sum(bool(v) for v in train_dbs)
|
||||
n_test_video = sum(bool(v) for v in test_videos)
|
||||
n_test_db = sum(bool(v) for v in test_dbs)
|
||||
print(f"wrote {args.out} ({n_rows} rows)")
|
||||
print(f" training: {n_train_video} with video, {n_train_db} with DB")
|
||||
print(f" testing: {n_test_video} with video, {n_test_db} with DB")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
|
@ -1,90 +1,113 @@
|
|||
import pandas as pd
|
||||
"""Load ROI tracking data from all sessions into one DataFrame.
|
||||
|
||||
Drives off the merged TSV (one row per ROI/fly across training + testing
|
||||
phases). For each TSV row, opens the corresponding tracking DB and pulls
|
||||
the matching ROI table, then attaches the experimental metadata.
|
||||
|
||||
The TSV is the single source of truth for what data exists and how it
|
||||
maps to flies and conditions.
|
||||
"""
|
||||
|
||||
import sqlite3
|
||||
import re
|
||||
from pathlib import Path
|
||||
|
||||
from config import DATA_RAW, DATA_METADATA, DATA_PROCESSED
|
||||
import pandas as pd
|
||||
|
||||
from config import VIDEO_INFO_XLSX
|
||||
|
||||
|
||||
def load_roi_data():
|
||||
"""Load ROI data from SQLite databases and group by trained/untrained.
|
||||
# Metadata columns to copy onto every tracking sample. These are the xlsx
|
||||
# fields that describe the experimental condition behind each fly/ROI.
|
||||
# Reason: the ROI column is uppercase ("ROI") for backwards compatibility
|
||||
# with the existing analysis pipeline (calculate_distances.py, notebooks).
|
||||
_META_COLS = (
|
||||
"date",
|
||||
"machine_name",
|
||||
"species",
|
||||
"male",
|
||||
"training_date_time",
|
||||
"testing_date_time",
|
||||
"training_length_hr",
|
||||
"consolidation_length_hr",
|
||||
"memory",
|
||||
"age",
|
||||
)
|
||||
|
||||
|
||||
def _open_ro(db_path: str, cache: dict) -> sqlite3.Connection | None:
|
||||
"""Cached read-only sqlite connection. Returns None on failure."""
|
||||
if not isinstance(db_path, str) or not db_path:
|
||||
return None
|
||||
if db_path not in cache:
|
||||
try:
|
||||
cache[db_path] = sqlite3.connect(f"file:{db_path}?mode=ro", uri=True)
|
||||
except sqlite3.Error as e:
|
||||
print(f"failed to open {Path(db_path).name}: {e}")
|
||||
cache[db_path] = None
|
||||
return cache[db_path]
|
||||
|
||||
|
||||
def load_roi_data(meta: pd.DataFrame | None = None) -> pd.DataFrame:
|
||||
"""Load ROI tracking data joined with experimental metadata.
|
||||
|
||||
For each row in ``meta``, reads the matching ROI table from both the
|
||||
training DB and the testing DB (whichever exist), and stamps every
|
||||
sample with the row's metadata plus a ``session`` column
|
||||
(``"training"`` or ``"testing"``). Rows with empty DB paths (unusable
|
||||
videos, or videos that didn't pass the completeness gate) are skipped.
|
||||
|
||||
Args:
|
||||
meta: optional DataFrame with the same schema as
|
||||
``all_video_info_merged.tsv``. Pass a filtered slice to load a
|
||||
subset (e.g. ``meta[meta.species == 'Melanogaster/CS']``).
|
||||
Defaults to the full TSV.
|
||||
|
||||
Returns:
|
||||
tuple: (trained_df, untrained_df) DataFrames with tracking data.
|
||||
DataFrame with columns ``id, t, x, y, w, h, phi, is_inferred,
|
||||
has_interacted, session, <metadata>`` — one row per tracking
|
||||
sample. Empty if nothing could be loaded.
|
||||
"""
|
||||
metadata = pd.read_csv(DATA_METADATA / '2025_07_15_metadata_fixed.csv')
|
||||
metadata['machine_name'] = metadata['machine_name'].astype(str)
|
||||
if meta is None:
|
||||
meta = pd.read_csv(VIDEO_INFO_XLSX.with_suffix(".tsv"), sep="\t")
|
||||
|
||||
trained_rois = metadata[metadata['group'] == 'trained']
|
||||
untrained_rois = metadata[metadata['group'] == 'untrained']
|
||||
db_cache: dict = {}
|
||||
chunks: list[pd.DataFrame] = []
|
||||
|
||||
db_files = list(DATA_RAW.glob('*_tracking.db'))
|
||||
|
||||
trained_df = pd.DataFrame()
|
||||
untrained_df = pd.DataFrame()
|
||||
|
||||
for db_file in db_files:
|
||||
print(f"Processing {db_file.name}")
|
||||
|
||||
pattern = r'_([0-9a-f]{32})__'
|
||||
match = re.search(pattern, db_file.name)
|
||||
|
||||
if not match:
|
||||
print(f"Could not extract UUID from {db_file.name}")
|
||||
continue
|
||||
|
||||
uuid = match.group(1)
|
||||
metadata_matches = metadata[metadata['path'].str.contains(uuid, na=False)]
|
||||
|
||||
if metadata_matches.empty:
|
||||
print(f"No metadata matches found for UUID {uuid} from {db_file.name}")
|
||||
continue
|
||||
|
||||
machine_id = metadata_matches.iloc[0]['machine_name']
|
||||
print(f"Matched to machine ID: {machine_id}")
|
||||
|
||||
conn = sqlite3.connect(str(db_file))
|
||||
|
||||
machine_trained = trained_rois[trained_rois['machine_name'] == machine_id]
|
||||
machine_untrained = untrained_rois[untrained_rois['machine_name'] == machine_id]
|
||||
|
||||
for _, row in machine_trained.iterrows():
|
||||
roi = row['ROI']
|
||||
for row in meta.itertuples(index=False):
|
||||
for session in ("training", "testing"):
|
||||
conn = _open_ro(getattr(row, f"{session}_db_path"), db_cache)
|
||||
if conn is None:
|
||||
continue
|
||||
try:
|
||||
query = f"SELECT * FROM ROI_{roi}"
|
||||
roi_data = pd.read_sql_query(query, conn)
|
||||
roi_data['machine_name'] = machine_id
|
||||
roi_data['ROI'] = roi
|
||||
roi_data['group'] = 'trained'
|
||||
trained_df = pd.concat([trained_df, roi_data], ignore_index=True)
|
||||
df = pd.read_sql_query(
|
||||
f"SELECT * FROM ROI_{int(row.roi)}", conn
|
||||
)
|
||||
except Exception as e:
|
||||
print(f"Error loading ROI_{roi} from {db_file.name}: {e}")
|
||||
# Reason: a DB may be missing a ROI table if tracking was
|
||||
# partial — skip rather than abort the whole batch.
|
||||
print(f" ROI_{row.roi} from {session} DB: {e}")
|
||||
continue
|
||||
df["session"] = session
|
||||
df["ROI"] = int(row.roi)
|
||||
for col in _META_COLS:
|
||||
df[col] = getattr(row, col)
|
||||
chunks.append(df)
|
||||
|
||||
for _, row in machine_untrained.iterrows():
|
||||
roi = row['ROI']
|
||||
try:
|
||||
query = f"SELECT * FROM ROI_{roi}"
|
||||
roi_data = pd.read_sql_query(query, conn)
|
||||
roi_data['machine_name'] = machine_id
|
||||
roi_data['ROI'] = roi
|
||||
roi_data['group'] = 'untrained'
|
||||
untrained_df = pd.concat([untrained_df, roi_data], ignore_index=True)
|
||||
except Exception as e:
|
||||
print(f"Error loading ROI_{roi} from {db_file.name}: {e}")
|
||||
for conn in db_cache.values():
|
||||
if conn is not None:
|
||||
conn.close()
|
||||
|
||||
conn.close()
|
||||
|
||||
return trained_df, untrained_df
|
||||
return pd.concat(chunks, ignore_index=True) if chunks else pd.DataFrame()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
trained_data, untrained_data = load_roi_data()
|
||||
print(f"Trained data shape: {trained_data.shape}")
|
||||
print(f"Untrained data shape: {untrained_data.shape}")
|
||||
if not trained_data.empty:
|
||||
print("Trained data columns:", trained_data.columns.tolist())
|
||||
if not untrained_data.empty:
|
||||
print("Untrained data columns:", untrained_data.columns.tolist())
|
||||
|
||||
trained_data.to_csv(DATA_PROCESSED / 'trained_roi_data.csv', index=False)
|
||||
untrained_data.to_csv(DATA_PROCESSED / 'untrained_roi_data.csv', index=False)
|
||||
print("Data saved to trained_roi_data.csv and untrained_roi_data.csv")
|
||||
data = load_roi_data()
|
||||
print(f"shape: {data.shape}")
|
||||
if not data.empty:
|
||||
print(f"columns: {list(data.columns)}")
|
||||
print(f"sessions: {data['session'].value_counts().to_dict()}")
|
||||
print(f"unique machines: {data['machine_name'].nunique()}")
|
||||
print(
|
||||
f"unique flies (date,machine,roi): "
|
||||
f"{data.groupby(['date','machine_name','roi']).ngroups}"
|
||||
)
|
||||
|
|
|
|||
|
|
@ -97,13 +97,32 @@ def snapshot() -> str:
|
|||
)
|
||||
lines.append(f" errors in log: {len(errors)}")
|
||||
|
||||
# Rate from the last 10 completions, when available.
|
||||
if len(history) >= 2:
|
||||
window = history[-min(10, len(history)) :]
|
||||
span = window[-1] - window[0]
|
||||
if span > 0:
|
||||
rate_per_hour = (len(window) - 1) / span * 3600
|
||||
lines.append(f" rate (last {len(window) - 1}): {rate_per_hour:.1f} videos/hour")
|
||||
# Rate from completions in the last 6 h — robust to gaps from killed /
|
||||
# restarted runs, while wide enough to span multiple parallel-worker
|
||||
# completion bursts. Reason: with 8 workers all started together on
|
||||
# multi-hour videos, completions arrive in tight bursts every ~video-
|
||||
# length apart; a 30-min window catches one burst and overestimates by
|
||||
# ~10×. 6 h spans at least one full burst cycle for typical videos.
|
||||
now_ts = time.time()
|
||||
window_secs = 6 * 3600
|
||||
recent = [t for t in history if t >= now_ts - window_secs]
|
||||
if len(recent) >= 2:
|
||||
# Reason: with N parallel workers, completions arrive in clumps
|
||||
# (all workers finish near-simultaneously). Dividing N by the *burst*
|
||||
# span gives nonsense rates. Use the full window as the denominator
|
||||
# once the batch has been running long enough to fill it; otherwise
|
||||
# use elapsed-since-first-DB. Detection: if every DB on disk also
|
||||
# falls inside the window, the batch is younger than the window.
|
||||
if len(recent) == len(history):
|
||||
elapsed = max(1.0, now_ts - history[0])
|
||||
else:
|
||||
elapsed = float(window_secs)
|
||||
if elapsed > 0:
|
||||
rate_per_hour = len(recent) / elapsed * 3600
|
||||
lines.append(
|
||||
f" rate (last {len(recent)} in {int(window_secs/3600)} h):"
|
||||
f" {rate_per_hour:.1f} videos/hour"
|
||||
)
|
||||
remaining = max(0, pickable - tracked)
|
||||
if rate_per_hour > 0 and remaining > 0:
|
||||
eta_sec = remaining * 3600 / rate_per_hour
|
||||
|
|
@ -112,6 +131,8 @@ def snapshot() -> str:
|
|||
f" ETA remaining: {fmt_duration(eta_sec)} "
|
||||
f"(done by {eta_at:%H:%M %a})"
|
||||
)
|
||||
else:
|
||||
lines.append(" rate: (warming up — check again in a few min)")
|
||||
|
||||
if last_mtime is not None and last_name is not None:
|
||||
ago = (datetime.now() - last_mtime).total_seconds()
|
||||
|
|
|
|||
|
|
@ -3,7 +3,7 @@
|
|||
Reads target JSONs produced by `pick_targets.py`, builds the 6 ROIs of the
|
||||
HD mating arena from the L-shape reference points, runs ethoscope's
|
||||
`MultiFlyTracker` against the merged.mp4 file via `MovieVirtualCamera`, and
|
||||
writes a SQLite DB to `data/tracked/<video_basename>_tracking.db`.
|
||||
writes a SQLite DB to `TRACKING_OUTPUT_DIR/<video_basename>_tracking.db`.
|
||||
|
||||
Idempotent: skips videos whose tracking DB already exists (unless --redo).
|
||||
|
||||
|
|
@ -58,17 +58,46 @@ def track_one(json_path: Path, output_dir: Path, max_duration: float | None,
|
|||
from ethoscope.io.sqlite import SQLiteResultWriter
|
||||
from ethoscope.trackers.multi_fly_tracker import MultiFlyTracker
|
||||
|
||||
class BGRMovieCamera(MovieVirtualCamera):
|
||||
"""MovieVirtualCamera variant that keeps BGR frames.
|
||||
import time as _time
|
||||
|
||||
MultiFlyTracker calls cv2.cvtColor(img, COLOR_BGR2GRAY) without checking
|
||||
whether img is already grayscale, so we must feed it 3-channel input.
|
||||
class BGRMovieCamera(MovieVirtualCamera):
|
||||
"""MovieVirtualCamera that keeps BGR frames AND retries on transient
|
||||
read failures.
|
||||
|
||||
Two reasons for the override:
|
||||
|
||||
1. MultiFlyTracker calls cv2.cvtColor(img, COLOR_BGR2GRAY) without
|
||||
checking whether img is already grayscale, so we must feed it
|
||||
3-channel input.
|
||||
|
||||
2. cv2.VideoCapture.read() can return False on transient I/O hiccups
|
||||
(NFS contention when 8 workers pull big mp4s in parallel) without
|
||||
the file actually being at EOF. A naive "False -> StopIteration"
|
||||
handling makes the tracker silently exit mid-video and write a
|
||||
short, lying DB. We retry a few times and only treat persistent
|
||||
failures within the *interior* of the video as real EOF.
|
||||
"""
|
||||
|
||||
_retry_count = 5
|
||||
_retry_backoff_s = 0.25
|
||||
_eof_safety_frames = 50 # near end-of-file, treat False as legitimate
|
||||
|
||||
def _next_image(self):
|
||||
ret, frame = self.capture.read()
|
||||
if not ret or frame is None:
|
||||
return None
|
||||
return frame # BGR, untouched
|
||||
for attempt in range(self._retry_count):
|
||||
ret, frame = self.capture.read()
|
||||
if ret and frame is not None:
|
||||
return frame # BGR, untouched
|
||||
# If we're near the genuine end of the file, accept it.
|
||||
if (
|
||||
self._has_end_of_file
|
||||
and self._frame_idx >= self._total_n_frames - self._eof_safety_frames
|
||||
):
|
||||
return None
|
||||
# Otherwise, this is a suspected transient hiccup — back off
|
||||
# and try again. The capture is still open; cv2 will pick up
|
||||
# the next decoded frame.
|
||||
_time.sleep(self._retry_backoff_s)
|
||||
return None # truly persistent failure
|
||||
|
||||
payload = json.loads(json_path.read_text())
|
||||
if payload.get("unusable"):
|
||||
|
|
@ -146,6 +175,42 @@ def track_one(json_path: Path, output_dir: Path, max_duration: float | None,
|
|||
|
||||
if not out_db.exists():
|
||||
return "error", "tracking finished but DB was not created"
|
||||
|
||||
# Post-tracking sanity check: did we cover most of the source video?
|
||||
# If not (cv2 retry exhausted, codec corruption, etc.), reject the DB so
|
||||
# it doesn't get cached as "done" — better an explicit failure than a
|
||||
# silent partial write.
|
||||
expected_ms = (cam._total_n_frames / 25.0) * 1000.0
|
||||
if max_duration is not None:
|
||||
expected_ms = min(expected_ms, max_duration * 1000.0)
|
||||
completeness_threshold = 0.90 # require ≥ 90 % of expected duration
|
||||
|
||||
# Use MAX(t) across all ROIs — a single ROI can run dry early if its fly
|
||||
# stops moving, so the latest detection anywhere in the arena is the
|
||||
# better signal of how far the iterator actually got.
|
||||
import sqlite3 as _sqlite3
|
||||
try:
|
||||
_con = _sqlite3.connect(f"file:{out_db}?mode=ro", uri=True)
|
||||
t_max = 0
|
||||
for _i in range(1, 7):
|
||||
_v = _con.execute(f"SELECT MAX(t) FROM ROI_{_i}").fetchone()[0]
|
||||
if _v and _v > t_max:
|
||||
t_max = _v
|
||||
_con.close()
|
||||
except Exception:
|
||||
t_max = 0
|
||||
|
||||
if expected_ms > 0 and t_max < expected_ms * completeness_threshold:
|
||||
out_db.unlink()
|
||||
for sidecar in (str(out_db) + "-wal", str(out_db) + "-shm"):
|
||||
Path(sidecar).unlink(missing_ok=True)
|
||||
ratio = t_max / expected_ms if expected_ms else 0
|
||||
return (
|
||||
"error",
|
||||
f"short output: t_max={t_max} ms vs expected {int(expected_ms)} ms "
|
||||
f"({ratio*100:.0f}%); DB removed",
|
||||
)
|
||||
|
||||
return "ok", str(out_db)
|
||||
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue