Files
novelty-seeking/experiments/deduplication.py
gbanyan 43c025e060 feat: Add experiments framework and novelty-driven agent loop
- Add complete experiments directory with pilot study infrastructure
  - 5 experimental conditions (direct, expert-only, attribute-only, full-pipeline, random-perspective)
  - Human assessment tool with React frontend and FastAPI backend
  - AUT flexibility analysis with jump signal detection
  - Result visualization and metrics computation

- Add novelty-driven agent loop module (experiments/novelty_loop/)
  - NoveltyDrivenTaskAgent with expert perspective perturbation
  - Three termination strategies: breakthrough, exhaust, coverage
  - Interactive CLI demo with colored output
  - Embedding-based novelty scoring

- Add DDC knowledge domain classification data (en/zh)
- Add CLAUDE.md project documentation
- Update research report with experiment findings

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-20 10:16:21 +08:00

329 lines
10 KiB
Python

"""
Post-generation deduplication for experiment results.
Applies embedding-based deduplication uniformly to all conditions
to normalize idea counts and measure "dedup survival rate".
Usage:
python -m experiments.deduplication --input results/experiment_xxx.json
"""
import sys
import json
import argparse
import asyncio
import logging
from pathlib import Path
from typing import List, Dict, Any, Optional
from dataclasses import dataclass
# Add backend to path for imports
sys.path.insert(0, str(Path(__file__).parent.parent / "backend"))
from app.services.embedding_service import embedding_service
from app.models.schemas import ExpertTransformationDescription
from experiments.config import DEDUP_THRESHOLD, RESULTS_DIR
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
@dataclass
class DedupStats:
"""Deduplication statistics for a single condition."""
condition: str
pre_dedup_count: int
post_dedup_count: int
duplicates_removed: int
survival_rate: float
groups: List[Dict[str, Any]]
def ideas_to_descriptions(
ideas: List[str],
ideas_with_source: Optional[List[Dict[str, Any]]] = None
) -> List[ExpertTransformationDescription]:
"""
Convert experiment ideas to ExpertTransformationDescription format
for compatibility with the embedding service.
"""
descriptions = []
if ideas_with_source:
# Use source information if available
for i, item in enumerate(ideas_with_source):
desc = ExpertTransformationDescription(
keyword=item.get("keyword", item.get("attribute", item.get("perspective_word", ""))),
expert_id=f"source-{i}",
expert_name=item.get("expert_name", item.get("perspective_word", "direct")),
description=item.get("idea", "")
)
descriptions.append(desc)
else:
# Simple conversion for ideas without source
for i, idea in enumerate(ideas):
desc = ExpertTransformationDescription(
keyword="",
expert_id=f"idea-{i}",
expert_name="direct",
description=idea
)
descriptions.append(desc)
return descriptions
async def deduplicate_condition(
ideas: List[str],
ideas_with_source: Optional[List[Dict[str, Any]]] = None,
threshold: float = DEDUP_THRESHOLD
) -> Dict[str, Any]:
"""
Apply deduplication to ideas from a single condition.
Returns:
Dict with deduplicated ideas and statistics
"""
if not ideas:
return {
"unique_ideas": [],
"unique_ideas_with_source": [],
"groups": [],
"stats": {
"pre_dedup_count": 0,
"post_dedup_count": 0,
"duplicates_removed": 0,
"survival_rate": 1.0
}
}
# Convert to description format
descriptions = ideas_to_descriptions(ideas, ideas_with_source)
# Run deduplication
result = await embedding_service.deduplicate(
descriptions=descriptions,
threshold=threshold
)
# Extract unique ideas (representatives from each group)
unique_ideas = []
unique_ideas_with_source = []
groups_info = []
for group in result.groups:
rep = group.representative
unique_ideas.append(rep.description)
# Reconstruct source info
source_info = {
"idea": rep.description,
"keyword": rep.keyword,
"expert_name": rep.expert_name
}
unique_ideas_with_source.append(source_info)
# Group info for analysis
group_info = {
"representative": rep.description,
"duplicates": [d.description for d in group.duplicates],
"duplicate_count": len(group.duplicates),
"similarity_scores": group.similarity_scores
}
groups_info.append(group_info)
pre_count = len(ideas)
post_count = len(unique_ideas)
survival_rate = post_count / pre_count if pre_count > 0 else 1.0
return {
"unique_ideas": unique_ideas,
"unique_ideas_with_source": unique_ideas_with_source,
"groups": groups_info,
"stats": {
"pre_dedup_count": pre_count,
"post_dedup_count": post_count,
"duplicates_removed": pre_count - post_count,
"survival_rate": survival_rate
}
}
async def process_experiment_results(
input_file: Path,
output_file: Optional[Path] = None,
threshold: float = DEDUP_THRESHOLD
) -> Dict[str, Any]:
"""
Process an experiment results file and apply deduplication.
Args:
input_file: Path to experiment results JSON
output_file: Path for output (default: input_file with _deduped suffix)
threshold: Similarity threshold for deduplication
Returns:
Processed results with deduplication applied
"""
# Load experiment results
with open(input_file, "r", encoding="utf-8") as f:
experiment = json.load(f)
logger.info(f"Processing experiment: {experiment.get('experiment_id', 'unknown')}")
logger.info(f"Deduplication threshold: {threshold}")
# Process each query's conditions
dedup_summary = {
"threshold": threshold,
"conditions": {}
}
for query_result in experiment["results"]:
query = query_result["query"]
query_id = query_result["query_id"]
logger.info(f"\nProcessing query: {query} ({query_id})")
for condition, cond_result in query_result["conditions"].items():
if not cond_result.get("success", False):
logger.warning(f" Skipping failed condition: {condition}")
continue
logger.info(f" Deduplicating {condition}...")
ideas = cond_result.get("ideas", [])
ideas_with_source = cond_result.get("ideas_with_source", [])
dedup_result = await deduplicate_condition(
ideas=ideas,
ideas_with_source=ideas_with_source,
threshold=threshold
)
# Add dedup results to condition
cond_result["dedup"] = dedup_result
# Update summary stats
if condition not in dedup_summary["conditions"]:
dedup_summary["conditions"][condition] = {
"total_pre_dedup": 0,
"total_post_dedup": 0,
"total_removed": 0,
"query_stats": []
}
stats = dedup_result["stats"]
cond_summary = dedup_summary["conditions"][condition]
cond_summary["total_pre_dedup"] += stats["pre_dedup_count"]
cond_summary["total_post_dedup"] += stats["post_dedup_count"]
cond_summary["total_removed"] += stats["duplicates_removed"]
cond_summary["query_stats"].append({
"query_id": query_id,
"query": query,
**stats
})
logger.info(f" {stats['pre_dedup_count']} -> {stats['post_dedup_count']} "
f"(survival: {stats['survival_rate']:.1%})")
# Calculate overall survival rates
for condition, cond_stats in dedup_summary["conditions"].items():
if cond_stats["total_pre_dedup"] > 0:
cond_stats["overall_survival_rate"] = (
cond_stats["total_post_dedup"] / cond_stats["total_pre_dedup"]
)
else:
cond_stats["overall_survival_rate"] = 1.0
# Add dedup summary to experiment
experiment["dedup_summary"] = dedup_summary
# Save results
if output_file is None:
stem = input_file.stem.replace("_complete", "").replace("_intermediate", "")
output_file = input_file.parent / f"{stem}_deduped.json"
with open(output_file, "w", encoding="utf-8") as f:
json.dump(experiment, f, indent=2, ensure_ascii=False)
logger.info(f"\nResults saved to: {output_file}")
return experiment
def print_dedup_summary(experiment: Dict[str, Any]):
"""Print formatted deduplication summary."""
dedup = experiment.get("dedup_summary", {})
print("\n" + "=" * 70)
print("DEDUPLICATION SUMMARY")
print("=" * 70)
print(f"Threshold: {dedup.get('threshold', 'N/A')}")
print("\nResults by condition:")
print("-" * 70)
print(f"{'Condition':<30} {'Pre-Dedup':<12} {'Post-Dedup':<12} {'Survival':<10}")
print("-" * 70)
for condition, stats in dedup.get("conditions", {}).items():
pre = stats.get("total_pre_dedup", 0)
post = stats.get("total_post_dedup", 0)
survival = stats.get("overall_survival_rate", 1.0)
print(f"{condition:<30} {pre:<12} {post:<12} {survival:<10.1%}")
print("-" * 70)
print("\nInterpretation:")
print("- Higher survival rate = more diverse/unique ideas")
print("- Lower survival rate = more redundant ideas removed")
async def main():
parser = argparse.ArgumentParser(
description="Apply deduplication to experiment results"
)
parser.add_argument(
"--input",
type=str,
required=True,
help="Input experiment results JSON file"
)
parser.add_argument(
"--output",
type=str,
help="Output file path (default: input_deduped.json)"
)
parser.add_argument(
"--threshold",
type=float,
default=DEDUP_THRESHOLD,
help=f"Similarity threshold (default: {DEDUP_THRESHOLD})"
)
args = parser.parse_args()
input_path = Path(args.input)
if not input_path.exists():
# Try relative to results dir
input_path = RESULTS_DIR / args.input
if not input_path.exists():
print(f"Error: Input file not found: {args.input}")
sys.exit(1)
output_path = Path(args.output) if args.output else None
experiment = await process_experiment_results(
input_file=input_path,
output_file=output_path,
threshold=args.threshold
)
print_dedup_summary(experiment)
if __name__ == "__main__":
asyncio.run(main())