Files
novelty-seeking/experiments/aut_flexibility_analysis.py
gbanyan 43c025e060 feat: Add experiments framework and novelty-driven agent loop
- Add complete experiments directory with pilot study infrastructure
  - 5 experimental conditions (direct, expert-only, attribute-only, full-pipeline, random-perspective)
  - Human assessment tool with React frontend and FastAPI backend
  - AUT flexibility analysis with jump signal detection
  - Result visualization and metrics computation

- Add novelty-driven agent loop module (experiments/novelty_loop/)
  - NoveltyDrivenTaskAgent with expert perspective perturbation
  - Three termination strategies: breakthrough, exhaust, coverage
  - Interactive CLI demo with colored output
  - Embedding-based novelty scoring

- Add DDC knowledge domain classification data (en/zh)
- Add CLAUDE.md project documentation
- Update research report with experiment findings

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-20 10:16:21 +08:00

1343 lines
50 KiB
Python
Executable File

#!/usr/bin/env python3
"""
AUT Flexibility Analysis for Creative Ideas
Implements creativity evaluation metrics based on the Alternative Uses Task (AUT) framework:
1. Lexical Diversity - Type-token ratio, vocabulary richness
2. Concept Extraction - Key concepts and domain coverage
3. Embedding Visualization - t-SNE/PCA scatter plots by condition
4. Novelty Scores - Distance from global centroid (semantic novelty)
5. Cross-condition Cohesion - Nearest neighbor overlap analysis
6. AUT Flexibility Analysis - Category-based divergent thinking metrics
- LLM-based flexibility: Two-phase category generation (Hadas & Hershkovitz 2024)
- Embedding-based flexibility: Hierarchical clustering (arXiv:2405.00899)
- Jump signal: Category switch ratio in sequential generation
References:
- Hadas & Hershkovitz (2024). "Using LLMs to Evaluate AUT Flexibility Score"
- arXiv:2405.00899 - "Characterising Creative Process in Humans and LLMs"
- Torrance (1974). Torrance Tests of Creative Thinking
Usage:
python aut_flexibility_analysis.py # Analyze latest experiment
python aut_flexibility_analysis.py experiment_xxx_deduped.json # Specific file
python aut_flexibility_analysis.py --skip-viz # Skip visualization (faster)
"""
import argparse
import asyncio
import json
import re
import math
from collections import Counter, defaultdict
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
import numpy as np
# Optional imports with fallbacks
try:
from sklearn.manifold import TSNE
from sklearn.decomposition import PCA
HAS_SKLEARN = True
except ImportError:
HAS_SKLEARN = False
print("Warning: sklearn not available, visualization will be limited")
try:
from scipy.cluster.hierarchy import linkage, fcluster
from scipy.spatial.distance import pdist, squareform
HAS_SCIPY = True
except ImportError:
HAS_SCIPY = False
print("Warning: scipy not available, hierarchical clustering will be limited")
try:
import matplotlib.pyplot as plt
import matplotlib
matplotlib.use('Agg') # Non-interactive backend
HAS_MATPLOTLIB = True
except ImportError:
HAS_MATPLOTLIB = False
print("Warning: matplotlib not available, no plots will be generated")
try:
import httpx
HAS_HTTPX = True
except ImportError:
HAS_HTTPX = False
print("Warning: httpx not available, will use cached embeddings only")
# ============================================================================
# Configuration
# ============================================================================
RESULTS_DIR = Path(__file__).parent / 'results'
OLLAMA_BASE_URL = "http://localhost:11435"
EMBEDDING_MODEL = "qwen3-embedding:4b"
LLM_MODEL = "qwen3:8b" # Model for flexibility category generation
# ============================================================================
# 1. Lexical Diversity Analysis
# ============================================================================
def tokenize(text: str) -> list[str]:
"""Simple word tokenization."""
# Lowercase and extract words
words = re.findall(r'\b[a-zA-Z]+\b', text.lower())
return words
def calculate_lexical_diversity(text: str) -> dict[str, Any]:
"""
Calculate lexical diversity metrics for a text.
Returns:
- type_token_ratio: unique words / total words
- vocabulary_size: number of unique words
- total_words: total word count
- avg_word_length: average word length
- hapax_ratio: words appearing only once / total unique words
"""
words = tokenize(text)
if not words:
return {
'type_token_ratio': 0,
'vocabulary_size': 0,
'total_words': 0,
'avg_word_length': 0,
'hapax_ratio': 0
}
word_counts = Counter(words)
unique_words = set(words)
hapax = sum(1 for w, c in word_counts.items() if c == 1)
return {
'type_token_ratio': len(unique_words) / len(words),
'vocabulary_size': len(unique_words),
'total_words': len(words),
'avg_word_length': sum(len(w) for w in words) / len(words),
'hapax_ratio': hapax / len(unique_words) if unique_words else 0
}
def analyze_lexical_diversity_by_condition(ideas_by_condition: dict[str, list[str]]) -> dict[str, Any]:
"""Analyze lexical diversity for each condition."""
results = {}
for condition, ideas in ideas_by_condition.items():
# Concatenate all ideas for overall metrics
all_text = ' '.join(ideas)
overall = calculate_lexical_diversity(all_text)
# Per-idea metrics
per_idea_metrics = [calculate_lexical_diversity(idea) for idea in ideas]
results[condition] = {
'overall': overall,
'per_idea_mean': {
'type_token_ratio': np.mean([m['type_token_ratio'] for m in per_idea_metrics]),
'vocabulary_size': np.mean([m['vocabulary_size'] for m in per_idea_metrics]),
'total_words': np.mean([m['total_words'] for m in per_idea_metrics]),
'avg_word_length': np.mean([m['avg_word_length'] for m in per_idea_metrics]),
},
'idea_count': len(ideas)
}
return results
# ============================================================================
# 2. Concept Extraction
# ============================================================================
# Common English stopwords
STOPWORDS = {
'a', 'an', 'the', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for', 'of',
'with', 'by', 'from', 'as', 'is', 'was', 'are', 'were', 'been', 'be', 'have',
'has', 'had', 'do', 'does', 'did', 'will', 'would', 'could', 'should', 'may',
'might', 'must', 'shall', 'can', 'need', 'dare', 'ought', 'used', 'that',
'which', 'who', 'whom', 'this', 'these', 'those', 'it', 'its', 'they', 'them',
'their', 'we', 'us', 'our', 'you', 'your', 'i', 'me', 'my', 'he', 'him', 'his',
'she', 'her', 'not', 'no', 'nor', 'so', 'than', 'too', 'very', 'just', 'also',
'only', 'own', 'same', 'into', 'over', 'such', 'through', 'during', 'before',
'after', 'above', 'below', 'between', 'under', 'again', 'further', 'then',
'once', 'here', 'there', 'when', 'where', 'why', 'how', 'all', 'each', 'few',
'more', 'most', 'other', 'some', 'any', 'both', 'being', 'about', 'against',
'while', 'using', 'based', 'allows', 'features', 'includes', 'provides'
}
# Domain keywords for classification
DOMAIN_KEYWORDS = {
'technology': {'smart', 'digital', 'ai', 'sensor', 'app', 'software', 'algorithm',
'wireless', 'bluetooth', 'iot', 'data', 'automated', 'electronic'},
'sustainability': {'eco', 'green', 'sustainable', 'renewable', 'solar', 'recycled',
'biodegradable', 'energy', 'environmental', 'carbon', 'organic'},
'health': {'health', 'medical', 'therapy', 'wellness', 'ergonomic', 'posture',
'fitness', 'therapeutic', 'rehabilitation', 'mental', 'physical'},
'social': {'community', 'social', 'sharing', 'collaborative', 'inclusive',
'accessible', 'elderly', 'children', 'family', 'public'},
'design': {'modular', 'customizable', 'aesthetic', 'minimalist', 'portable',
'foldable', 'compact', 'lightweight', 'adjustable', 'convertible'},
'materials': {'material', 'fabric', 'wood', 'metal', 'plastic', 'carbon',
'fiber', 'composite', 'bamboo', 'leather', 'textile'}
}
def extract_concepts(text: str) -> dict[str, Any]:
"""
Extract key concepts from text.
Returns:
- keywords: list of significant words (non-stopwords)
- bigrams: common two-word phrases
- domains: detected domain categories
"""
words = tokenize(text)
# Filter stopwords and short words
keywords = [w for w in words if w not in STOPWORDS and len(w) > 2]
# Extract bigrams
bigrams = []
for i in range(len(words) - 1):
if words[i] not in STOPWORDS and words[i+1] not in STOPWORDS:
bigrams.append(f"{words[i]} {words[i+1]}")
# Detect domains
text_lower = text.lower()
detected_domains = []
for domain, domain_words in DOMAIN_KEYWORDS.items():
if any(kw in text_lower for kw in domain_words):
detected_domains.append(domain)
return {
'keywords': keywords,
'bigrams': bigrams,
'domains': detected_domains
}
def analyze_concepts_by_condition(ideas_by_condition: dict[str, list[str]]) -> dict[str, Any]:
"""Analyze concept extraction for each condition."""
results = {}
for condition, ideas in ideas_by_condition.items():
all_keywords = []
all_bigrams = []
domain_counts = Counter()
for idea in ideas:
concepts = extract_concepts(idea)
all_keywords.extend(concepts['keywords'])
all_bigrams.extend(concepts['bigrams'])
for domain in concepts['domains']:
domain_counts[domain] += 1
keyword_counts = Counter(all_keywords)
bigram_counts = Counter(all_bigrams)
results[condition] = {
'unique_keywords': len(set(all_keywords)),
'total_keywords': len(all_keywords),
'top_keywords': keyword_counts.most_common(20),
'top_bigrams': bigram_counts.most_common(10),
'domain_distribution': dict(domain_counts),
'domain_coverage': len(domain_counts),
'idea_count': len(ideas)
}
return results
# ============================================================================
# 3. Embedding-based Analysis (Visualization, Novelty, Overlap)
# ============================================================================
async def get_embeddings_from_ollama(texts: list[str], batch_size: int = 50) -> list[list[float]] | None:
"""Get embeddings from Ollama API."""
if not HAS_HTTPX:
return None
embeddings = []
async with httpx.AsyncClient(timeout=120.0) as client:
for i in range(0, len(texts), batch_size):
batch = texts[i:i+batch_size]
try:
response = await client.post(
f"{OLLAMA_BASE_URL}/api/embed",
json={"model": EMBEDDING_MODEL, "input": batch}
)
response.raise_for_status()
result = response.json()
embeddings.extend(result["embeddings"])
print(f" Embedded {len(embeddings)}/{len(texts)} ideas...")
except Exception as e:
print(f" Embedding error: {e}")
return None
return embeddings
def load_cached_embeddings(experiment_id: str) -> dict[str, list[float]] | None:
"""Try to load embeddings from metrics file."""
metrics_file = RESULTS_DIR / f"experiment_{experiment_id}_metrics.json"
if not metrics_file.exists():
return None
# The metrics file doesn't store raw embeddings, so we can't load them
return None
def compute_centroid(embeddings: np.ndarray) -> np.ndarray:
"""Compute centroid of embeddings."""
return np.mean(embeddings, axis=0)
def cosine_distance(a: np.ndarray, b: np.ndarray) -> float:
"""Compute cosine distance between two vectors."""
dot = np.dot(a, b)
norm_a = np.linalg.norm(a)
norm_b = np.linalg.norm(b)
if norm_a == 0 or norm_b == 0:
return 1.0
return 1 - dot / (norm_a * norm_b)
def analyze_embeddings(
ideas_by_condition: dict[str, list[str]],
embeddings_by_condition: dict[str, np.ndarray],
output_dir: Path,
skip_viz: bool = False
) -> dict[str, Any]:
"""
Analyze embeddings for visualization, novelty, and overlap.
"""
results = {
'novelty_scores': {},
'cross_condition_overlap': {},
'centroid_distances': {}
}
# Compute centroids for each condition
centroids = {}
for condition, embeddings in embeddings_by_condition.items():
centroids[condition] = compute_centroid(embeddings)
# Global centroid (all ideas)
all_embeddings = np.vstack(list(embeddings_by_condition.values()))
global_centroid = compute_centroid(all_embeddings)
# 4. Perplexity-based Novelty (approximated as distance from global centroid)
print("Computing novelty scores...")
for condition, embeddings in embeddings_by_condition.items():
distances = [cosine_distance(emb, global_centroid) for emb in embeddings]
results['novelty_scores'][condition] = {
'mean': float(np.mean(distances)),
'std': float(np.std(distances)),
'min': float(np.min(distances)),
'max': float(np.max(distances))
}
# 5. Cross-condition Overlap
print("Computing cross-condition overlap...")
conditions = list(embeddings_by_condition.keys())
# Centroid distances between conditions
for i, c1 in enumerate(conditions):
for c2 in conditions[i+1:]:
dist = cosine_distance(centroids[c1], centroids[c2])
results['centroid_distances'][f"{c1}_vs_{c2}"] = float(dist)
# Overlap analysis: for each idea, find if nearest neighbor is same or different condition
print("Computing nearest neighbor overlap...")
overlap_stats = defaultdict(lambda: {'same_condition': 0, 'diff_condition': 0})
# Build flat arrays with condition labels
all_emb_list = []
all_labels = []
for condition, embeddings in embeddings_by_condition.items():
for emb in embeddings:
all_emb_list.append(emb)
all_labels.append(condition)
all_emb_array = np.array(all_emb_list)
for i, (emb, label) in enumerate(zip(all_emb_array, all_labels)):
# Find nearest neighbor (excluding self)
distances = np.array([cosine_distance(emb, other) for other in all_emb_array])
distances[i] = float('inf') # Exclude self
nearest_idx = np.argmin(distances)
nearest_label = all_labels[nearest_idx]
if nearest_label == label:
overlap_stats[label]['same_condition'] += 1
else:
overlap_stats[label]['diff_condition'] += 1
for condition in conditions:
total = overlap_stats[condition]['same_condition'] + overlap_stats[condition]['diff_condition']
results['cross_condition_overlap'][condition] = {
'same_condition_nn': overlap_stats[condition]['same_condition'],
'diff_condition_nn': overlap_stats[condition]['diff_condition'],
'cohesion_ratio': overlap_stats[condition]['same_condition'] / total if total > 0 else 0
}
# 3. Embedding Visualization
if not skip_viz and HAS_SKLEARN and HAS_MATPLOTLIB:
print("Generating visualizations...")
generate_visualizations(embeddings_by_condition, output_dir)
return results
def generate_visualizations(
embeddings_by_condition: dict[str, np.ndarray],
output_dir: Path
):
"""Generate t-SNE and PCA visualizations."""
# Prepare data
all_embeddings = []
all_labels = []
for condition, embeddings in embeddings_by_condition.items():
all_embeddings.extend(embeddings)
all_labels.extend([condition] * len(embeddings))
all_embeddings = np.array(all_embeddings)
# Color map for conditions
conditions = list(embeddings_by_condition.keys())
colors = plt.cm.tab10(np.linspace(0, 1, len(conditions)))
color_map = {c: colors[i] for i, c in enumerate(conditions)}
point_colors = [color_map[label] for label in all_labels]
# PCA visualization
print(" Running PCA...")
pca = PCA(n_components=2, random_state=42)
pca_result = pca.fit_transform(all_embeddings)
plt.figure(figsize=(12, 8))
for condition in conditions:
mask = [l == condition for l in all_labels]
plt.scatter(
pca_result[mask, 0],
pca_result[mask, 1],
c=[color_map[condition]],
label=condition,
alpha=0.6,
s=30
)
plt.xlabel(f'PC1 ({pca.explained_variance_ratio_[0]:.1%} variance)')
plt.ylabel(f'PC2 ({pca.explained_variance_ratio_[1]:.1%} variance)')
plt.title('Ideas by Condition (PCA)')
plt.legend(bbox_to_anchor=(1.05, 1), loc='upper left')
plt.tight_layout()
plt.savefig(output_dir / 'embedding_pca.png', dpi=150)
plt.close()
# t-SNE visualization
print(" Running t-SNE...")
tsne = TSNE(n_components=2, random_state=42, perplexity=min(30, len(all_embeddings)-1))
tsne_result = tsne.fit_transform(all_embeddings)
plt.figure(figsize=(12, 8))
for condition in conditions:
mask = [l == condition for l in all_labels]
plt.scatter(
tsne_result[mask, 0],
tsne_result[mask, 1],
c=[color_map[condition]],
label=condition,
alpha=0.6,
s=30
)
plt.xlabel('t-SNE 1')
plt.ylabel('t-SNE 2')
plt.title('Ideas by Condition (t-SNE)')
plt.legend(bbox_to_anchor=(1.05, 1), loc='upper left')
plt.tight_layout()
plt.savefig(output_dir / 'embedding_tsne.png', dpi=150)
plt.close()
print(f" Saved visualizations to {output_dir}")
# ============================================================================
# 6. AUT Flexibility Analysis (Category-based Divergent Thinking)
# ============================================================================
async def call_llm(prompt: str, model: str = LLM_MODEL) -> str | None:
"""Call Ollama LLM for text generation."""
if not HAS_HTTPX:
return None
async with httpx.AsyncClient(timeout=120.0) as client:
try:
response = await client.post(
f"{OLLAMA_BASE_URL}/api/generate",
json={
"model": model,
"prompt": prompt,
"stream": False,
"options": {"temperature": 0.3} # Lower temperature for consistency
}
)
response.raise_for_status()
result = response.json()
return result.get("response", "")
except Exception as e:
print(f" LLM call error: {e}")
return None
async def compute_flexibility_llm(
ideas: list[str],
query: str = "bicycle"
) -> dict[str, Any]:
"""
Compute flexibility score using LLM-based category generation.
Two-phase approach (Hadas & Hershkovitz 2024):
1. Generate semantic categories from all ideas
2. Classify each idea into a category
3. Flexibility = number of unique categories used
Returns:
- categories: list of generated categories
- assignments: mapping of idea index to category
- flexibility_score: count of unique categories
"""
# Phase 1: Generate categories
ideas_text = "\n".join(f"{i+1}. {idea}" for i, idea in enumerate(ideas))
prompt1 = f"""/no_think
You are analyzing creative ideas for alternative uses of a {query}.
Examine these ideas and determine the distinct SEMANTIC CATEGORIES they fall into.
Categories should represent fundamentally different ways of thinking about using the object.
Ideas:
{ideas_text}
Output ONLY a JSON array of category names (5-15 categories typically).
Example: ["Transportation", "Art/Decoration", "Tool/Equipment", "Recreation", "Storage"]
JSON array:"""
response1 = await call_llm(prompt1)
if not response1:
return {"error": "LLM call failed for category generation"}
# Parse categories from response
try:
# Try to extract JSON array from response
match = re.search(r'\[.*?\]', response1, re.DOTALL)
if match:
categories = json.loads(match.group())
else:
# Fallback: split by newlines or commas
categories = [c.strip().strip('"\'') for c in response1.split('\n') if c.strip()]
categories = [c for c in categories if c and not c.startswith('[')]
except json.JSONDecodeError:
categories = [c.strip().strip('"\'') for c in response1.split(',') if c.strip()]
if not categories:
return {"error": "Failed to parse categories", "raw_response": response1}
# Phase 2: Classify each idea
categories_text = ", ".join(f'"{c}"' for c in categories)
prompt2 = f"""/no_think
Classify each idea into exactly ONE of these categories: [{categories_text}]
Ideas:
{ideas_text}
Output a JSON object mapping idea number (as string) to category name.
Example: {{"1": "Transportation", "2": "Art/Decoration", "3": "Tool/Equipment"}}
JSON object:"""
response2 = await call_llm(prompt2)
if not response2:
return {"error": "LLM call failed for classification", "categories": categories}
# Parse assignments
try:
match = re.search(r'\{.*?\}', response2, re.DOTALL)
if match:
assignments = json.loads(match.group())
else:
assignments = {}
except json.JSONDecodeError:
assignments = {}
# Calculate flexibility
used_categories = set(assignments.values())
flexibility_score = len(used_categories)
# Category distribution
category_counts = Counter(assignments.values())
return {
"categories": categories,
"assignments": assignments,
"flexibility_score": flexibility_score,
"category_distribution": dict(category_counts),
"total_ideas_classified": len(assignments)
}
def compute_flexibility_embedding(
embeddings: np.ndarray,
ideas: list[str],
distance_threshold: float = 0.5
) -> dict[str, Any]:
"""
Compute flexibility score using embedding-based hierarchical clustering.
Method from arXiv:2405.00899:
1. Encode ideas as embeddings
2. Hierarchical clustering with average linkage
3. Cut tree at distance threshold (higher threshold = fewer clusters)
Args:
embeddings: numpy array of shape (n_ideas, embedding_dim)
ideas: list of idea texts for reference
distance_threshold: cosine distance threshold for cutting dendrogram
(0.5 = cut when similarity drops below 0.5)
Returns:
- cluster_assignments: list of cluster IDs
- flexibility_score: number of clusters
- cluster_sizes: distribution of cluster sizes
- mean_pairwise_similarity: average similarity within condition
"""
if not HAS_SCIPY:
return {"error": "scipy not available for hierarchical clustering"}
n_ideas = len(embeddings)
if n_ideas < 2:
return {
"cluster_assignments": [0] * n_ideas,
"flexibility_score": 1,
"cluster_sizes": {0: n_ideas},
"mean_pairwise_similarity": 1.0
}
# Normalize embeddings for cosine similarity
norms = np.linalg.norm(embeddings, axis=1, keepdims=True)
norms[norms == 0] = 1 # Avoid division by zero
normalized = embeddings / norms
# Compute pairwise cosine distances
distances = pdist(normalized, metric='cosine')
# Calculate mean pairwise similarity for reporting
mean_pairwise_sim = 1 - np.mean(distances)
# Hierarchical clustering with average linkage (better for varying density)
Z = linkage(distances, method='average')
# Cut at distance threshold
# This creates clusters where items within cluster have distance < threshold
clusters = fcluster(Z, distance_threshold, criterion='distance')
n_clusters = len(set(clusters))
cluster_sizes = Counter(clusters)
# Convert numpy keys to Python ints for JSON serialization
cluster_sizes_dict = {int(k): int(v) for k, v in cluster_sizes.items()}
# Calculate mean intra-cluster similarity
total_sim = 0
total_pairs = 0
for c in set(clusters):
mask = clusters == c
cluster_points = normalized[mask]
if len(cluster_points) > 1:
for i in range(len(cluster_points)):
for j in range(i + 1, len(cluster_points)):
sim = np.dot(cluster_points[i], cluster_points[j])
total_sim += sim
total_pairs += 1
mean_intra_sim = total_sim / total_pairs if total_pairs > 0 else None
return {
"cluster_assignments": [int(c) for c in clusters],
"flexibility_score": int(n_clusters),
"cluster_sizes": cluster_sizes_dict,
"mean_pairwise_similarity": float(mean_pairwise_sim),
"mean_intra_cluster_similarity": float(mean_intra_sim) if mean_intra_sim else None
}
def compute_jump_signal(
cluster_assignments: list[int],
embeddings: np.ndarray | None = None,
similarity_threshold: float = 0.7
) -> dict[str, Any]:
"""
Compute jump signal - measures category switches in sequential idea generation.
Enhanced method from arXiv:2405.00899:
- Combined jump signal: jump = jumpcat ∧ jumpSS (logical AND)
- A "true" jump requires BOTH category change AND semantic dissimilarity
This reduces false positives where switching words within same concept space
would incorrectly count as a jump.
Args:
cluster_assignments: list of cluster IDs for each idea (in generation order)
embeddings: optional, for computing semantic-similarity-based jumps
similarity_threshold: threshold for semantic similarity jump detection (default 0.7)
Returns:
- category_jump_count: number of category switches (jumpcat)
- semantic_jump_count: number of semantic dissimilarity jumps (jumpSS)
- combined_jump_count: jumps where BOTH conditions are true
- combined_jump_ratio: proportion of combined jumps (paper metric)
- jump_positions: indices where combined jumps occur
"""
if len(cluster_assignments) < 2:
return {
"category_jump_count": 0,
"semantic_jump_count": 0,
"combined_jump_count": 0,
"combined_jump_ratio": 0.0,
"category_jump_positions": [],
"semantic_jump_positions": [],
"combined_jump_positions": [],
"total_transitions": 0,
# Legacy fields for backward compatibility
"jump_count": 0,
"jump_ratio": 0.0,
"jump_positions": []
}
category_jumps = []
semantic_jumps = []
combined_jumps = []
for i in range(1, len(cluster_assignments)):
# Category-based jump (jumpcat)
is_category_jump = cluster_assignments[i] != cluster_assignments[i-1]
if is_category_jump:
category_jumps.append(i)
# Semantic similarity-based jump (jumpSS)
is_semantic_jump = False
if embeddings is not None:
sim = np.dot(embeddings[i], embeddings[i-1]) / (
np.linalg.norm(embeddings[i]) * np.linalg.norm(embeddings[i-1]) + 1e-10
)
is_semantic_jump = sim < similarity_threshold
if is_semantic_jump:
semantic_jumps.append(i)
# Combined jump: both must be true (paper method)
if is_category_jump and (is_semantic_jump if embeddings is not None else True):
combined_jumps.append(i)
total_transitions = len(cluster_assignments) - 1
result = {
"category_jump_count": len(category_jumps),
"semantic_jump_count": len(semantic_jumps) if embeddings is not None else 0,
"combined_jump_count": len(combined_jumps),
"combined_jump_ratio": len(combined_jumps) / total_transitions if total_transitions > 0 else 0.0,
"category_jump_ratio": len(category_jumps) / total_transitions if total_transitions > 0 else 0.0,
"semantic_jump_ratio": len(semantic_jumps) / total_transitions if total_transitions > 0 and embeddings is not None else 0.0,
"category_jump_positions": category_jumps,
"semantic_jump_positions": semantic_jumps if embeddings is not None else [],
"combined_jump_positions": combined_jumps,
"total_transitions": total_transitions,
# Legacy fields for backward compatibility
"jump_count": len(combined_jumps), # Now uses combined
"jump_ratio": len(combined_jumps) / total_transitions if total_transitions > 0 else 0.0,
"jump_positions": combined_jumps
}
return result
def classify_flexibility_profile(jump_count: int, idea_count: int) -> str:
"""
Classify creativity style into Persistent/Flexible/Mixed based on jump count.
Based on arXiv:2405.00899 findings:
- Persistent: Deep exploration within categories (low jump ratio)
- Flexible: Broad exploration across categories (high jump ratio)
- Mixed: Intermediate pattern
Paper thresholds normalized to response count:
- Persistent: jump_ratio < 0.30
- Flexible: jump_ratio > 0.45
- Mixed: 0.30 <= jump_ratio <= 0.45
Args:
jump_count: Number of category jumps
idea_count: Total number of ideas
Returns:
Profile name: "Persistent", "Flexible", "Mixed", or "Undefined"
"""
if idea_count <= 1:
return "Undefined"
jump_ratio = jump_count / (idea_count - 1)
if jump_ratio < 0.30:
return "Persistent"
elif jump_ratio > 0.45:
return "Flexible"
else:
return "Mixed"
def compute_cumulative_jump_profile(
jump_positions: list[int],
total_ideas: int
) -> list[int]:
"""
Compute cumulative jump count at each response position.
This visualization shows exploration patterns over the generation sequence,
revealing whether participants explore steadily or in bursts.
Args:
jump_positions: Indices where jumps occurred (1-indexed)
total_ideas: Total number of ideas generated
Returns:
List where index i = cumulative jumps after response i
"""
if total_ideas <= 0:
return []
cumulative = [0] * total_ideas
current_jumps = 0
for i in range(total_ideas):
if (i + 1) in jump_positions: # Positions are 1-indexed
current_jumps += 1
cumulative[i] = current_jumps
return cumulative
def analyze_originality_flexibility_correlation(
novelty_scores: dict[str, float],
flexibility_scores: dict[str, int]
) -> dict[str, Any]:
"""
Analyze correlation between novelty (originality) and flexibility across conditions.
Paper finding from arXiv:2405.00899:
- Humans: No correlation between flexibility and originality (r ≈ 0)
- LLMs: Positive correlation - flexible LLMs score higher on originality
Research question: Does our attribute+expert pipeline break this LLM pattern?
- If C4 (Full Pipeline) shows high novelty but moderate flexibility → breaks pattern
- If correlation is near zero → human-like creative behavior
Args:
novelty_scores: Mean novelty score per condition
flexibility_scores: Combined jump count (or flexibility score) per condition
Returns:
- pearson_r: Correlation coefficient
- interpretation: What the correlation means
- per_condition: Novelty and flexibility values per condition
"""
conditions = list(novelty_scores.keys())
novelties = [novelty_scores[c] for c in conditions if c in flexibility_scores]
flexibilities = [flexibility_scores[c] for c in conditions if c in flexibility_scores]
valid_conditions = [c for c in conditions if c in flexibility_scores]
if len(novelties) < 3:
return {
"pearson_r": None,
"interpretation": "Insufficient data (need at least 3 conditions)",
"conditions": valid_conditions,
"novelty_values": novelties,
"flexibility_values": flexibilities
}
# Check for zero variance
if np.std(novelties) == 0 or np.std(flexibilities) == 0:
return {
"pearson_r": 0.0,
"interpretation": "Zero variance in one variable",
"conditions": valid_conditions,
"novelty_values": novelties,
"flexibility_values": flexibilities
}
correlation = np.corrcoef(novelties, flexibilities)[0, 1]
# Interpret the correlation
if correlation > 0.3:
interpretation = "Positive correlation (typical LLM pattern)"
elif correlation < -0.3:
interpretation = "Negative correlation (atypical - high novelty with low flexibility)"
else:
interpretation = "No significant correlation (human-like pattern)"
return {
"pearson_r": float(correlation),
"interpretation": interpretation,
"conditions": valid_conditions,
"novelty_values": novelties,
"flexibility_values": flexibilities,
"per_condition": {c: {"novelty": novelties[i], "flexibility": flexibilities[i]}
for i, c in enumerate(valid_conditions)}
}
def plot_cumulative_jump_profiles(
profiles_by_condition: dict[str, list[int]],
output_path: Path
):
"""
Plot cumulative jump profiles for each condition.
Shows exploration patterns over generation sequence - steep slopes indicate
rapid category switching, flat regions indicate persistent exploration.
Args:
profiles_by_condition: Cumulative jump counts per condition
output_path: Directory to save the plot
"""
if not HAS_MATPLOTLIB:
print(" Skipping cumulative jump plot (matplotlib not available)")
return
plt.figure(figsize=(12, 6))
# Color scheme for conditions
colors = plt.cm.tab10(np.linspace(0, 1, len(profiles_by_condition)))
for (condition, profile), color in zip(profiles_by_condition.items(), colors):
if profile: # Only plot if there's data
x = range(1, len(profile) + 1)
plt.plot(x, profile, label=condition, linewidth=2, color=color, marker='o',
markersize=3, alpha=0.8)
plt.xlabel('Response Position', fontsize=12)
plt.ylabel('Cumulative Jumps', fontsize=12)
plt.title('Exploration Patterns by Condition\n(Cumulative Category Jumps)', fontsize=14)
plt.legend(bbox_to_anchor=(1.05, 1), loc='upper left')
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.savefig(output_path / 'cumulative_jump_profiles.png', dpi=150, bbox_inches='tight')
plt.close()
print(f" Saved cumulative jump profiles to {output_path / 'cumulative_jump_profiles.png'}")
async def analyze_flexibility_by_condition(
ideas_by_condition: dict[str, list[str]],
embeddings_by_condition: dict[str, np.ndarray] | None,
query: str = "bicycle",
output_dir: Path | None = None
) -> dict[str, Any]:
"""
Analyze AUT flexibility for each condition using both LLM and embedding methods.
Enhanced with arXiv:2405.00899 metrics:
- Combined jump signal (jumpcat ∧ jumpSS)
- Flexibility profile classification (Persistent/Flexible/Mixed)
- Cumulative jump profiles for visualization
Returns flexibility scores, category distributions, jump signals, and profiles.
"""
results = {
"llm_flexibility": {},
"embedding_flexibility": {},
"jump_analysis": {},
"flexibility_profiles": {},
"cumulative_jump_profiles": {},
"method_correlation": {}
}
# LLM-based flexibility analysis
print("\nComputing LLM-based flexibility scores...")
for condition, ideas in ideas_by_condition.items():
print(f" {condition}...")
llm_result = await compute_flexibility_llm(ideas, query)
results["llm_flexibility"][condition] = llm_result
# Embedding-based flexibility analysis
if embeddings_by_condition is not None:
print("\nComputing embedding-based flexibility scores...")
for condition, embeddings in embeddings_by_condition.items():
ideas = ideas_by_condition[condition]
emb_result = compute_flexibility_embedding(embeddings, ideas)
results["embedding_flexibility"][condition] = emb_result
# Jump signal analysis (enhanced with combined jump)
if "cluster_assignments" in emb_result:
jump_result = compute_jump_signal(
emb_result["cluster_assignments"],
embeddings
)
results["jump_analysis"][condition] = jump_result
# Classify flexibility profile
profile = classify_flexibility_profile(
jump_result["combined_jump_count"],
len(ideas)
)
results["flexibility_profiles"][condition] = profile
# Compute cumulative jump profile
cumulative = compute_cumulative_jump_profile(
jump_result["combined_jump_positions"],
len(ideas)
)
results["cumulative_jump_profiles"][condition] = cumulative
# Generate cumulative jump profile visualization
if output_dir is not None and results["cumulative_jump_profiles"]:
print("\nGenerating cumulative jump profile visualization...")
plot_cumulative_jump_profiles(results["cumulative_jump_profiles"], output_dir)
# Calculate correlation between methods (if both available)
llm_scores = []
emb_scores = []
conditions_order = []
for condition in ideas_by_condition.keys():
if condition in results["llm_flexibility"] and condition in results["embedding_flexibility"]:
llm_flex = results["llm_flexibility"][condition].get("flexibility_score")
emb_flex = results["embedding_flexibility"][condition].get("flexibility_score")
if llm_flex is not None and emb_flex is not None:
llm_scores.append(llm_flex)
emb_scores.append(emb_flex)
conditions_order.append(condition)
if len(llm_scores) >= 3:
# Pearson correlation
if np.std(llm_scores) > 0 and np.std(emb_scores) > 0:
correlation = np.corrcoef(llm_scores, emb_scores)[0, 1]
results["method_correlation"] = {
"pearson_r": float(correlation),
"llm_scores": dict(zip(conditions_order, llm_scores)),
"embedding_scores": dict(zip(conditions_order, emb_scores))
}
return results
# ============================================================================
# Main Analysis
# ============================================================================
async def run_analysis(
experiment_file: Path,
output_dir: Path,
skip_viz: bool = False,
skip_embeddings: bool = False
):
"""Run all analyses on an experiment file."""
print("=" * 60)
print("ADVANCED AUTOMATIC ANALYSIS")
print("=" * 60)
# Load experiment data
print(f"\nLoading: {experiment_file.name}")
with open(experiment_file, 'r', encoding='utf-8') as f:
data = json.load(f)
experiment_id = data.get('experiment_id', 'unknown')
print(f"Experiment ID: {experiment_id}")
# Extract ideas by condition
ideas_by_condition: dict[str, list[str]] = defaultdict(list)
idea_texts: list[str] = []
idea_conditions: list[str] = []
for result in data.get('results', []):
for condition_name, condition_data in result.get('conditions', {}).items():
dedup = condition_data.get('dedup', {})
unique_ideas = dedup.get('unique_ideas', [])
for idea in unique_ideas:
ideas_by_condition[condition_name].append(idea)
idea_texts.append(idea)
idea_conditions.append(condition_name)
total_ideas = len(idea_texts)
print(f"Total ideas: {total_ideas}")
print(f"Conditions: {list(ideas_by_condition.keys())}")
results = {
'experiment_id': experiment_id,
'analysis_timestamp': datetime.now(timezone.utc).isoformat(),
'total_ideas': total_ideas,
'conditions': list(ideas_by_condition.keys())
}
# 1. Lexical Diversity
print("\n" + "-" * 40)
print("1. LEXICAL DIVERSITY ANALYSIS")
print("-" * 40)
lexical_results = analyze_lexical_diversity_by_condition(ideas_by_condition)
results['lexical_diversity'] = lexical_results
for condition, metrics in lexical_results.items():
print(f"\n{condition}:")
print(f" Overall TTR: {metrics['overall']['type_token_ratio']:.3f}")
print(f" Vocabulary size: {metrics['overall']['vocabulary_size']}")
print(f" Avg words/idea: {metrics['per_idea_mean']['total_words']:.1f}")
# 2. Concept Extraction
print("\n" + "-" * 40)
print("2. CONCEPT EXTRACTION")
print("-" * 40)
concept_results = analyze_concepts_by_condition(ideas_by_condition)
results['concept_extraction'] = concept_results
for condition, metrics in concept_results.items():
print(f"\n{condition}:")
print(f" Unique keywords: {metrics['unique_keywords']}")
print(f" Domain coverage: {metrics['domain_coverage']} domains")
print(f" Top keywords: {[k for k, _ in metrics['top_keywords'][:5]]}")
# 3-5. Embedding-based Analysis
if not skip_embeddings:
print("\n" + "-" * 40)
print("3-5. EMBEDDING-BASED ANALYSIS")
print("-" * 40)
# Try to get embeddings
print("Getting embeddings from Ollama...")
embeddings = await get_embeddings_from_ollama(idea_texts)
if embeddings is not None:
# Organize embeddings by condition
embeddings_by_condition: dict[str, np.ndarray] = defaultdict(list)
for emb, condition in zip(embeddings, idea_conditions):
embeddings_by_condition[condition].append(emb)
for condition in embeddings_by_condition:
embeddings_by_condition[condition] = np.array(embeddings_by_condition[condition])
embedding_results = analyze_embeddings(
ideas_by_condition,
embeddings_by_condition,
output_dir,
skip_viz=skip_viz
)
results['novelty_scores'] = embedding_results['novelty_scores']
results['cross_condition_overlap'] = embedding_results['cross_condition_overlap']
results['centroid_distances'] = embedding_results['centroid_distances']
# Print novelty scores
print("\nNovelty Scores (distance from global centroid):")
for condition, scores in embedding_results['novelty_scores'].items():
print(f" {condition}: mean={scores['mean']:.4f}, std={scores['std']:.4f}")
# Print overlap
print("\nCross-condition Cohesion (% nearest neighbors from same condition):")
for condition, overlap in embedding_results['cross_condition_overlap'].items():
print(f" {condition}: {overlap['cohesion_ratio']:.1%}")
# Print centroid distances
print("\nCentroid Distances (lower = more similar):")
for pair, dist in sorted(embedding_results['centroid_distances'].items()):
print(f" {pair}: {dist:.4f}")
# 6. AUT Flexibility Analysis (Enhanced with arXiv:2405.00899 metrics)
print("\n" + "-" * 40)
print("6. AUT FLEXIBILITY ANALYSIS (arXiv:2405.00899)")
print("-" * 40)
# Extract query from experiment data
query = "bicycle" # Default
if data.get('results') and len(data['results']) > 0:
first_result = data['results'][0]
if 'query' in first_result:
query = first_result['query']
print(f"Query object: {query}")
flexibility_results = await analyze_flexibility_by_condition(
ideas_by_condition,
embeddings_by_condition,
query,
output_dir=output_dir if not skip_viz else None
)
results['flexibility_analysis'] = flexibility_results
# Print flexibility scores
print("\nLLM-based Flexibility Scores (semantic categories):")
for condition, flex_data in flexibility_results['llm_flexibility'].items():
if 'flexibility_score' in flex_data:
print(f" {condition}: {flex_data['flexibility_score']} categories")
if 'category_distribution' in flex_data:
top_cats = sorted(flex_data['category_distribution'].items(),
key=lambda x: x[1], reverse=True)[:3]
print(f" Top categories: {[c[0] for c in top_cats]}")
print("\nEmbedding-based Flexibility Scores (hierarchical clustering):")
for condition, flex_data in flexibility_results['embedding_flexibility'].items():
if 'flexibility_score' in flex_data:
print(f" {condition}: {flex_data['flexibility_score']} clusters")
# Enhanced Jump Signal Analysis (Combined Jump from paper)
print("\nCombined Jump Signal Analysis (jumpcat ∧ jumpSS):")
print(" Condition | Cat-Only | Sem-Only | Combined | Profile")
print(" " + "-" * 60)
for condition, jump_data in flexibility_results['jump_analysis'].items():
profile = flexibility_results.get('flexibility_profiles', {}).get(condition, "N/A")
cat_jumps = jump_data.get('category_jump_count', 0)
sem_jumps = jump_data.get('semantic_jump_count', 0)
combined = jump_data.get('combined_jump_count', 0)
print(f" {condition:16} | {cat_jumps:8} | {sem_jumps:8} | {combined:8} | {profile}")
# Print flexibility profiles summary
print("\nFlexibility Profiles (based on combined jump ratio):")
for condition, profile in flexibility_results.get('flexibility_profiles', {}).items():
jump_data = flexibility_results['jump_analysis'].get(condition, {})
ratio = jump_data.get('combined_jump_ratio', 0)
print(f" {condition}: {profile} (ratio={ratio:.2%})")
# 7. Originality-Flexibility Correlation Analysis
print("\n" + "-" * 40)
print("7. ORIGINALITY-FLEXIBILITY CORRELATION")
print("-" * 40)
# Extract novelty means and flexibility scores for correlation
novelty_means = {c: scores['mean'] for c, scores in embedding_results['novelty_scores'].items()}
flexibility_jumps = {c: jump_data.get('combined_jump_count', 0)
for c, jump_data in flexibility_results['jump_analysis'].items()}
correlation_result = analyze_originality_flexibility_correlation(
novelty_means,
flexibility_jumps
)
results['originality_flexibility_correlation'] = correlation_result
print(f"\nPearson r: {correlation_result.get('pearson_r', 'N/A')}")
print(f"Interpretation: {correlation_result.get('interpretation', 'N/A')}")
if correlation_result.get('per_condition'):
print("\nPer-Condition Values:")
for condition, vals in correlation_result['per_condition'].items():
print(f" {condition}: Novelty={vals['novelty']:.4f}, Flexibility={vals['flexibility']}")
# Print method correlation
if flexibility_results.get('method_correlation', {}).get('pearson_r') is not None:
print(f"\nLLM vs Embedding Flexibility Correlation: r={flexibility_results['method_correlation']['pearson_r']:.3f}")
else:
print("Could not get embeddings. Skipping embedding-based analysis.")
print("Make sure Ollama is running with the embedding model.")
# Save results
output_file = output_dir / f"aut_flexibility_{experiment_id}.json"
with open(output_file, 'w', encoding='utf-8') as f:
# Convert numpy types to Python types for JSON serialization
def convert(obj):
if isinstance(obj, np.ndarray):
return obj.tolist()
if isinstance(obj, (np.int64, np.int32)):
return int(obj)
if isinstance(obj, (np.float64, np.float32)):
return float(obj)
return obj
json.dump(results, f, ensure_ascii=False, indent=2, default=convert)
print("\n" + "=" * 60)
print(f"Results saved to: {output_file}")
if not skip_viz and HAS_MATPLOTLIB:
print(f"Visualizations saved to: {output_dir}")
print("=" * 60)
return results
def list_experiment_files() -> list[Path]:
"""List available deduped experiment files."""
return sorted(RESULTS_DIR.glob('*_deduped.json'), key=lambda p: p.stat().st_mtime, reverse=True)
def main():
parser = argparse.ArgumentParser(
description='Run advanced automatic analysis on experiment results.',
formatter_class=argparse.RawDescriptionHelpFormatter
)
parser.add_argument(
'experiment_file',
nargs='?',
default=None,
help='Experiment file name'
)
parser.add_argument(
'--list', '-l',
action='store_true',
help='List available experiment files'
)
parser.add_argument(
'--skip-viz',
action='store_true',
help='Skip visualization generation'
)
parser.add_argument(
'--skip-embeddings',
action='store_true',
help='Skip embedding-based analysis (faster)'
)
args = parser.parse_args()
available_files = list_experiment_files()
if args.list:
print("Available experiment files:")
for f in available_files:
print(f" {f.name}")
return
# Determine which file to use
if args.experiment_file:
experiment_file = RESULTS_DIR / args.experiment_file
if not experiment_file.exists():
experiment_file = RESULTS_DIR / f"{args.experiment_file}.json"
else:
if not available_files:
print("Error: No deduped experiment files found.")
return
experiment_file = available_files[0]
print(f"Using latest: {experiment_file.name}")
if not experiment_file.exists():
print(f"Error: File not found: {experiment_file}")
return
# Run analysis
asyncio.run(run_analysis(
experiment_file,
RESULTS_DIR,
skip_viz=args.skip_viz,
skip_embeddings=args.skip_embeddings
))
if __name__ == '__main__':
main()