- Add complete experiments directory with pilot study infrastructure - 5 experimental conditions (direct, expert-only, attribute-only, full-pipeline, random-perspective) - Human assessment tool with React frontend and FastAPI backend - AUT flexibility analysis with jump signal detection - Result visualization and metrics computation - Add novelty-driven agent loop module (experiments/novelty_loop/) - NoveltyDrivenTaskAgent with expert perspective perturbation - Three termination strategies: breakthrough, exhaust, coverage - Interactive CLI demo with colored output - Embedding-based novelty scoring - Add DDC knowledge domain classification data (en/zh) - Add CLAUDE.md project documentation - Update research report with experiment findings Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
376 lines
12 KiB
Python
Executable File
376 lines
12 KiB
Python
Executable File
#!/usr/bin/env python3
|
||
"""
|
||
Prepare assessment data from experiment results.
|
||
|
||
Extracts unique ideas from deduped experiment results, assigns stable IDs,
|
||
and randomizes the order within each query for unbiased human assessment.
|
||
|
||
Usage:
|
||
python prepare_data.py # Use latest, all ideas
|
||
python prepare_data.py --sample 100 # Sample 100 ideas total
|
||
python prepare_data.py --per-query 10 # 10 ideas per query
|
||
python prepare_data.py --per-condition 5 # 5 ideas per condition per query
|
||
python prepare_data.py --list # List available files
|
||
"""
|
||
|
||
import argparse
|
||
import json
|
||
import random
|
||
from pathlib import Path
|
||
from typing import Any
|
||
|
||
|
||
def load_experiment_data(filepath: Path) -> dict[str, Any]:
|
||
"""Load experiment data from JSON file."""
|
||
with open(filepath, 'r', encoding='utf-8') as f:
|
||
return json.load(f)
|
||
|
||
|
||
def sample_ideas_stratified(
|
||
ideas: list[dict[str, Any]],
|
||
per_condition: int | None = None,
|
||
total_limit: int | None = None,
|
||
rng: random.Random | None = None
|
||
) -> list[dict[str, Any]]:
|
||
"""
|
||
Sample ideas with stratification by condition.
|
||
|
||
Args:
|
||
ideas: List of ideas with _hidden.condition metadata
|
||
per_condition: Max ideas per condition (stratified sampling)
|
||
total_limit: Max total ideas (after stratified sampling)
|
||
rng: Random number generator for reproducibility
|
||
|
||
Returns:
|
||
Sampled list of ideas
|
||
"""
|
||
if rng is None:
|
||
rng = random.Random()
|
||
|
||
if per_condition is None and total_limit is None:
|
||
return ideas
|
||
|
||
# Group by condition
|
||
by_condition: dict[str, list[dict[str, Any]]] = {}
|
||
for idea in ideas:
|
||
condition = idea['_hidden']['condition']
|
||
if condition not in by_condition:
|
||
by_condition[condition] = []
|
||
by_condition[condition].append(idea)
|
||
|
||
# Sample per condition
|
||
sampled = []
|
||
for condition, cond_ideas in by_condition.items():
|
||
rng.shuffle(cond_ideas)
|
||
if per_condition is not None:
|
||
cond_ideas = cond_ideas[:per_condition]
|
||
sampled.extend(cond_ideas)
|
||
|
||
# Apply total limit if specified
|
||
if total_limit is not None and len(sampled) > total_limit:
|
||
rng.shuffle(sampled)
|
||
sampled = sampled[:total_limit]
|
||
|
||
return sampled
|
||
|
||
|
||
def extract_ideas_from_condition(
|
||
query_id: str,
|
||
condition_name: str,
|
||
condition_data: dict[str, Any],
|
||
idea_counter: dict[str, int]
|
||
) -> list[dict[str, Any]]:
|
||
"""Extract ideas from a single condition with hidden metadata."""
|
||
ideas = []
|
||
|
||
dedup_data = condition_data.get('dedup', {})
|
||
unique_ideas_with_source = dedup_data.get('unique_ideas_with_source', [])
|
||
|
||
for item in unique_ideas_with_source:
|
||
idea_text = item.get('idea', '')
|
||
if not idea_text:
|
||
continue
|
||
|
||
# Generate stable idea ID
|
||
current_count = idea_counter.get(query_id, 0)
|
||
idea_id = f"{query_id}_I{current_count:03d}"
|
||
idea_counter[query_id] = current_count + 1
|
||
|
||
ideas.append({
|
||
'idea_id': idea_id,
|
||
'text': idea_text,
|
||
'_hidden': {
|
||
'condition': condition_name,
|
||
'expert_name': item.get('expert_name', ''),
|
||
'keyword': item.get('keyword', '')
|
||
}
|
||
})
|
||
|
||
return ideas
|
||
|
||
|
||
def prepare_assessment_data(
|
||
experiment_filepath: Path,
|
||
output_filepath: Path,
|
||
seed: int = 42,
|
||
sample_total: int | None = None,
|
||
per_query: int | None = None,
|
||
per_condition: int | None = None
|
||
) -> dict[str, Any]:
|
||
"""
|
||
Prepare assessment data from experiment results.
|
||
|
||
Args:
|
||
experiment_filepath: Path to deduped experiment JSON
|
||
output_filepath: Path to write assessment items JSON
|
||
seed: Random seed for reproducible shuffling
|
||
sample_total: Total number of ideas to sample (across all queries)
|
||
per_query: Maximum ideas per query
|
||
per_condition: Maximum ideas per condition per query (stratified)
|
||
|
||
Returns:
|
||
Assessment data structure
|
||
"""
|
||
rng = random.Random(seed)
|
||
|
||
# Load experiment data
|
||
data = load_experiment_data(experiment_filepath)
|
||
experiment_id = data.get('experiment_id', 'unknown')
|
||
conditions = data.get('conditions', [])
|
||
results = data.get('results', [])
|
||
|
||
print(f"Loading experiment: {experiment_id}")
|
||
print(f"Conditions: {conditions}")
|
||
print(f"Number of queries: {len(results)}")
|
||
|
||
# Show sampling config
|
||
if sample_total or per_query or per_condition:
|
||
print(f"Sampling config: total={sample_total}, per_query={per_query}, per_condition={per_condition}")
|
||
|
||
assessment_queries = []
|
||
total_ideas = 0
|
||
idea_counter: dict[str, int] = {}
|
||
|
||
for result in results:
|
||
query_id = result.get('query_id', '')
|
||
query_text = result.get('query', '')
|
||
category = result.get('category', '')
|
||
|
||
query_ideas = []
|
||
|
||
# Extract ideas from all conditions
|
||
conditions_data = result.get('conditions', {})
|
||
for condition_name, condition_data in conditions_data.items():
|
||
ideas = extract_ideas_from_condition(
|
||
query_id, condition_name, condition_data, idea_counter
|
||
)
|
||
query_ideas.extend(ideas)
|
||
|
||
# Apply stratified sampling if per_condition is specified
|
||
if per_condition is not None:
|
||
query_ideas = sample_ideas_stratified(
|
||
query_ideas,
|
||
per_condition=per_condition,
|
||
rng=rng
|
||
)
|
||
|
||
# Apply per-query limit
|
||
if per_query is not None and len(query_ideas) > per_query:
|
||
rng.shuffle(query_ideas)
|
||
query_ideas = query_ideas[:per_query]
|
||
|
||
# Shuffle ideas within this query
|
||
rng.shuffle(query_ideas)
|
||
|
||
assessment_queries.append({
|
||
'query_id': query_id,
|
||
'query_text': query_text,
|
||
'category': category,
|
||
'ideas': query_ideas,
|
||
'idea_count': len(query_ideas)
|
||
})
|
||
|
||
total_ideas += len(query_ideas)
|
||
print(f" Query '{query_text}' ({query_id}): {len(query_ideas)} ideas")
|
||
|
||
# Apply total sample limit across all queries (proportionally)
|
||
if sample_total is not None and total_ideas > sample_total:
|
||
print(f"\nApplying total sample limit: {sample_total} (from {total_ideas})")
|
||
# Calculate proportion to keep
|
||
keep_ratio = sample_total / total_ideas
|
||
new_total = 0
|
||
|
||
for query in assessment_queries:
|
||
n_keep = max(1, int(len(query['ideas']) * keep_ratio))
|
||
rng.shuffle(query['ideas'])
|
||
query['ideas'] = query['ideas'][:n_keep]
|
||
query['idea_count'] = len(query['ideas'])
|
||
new_total += len(query['ideas'])
|
||
|
||
total_ideas = new_total
|
||
|
||
# Build output structure
|
||
assessment_data = {
|
||
'experiment_id': experiment_id,
|
||
'queries': assessment_queries,
|
||
'total_ideas': total_ideas,
|
||
'query_count': len(assessment_queries),
|
||
'conditions': conditions,
|
||
'randomization_seed': seed,
|
||
'sampling': {
|
||
'sample_total': sample_total,
|
||
'per_query': per_query,
|
||
'per_condition': per_condition
|
||
},
|
||
'metadata': {
|
||
'source_file': str(experiment_filepath.name),
|
||
'prepared_for': 'human_assessment'
|
||
}
|
||
}
|
||
|
||
# Write output
|
||
output_filepath.parent.mkdir(parents=True, exist_ok=True)
|
||
with open(output_filepath, 'w', encoding='utf-8') as f:
|
||
json.dump(assessment_data, f, ensure_ascii=False, indent=2)
|
||
|
||
print(f"\nTotal ideas for assessment: {total_ideas}")
|
||
print(f"Output written to: {output_filepath}")
|
||
|
||
return assessment_data
|
||
|
||
|
||
def list_experiment_files(results_dir: Path) -> list[Path]:
|
||
"""List available deduped experiment files."""
|
||
return sorted(results_dir.glob('*_deduped.json'), key=lambda p: p.stat().st_mtime, reverse=True)
|
||
|
||
|
||
def main():
|
||
"""Main entry point."""
|
||
parser = argparse.ArgumentParser(
|
||
description='Prepare assessment data from experiment results.',
|
||
formatter_class=argparse.RawDescriptionHelpFormatter,
|
||
epilog="""
|
||
Examples:
|
||
python prepare_data.py # Use latest, all ideas
|
||
python prepare_data.py --sample 100 # Sample 100 ideas total
|
||
python prepare_data.py --per-query 20 # Max 20 ideas per query
|
||
python prepare_data.py --per-condition 4 # 4 ideas per condition per query
|
||
python prepare_data.py --per-condition 4 --per-query 15 # Combined limits
|
||
python prepare_data.py --list # List available files
|
||
|
||
Recommended for human assessment:
|
||
# 5 conditions × 4 ideas × 10 queries = 200 ideas (balanced)
|
||
python prepare_data.py --per-condition 4
|
||
|
||
# Or limit total to ~150 ideas
|
||
python prepare_data.py --sample 150
|
||
"""
|
||
)
|
||
parser.add_argument(
|
||
'experiment_file',
|
||
nargs='?',
|
||
default=None,
|
||
help='Experiment file name (e.g., experiment_20260119_165650_deduped.json)'
|
||
)
|
||
parser.add_argument(
|
||
'--list', '-l',
|
||
action='store_true',
|
||
help='List available experiment files'
|
||
)
|
||
parser.add_argument(
|
||
'--sample',
|
||
type=int,
|
||
default=None,
|
||
metavar='N',
|
||
help='Total number of ideas to sample (proportionally across queries)'
|
||
)
|
||
parser.add_argument(
|
||
'--per-query',
|
||
type=int,
|
||
default=None,
|
||
metavar='N',
|
||
help='Maximum ideas per query'
|
||
)
|
||
parser.add_argument(
|
||
'--per-condition',
|
||
type=int,
|
||
default=None,
|
||
metavar='N',
|
||
help='Maximum ideas per condition per query (stratified sampling)'
|
||
)
|
||
parser.add_argument(
|
||
'--seed', '-s',
|
||
type=int,
|
||
default=42,
|
||
help='Random seed for shuffling (default: 42)'
|
||
)
|
||
args = parser.parse_args()
|
||
|
||
# Paths
|
||
base_dir = Path(__file__).parent.parent
|
||
results_dir = base_dir / 'results'
|
||
output_file = Path(__file__).parent / 'data' / 'assessment_items.json'
|
||
|
||
# List available files
|
||
available_files = list_experiment_files(results_dir)
|
||
|
||
if args.list:
|
||
print("Available experiment files (most recent first):")
|
||
for f in available_files:
|
||
size_kb = f.stat().st_size / 1024
|
||
print(f" {f.name} ({size_kb:.1f} KB)")
|
||
return
|
||
|
||
# Determine which file to use
|
||
if args.experiment_file:
|
||
experiment_file = results_dir / args.experiment_file
|
||
if not experiment_file.exists():
|
||
# Try without .json extension
|
||
experiment_file = results_dir / f"{args.experiment_file}.json"
|
||
else:
|
||
# Use the latest deduped file
|
||
if not available_files:
|
||
print("Error: No deduped experiment files found in results directory.")
|
||
return
|
||
experiment_file = available_files[0]
|
||
print(f"Using latest experiment file: {experiment_file.name}")
|
||
|
||
if not experiment_file.exists():
|
||
print(f"Error: Experiment file not found: {experiment_file}")
|
||
print("\nAvailable files:")
|
||
for f in available_files:
|
||
print(f" {f.name}")
|
||
return
|
||
|
||
prepare_assessment_data(
|
||
experiment_file,
|
||
output_file,
|
||
seed=args.seed,
|
||
sample_total=args.sample,
|
||
per_query=args.per_query,
|
||
per_condition=args.per_condition
|
||
)
|
||
|
||
# Verify output
|
||
with open(output_file, 'r') as f:
|
||
data = json.load(f)
|
||
|
||
print("\n--- Verification ---")
|
||
print(f"Queries: {data['query_count']}")
|
||
print(f"Total ideas: {data['total_ideas']}")
|
||
|
||
# Show distribution by condition (from hidden metadata)
|
||
condition_counts: dict[str, int] = {}
|
||
for query in data['queries']:
|
||
for idea in query['ideas']:
|
||
condition = idea['_hidden']['condition']
|
||
condition_counts[condition] = condition_counts.get(condition, 0) + 1
|
||
|
||
print("\nIdeas per condition:")
|
||
for condition, count in sorted(condition_counts.items()):
|
||
print(f" {condition}: {count}")
|
||
|
||
|
||
if __name__ == '__main__':
|
||
main()
|