#!/usr/bin/env python3 """Analyze extracted emotion vectors: similarity, PCA, clustering, cross-layer, cross-model. Run: python -m full_replication.analyze_vectors --model e4b python -m full_replication.analyze_vectors --model 31b python -m full_replication.analyze_vectors --compare """ import argparse import json import os import numpy as np from scipy.cluster.hierarchy import linkage, fcluster from scipy.spatial.distance import pdist from full_replication.config import MODELS, get_extraction_layers, get_results_dir def load_vectors(results_dir, layer): path = os.path.join(results_dir, f"emotion_vectors_layer{layer}.npz") if not os.path.exists(path): return None data = np.load(path) return {name: data[name] for name in data.files} def load_results(results_dir, layer): path = os.path.join(results_dir, f"experiment_results_layer{layer}.json") if not os.path.exists(path): return None with open(path, "r", encoding="utf-8") as f: return json.load(f) def cosine_sim(a, b): return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b) + 1e-8) def cosine_similarity_matrix(vectors): emotions = sorted(vectors.keys()) n = len(emotions) matrix = np.zeros((n, n)) for i, e1 in enumerate(emotions): for j, e2 in enumerate(emotions): matrix[i, j] = cosine_sim(vectors[e1], vectors[e2]) return emotions, matrix def find_clusters_hierarchical(vectors, n_clusters=10): """Hierarchical clustering of emotion vectors.""" emotions = sorted(vectors.keys()) matrix = np.stack([vectors[e] for e in emotions]) distances = pdist(matrix, metric='cosine') Z = linkage(distances, method='ward') labels = fcluster(Z, t=n_clusters, criterion='maxclust') clusters = {} for emotion, label in zip(emotions, labels): clusters.setdefault(int(label), []).append(emotion) return clusters def pc_interpretation(pca_results): """Data-driven PC interpretation with top/bottom emotions.""" positive = {"happy", "proud", "inspired", "loving", "hopeful", "calm", "playful", "cheerful", "content", "delighted", "ecstatic", "elated", "euphoric", "grateful", "joyful", "jubilant", "pleased", "satisfied", "serene", "thrilled", "blissful", "amused", "enthusiastic", "excited", "exuberant", "fulfilled", "refreshed", "rejuvenated", "relieved", "triumphant", "vibrant", "invigorated", "energized", "optimistic", "peaceful", "relaxed", "safe", "self-confident", "stimulated", "thankful", "valiant", "eager", "kind", "compassionate", "empathetic", "sympathetic", "sentimental", "nostalgic", "patient", "at ease"} negative = {"sad", "angry", "afraid", "desperate", "guilty", "disgusted", "lonely", "spiteful", "anxious", "depressed", "furious", "hateful", "hostile", "jealous", "miserable", "resentful", "terrified", "worried", "ashamed", "bitter", "contemptuous", "envious", "frustrated", "grief-stricken", "heartbroken", "horrified", "humiliated", "hurt", "irate", "irritated", "mad", "mortified", "offended", "outraged", "panicked", "paranoid", "remorseful", "scared", "tormented", "troubled", "uneasy", "unhappy", "upset", "vengeful", "vindictive", "vulnerable", "weary", "worn out", "worthless", "alarmed", "annoyed", "distressed", "enraged", "exasperated", "frightened", "grumpy", "indignant", "insulted", "overwhelmed", "regretful", "scornful", "stressed", "sullen", "tense", "unnerved", "unsettled", "dispirited", "gloomy", "melancholy"} high_arousal = {"angry", "afraid", "surprised", "desperate", "nervous", "anxious", "disgusted", "confused", "spiteful", "alarmed", "astonished", "enraged", "excited", "exuberant", "frightened", "furious", "horrified", "hysterical", "irate", "outraged", "panicked", "terrified", "thrilled", "ecstatic", "euphoric", "shocked", "startled", "stimulated", "rattled", "overwhelmed", "agitated"} low_arousal = {"calm", "sad", "brooding", "lonely", "guilty", "loving", "hopeful", "bored", "content", "depressed", "docile", "droopy", "indifferent", "lazy", "listless", "melancholy", "nostalgic", "peaceful", "patient", "relaxed", "resigned", "safe", "serene", "sleepy", "sluggish", "tired", "weary", "worn out", "at ease", "sentimental"} interpretations = [] for pc_key in sorted(pca_results["projections"].keys()): pc_vals = pca_results["projections"][pc_key] emotions = pca_results["emotions"] pos_vals = [pc_vals[i] for i, e in enumerate(emotions) if e in positive] neg_vals = [pc_vals[i] for i, e in enumerate(emotions) if e in negative] hi_vals = [pc_vals[i] for i, e in enumerate(emotions) if e in high_arousal] lo_vals = [pc_vals[i] for i, e in enumerate(emotions) if e in low_arousal] pos_mean = np.mean(pos_vals) if pos_vals else 0 neg_mean = np.mean(neg_vals) if neg_vals else 0 hi_mean = np.mean(hi_vals) if hi_vals else 0 lo_mean = np.mean(lo_vals) if lo_vals else 0 valence_sep = abs(pos_mean - neg_mean) arousal_sep = abs(hi_mean - lo_mean) indexed = sorted(zip(emotions, pc_vals), key=lambda x: x[1]) bottom_5 = indexed[:5] top_5 = indexed[-5:][::-1] if valence_sep > 2.0 and valence_sep > 2 * arousal_sep: label = "VALENCE" elif arousal_sep > 2.0 and arousal_sep > 2 * valence_sep: label = "AROUSAL" else: label = "MIXED" interpretations.append({ "pc": pc_key, "label": label, "valence_separation": float(valence_sep), "arousal_separation": float(arousal_sep), "top_5": [(e, float(v)) for e, v in top_5], "bottom_5": [(e, float(v)) for e, v in bottom_5], "explained_variance": pca_results["explained_variance"].get(pc_key, 0), }) return interpretations def analyze_single_model(model_key): """Full analysis for one model across all extracted layers.""" results_dir = get_results_dir(model_key) layers = get_extraction_layers(model_key) analysis_dir = os.path.join(results_dir, "analysis") os.makedirs(analysis_dir, exist_ok=True) print(f"\n=== Analysis: {MODELS[model_key]['model_id']} ===\n") all_layer_results = {} for layer in layers: vectors = load_vectors(results_dir, layer) if vectors is None: continue results = load_results(results_dir, layer) if results is None: continue print(f"--- Layer {layer} ({len(vectors)} emotions, dim={next(iter(vectors.values())).shape[0]}) ---") # Cosine similarity emotions, sim_matrix = cosine_similarity_matrix(vectors) # High similarity pairs pairs_high = [] pairs_low = [] for i in range(len(emotions)): for j in range(i + 1, len(emotions)): s = sim_matrix[i, j] if s > 0.4: pairs_high.append((emotions[i], emotions[j], float(s))) if s < -0.3: pairs_low.append((emotions[i], emotions[j], float(s))) pairs_high.sort(key=lambda x: -x[2]) pairs_low.sort(key=lambda x: x[2]) print(f" High similarity pairs (>0.4): {len(pairs_high)}") for e1, e2, s in pairs_high[:10]: print(f" {e1} <-> {e2}: {s:.3f}") print(f" Opposite pairs (<-0.3): {len(pairs_low)}") for e1, e2, s in pairs_low[:10]: print(f" {e1} <-> {e2}: {s:.3f}") # Hierarchical clustering n_clusters = min(15, len(vectors) // 5) if n_clusters >= 2: clusters = find_clusters_hierarchical(vectors, n_clusters) print(f" Clusters ({n_clusters}):") for cid, members in sorted(clusters.items()): print(f" {cid}: {', '.join(members)}") # PC interpretation pca = results.get("pca", {}) if pca: interps = pc_interpretation(pca) print(f" PC interpretation:") for ip in interps[:3]: var = ip['explained_variance'] * 100 print(f" {ip['pc'].upper()} ({var:.1f}%): {ip['label']}") print(f" Top: {', '.join(f'{e}({v:+.1f})' for e,v in ip['top_5'][:3])}") print(f" Bottom: {', '.join(f'{e}({v:+.1f})' for e,v in ip['bottom_5'][:3])}") all_layer_results[layer] = { "num_emotions": len(vectors), "avg_pairwise_similarity": float(sim_matrix[np.triu_indices_from(sim_matrix, k=1)].mean()), "high_similarity_pairs": pairs_high[:20], "opposite_pairs": pairs_low[:20], "clusters": clusters if n_clusters >= 2 else {}, "pc_interpretation": interps if pca else [], "pca": pca, } # Save analysis out_file = os.path.join(analysis_dir, "analysis_results.json") with open(out_file, "w", encoding="utf-8") as f: json.dump(all_layer_results, f, indent=2, ensure_ascii=False, default=str) print(f"\nAnalysis saved: {out_file}") return all_layer_results def compare_models(): """Compare emotion vector structure between E4B and 31B.""" print("\n=== Cross-Model Comparison ===\n") # Load primary layer (2/3 depth) from each model for model_key in ["e4b", "31b"]: results_dir = get_results_dir(model_key) cfg = MODELS[model_key] target = int(cfg["num_layers"] * 2 / 3) vectors = load_vectors(results_dir, target) if vectors is None: print(f" {model_key}: no vectors at layer {target}") continue results = load_results(results_dir, target) emotions, sim_matrix = cosine_similarity_matrix(vectors) avg_sim = sim_matrix[np.triu_indices_from(sim_matrix, k=1)].mean() pca = results.get("pca", {}) total_var = sum(pca.get("explained_variance", {}).get(f"pc{i}", 0) for i in range(1, 3)) print(f" {model_key} (layer {target}):") print(f" Emotions: {len(vectors)}") print(f" Avg pairwise similarity: {avg_sim:.3f}") print(f" PC1+PC2 variance: {total_var*100:.1f}%") # Find common emotions e4b_vecs = load_vectors(get_results_dir("e4b"), int(MODELS["e4b"]["num_layers"] * 2 / 3)) b31_vecs = load_vectors(get_results_dir("31b"), int(MODELS["31b"]["num_layers"] * 2 / 3)) if e4b_vecs and b31_vecs: common = sorted(set(e4b_vecs.keys()) & set(b31_vecs.keys())) print(f"\n Common emotions: {len(common)}") # Compare similarity structures if len(common) >= 5: e4b_emo, e4b_sim = cosine_similarity_matrix({e: e4b_vecs[e] for e in common}) b31_emo, b31_sim = cosine_similarity_matrix({e: b31_vecs[e] for e in common}) # Correlation of pairwise similarities triu = np.triu_indices_from(e4b_sim, k=1) corr = np.corrcoef(e4b_sim[triu], b31_sim[triu])[0, 1] print(f" Similarity structure correlation: r={corr:.3f}") def main(): parser = argparse.ArgumentParser() parser.add_argument("--model", choices=["e4b", "31b"], help="Analyze single model") parser.add_argument("--compare", action="store_true", help="Compare E4B vs 31B") args = parser.parse_args() if args.model: analyze_single_model(args.model) elif args.compare: compare_models() else: # Default: analyze both and compare for m in ["e4b", "31b"]: try: analyze_single_model(m) except Exception as e: print(f" Skipping {m}: {e}") compare_models() if __name__ == "__main__": main()