import pandas as pd import numpy as np import matplotlib.pyplot as plt import seaborn as sns from scipy import stats from sklearn.cluster import KMeans from sklearn.preprocessing import StandardScaler from sklearn.metrics import silhouette_score import warnings warnings.filterwarnings('ignore') from config import DATA_PROCESSED, FIGURES def load_and_combine_data(): """Load and combine trained and untrained distance data. Returns: pd.DataFrame: Combined distance data with group labels. """ trained_distances = pd.read_csv(DATA_PROCESSED / 'trained_distances.csv') untrained_distances = pd.read_csv(DATA_PROCESSED / 'untrained_distances.csv') trained_distances['group'] = 'trained' untrained_distances['group'] = 'untrained' combined_data = pd.concat([trained_distances, untrained_distances], ignore_index=True) combined_data = combined_data.dropna(subset=['distance']) print(f"Combined data shape: {combined_data.shape}") print(f"Trained samples: {len(combined_data[combined_data['group'] == 'trained'])}") print(f"Untrained samples: {len(combined_data[combined_data['group'] == 'untrained'])}") return combined_data def basic_statistics(combined_data): """Perform basic statistical analysis. Args: combined_data (pd.DataFrame): Combined distance data. """ print("\n=== BASIC STATISTICS ===") for group in ['trained', 'untrained']: group_data = combined_data[combined_data['group'] == group]['distance'] print(f"\n{group.capitalize()} flies:") print(f" Count: {len(group_data)}") print(f" Mean distance: {group_data.mean():.2f}") print(f" Std distance: {group_data.std():.2f}") print(f" Median distance: {group_data.median():.2f}") print(f" Min distance: {group_data.min():.2f}") print(f" Max distance: {group_data.max():.2f}") trained_dist = combined_data[combined_data['group'] == 'trained']['distance'] untrained_dist = combined_data[combined_data['group'] == 'untrained']['distance'] t_stat, p_value = stats.ttest_ind(trained_dist, untrained_dist) print(f"\nT-test between groups:") print(f" T-statistic: {t_stat:.4f}") print(f" P-value: {p_value:.2e}") pooled_std = np.sqrt(((len(trained_dist)-1)*trained_dist.std()**2 + (len(untrained_dist)-1)*untrained_dist.std()**2) / (len(trained_dist) + len(untrained_dist) - 2)) cohens_d = (trained_dist.mean() - untrained_dist.mean()) / pooled_std print(f" Cohen's d (effect size): {cohens_d:.4f}") def distance_distribution_analysis(combined_data): """Analyze distance distributions and create plots. Args: combined_data (pd.DataFrame): Combined distance data. """ print("\n=== DISTANCE DISTRIBUTION ANALYSIS ===") fig, axes = plt.subplots(2, 2, figsize=(15, 10)) fig.suptitle('Distance Distribution Analysis', fontsize=16) axes[0, 0].hist(combined_data[combined_data['group'] == 'trained']['distance'], alpha=0.7, label='Trained', bins=50, density=True) axes[0, 0].hist(combined_data[combined_data['group'] == 'untrained']['distance'], alpha=0.7, label='Untrained', bins=50, density=True) axes[0, 0].set_xlabel('Distance') axes[0, 0].set_ylabel('Density') axes[0, 0].set_title('Distance Distribution by Group') axes[0, 0].legend() combined_data.boxplot(column='distance', by='group', ax=axes[0, 1]) axes[0, 1].set_title('Distance Box Plot by Group') axes[0, 1].set_xlabel('Group') axes[0, 1].set_ylabel('Distance') trained_dist = combined_data[combined_data['group'] == 'trained']['distance'] untrained_dist = combined_data[combined_data['group'] == 'untrained']['distance'] trained_sorted = np.sort(trained_dist) untrained_sorted = np.sort(untrained_dist) trained_cumulative = np.arange(1, len(trained_sorted) + 1) / len(trained_sorted) untrained_cumulative = np.arange(1, len(untrained_sorted) + 1) / len(untrained_sorted) axes[1, 0].plot(trained_sorted, trained_cumulative, label='Trained', alpha=0.7) axes[1, 0].plot(untrained_sorted, untrained_cumulative, label='Untrained', alpha=0.7) axes[1, 0].set_xlabel('Distance') axes[1, 0].set_ylabel('Cumulative Probability') axes[1, 0].set_title('Cumulative Distribution of Distances') axes[1, 0].legend() sns.violinplot(data=combined_data, x='group', y='distance', ax=axes[1, 1]) axes[1, 1].set_title('Distance Violin Plot by Group') axes[1, 1].set_xlabel('Group') axes[1, 1].set_ylabel('Distance') plt.tight_layout() plt.savefig(FIGURES / 'distance_analysis.png', dpi=300, bbox_inches='tight') plt.show() print("Distance distribution plots saved") def clustering_analysis(combined_data): """Perform clustering analysis on distance data. Args: combined_data (pd.DataFrame): Combined distance data. Returns: tuple: (clustered_data, kmeans_model, scaler). """ print("\n=== CLUSTERING ANALYSIS ===") features = ['distance', 'n_flies', 'area_fly1', 'area_fly2'] X = combined_data[features].dropna() scaler = StandardScaler() X_scaled = scaler.fit_transform(X) k_range = range(2, 6) inertias = [] sil_scores = [] for k in k_range: kmeans = KMeans(n_clusters=k, random_state=42, n_init=10) kmeans.fit(X_scaled) inertias.append(kmeans.inertia_) sil_scores.append(silhouette_score(X_scaled, kmeans.labels_)) fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 5)) ax1.plot(k_range, inertias, 'bo-') ax1.set_xlabel('Number of Clusters (k)') ax1.set_ylabel('Inertia') ax1.set_title('Elbow Method for Optimal k') ax2.plot(k_range, sil_scores, 'ro-') ax2.set_xlabel('Number of Clusters (k)') ax2.set_ylabel('Silhouette Score') ax2.set_title('Silhouette Score for Different k') plt.tight_layout() plt.savefig(FIGURES / 'clustering_analysis.png', dpi=300, bbox_inches='tight') plt.show() optimal_k = 2 kmeans = KMeans(n_clusters=optimal_k, random_state=42, n_init=10) cluster_labels = kmeans.fit_predict(X_scaled) X_clustered = X.copy() X_clustered['cluster'] = cluster_labels X_clustered['actual_group'] = combined_data.loc[X_clustered.index, 'group'].values confusion = pd.crosstab(X_clustered['cluster'], X_clustered['actual_group']) print(f"Clustering results (k={optimal_k}):") print(confusion) c0t = len(X_clustered[(X_clustered['cluster'] == 0) & (X_clustered['actual_group'] == 'trained')]) c0u = len(X_clustered[(X_clustered['cluster'] == 0) & (X_clustered['actual_group'] == 'untrained')]) c1t = len(X_clustered[(X_clustered['cluster'] == 1) & (X_clustered['actual_group'] == 'trained')]) c1u = len(X_clustered[(X_clustered['cluster'] == 1) & (X_clustered['actual_group'] == 'untrained')]) accuracy = max((c0t + c1u) / len(X_clustered), (c0u + c1t) / len(X_clustered)) print(f"\nClustering accuracy: {accuracy:.4f}") print("\nCluster characteristics:") for i in range(optimal_k): cluster_data = X_clustered[X_clustered['cluster'] == i] print(f"\nCluster {i}:") print(f" Size: {len(cluster_data)}") print(f" Distance - Mean: {cluster_data['distance'].mean():.2f}, Std: {cluster_data['distance'].std():.2f}") print(f" N_flies - Mean: {cluster_data['n_flies'].mean():.2f}") print(f" Area_fly1 - Mean: {cluster_data['area_fly1'].mean():.2f}") return X_clustered, kmeans, scaler def simple_classification_rule(combined_data): """Create a simple rule-based classifier. Args: combined_data (pd.DataFrame): Combined distance data. """ print("\n=== SIMPLE RULE-BASED CLASSIFICATION ===") clean_data = combined_data.dropna(subset=['distance']) thresholds = np.percentile(clean_data['distance'], [25, 50, 75]) print(f"Distance percentiles: 25%={thresholds[0]:.2f}, 50%={thresholds[1]:.2f}, 75%={thresholds[2]:.2f}") for threshold in thresholds: predictions = ['trained' if d > threshold else 'untrained' for d in clean_data['distance']] actual = clean_data['group'] accuracy = np.mean([p == a for p, a in zip(predictions, actual)]) tp = sum([p == 'trained' and a == 'trained' for p, a in zip(predictions, actual)]) tn = sum([p == 'untrained' and a == 'untrained' for p, a in zip(predictions, actual)]) fp = sum([p == 'trained' and a == 'untrained' for p, a in zip(predictions, actual)]) fn = sum([p == 'untrained' and a == 'trained' for p, a in zip(predictions, actual)]) sensitivity = tp / (tp + fn) if (tp + fn) > 0 else 0 specificity = tn / (tn + fp) if (tn + fp) > 0 else 0 print(f"\nThreshold = {threshold:.2f}:") print(f" Accuracy: {accuracy:.4f}") print(f" Sensitivity: {sensitivity:.4f}, Specificity: {specificity:.4f}") def main(): """Run the full distance analysis pipeline.""" combined_data = load_and_combine_data() basic_statistics(combined_data) distance_distribution_analysis(combined_data) clustered_data, kmeans_model, scaler = clustering_analysis(combined_data) simple_classification_rule(combined_data) clustered_data.to_csv(DATA_PROCESSED / 'clustered_distance_data.csv', index=False) print("\n=== ANALYSIS COMPLETE ===") if __name__ == "__main__": main()