- Add complete experiments directory with pilot study infrastructure - 5 experimental conditions (direct, expert-only, attribute-only, full-pipeline, random-perspective) - Human assessment tool with React frontend and FastAPI backend - AUT flexibility analysis with jump signal detection - Result visualization and metrics computation - Add novelty-driven agent loop module (experiments/novelty_loop/) - NoveltyDrivenTaskAgent with expert perspective perturbation - Three termination strategies: breakthrough, exhaust, coverage - Interactive CLI demo with colored output - Embedding-based novelty scoring - Add DDC knowledge domain classification data (en/zh) - Add CLAUDE.md project documentation - Update research report with experiment findings Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1343 lines
50 KiB
Python
Executable File
1343 lines
50 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
AUT Flexibility Analysis for Creative Ideas
|
|
|
|
Implements creativity evaluation metrics based on the Alternative Uses Task (AUT) framework:
|
|
|
|
1. Lexical Diversity - Type-token ratio, vocabulary richness
|
|
2. Concept Extraction - Key concepts and domain coverage
|
|
3. Embedding Visualization - t-SNE/PCA scatter plots by condition
|
|
4. Novelty Scores - Distance from global centroid (semantic novelty)
|
|
5. Cross-condition Cohesion - Nearest neighbor overlap analysis
|
|
6. AUT Flexibility Analysis - Category-based divergent thinking metrics
|
|
- LLM-based flexibility: Two-phase category generation (Hadas & Hershkovitz 2024)
|
|
- Embedding-based flexibility: Hierarchical clustering (arXiv:2405.00899)
|
|
- Jump signal: Category switch ratio in sequential generation
|
|
|
|
References:
|
|
- Hadas & Hershkovitz (2024). "Using LLMs to Evaluate AUT Flexibility Score"
|
|
- arXiv:2405.00899 - "Characterising Creative Process in Humans and LLMs"
|
|
- Torrance (1974). Torrance Tests of Creative Thinking
|
|
|
|
Usage:
|
|
python aut_flexibility_analysis.py # Analyze latest experiment
|
|
python aut_flexibility_analysis.py experiment_xxx_deduped.json # Specific file
|
|
python aut_flexibility_analysis.py --skip-viz # Skip visualization (faster)
|
|
"""
|
|
|
|
import argparse
|
|
import asyncio
|
|
import json
|
|
import re
|
|
import math
|
|
from collections import Counter, defaultdict
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
import numpy as np
|
|
|
|
# Optional imports with fallbacks
|
|
try:
|
|
from sklearn.manifold import TSNE
|
|
from sklearn.decomposition import PCA
|
|
HAS_SKLEARN = True
|
|
except ImportError:
|
|
HAS_SKLEARN = False
|
|
print("Warning: sklearn not available, visualization will be limited")
|
|
|
|
try:
|
|
from scipy.cluster.hierarchy import linkage, fcluster
|
|
from scipy.spatial.distance import pdist, squareform
|
|
HAS_SCIPY = True
|
|
except ImportError:
|
|
HAS_SCIPY = False
|
|
print("Warning: scipy not available, hierarchical clustering will be limited")
|
|
|
|
try:
|
|
import matplotlib.pyplot as plt
|
|
import matplotlib
|
|
matplotlib.use('Agg') # Non-interactive backend
|
|
HAS_MATPLOTLIB = True
|
|
except ImportError:
|
|
HAS_MATPLOTLIB = False
|
|
print("Warning: matplotlib not available, no plots will be generated")
|
|
|
|
try:
|
|
import httpx
|
|
HAS_HTTPX = True
|
|
except ImportError:
|
|
HAS_HTTPX = False
|
|
print("Warning: httpx not available, will use cached embeddings only")
|
|
|
|
|
|
# ============================================================================
|
|
# Configuration
|
|
# ============================================================================
|
|
|
|
RESULTS_DIR = Path(__file__).parent / 'results'
|
|
OLLAMA_BASE_URL = "http://localhost:11435"
|
|
EMBEDDING_MODEL = "qwen3-embedding:4b"
|
|
LLM_MODEL = "qwen3:8b" # Model for flexibility category generation
|
|
|
|
|
|
# ============================================================================
|
|
# 1. Lexical Diversity Analysis
|
|
# ============================================================================
|
|
|
|
def tokenize(text: str) -> list[str]:
|
|
"""Simple word tokenization."""
|
|
# Lowercase and extract words
|
|
words = re.findall(r'\b[a-zA-Z]+\b', text.lower())
|
|
return words
|
|
|
|
|
|
def calculate_lexical_diversity(text: str) -> dict[str, Any]:
|
|
"""
|
|
Calculate lexical diversity metrics for a text.
|
|
|
|
Returns:
|
|
- type_token_ratio: unique words / total words
|
|
- vocabulary_size: number of unique words
|
|
- total_words: total word count
|
|
- avg_word_length: average word length
|
|
- hapax_ratio: words appearing only once / total unique words
|
|
"""
|
|
words = tokenize(text)
|
|
|
|
if not words:
|
|
return {
|
|
'type_token_ratio': 0,
|
|
'vocabulary_size': 0,
|
|
'total_words': 0,
|
|
'avg_word_length': 0,
|
|
'hapax_ratio': 0
|
|
}
|
|
|
|
word_counts = Counter(words)
|
|
unique_words = set(words)
|
|
hapax = sum(1 for w, c in word_counts.items() if c == 1)
|
|
|
|
return {
|
|
'type_token_ratio': len(unique_words) / len(words),
|
|
'vocabulary_size': len(unique_words),
|
|
'total_words': len(words),
|
|
'avg_word_length': sum(len(w) for w in words) / len(words),
|
|
'hapax_ratio': hapax / len(unique_words) if unique_words else 0
|
|
}
|
|
|
|
|
|
def analyze_lexical_diversity_by_condition(ideas_by_condition: dict[str, list[str]]) -> dict[str, Any]:
|
|
"""Analyze lexical diversity for each condition."""
|
|
results = {}
|
|
|
|
for condition, ideas in ideas_by_condition.items():
|
|
# Concatenate all ideas for overall metrics
|
|
all_text = ' '.join(ideas)
|
|
overall = calculate_lexical_diversity(all_text)
|
|
|
|
# Per-idea metrics
|
|
per_idea_metrics = [calculate_lexical_diversity(idea) for idea in ideas]
|
|
|
|
results[condition] = {
|
|
'overall': overall,
|
|
'per_idea_mean': {
|
|
'type_token_ratio': np.mean([m['type_token_ratio'] for m in per_idea_metrics]),
|
|
'vocabulary_size': np.mean([m['vocabulary_size'] for m in per_idea_metrics]),
|
|
'total_words': np.mean([m['total_words'] for m in per_idea_metrics]),
|
|
'avg_word_length': np.mean([m['avg_word_length'] for m in per_idea_metrics]),
|
|
},
|
|
'idea_count': len(ideas)
|
|
}
|
|
|
|
return results
|
|
|
|
|
|
# ============================================================================
|
|
# 2. Concept Extraction
|
|
# ============================================================================
|
|
|
|
# Common English stopwords
|
|
STOPWORDS = {
|
|
'a', 'an', 'the', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for', 'of',
|
|
'with', 'by', 'from', 'as', 'is', 'was', 'are', 'were', 'been', 'be', 'have',
|
|
'has', 'had', 'do', 'does', 'did', 'will', 'would', 'could', 'should', 'may',
|
|
'might', 'must', 'shall', 'can', 'need', 'dare', 'ought', 'used', 'that',
|
|
'which', 'who', 'whom', 'this', 'these', 'those', 'it', 'its', 'they', 'them',
|
|
'their', 'we', 'us', 'our', 'you', 'your', 'i', 'me', 'my', 'he', 'him', 'his',
|
|
'she', 'her', 'not', 'no', 'nor', 'so', 'than', 'too', 'very', 'just', 'also',
|
|
'only', 'own', 'same', 'into', 'over', 'such', 'through', 'during', 'before',
|
|
'after', 'above', 'below', 'between', 'under', 'again', 'further', 'then',
|
|
'once', 'here', 'there', 'when', 'where', 'why', 'how', 'all', 'each', 'few',
|
|
'more', 'most', 'other', 'some', 'any', 'both', 'being', 'about', 'against',
|
|
'while', 'using', 'based', 'allows', 'features', 'includes', 'provides'
|
|
}
|
|
|
|
# Domain keywords for classification
|
|
DOMAIN_KEYWORDS = {
|
|
'technology': {'smart', 'digital', 'ai', 'sensor', 'app', 'software', 'algorithm',
|
|
'wireless', 'bluetooth', 'iot', 'data', 'automated', 'electronic'},
|
|
'sustainability': {'eco', 'green', 'sustainable', 'renewable', 'solar', 'recycled',
|
|
'biodegradable', 'energy', 'environmental', 'carbon', 'organic'},
|
|
'health': {'health', 'medical', 'therapy', 'wellness', 'ergonomic', 'posture',
|
|
'fitness', 'therapeutic', 'rehabilitation', 'mental', 'physical'},
|
|
'social': {'community', 'social', 'sharing', 'collaborative', 'inclusive',
|
|
'accessible', 'elderly', 'children', 'family', 'public'},
|
|
'design': {'modular', 'customizable', 'aesthetic', 'minimalist', 'portable',
|
|
'foldable', 'compact', 'lightweight', 'adjustable', 'convertible'},
|
|
'materials': {'material', 'fabric', 'wood', 'metal', 'plastic', 'carbon',
|
|
'fiber', 'composite', 'bamboo', 'leather', 'textile'}
|
|
}
|
|
|
|
|
|
def extract_concepts(text: str) -> dict[str, Any]:
|
|
"""
|
|
Extract key concepts from text.
|
|
|
|
Returns:
|
|
- keywords: list of significant words (non-stopwords)
|
|
- bigrams: common two-word phrases
|
|
- domains: detected domain categories
|
|
"""
|
|
words = tokenize(text)
|
|
|
|
# Filter stopwords and short words
|
|
keywords = [w for w in words if w not in STOPWORDS and len(w) > 2]
|
|
|
|
# Extract bigrams
|
|
bigrams = []
|
|
for i in range(len(words) - 1):
|
|
if words[i] not in STOPWORDS and words[i+1] not in STOPWORDS:
|
|
bigrams.append(f"{words[i]} {words[i+1]}")
|
|
|
|
# Detect domains
|
|
text_lower = text.lower()
|
|
detected_domains = []
|
|
for domain, domain_words in DOMAIN_KEYWORDS.items():
|
|
if any(kw in text_lower for kw in domain_words):
|
|
detected_domains.append(domain)
|
|
|
|
return {
|
|
'keywords': keywords,
|
|
'bigrams': bigrams,
|
|
'domains': detected_domains
|
|
}
|
|
|
|
|
|
def analyze_concepts_by_condition(ideas_by_condition: dict[str, list[str]]) -> dict[str, Any]:
|
|
"""Analyze concept extraction for each condition."""
|
|
results = {}
|
|
|
|
for condition, ideas in ideas_by_condition.items():
|
|
all_keywords = []
|
|
all_bigrams = []
|
|
domain_counts = Counter()
|
|
|
|
for idea in ideas:
|
|
concepts = extract_concepts(idea)
|
|
all_keywords.extend(concepts['keywords'])
|
|
all_bigrams.extend(concepts['bigrams'])
|
|
for domain in concepts['domains']:
|
|
domain_counts[domain] += 1
|
|
|
|
keyword_counts = Counter(all_keywords)
|
|
bigram_counts = Counter(all_bigrams)
|
|
|
|
results[condition] = {
|
|
'unique_keywords': len(set(all_keywords)),
|
|
'total_keywords': len(all_keywords),
|
|
'top_keywords': keyword_counts.most_common(20),
|
|
'top_bigrams': bigram_counts.most_common(10),
|
|
'domain_distribution': dict(domain_counts),
|
|
'domain_coverage': len(domain_counts),
|
|
'idea_count': len(ideas)
|
|
}
|
|
|
|
return results
|
|
|
|
|
|
# ============================================================================
|
|
# 3. Embedding-based Analysis (Visualization, Novelty, Overlap)
|
|
# ============================================================================
|
|
|
|
async def get_embeddings_from_ollama(texts: list[str], batch_size: int = 50) -> list[list[float]] | None:
|
|
"""Get embeddings from Ollama API."""
|
|
if not HAS_HTTPX:
|
|
return None
|
|
|
|
embeddings = []
|
|
|
|
async with httpx.AsyncClient(timeout=120.0) as client:
|
|
for i in range(0, len(texts), batch_size):
|
|
batch = texts[i:i+batch_size]
|
|
try:
|
|
response = await client.post(
|
|
f"{OLLAMA_BASE_URL}/api/embed",
|
|
json={"model": EMBEDDING_MODEL, "input": batch}
|
|
)
|
|
response.raise_for_status()
|
|
result = response.json()
|
|
embeddings.extend(result["embeddings"])
|
|
print(f" Embedded {len(embeddings)}/{len(texts)} ideas...")
|
|
except Exception as e:
|
|
print(f" Embedding error: {e}")
|
|
return None
|
|
|
|
return embeddings
|
|
|
|
|
|
def load_cached_embeddings(experiment_id: str) -> dict[str, list[float]] | None:
|
|
"""Try to load embeddings from metrics file."""
|
|
metrics_file = RESULTS_DIR / f"experiment_{experiment_id}_metrics.json"
|
|
if not metrics_file.exists():
|
|
return None
|
|
|
|
# The metrics file doesn't store raw embeddings, so we can't load them
|
|
return None
|
|
|
|
|
|
def compute_centroid(embeddings: np.ndarray) -> np.ndarray:
|
|
"""Compute centroid of embeddings."""
|
|
return np.mean(embeddings, axis=0)
|
|
|
|
|
|
def cosine_distance(a: np.ndarray, b: np.ndarray) -> float:
|
|
"""Compute cosine distance between two vectors."""
|
|
dot = np.dot(a, b)
|
|
norm_a = np.linalg.norm(a)
|
|
norm_b = np.linalg.norm(b)
|
|
if norm_a == 0 or norm_b == 0:
|
|
return 1.0
|
|
return 1 - dot / (norm_a * norm_b)
|
|
|
|
|
|
def analyze_embeddings(
|
|
ideas_by_condition: dict[str, list[str]],
|
|
embeddings_by_condition: dict[str, np.ndarray],
|
|
output_dir: Path,
|
|
skip_viz: bool = False
|
|
) -> dict[str, Any]:
|
|
"""
|
|
Analyze embeddings for visualization, novelty, and overlap.
|
|
"""
|
|
results = {
|
|
'novelty_scores': {},
|
|
'cross_condition_overlap': {},
|
|
'centroid_distances': {}
|
|
}
|
|
|
|
# Compute centroids for each condition
|
|
centroids = {}
|
|
for condition, embeddings in embeddings_by_condition.items():
|
|
centroids[condition] = compute_centroid(embeddings)
|
|
|
|
# Global centroid (all ideas)
|
|
all_embeddings = np.vstack(list(embeddings_by_condition.values()))
|
|
global_centroid = compute_centroid(all_embeddings)
|
|
|
|
# 4. Perplexity-based Novelty (approximated as distance from global centroid)
|
|
print("Computing novelty scores...")
|
|
for condition, embeddings in embeddings_by_condition.items():
|
|
distances = [cosine_distance(emb, global_centroid) for emb in embeddings]
|
|
results['novelty_scores'][condition] = {
|
|
'mean': float(np.mean(distances)),
|
|
'std': float(np.std(distances)),
|
|
'min': float(np.min(distances)),
|
|
'max': float(np.max(distances))
|
|
}
|
|
|
|
# 5. Cross-condition Overlap
|
|
print("Computing cross-condition overlap...")
|
|
conditions = list(embeddings_by_condition.keys())
|
|
|
|
# Centroid distances between conditions
|
|
for i, c1 in enumerate(conditions):
|
|
for c2 in conditions[i+1:]:
|
|
dist = cosine_distance(centroids[c1], centroids[c2])
|
|
results['centroid_distances'][f"{c1}_vs_{c2}"] = float(dist)
|
|
|
|
# Overlap analysis: for each idea, find if nearest neighbor is same or different condition
|
|
print("Computing nearest neighbor overlap...")
|
|
overlap_stats = defaultdict(lambda: {'same_condition': 0, 'diff_condition': 0})
|
|
|
|
# Build flat arrays with condition labels
|
|
all_emb_list = []
|
|
all_labels = []
|
|
for condition, embeddings in embeddings_by_condition.items():
|
|
for emb in embeddings:
|
|
all_emb_list.append(emb)
|
|
all_labels.append(condition)
|
|
|
|
all_emb_array = np.array(all_emb_list)
|
|
|
|
for i, (emb, label) in enumerate(zip(all_emb_array, all_labels)):
|
|
# Find nearest neighbor (excluding self)
|
|
distances = np.array([cosine_distance(emb, other) for other in all_emb_array])
|
|
distances[i] = float('inf') # Exclude self
|
|
nearest_idx = np.argmin(distances)
|
|
nearest_label = all_labels[nearest_idx]
|
|
|
|
if nearest_label == label:
|
|
overlap_stats[label]['same_condition'] += 1
|
|
else:
|
|
overlap_stats[label]['diff_condition'] += 1
|
|
|
|
for condition in conditions:
|
|
total = overlap_stats[condition]['same_condition'] + overlap_stats[condition]['diff_condition']
|
|
results['cross_condition_overlap'][condition] = {
|
|
'same_condition_nn': overlap_stats[condition]['same_condition'],
|
|
'diff_condition_nn': overlap_stats[condition]['diff_condition'],
|
|
'cohesion_ratio': overlap_stats[condition]['same_condition'] / total if total > 0 else 0
|
|
}
|
|
|
|
# 3. Embedding Visualization
|
|
if not skip_viz and HAS_SKLEARN and HAS_MATPLOTLIB:
|
|
print("Generating visualizations...")
|
|
generate_visualizations(embeddings_by_condition, output_dir)
|
|
|
|
return results
|
|
|
|
|
|
def generate_visualizations(
|
|
embeddings_by_condition: dict[str, np.ndarray],
|
|
output_dir: Path
|
|
):
|
|
"""Generate t-SNE and PCA visualizations."""
|
|
|
|
# Prepare data
|
|
all_embeddings = []
|
|
all_labels = []
|
|
for condition, embeddings in embeddings_by_condition.items():
|
|
all_embeddings.extend(embeddings)
|
|
all_labels.extend([condition] * len(embeddings))
|
|
|
|
all_embeddings = np.array(all_embeddings)
|
|
|
|
# Color map for conditions
|
|
conditions = list(embeddings_by_condition.keys())
|
|
colors = plt.cm.tab10(np.linspace(0, 1, len(conditions)))
|
|
color_map = {c: colors[i] for i, c in enumerate(conditions)}
|
|
point_colors = [color_map[label] for label in all_labels]
|
|
|
|
# PCA visualization
|
|
print(" Running PCA...")
|
|
pca = PCA(n_components=2, random_state=42)
|
|
pca_result = pca.fit_transform(all_embeddings)
|
|
|
|
plt.figure(figsize=(12, 8))
|
|
for condition in conditions:
|
|
mask = [l == condition for l in all_labels]
|
|
plt.scatter(
|
|
pca_result[mask, 0],
|
|
pca_result[mask, 1],
|
|
c=[color_map[condition]],
|
|
label=condition,
|
|
alpha=0.6,
|
|
s=30
|
|
)
|
|
plt.xlabel(f'PC1 ({pca.explained_variance_ratio_[0]:.1%} variance)')
|
|
plt.ylabel(f'PC2 ({pca.explained_variance_ratio_[1]:.1%} variance)')
|
|
plt.title('Ideas by Condition (PCA)')
|
|
plt.legend(bbox_to_anchor=(1.05, 1), loc='upper left')
|
|
plt.tight_layout()
|
|
plt.savefig(output_dir / 'embedding_pca.png', dpi=150)
|
|
plt.close()
|
|
|
|
# t-SNE visualization
|
|
print(" Running t-SNE...")
|
|
tsne = TSNE(n_components=2, random_state=42, perplexity=min(30, len(all_embeddings)-1))
|
|
tsne_result = tsne.fit_transform(all_embeddings)
|
|
|
|
plt.figure(figsize=(12, 8))
|
|
for condition in conditions:
|
|
mask = [l == condition for l in all_labels]
|
|
plt.scatter(
|
|
tsne_result[mask, 0],
|
|
tsne_result[mask, 1],
|
|
c=[color_map[condition]],
|
|
label=condition,
|
|
alpha=0.6,
|
|
s=30
|
|
)
|
|
plt.xlabel('t-SNE 1')
|
|
plt.ylabel('t-SNE 2')
|
|
plt.title('Ideas by Condition (t-SNE)')
|
|
plt.legend(bbox_to_anchor=(1.05, 1), loc='upper left')
|
|
plt.tight_layout()
|
|
plt.savefig(output_dir / 'embedding_tsne.png', dpi=150)
|
|
plt.close()
|
|
|
|
print(f" Saved visualizations to {output_dir}")
|
|
|
|
|
|
# ============================================================================
|
|
# 6. AUT Flexibility Analysis (Category-based Divergent Thinking)
|
|
# ============================================================================
|
|
|
|
async def call_llm(prompt: str, model: str = LLM_MODEL) -> str | None:
|
|
"""Call Ollama LLM for text generation."""
|
|
if not HAS_HTTPX:
|
|
return None
|
|
|
|
async with httpx.AsyncClient(timeout=120.0) as client:
|
|
try:
|
|
response = await client.post(
|
|
f"{OLLAMA_BASE_URL}/api/generate",
|
|
json={
|
|
"model": model,
|
|
"prompt": prompt,
|
|
"stream": False,
|
|
"options": {"temperature": 0.3} # Lower temperature for consistency
|
|
}
|
|
)
|
|
response.raise_for_status()
|
|
result = response.json()
|
|
return result.get("response", "")
|
|
except Exception as e:
|
|
print(f" LLM call error: {e}")
|
|
return None
|
|
|
|
|
|
async def compute_flexibility_llm(
|
|
ideas: list[str],
|
|
query: str = "bicycle"
|
|
) -> dict[str, Any]:
|
|
"""
|
|
Compute flexibility score using LLM-based category generation.
|
|
|
|
Two-phase approach (Hadas & Hershkovitz 2024):
|
|
1. Generate semantic categories from all ideas
|
|
2. Classify each idea into a category
|
|
3. Flexibility = number of unique categories used
|
|
|
|
Returns:
|
|
- categories: list of generated categories
|
|
- assignments: mapping of idea index to category
|
|
- flexibility_score: count of unique categories
|
|
"""
|
|
# Phase 1: Generate categories
|
|
ideas_text = "\n".join(f"{i+1}. {idea}" for i, idea in enumerate(ideas))
|
|
|
|
prompt1 = f"""/no_think
|
|
You are analyzing creative ideas for alternative uses of a {query}.
|
|
|
|
Examine these ideas and determine the distinct SEMANTIC CATEGORIES they fall into.
|
|
Categories should represent fundamentally different ways of thinking about using the object.
|
|
|
|
Ideas:
|
|
{ideas_text}
|
|
|
|
Output ONLY a JSON array of category names (5-15 categories typically).
|
|
Example: ["Transportation", "Art/Decoration", "Tool/Equipment", "Recreation", "Storage"]
|
|
|
|
JSON array:"""
|
|
|
|
response1 = await call_llm(prompt1)
|
|
if not response1:
|
|
return {"error": "LLM call failed for category generation"}
|
|
|
|
# Parse categories from response
|
|
try:
|
|
# Try to extract JSON array from response
|
|
match = re.search(r'\[.*?\]', response1, re.DOTALL)
|
|
if match:
|
|
categories = json.loads(match.group())
|
|
else:
|
|
# Fallback: split by newlines or commas
|
|
categories = [c.strip().strip('"\'') for c in response1.split('\n') if c.strip()]
|
|
categories = [c for c in categories if c and not c.startswith('[')]
|
|
except json.JSONDecodeError:
|
|
categories = [c.strip().strip('"\'') for c in response1.split(',') if c.strip()]
|
|
|
|
if not categories:
|
|
return {"error": "Failed to parse categories", "raw_response": response1}
|
|
|
|
# Phase 2: Classify each idea
|
|
categories_text = ", ".join(f'"{c}"' for c in categories)
|
|
|
|
prompt2 = f"""/no_think
|
|
Classify each idea into exactly ONE of these categories: [{categories_text}]
|
|
|
|
Ideas:
|
|
{ideas_text}
|
|
|
|
Output a JSON object mapping idea number (as string) to category name.
|
|
Example: {{"1": "Transportation", "2": "Art/Decoration", "3": "Tool/Equipment"}}
|
|
|
|
JSON object:"""
|
|
|
|
response2 = await call_llm(prompt2)
|
|
if not response2:
|
|
return {"error": "LLM call failed for classification", "categories": categories}
|
|
|
|
# Parse assignments
|
|
try:
|
|
match = re.search(r'\{.*?\}', response2, re.DOTALL)
|
|
if match:
|
|
assignments = json.loads(match.group())
|
|
else:
|
|
assignments = {}
|
|
except json.JSONDecodeError:
|
|
assignments = {}
|
|
|
|
# Calculate flexibility
|
|
used_categories = set(assignments.values())
|
|
flexibility_score = len(used_categories)
|
|
|
|
# Category distribution
|
|
category_counts = Counter(assignments.values())
|
|
|
|
return {
|
|
"categories": categories,
|
|
"assignments": assignments,
|
|
"flexibility_score": flexibility_score,
|
|
"category_distribution": dict(category_counts),
|
|
"total_ideas_classified": len(assignments)
|
|
}
|
|
|
|
|
|
def compute_flexibility_embedding(
|
|
embeddings: np.ndarray,
|
|
ideas: list[str],
|
|
distance_threshold: float = 0.5
|
|
) -> dict[str, Any]:
|
|
"""
|
|
Compute flexibility score using embedding-based hierarchical clustering.
|
|
|
|
Method from arXiv:2405.00899:
|
|
1. Encode ideas as embeddings
|
|
2. Hierarchical clustering with average linkage
|
|
3. Cut tree at distance threshold (higher threshold = fewer clusters)
|
|
|
|
Args:
|
|
embeddings: numpy array of shape (n_ideas, embedding_dim)
|
|
ideas: list of idea texts for reference
|
|
distance_threshold: cosine distance threshold for cutting dendrogram
|
|
(0.5 = cut when similarity drops below 0.5)
|
|
|
|
Returns:
|
|
- cluster_assignments: list of cluster IDs
|
|
- flexibility_score: number of clusters
|
|
- cluster_sizes: distribution of cluster sizes
|
|
- mean_pairwise_similarity: average similarity within condition
|
|
"""
|
|
if not HAS_SCIPY:
|
|
return {"error": "scipy not available for hierarchical clustering"}
|
|
|
|
n_ideas = len(embeddings)
|
|
if n_ideas < 2:
|
|
return {
|
|
"cluster_assignments": [0] * n_ideas,
|
|
"flexibility_score": 1,
|
|
"cluster_sizes": {0: n_ideas},
|
|
"mean_pairwise_similarity": 1.0
|
|
}
|
|
|
|
# Normalize embeddings for cosine similarity
|
|
norms = np.linalg.norm(embeddings, axis=1, keepdims=True)
|
|
norms[norms == 0] = 1 # Avoid division by zero
|
|
normalized = embeddings / norms
|
|
|
|
# Compute pairwise cosine distances
|
|
distances = pdist(normalized, metric='cosine')
|
|
|
|
# Calculate mean pairwise similarity for reporting
|
|
mean_pairwise_sim = 1 - np.mean(distances)
|
|
|
|
# Hierarchical clustering with average linkage (better for varying density)
|
|
Z = linkage(distances, method='average')
|
|
|
|
# Cut at distance threshold
|
|
# This creates clusters where items within cluster have distance < threshold
|
|
clusters = fcluster(Z, distance_threshold, criterion='distance')
|
|
|
|
n_clusters = len(set(clusters))
|
|
cluster_sizes = Counter(clusters)
|
|
|
|
# Convert numpy keys to Python ints for JSON serialization
|
|
cluster_sizes_dict = {int(k): int(v) for k, v in cluster_sizes.items()}
|
|
|
|
# Calculate mean intra-cluster similarity
|
|
total_sim = 0
|
|
total_pairs = 0
|
|
for c in set(clusters):
|
|
mask = clusters == c
|
|
cluster_points = normalized[mask]
|
|
if len(cluster_points) > 1:
|
|
for i in range(len(cluster_points)):
|
|
for j in range(i + 1, len(cluster_points)):
|
|
sim = np.dot(cluster_points[i], cluster_points[j])
|
|
total_sim += sim
|
|
total_pairs += 1
|
|
|
|
mean_intra_sim = total_sim / total_pairs if total_pairs > 0 else None
|
|
|
|
return {
|
|
"cluster_assignments": [int(c) for c in clusters],
|
|
"flexibility_score": int(n_clusters),
|
|
"cluster_sizes": cluster_sizes_dict,
|
|
"mean_pairwise_similarity": float(mean_pairwise_sim),
|
|
"mean_intra_cluster_similarity": float(mean_intra_sim) if mean_intra_sim else None
|
|
}
|
|
|
|
|
|
def compute_jump_signal(
|
|
cluster_assignments: list[int],
|
|
embeddings: np.ndarray | None = None,
|
|
similarity_threshold: float = 0.7
|
|
) -> dict[str, Any]:
|
|
"""
|
|
Compute jump signal - measures category switches in sequential idea generation.
|
|
|
|
Enhanced method from arXiv:2405.00899:
|
|
- Combined jump signal: jump = jumpcat ∧ jumpSS (logical AND)
|
|
- A "true" jump requires BOTH category change AND semantic dissimilarity
|
|
|
|
This reduces false positives where switching words within same concept space
|
|
would incorrectly count as a jump.
|
|
|
|
Args:
|
|
cluster_assignments: list of cluster IDs for each idea (in generation order)
|
|
embeddings: optional, for computing semantic-similarity-based jumps
|
|
similarity_threshold: threshold for semantic similarity jump detection (default 0.7)
|
|
|
|
Returns:
|
|
- category_jump_count: number of category switches (jumpcat)
|
|
- semantic_jump_count: number of semantic dissimilarity jumps (jumpSS)
|
|
- combined_jump_count: jumps where BOTH conditions are true
|
|
- combined_jump_ratio: proportion of combined jumps (paper metric)
|
|
- jump_positions: indices where combined jumps occur
|
|
"""
|
|
if len(cluster_assignments) < 2:
|
|
return {
|
|
"category_jump_count": 0,
|
|
"semantic_jump_count": 0,
|
|
"combined_jump_count": 0,
|
|
"combined_jump_ratio": 0.0,
|
|
"category_jump_positions": [],
|
|
"semantic_jump_positions": [],
|
|
"combined_jump_positions": [],
|
|
"total_transitions": 0,
|
|
# Legacy fields for backward compatibility
|
|
"jump_count": 0,
|
|
"jump_ratio": 0.0,
|
|
"jump_positions": []
|
|
}
|
|
|
|
category_jumps = []
|
|
semantic_jumps = []
|
|
combined_jumps = []
|
|
|
|
for i in range(1, len(cluster_assignments)):
|
|
# Category-based jump (jumpcat)
|
|
is_category_jump = cluster_assignments[i] != cluster_assignments[i-1]
|
|
if is_category_jump:
|
|
category_jumps.append(i)
|
|
|
|
# Semantic similarity-based jump (jumpSS)
|
|
is_semantic_jump = False
|
|
if embeddings is not None:
|
|
sim = np.dot(embeddings[i], embeddings[i-1]) / (
|
|
np.linalg.norm(embeddings[i]) * np.linalg.norm(embeddings[i-1]) + 1e-10
|
|
)
|
|
is_semantic_jump = sim < similarity_threshold
|
|
if is_semantic_jump:
|
|
semantic_jumps.append(i)
|
|
|
|
# Combined jump: both must be true (paper method)
|
|
if is_category_jump and (is_semantic_jump if embeddings is not None else True):
|
|
combined_jumps.append(i)
|
|
|
|
total_transitions = len(cluster_assignments) - 1
|
|
|
|
result = {
|
|
"category_jump_count": len(category_jumps),
|
|
"semantic_jump_count": len(semantic_jumps) if embeddings is not None else 0,
|
|
"combined_jump_count": len(combined_jumps),
|
|
"combined_jump_ratio": len(combined_jumps) / total_transitions if total_transitions > 0 else 0.0,
|
|
"category_jump_ratio": len(category_jumps) / total_transitions if total_transitions > 0 else 0.0,
|
|
"semantic_jump_ratio": len(semantic_jumps) / total_transitions if total_transitions > 0 and embeddings is not None else 0.0,
|
|
"category_jump_positions": category_jumps,
|
|
"semantic_jump_positions": semantic_jumps if embeddings is not None else [],
|
|
"combined_jump_positions": combined_jumps,
|
|
"total_transitions": total_transitions,
|
|
# Legacy fields for backward compatibility
|
|
"jump_count": len(combined_jumps), # Now uses combined
|
|
"jump_ratio": len(combined_jumps) / total_transitions if total_transitions > 0 else 0.0,
|
|
"jump_positions": combined_jumps
|
|
}
|
|
|
|
return result
|
|
|
|
|
|
def classify_flexibility_profile(jump_count: int, idea_count: int) -> str:
|
|
"""
|
|
Classify creativity style into Persistent/Flexible/Mixed based on jump count.
|
|
|
|
Based on arXiv:2405.00899 findings:
|
|
- Persistent: Deep exploration within categories (low jump ratio)
|
|
- Flexible: Broad exploration across categories (high jump ratio)
|
|
- Mixed: Intermediate pattern
|
|
|
|
Paper thresholds normalized to response count:
|
|
- Persistent: jump_ratio < 0.30
|
|
- Flexible: jump_ratio > 0.45
|
|
- Mixed: 0.30 <= jump_ratio <= 0.45
|
|
|
|
Args:
|
|
jump_count: Number of category jumps
|
|
idea_count: Total number of ideas
|
|
|
|
Returns:
|
|
Profile name: "Persistent", "Flexible", "Mixed", or "Undefined"
|
|
"""
|
|
if idea_count <= 1:
|
|
return "Undefined"
|
|
|
|
jump_ratio = jump_count / (idea_count - 1)
|
|
|
|
if jump_ratio < 0.30:
|
|
return "Persistent"
|
|
elif jump_ratio > 0.45:
|
|
return "Flexible"
|
|
else:
|
|
return "Mixed"
|
|
|
|
|
|
def compute_cumulative_jump_profile(
|
|
jump_positions: list[int],
|
|
total_ideas: int
|
|
) -> list[int]:
|
|
"""
|
|
Compute cumulative jump count at each response position.
|
|
|
|
This visualization shows exploration patterns over the generation sequence,
|
|
revealing whether participants explore steadily or in bursts.
|
|
|
|
Args:
|
|
jump_positions: Indices where jumps occurred (1-indexed)
|
|
total_ideas: Total number of ideas generated
|
|
|
|
Returns:
|
|
List where index i = cumulative jumps after response i
|
|
"""
|
|
if total_ideas <= 0:
|
|
return []
|
|
|
|
cumulative = [0] * total_ideas
|
|
current_jumps = 0
|
|
|
|
for i in range(total_ideas):
|
|
if (i + 1) in jump_positions: # Positions are 1-indexed
|
|
current_jumps += 1
|
|
cumulative[i] = current_jumps
|
|
|
|
return cumulative
|
|
|
|
|
|
def analyze_originality_flexibility_correlation(
|
|
novelty_scores: dict[str, float],
|
|
flexibility_scores: dict[str, int]
|
|
) -> dict[str, Any]:
|
|
"""
|
|
Analyze correlation between novelty (originality) and flexibility across conditions.
|
|
|
|
Paper finding from arXiv:2405.00899:
|
|
- Humans: No correlation between flexibility and originality (r ≈ 0)
|
|
- LLMs: Positive correlation - flexible LLMs score higher on originality
|
|
|
|
Research question: Does our attribute+expert pipeline break this LLM pattern?
|
|
- If C4 (Full Pipeline) shows high novelty but moderate flexibility → breaks pattern
|
|
- If correlation is near zero → human-like creative behavior
|
|
|
|
Args:
|
|
novelty_scores: Mean novelty score per condition
|
|
flexibility_scores: Combined jump count (or flexibility score) per condition
|
|
|
|
Returns:
|
|
- pearson_r: Correlation coefficient
|
|
- interpretation: What the correlation means
|
|
- per_condition: Novelty and flexibility values per condition
|
|
"""
|
|
conditions = list(novelty_scores.keys())
|
|
novelties = [novelty_scores[c] for c in conditions if c in flexibility_scores]
|
|
flexibilities = [flexibility_scores[c] for c in conditions if c in flexibility_scores]
|
|
valid_conditions = [c for c in conditions if c in flexibility_scores]
|
|
|
|
if len(novelties) < 3:
|
|
return {
|
|
"pearson_r": None,
|
|
"interpretation": "Insufficient data (need at least 3 conditions)",
|
|
"conditions": valid_conditions,
|
|
"novelty_values": novelties,
|
|
"flexibility_values": flexibilities
|
|
}
|
|
|
|
# Check for zero variance
|
|
if np.std(novelties) == 0 or np.std(flexibilities) == 0:
|
|
return {
|
|
"pearson_r": 0.0,
|
|
"interpretation": "Zero variance in one variable",
|
|
"conditions": valid_conditions,
|
|
"novelty_values": novelties,
|
|
"flexibility_values": flexibilities
|
|
}
|
|
|
|
correlation = np.corrcoef(novelties, flexibilities)[0, 1]
|
|
|
|
# Interpret the correlation
|
|
if correlation > 0.3:
|
|
interpretation = "Positive correlation (typical LLM pattern)"
|
|
elif correlation < -0.3:
|
|
interpretation = "Negative correlation (atypical - high novelty with low flexibility)"
|
|
else:
|
|
interpretation = "No significant correlation (human-like pattern)"
|
|
|
|
return {
|
|
"pearson_r": float(correlation),
|
|
"interpretation": interpretation,
|
|
"conditions": valid_conditions,
|
|
"novelty_values": novelties,
|
|
"flexibility_values": flexibilities,
|
|
"per_condition": {c: {"novelty": novelties[i], "flexibility": flexibilities[i]}
|
|
for i, c in enumerate(valid_conditions)}
|
|
}
|
|
|
|
|
|
def plot_cumulative_jump_profiles(
|
|
profiles_by_condition: dict[str, list[int]],
|
|
output_path: Path
|
|
):
|
|
"""
|
|
Plot cumulative jump profiles for each condition.
|
|
|
|
Shows exploration patterns over generation sequence - steep slopes indicate
|
|
rapid category switching, flat regions indicate persistent exploration.
|
|
|
|
Args:
|
|
profiles_by_condition: Cumulative jump counts per condition
|
|
output_path: Directory to save the plot
|
|
"""
|
|
if not HAS_MATPLOTLIB:
|
|
print(" Skipping cumulative jump plot (matplotlib not available)")
|
|
return
|
|
|
|
plt.figure(figsize=(12, 6))
|
|
|
|
# Color scheme for conditions
|
|
colors = plt.cm.tab10(np.linspace(0, 1, len(profiles_by_condition)))
|
|
|
|
for (condition, profile), color in zip(profiles_by_condition.items(), colors):
|
|
if profile: # Only plot if there's data
|
|
x = range(1, len(profile) + 1)
|
|
plt.plot(x, profile, label=condition, linewidth=2, color=color, marker='o',
|
|
markersize=3, alpha=0.8)
|
|
|
|
plt.xlabel('Response Position', fontsize=12)
|
|
plt.ylabel('Cumulative Jumps', fontsize=12)
|
|
plt.title('Exploration Patterns by Condition\n(Cumulative Category Jumps)', fontsize=14)
|
|
plt.legend(bbox_to_anchor=(1.05, 1), loc='upper left')
|
|
plt.grid(True, alpha=0.3)
|
|
plt.tight_layout()
|
|
plt.savefig(output_path / 'cumulative_jump_profiles.png', dpi=150, bbox_inches='tight')
|
|
plt.close()
|
|
|
|
print(f" Saved cumulative jump profiles to {output_path / 'cumulative_jump_profiles.png'}")
|
|
|
|
|
|
async def analyze_flexibility_by_condition(
|
|
ideas_by_condition: dict[str, list[str]],
|
|
embeddings_by_condition: dict[str, np.ndarray] | None,
|
|
query: str = "bicycle",
|
|
output_dir: Path | None = None
|
|
) -> dict[str, Any]:
|
|
"""
|
|
Analyze AUT flexibility for each condition using both LLM and embedding methods.
|
|
|
|
Enhanced with arXiv:2405.00899 metrics:
|
|
- Combined jump signal (jumpcat ∧ jumpSS)
|
|
- Flexibility profile classification (Persistent/Flexible/Mixed)
|
|
- Cumulative jump profiles for visualization
|
|
|
|
Returns flexibility scores, category distributions, jump signals, and profiles.
|
|
"""
|
|
results = {
|
|
"llm_flexibility": {},
|
|
"embedding_flexibility": {},
|
|
"jump_analysis": {},
|
|
"flexibility_profiles": {},
|
|
"cumulative_jump_profiles": {},
|
|
"method_correlation": {}
|
|
}
|
|
|
|
# LLM-based flexibility analysis
|
|
print("\nComputing LLM-based flexibility scores...")
|
|
for condition, ideas in ideas_by_condition.items():
|
|
print(f" {condition}...")
|
|
llm_result = await compute_flexibility_llm(ideas, query)
|
|
results["llm_flexibility"][condition] = llm_result
|
|
|
|
# Embedding-based flexibility analysis
|
|
if embeddings_by_condition is not None:
|
|
print("\nComputing embedding-based flexibility scores...")
|
|
for condition, embeddings in embeddings_by_condition.items():
|
|
ideas = ideas_by_condition[condition]
|
|
emb_result = compute_flexibility_embedding(embeddings, ideas)
|
|
results["embedding_flexibility"][condition] = emb_result
|
|
|
|
# Jump signal analysis (enhanced with combined jump)
|
|
if "cluster_assignments" in emb_result:
|
|
jump_result = compute_jump_signal(
|
|
emb_result["cluster_assignments"],
|
|
embeddings
|
|
)
|
|
results["jump_analysis"][condition] = jump_result
|
|
|
|
# Classify flexibility profile
|
|
profile = classify_flexibility_profile(
|
|
jump_result["combined_jump_count"],
|
|
len(ideas)
|
|
)
|
|
results["flexibility_profiles"][condition] = profile
|
|
|
|
# Compute cumulative jump profile
|
|
cumulative = compute_cumulative_jump_profile(
|
|
jump_result["combined_jump_positions"],
|
|
len(ideas)
|
|
)
|
|
results["cumulative_jump_profiles"][condition] = cumulative
|
|
|
|
# Generate cumulative jump profile visualization
|
|
if output_dir is not None and results["cumulative_jump_profiles"]:
|
|
print("\nGenerating cumulative jump profile visualization...")
|
|
plot_cumulative_jump_profiles(results["cumulative_jump_profiles"], output_dir)
|
|
|
|
# Calculate correlation between methods (if both available)
|
|
llm_scores = []
|
|
emb_scores = []
|
|
conditions_order = []
|
|
|
|
for condition in ideas_by_condition.keys():
|
|
if condition in results["llm_flexibility"] and condition in results["embedding_flexibility"]:
|
|
llm_flex = results["llm_flexibility"][condition].get("flexibility_score")
|
|
emb_flex = results["embedding_flexibility"][condition].get("flexibility_score")
|
|
if llm_flex is not None and emb_flex is not None:
|
|
llm_scores.append(llm_flex)
|
|
emb_scores.append(emb_flex)
|
|
conditions_order.append(condition)
|
|
|
|
if len(llm_scores) >= 3:
|
|
# Pearson correlation
|
|
if np.std(llm_scores) > 0 and np.std(emb_scores) > 0:
|
|
correlation = np.corrcoef(llm_scores, emb_scores)[0, 1]
|
|
results["method_correlation"] = {
|
|
"pearson_r": float(correlation),
|
|
"llm_scores": dict(zip(conditions_order, llm_scores)),
|
|
"embedding_scores": dict(zip(conditions_order, emb_scores))
|
|
}
|
|
|
|
return results
|
|
|
|
|
|
# ============================================================================
|
|
# Main Analysis
|
|
# ============================================================================
|
|
|
|
async def run_analysis(
|
|
experiment_file: Path,
|
|
output_dir: Path,
|
|
skip_viz: bool = False,
|
|
skip_embeddings: bool = False
|
|
):
|
|
"""Run all analyses on an experiment file."""
|
|
|
|
print("=" * 60)
|
|
print("ADVANCED AUTOMATIC ANALYSIS")
|
|
print("=" * 60)
|
|
|
|
# Load experiment data
|
|
print(f"\nLoading: {experiment_file.name}")
|
|
with open(experiment_file, 'r', encoding='utf-8') as f:
|
|
data = json.load(f)
|
|
|
|
experiment_id = data.get('experiment_id', 'unknown')
|
|
print(f"Experiment ID: {experiment_id}")
|
|
|
|
# Extract ideas by condition
|
|
ideas_by_condition: dict[str, list[str]] = defaultdict(list)
|
|
idea_texts: list[str] = []
|
|
idea_conditions: list[str] = []
|
|
|
|
for result in data.get('results', []):
|
|
for condition_name, condition_data in result.get('conditions', {}).items():
|
|
dedup = condition_data.get('dedup', {})
|
|
unique_ideas = dedup.get('unique_ideas', [])
|
|
for idea in unique_ideas:
|
|
ideas_by_condition[condition_name].append(idea)
|
|
idea_texts.append(idea)
|
|
idea_conditions.append(condition_name)
|
|
|
|
total_ideas = len(idea_texts)
|
|
print(f"Total ideas: {total_ideas}")
|
|
print(f"Conditions: {list(ideas_by_condition.keys())}")
|
|
|
|
results = {
|
|
'experiment_id': experiment_id,
|
|
'analysis_timestamp': datetime.now(timezone.utc).isoformat(),
|
|
'total_ideas': total_ideas,
|
|
'conditions': list(ideas_by_condition.keys())
|
|
}
|
|
|
|
# 1. Lexical Diversity
|
|
print("\n" + "-" * 40)
|
|
print("1. LEXICAL DIVERSITY ANALYSIS")
|
|
print("-" * 40)
|
|
|
|
lexical_results = analyze_lexical_diversity_by_condition(ideas_by_condition)
|
|
results['lexical_diversity'] = lexical_results
|
|
|
|
for condition, metrics in lexical_results.items():
|
|
print(f"\n{condition}:")
|
|
print(f" Overall TTR: {metrics['overall']['type_token_ratio']:.3f}")
|
|
print(f" Vocabulary size: {metrics['overall']['vocabulary_size']}")
|
|
print(f" Avg words/idea: {metrics['per_idea_mean']['total_words']:.1f}")
|
|
|
|
# 2. Concept Extraction
|
|
print("\n" + "-" * 40)
|
|
print("2. CONCEPT EXTRACTION")
|
|
print("-" * 40)
|
|
|
|
concept_results = analyze_concepts_by_condition(ideas_by_condition)
|
|
results['concept_extraction'] = concept_results
|
|
|
|
for condition, metrics in concept_results.items():
|
|
print(f"\n{condition}:")
|
|
print(f" Unique keywords: {metrics['unique_keywords']}")
|
|
print(f" Domain coverage: {metrics['domain_coverage']} domains")
|
|
print(f" Top keywords: {[k for k, _ in metrics['top_keywords'][:5]]}")
|
|
|
|
# 3-5. Embedding-based Analysis
|
|
if not skip_embeddings:
|
|
print("\n" + "-" * 40)
|
|
print("3-5. EMBEDDING-BASED ANALYSIS")
|
|
print("-" * 40)
|
|
|
|
# Try to get embeddings
|
|
print("Getting embeddings from Ollama...")
|
|
embeddings = await get_embeddings_from_ollama(idea_texts)
|
|
|
|
if embeddings is not None:
|
|
# Organize embeddings by condition
|
|
embeddings_by_condition: dict[str, np.ndarray] = defaultdict(list)
|
|
for emb, condition in zip(embeddings, idea_conditions):
|
|
embeddings_by_condition[condition].append(emb)
|
|
|
|
for condition in embeddings_by_condition:
|
|
embeddings_by_condition[condition] = np.array(embeddings_by_condition[condition])
|
|
|
|
embedding_results = analyze_embeddings(
|
|
ideas_by_condition,
|
|
embeddings_by_condition,
|
|
output_dir,
|
|
skip_viz=skip_viz
|
|
)
|
|
|
|
results['novelty_scores'] = embedding_results['novelty_scores']
|
|
results['cross_condition_overlap'] = embedding_results['cross_condition_overlap']
|
|
results['centroid_distances'] = embedding_results['centroid_distances']
|
|
|
|
# Print novelty scores
|
|
print("\nNovelty Scores (distance from global centroid):")
|
|
for condition, scores in embedding_results['novelty_scores'].items():
|
|
print(f" {condition}: mean={scores['mean']:.4f}, std={scores['std']:.4f}")
|
|
|
|
# Print overlap
|
|
print("\nCross-condition Cohesion (% nearest neighbors from same condition):")
|
|
for condition, overlap in embedding_results['cross_condition_overlap'].items():
|
|
print(f" {condition}: {overlap['cohesion_ratio']:.1%}")
|
|
|
|
# Print centroid distances
|
|
print("\nCentroid Distances (lower = more similar):")
|
|
for pair, dist in sorted(embedding_results['centroid_distances'].items()):
|
|
print(f" {pair}: {dist:.4f}")
|
|
# 6. AUT Flexibility Analysis (Enhanced with arXiv:2405.00899 metrics)
|
|
print("\n" + "-" * 40)
|
|
print("6. AUT FLEXIBILITY ANALYSIS (arXiv:2405.00899)")
|
|
print("-" * 40)
|
|
|
|
# Extract query from experiment data
|
|
query = "bicycle" # Default
|
|
if data.get('results') and len(data['results']) > 0:
|
|
first_result = data['results'][0]
|
|
if 'query' in first_result:
|
|
query = first_result['query']
|
|
|
|
print(f"Query object: {query}")
|
|
|
|
flexibility_results = await analyze_flexibility_by_condition(
|
|
ideas_by_condition,
|
|
embeddings_by_condition,
|
|
query,
|
|
output_dir=output_dir if not skip_viz else None
|
|
)
|
|
|
|
results['flexibility_analysis'] = flexibility_results
|
|
|
|
# Print flexibility scores
|
|
print("\nLLM-based Flexibility Scores (semantic categories):")
|
|
for condition, flex_data in flexibility_results['llm_flexibility'].items():
|
|
if 'flexibility_score' in flex_data:
|
|
print(f" {condition}: {flex_data['flexibility_score']} categories")
|
|
if 'category_distribution' in flex_data:
|
|
top_cats = sorted(flex_data['category_distribution'].items(),
|
|
key=lambda x: x[1], reverse=True)[:3]
|
|
print(f" Top categories: {[c[0] for c in top_cats]}")
|
|
|
|
print("\nEmbedding-based Flexibility Scores (hierarchical clustering):")
|
|
for condition, flex_data in flexibility_results['embedding_flexibility'].items():
|
|
if 'flexibility_score' in flex_data:
|
|
print(f" {condition}: {flex_data['flexibility_score']} clusters")
|
|
|
|
# Enhanced Jump Signal Analysis (Combined Jump from paper)
|
|
print("\nCombined Jump Signal Analysis (jumpcat ∧ jumpSS):")
|
|
print(" Condition | Cat-Only | Sem-Only | Combined | Profile")
|
|
print(" " + "-" * 60)
|
|
for condition, jump_data in flexibility_results['jump_analysis'].items():
|
|
profile = flexibility_results.get('flexibility_profiles', {}).get(condition, "N/A")
|
|
cat_jumps = jump_data.get('category_jump_count', 0)
|
|
sem_jumps = jump_data.get('semantic_jump_count', 0)
|
|
combined = jump_data.get('combined_jump_count', 0)
|
|
print(f" {condition:16} | {cat_jumps:8} | {sem_jumps:8} | {combined:8} | {profile}")
|
|
|
|
# Print flexibility profiles summary
|
|
print("\nFlexibility Profiles (based on combined jump ratio):")
|
|
for condition, profile in flexibility_results.get('flexibility_profiles', {}).items():
|
|
jump_data = flexibility_results['jump_analysis'].get(condition, {})
|
|
ratio = jump_data.get('combined_jump_ratio', 0)
|
|
print(f" {condition}: {profile} (ratio={ratio:.2%})")
|
|
|
|
# 7. Originality-Flexibility Correlation Analysis
|
|
print("\n" + "-" * 40)
|
|
print("7. ORIGINALITY-FLEXIBILITY CORRELATION")
|
|
print("-" * 40)
|
|
|
|
# Extract novelty means and flexibility scores for correlation
|
|
novelty_means = {c: scores['mean'] for c, scores in embedding_results['novelty_scores'].items()}
|
|
flexibility_jumps = {c: jump_data.get('combined_jump_count', 0)
|
|
for c, jump_data in flexibility_results['jump_analysis'].items()}
|
|
|
|
correlation_result = analyze_originality_flexibility_correlation(
|
|
novelty_means,
|
|
flexibility_jumps
|
|
)
|
|
results['originality_flexibility_correlation'] = correlation_result
|
|
|
|
print(f"\nPearson r: {correlation_result.get('pearson_r', 'N/A')}")
|
|
print(f"Interpretation: {correlation_result.get('interpretation', 'N/A')}")
|
|
|
|
if correlation_result.get('per_condition'):
|
|
print("\nPer-Condition Values:")
|
|
for condition, vals in correlation_result['per_condition'].items():
|
|
print(f" {condition}: Novelty={vals['novelty']:.4f}, Flexibility={vals['flexibility']}")
|
|
|
|
# Print method correlation
|
|
if flexibility_results.get('method_correlation', {}).get('pearson_r') is not None:
|
|
print(f"\nLLM vs Embedding Flexibility Correlation: r={flexibility_results['method_correlation']['pearson_r']:.3f}")
|
|
|
|
else:
|
|
print("Could not get embeddings. Skipping embedding-based analysis.")
|
|
print("Make sure Ollama is running with the embedding model.")
|
|
|
|
# Save results
|
|
output_file = output_dir / f"aut_flexibility_{experiment_id}.json"
|
|
with open(output_file, 'w', encoding='utf-8') as f:
|
|
# Convert numpy types to Python types for JSON serialization
|
|
def convert(obj):
|
|
if isinstance(obj, np.ndarray):
|
|
return obj.tolist()
|
|
if isinstance(obj, (np.int64, np.int32)):
|
|
return int(obj)
|
|
if isinstance(obj, (np.float64, np.float32)):
|
|
return float(obj)
|
|
return obj
|
|
|
|
json.dump(results, f, ensure_ascii=False, indent=2, default=convert)
|
|
|
|
print("\n" + "=" * 60)
|
|
print(f"Results saved to: {output_file}")
|
|
if not skip_viz and HAS_MATPLOTLIB:
|
|
print(f"Visualizations saved to: {output_dir}")
|
|
print("=" * 60)
|
|
|
|
return results
|
|
|
|
|
|
def list_experiment_files() -> list[Path]:
|
|
"""List available deduped experiment files."""
|
|
return sorted(RESULTS_DIR.glob('*_deduped.json'), key=lambda p: p.stat().st_mtime, reverse=True)
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
description='Run advanced automatic analysis on experiment results.',
|
|
formatter_class=argparse.RawDescriptionHelpFormatter
|
|
)
|
|
parser.add_argument(
|
|
'experiment_file',
|
|
nargs='?',
|
|
default=None,
|
|
help='Experiment file name'
|
|
)
|
|
parser.add_argument(
|
|
'--list', '-l',
|
|
action='store_true',
|
|
help='List available experiment files'
|
|
)
|
|
parser.add_argument(
|
|
'--skip-viz',
|
|
action='store_true',
|
|
help='Skip visualization generation'
|
|
)
|
|
parser.add_argument(
|
|
'--skip-embeddings',
|
|
action='store_true',
|
|
help='Skip embedding-based analysis (faster)'
|
|
)
|
|
args = parser.parse_args()
|
|
|
|
available_files = list_experiment_files()
|
|
|
|
if args.list:
|
|
print("Available experiment files:")
|
|
for f in available_files:
|
|
print(f" {f.name}")
|
|
return
|
|
|
|
# Determine which file to use
|
|
if args.experiment_file:
|
|
experiment_file = RESULTS_DIR / args.experiment_file
|
|
if not experiment_file.exists():
|
|
experiment_file = RESULTS_DIR / f"{args.experiment_file}.json"
|
|
else:
|
|
if not available_files:
|
|
print("Error: No deduped experiment files found.")
|
|
return
|
|
experiment_file = available_files[0]
|
|
print(f"Using latest: {experiment_file.name}")
|
|
|
|
if not experiment_file.exists():
|
|
print(f"Error: File not found: {experiment_file}")
|
|
return
|
|
|
|
# Run analysis
|
|
asyncio.run(run_analysis(
|
|
experiment_file,
|
|
RESULTS_DIR,
|
|
skip_viz=args.skip_viz,
|
|
skip_embeddings=args.skip_embeddings
|
|
))
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|