Reorganized flat 41-file directory into structured layout with: - scripts/ for Python analysis code with shared config.py - notebooks/ for Jupyter analysis notebooks - data/ split into raw/, metadata/, processed/ - docs/ with analysis summary, experimental design, and bimodal hypothesis tutorial - tasks/ with todo checklist and lessons learned - Comprehensive README, PLANNING.md, and .gitignore Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
240 lines
9.3 KiB
Python
240 lines
9.3 KiB
Python
import pandas as pd
|
|
import numpy as np
|
|
import matplotlib.pyplot as plt
|
|
import seaborn as sns
|
|
from scipy import stats
|
|
from sklearn.cluster import KMeans
|
|
from sklearn.preprocessing import StandardScaler
|
|
from sklearn.metrics import silhouette_score
|
|
import warnings
|
|
warnings.filterwarnings('ignore')
|
|
|
|
from config import DATA_PROCESSED, FIGURES
|
|
|
|
|
|
def load_and_combine_data():
|
|
"""Load and combine trained and untrained distance data.
|
|
|
|
Returns:
|
|
pd.DataFrame: Combined distance data with group labels.
|
|
"""
|
|
trained_distances = pd.read_csv(DATA_PROCESSED / 'trained_distances.csv')
|
|
untrained_distances = pd.read_csv(DATA_PROCESSED / 'untrained_distances.csv')
|
|
|
|
trained_distances['group'] = 'trained'
|
|
untrained_distances['group'] = 'untrained'
|
|
|
|
combined_data = pd.concat([trained_distances, untrained_distances], ignore_index=True)
|
|
combined_data = combined_data.dropna(subset=['distance'])
|
|
|
|
print(f"Combined data shape: {combined_data.shape}")
|
|
print(f"Trained samples: {len(combined_data[combined_data['group'] == 'trained'])}")
|
|
print(f"Untrained samples: {len(combined_data[combined_data['group'] == 'untrained'])}")
|
|
|
|
return combined_data
|
|
|
|
|
|
def basic_statistics(combined_data):
|
|
"""Perform basic statistical analysis.
|
|
|
|
Args:
|
|
combined_data (pd.DataFrame): Combined distance data.
|
|
"""
|
|
print("\n=== BASIC STATISTICS ===")
|
|
|
|
for group in ['trained', 'untrained']:
|
|
group_data = combined_data[combined_data['group'] == group]['distance']
|
|
print(f"\n{group.capitalize()} flies:")
|
|
print(f" Count: {len(group_data)}")
|
|
print(f" Mean distance: {group_data.mean():.2f}")
|
|
print(f" Std distance: {group_data.std():.2f}")
|
|
print(f" Median distance: {group_data.median():.2f}")
|
|
print(f" Min distance: {group_data.min():.2f}")
|
|
print(f" Max distance: {group_data.max():.2f}")
|
|
|
|
trained_dist = combined_data[combined_data['group'] == 'trained']['distance']
|
|
untrained_dist = combined_data[combined_data['group'] == 'untrained']['distance']
|
|
|
|
t_stat, p_value = stats.ttest_ind(trained_dist, untrained_dist)
|
|
print(f"\nT-test between groups:")
|
|
print(f" T-statistic: {t_stat:.4f}")
|
|
print(f" P-value: {p_value:.2e}")
|
|
|
|
pooled_std = np.sqrt(((len(trained_dist)-1)*trained_dist.std()**2 +
|
|
(len(untrained_dist)-1)*untrained_dist.std()**2) /
|
|
(len(trained_dist) + len(untrained_dist) - 2))
|
|
cohens_d = (trained_dist.mean() - untrained_dist.mean()) / pooled_std
|
|
print(f" Cohen's d (effect size): {cohens_d:.4f}")
|
|
|
|
|
|
def distance_distribution_analysis(combined_data):
|
|
"""Analyze distance distributions and create plots.
|
|
|
|
Args:
|
|
combined_data (pd.DataFrame): Combined distance data.
|
|
"""
|
|
print("\n=== DISTANCE DISTRIBUTION ANALYSIS ===")
|
|
|
|
fig, axes = plt.subplots(2, 2, figsize=(15, 10))
|
|
fig.suptitle('Distance Distribution Analysis', fontsize=16)
|
|
|
|
axes[0, 0].hist(combined_data[combined_data['group'] == 'trained']['distance'],
|
|
alpha=0.7, label='Trained', bins=50, density=True)
|
|
axes[0, 0].hist(combined_data[combined_data['group'] == 'untrained']['distance'],
|
|
alpha=0.7, label='Untrained', bins=50, density=True)
|
|
axes[0, 0].set_xlabel('Distance')
|
|
axes[0, 0].set_ylabel('Density')
|
|
axes[0, 0].set_title('Distance Distribution by Group')
|
|
axes[0, 0].legend()
|
|
|
|
combined_data.boxplot(column='distance', by='group', ax=axes[0, 1])
|
|
axes[0, 1].set_title('Distance Box Plot by Group')
|
|
axes[0, 1].set_xlabel('Group')
|
|
axes[0, 1].set_ylabel('Distance')
|
|
|
|
trained_dist = combined_data[combined_data['group'] == 'trained']['distance']
|
|
untrained_dist = combined_data[combined_data['group'] == 'untrained']['distance']
|
|
|
|
trained_sorted = np.sort(trained_dist)
|
|
untrained_sorted = np.sort(untrained_dist)
|
|
trained_cumulative = np.arange(1, len(trained_sorted) + 1) / len(trained_sorted)
|
|
untrained_cumulative = np.arange(1, len(untrained_sorted) + 1) / len(untrained_sorted)
|
|
|
|
axes[1, 0].plot(trained_sorted, trained_cumulative, label='Trained', alpha=0.7)
|
|
axes[1, 0].plot(untrained_sorted, untrained_cumulative, label='Untrained', alpha=0.7)
|
|
axes[1, 0].set_xlabel('Distance')
|
|
axes[1, 0].set_ylabel('Cumulative Probability')
|
|
axes[1, 0].set_title('Cumulative Distribution of Distances')
|
|
axes[1, 0].legend()
|
|
|
|
sns.violinplot(data=combined_data, x='group', y='distance', ax=axes[1, 1])
|
|
axes[1, 1].set_title('Distance Violin Plot by Group')
|
|
axes[1, 1].set_xlabel('Group')
|
|
axes[1, 1].set_ylabel('Distance')
|
|
|
|
plt.tight_layout()
|
|
plt.savefig(FIGURES / 'distance_analysis.png', dpi=300, bbox_inches='tight')
|
|
plt.show()
|
|
|
|
print("Distance distribution plots saved")
|
|
|
|
|
|
def clustering_analysis(combined_data):
|
|
"""Perform clustering analysis on distance data.
|
|
|
|
Args:
|
|
combined_data (pd.DataFrame): Combined distance data.
|
|
|
|
Returns:
|
|
tuple: (clustered_data, kmeans_model, scaler).
|
|
"""
|
|
print("\n=== CLUSTERING ANALYSIS ===")
|
|
|
|
features = ['distance', 'n_flies', 'area_fly1', 'area_fly2']
|
|
X = combined_data[features].dropna()
|
|
|
|
scaler = StandardScaler()
|
|
X_scaled = scaler.fit_transform(X)
|
|
|
|
k_range = range(2, 6)
|
|
inertias = []
|
|
sil_scores = []
|
|
|
|
for k in k_range:
|
|
kmeans = KMeans(n_clusters=k, random_state=42, n_init=10)
|
|
kmeans.fit(X_scaled)
|
|
inertias.append(kmeans.inertia_)
|
|
sil_scores.append(silhouette_score(X_scaled, kmeans.labels_))
|
|
|
|
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 5))
|
|
ax1.plot(k_range, inertias, 'bo-')
|
|
ax1.set_xlabel('Number of Clusters (k)')
|
|
ax1.set_ylabel('Inertia')
|
|
ax1.set_title('Elbow Method for Optimal k')
|
|
|
|
ax2.plot(k_range, sil_scores, 'ro-')
|
|
ax2.set_xlabel('Number of Clusters (k)')
|
|
ax2.set_ylabel('Silhouette Score')
|
|
ax2.set_title('Silhouette Score for Different k')
|
|
|
|
plt.tight_layout()
|
|
plt.savefig(FIGURES / 'clustering_analysis.png', dpi=300, bbox_inches='tight')
|
|
plt.show()
|
|
|
|
optimal_k = 2
|
|
kmeans = KMeans(n_clusters=optimal_k, random_state=42, n_init=10)
|
|
cluster_labels = kmeans.fit_predict(X_scaled)
|
|
|
|
X_clustered = X.copy()
|
|
X_clustered['cluster'] = cluster_labels
|
|
X_clustered['actual_group'] = combined_data.loc[X_clustered.index, 'group'].values
|
|
|
|
confusion = pd.crosstab(X_clustered['cluster'], X_clustered['actual_group'])
|
|
print(f"Clustering results (k={optimal_k}):")
|
|
print(confusion)
|
|
|
|
c0t = len(X_clustered[(X_clustered['cluster'] == 0) & (X_clustered['actual_group'] == 'trained')])
|
|
c0u = len(X_clustered[(X_clustered['cluster'] == 0) & (X_clustered['actual_group'] == 'untrained')])
|
|
c1t = len(X_clustered[(X_clustered['cluster'] == 1) & (X_clustered['actual_group'] == 'trained')])
|
|
c1u = len(X_clustered[(X_clustered['cluster'] == 1) & (X_clustered['actual_group'] == 'untrained')])
|
|
|
|
accuracy = max((c0t + c1u) / len(X_clustered), (c0u + c1t) / len(X_clustered))
|
|
print(f"\nClustering accuracy: {accuracy:.4f}")
|
|
|
|
print("\nCluster characteristics:")
|
|
for i in range(optimal_k):
|
|
cluster_data = X_clustered[X_clustered['cluster'] == i]
|
|
print(f"\nCluster {i}:")
|
|
print(f" Size: {len(cluster_data)}")
|
|
print(f" Distance - Mean: {cluster_data['distance'].mean():.2f}, Std: {cluster_data['distance'].std():.2f}")
|
|
print(f" N_flies - Mean: {cluster_data['n_flies'].mean():.2f}")
|
|
print(f" Area_fly1 - Mean: {cluster_data['area_fly1'].mean():.2f}")
|
|
|
|
return X_clustered, kmeans, scaler
|
|
|
|
|
|
def simple_classification_rule(combined_data):
|
|
"""Create a simple rule-based classifier.
|
|
|
|
Args:
|
|
combined_data (pd.DataFrame): Combined distance data.
|
|
"""
|
|
print("\n=== SIMPLE RULE-BASED CLASSIFICATION ===")
|
|
|
|
clean_data = combined_data.dropna(subset=['distance'])
|
|
thresholds = np.percentile(clean_data['distance'], [25, 50, 75])
|
|
print(f"Distance percentiles: 25%={thresholds[0]:.2f}, 50%={thresholds[1]:.2f}, 75%={thresholds[2]:.2f}")
|
|
|
|
for threshold in thresholds:
|
|
predictions = ['trained' if d > threshold else 'untrained'
|
|
for d in clean_data['distance']]
|
|
actual = clean_data['group']
|
|
accuracy = np.mean([p == a for p, a in zip(predictions, actual)])
|
|
|
|
tp = sum([p == 'trained' and a == 'trained' for p, a in zip(predictions, actual)])
|
|
tn = sum([p == 'untrained' and a == 'untrained' for p, a in zip(predictions, actual)])
|
|
fp = sum([p == 'trained' and a == 'untrained' for p, a in zip(predictions, actual)])
|
|
fn = sum([p == 'untrained' and a == 'trained' for p, a in zip(predictions, actual)])
|
|
|
|
sensitivity = tp / (tp + fn) if (tp + fn) > 0 else 0
|
|
specificity = tn / (tn + fp) if (tn + fp) > 0 else 0
|
|
|
|
print(f"\nThreshold = {threshold:.2f}:")
|
|
print(f" Accuracy: {accuracy:.4f}")
|
|
print(f" Sensitivity: {sensitivity:.4f}, Specificity: {specificity:.4f}")
|
|
|
|
|
|
def main():
|
|
"""Run the full distance analysis pipeline."""
|
|
combined_data = load_and_combine_data()
|
|
basic_statistics(combined_data)
|
|
distance_distribution_analysis(combined_data)
|
|
clustered_data, kmeans_model, scaler = clustering_analysis(combined_data)
|
|
simple_classification_rule(combined_data)
|
|
|
|
clustered_data.to_csv(DATA_PROCESSED / 'clustered_distance_data.csv', index=False)
|
|
print("\n=== ANALYSIS COMPLETE ===")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|