import pandas as pd import numpy as np from scipy.spatial.distance import euclidean from config import DATA_PROCESSED def calculate_fly_distances(trained_file=None, untrained_file=None): """Calculate distances between flies at each time point. For each time point: - If two flies are detected: calculate Cartesian distance between them - If one fly is detected: set distance to 0 if area > average area, otherwise NaN Args: trained_file (Path): Path to trained ROI data CSV. untrained_file (Path): Path to untrained ROI data CSV. Returns: tuple: (trained_distances, untrained_distances) DataFrames. """ if trained_file is None: trained_file = DATA_PROCESSED / 'trained_roi_data.csv' if untrained_file is None: untrained_file = DATA_PROCESSED / 'untrained_roi_data.csv' trained_df = pd.read_csv(trained_file) untrained_df = pd.read_csv(untrained_file) trained_df['area'] = trained_df['w'] * trained_df['h'] untrained_df['area'] = untrained_df['w'] * untrained_df['h'] avg_area = np.mean([trained_df['area'].mean(), untrained_df['area'].mean()]) print(f"Average area across all data: {avg_area:.2f}") trained_distances = process_distance_data(trained_df, avg_area) untrained_distances = process_distance_data(untrained_df, avg_area) return trained_distances, untrained_distances def process_distance_data(df, avg_area): """Process a DataFrame to calculate distances between flies at each time point. Args: df (pd.DataFrame): Input tracking data. avg_area (float): Average area threshold for single-fly detection. Returns: pd.DataFrame: Distance data with columns for machine, ROI, time, distance. """ results = [] for (machine_name, roi), group in df.groupby(['machine_name', 'ROI']): for t, time_group in group.groupby('t'): time_group = time_group.sort_values('id').reset_index(drop=True) if len(time_group) >= 2: fly1 = time_group.iloc[0] fly2 = time_group.iloc[1] distance = euclidean([fly1['x'], fly1['y']], [fly2['x'], fly2['y']]) results.append({ 'machine_name': machine_name, 'ROI': roi, 't': t, 'distance': distance, 'n_flies': len(time_group), 'area_fly1': fly1['area'], 'area_fly2': fly2['area'] }) elif len(time_group) == 1: fly = time_group.iloc[0] area = fly['area'] if area > avg_area: distance = 0.0 else: distance = np.nan results.append({ 'machine_name': machine_name, 'ROI': roi, 't': t, 'distance': distance, 'n_flies': 1, 'area_fly1': area, 'area_fly2': np.nan }) return pd.DataFrame(results) def main(): """Run distance calculations and save results.""" trained_distances, untrained_distances = calculate_fly_distances() print(f"Trained data distance summary:") print(f" Shape: {trained_distances.shape}") print(f" Distance stats:") print(f" Count: {trained_distances['distance'].count()}") print(f" Mean: {trained_distances['distance'].mean():.2f}") print(f" Std: {trained_distances['distance'].std():.2f}") print(f"\nUntrained data distance summary:") print(f" Shape: {untrained_distances.shape}") print(f" Distance stats:") print(f" Count: {untrained_distances['distance'].count()}") print(f" Mean: {untrained_distances['distance'].mean():.2f}") print(f" Std: {untrained_distances['distance'].std():.2f}") trained_distances.to_csv(DATA_PROCESSED / 'trained_distances.csv', index=False) untrained_distances.to_csv(DATA_PROCESSED / 'untrained_distances.csv', index=False) print("\nDistance data saved") if __name__ == "__main__": main()