#!/usr/bin/env python3 """ Analyze assessment ratings for inter-rater reliability and condition comparisons. This script: 1. Loads ratings from the SQLite database 2. Joins with hidden metadata (condition, expert) 3. Calculates inter-rater reliability metrics 4. Computes mean ratings per dimension per condition 5. Performs statistical comparisons between conditions """ import json import sqlite3 from collections import defaultdict from datetime import datetime from pathlib import Path from typing import Any import numpy as np from scipy import stats # Paths RESULTS_DIR = Path(__file__).parent / 'results' DATA_DIR = Path(__file__).parent / 'data' DB_PATH = RESULTS_DIR / 'ratings.db' ASSESSMENT_DATA_PATH = DATA_DIR / 'assessment_items.json' def load_assessment_data() -> dict[str, Any]: """Load the assessment items data with hidden metadata.""" with open(ASSESSMENT_DATA_PATH, 'r', encoding='utf-8') as f: return json.load(f) def load_ratings_from_db() -> list[dict[str, Any]]: """Load all ratings from the SQLite database.""" if not DB_PATH.exists(): print(f"Database not found at {DB_PATH}") return [] conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row cursor = conn.cursor() cursor.execute(''' SELECT r.*, rat.name as rater_name FROM ratings r LEFT JOIN raters rat ON r.rater_id = rat.rater_id WHERE r.skipped = 0 ''') ratings = [dict(row) for row in cursor.fetchall()] conn.close() return ratings def build_idea_lookup(assessment_data: dict[str, Any]) -> dict[str, dict[str, Any]]: """Build a lookup table from idea_id to metadata.""" lookup = {} for query in assessment_data['queries']: for idea in query['ideas']: lookup[idea['idea_id']] = { 'text': idea['text'], 'query_id': query['query_id'], 'query_text': query['query_text'], **idea['_hidden'] } return lookup def calculate_krippendorff_alpha(ratings_matrix: np.ndarray) -> float: """ Calculate Krippendorff's alpha for ordinal data. Args: ratings_matrix: 2D array where rows are items and columns are raters. NaN values indicate missing ratings. Returns: Krippendorff's alpha coefficient """ # Remove items with fewer than 2 raters valid_items = ~np.all(np.isnan(ratings_matrix), axis=1) ratings_matrix = ratings_matrix[valid_items] if ratings_matrix.shape[0] < 2: return np.nan n_items, n_raters = ratings_matrix.shape # Observed disagreement observed_disagreement = 0 n_pairs = 0 for i in range(n_items): values = ratings_matrix[i, ~np.isnan(ratings_matrix[i])] if len(values) < 2: continue # Ordinal distance: squared difference for j in range(len(values)): for k in range(j + 1, len(values)): observed_disagreement += (values[j] - values[k]) ** 2 n_pairs += 1 if n_pairs == 0: return np.nan observed_disagreement /= n_pairs # Expected disagreement (based on marginal distribution) all_values = ratings_matrix[~np.isnan(ratings_matrix)] if len(all_values) < 2: return np.nan expected_disagreement = 0 n_total_pairs = 0 for i in range(len(all_values)): for j in range(i + 1, len(all_values)): expected_disagreement += (all_values[i] - all_values[j]) ** 2 n_total_pairs += 1 if n_total_pairs == 0: return np.nan expected_disagreement /= n_total_pairs if expected_disagreement == 0: return 1.0 alpha = 1 - (observed_disagreement / expected_disagreement) return alpha def calculate_icc(ratings_matrix: np.ndarray) -> tuple[float, float, float]: """ Calculate Intraclass Correlation Coefficient (ICC(2,1)). Args: ratings_matrix: 2D array where rows are items and columns are raters. Returns: Tuple of (ICC, lower_bound, upper_bound) """ # Remove rows with any NaN valid_rows = ~np.any(np.isnan(ratings_matrix), axis=1) ratings_matrix = ratings_matrix[valid_rows] if ratings_matrix.shape[0] < 2 or ratings_matrix.shape[1] < 2: return np.nan, np.nan, np.nan n, k = ratings_matrix.shape # Grand mean grand_mean = np.mean(ratings_matrix) # Row means (item means) row_means = np.mean(ratings_matrix, axis=1) # Column means (rater means) col_means = np.mean(ratings_matrix, axis=0) # Sum of squares ss_total = np.sum((ratings_matrix - grand_mean) ** 2) ss_rows = k * np.sum((row_means - grand_mean) ** 2) ss_cols = n * np.sum((col_means - grand_mean) ** 2) ss_error = ss_total - ss_rows - ss_cols # Mean squares ms_rows = ss_rows / (n - 1) if n > 1 else 0 ms_cols = ss_cols / (k - 1) if k > 1 else 0 ms_error = ss_error / ((n - 1) * (k - 1)) if (n > 1 and k > 1) else 0 # ICC(2,1) - two-way random, absolute agreement, single rater if ms_error + (ms_cols - ms_error) / n == 0: return np.nan, np.nan, np.nan icc = (ms_rows - ms_error) / (ms_rows + (k - 1) * ms_error + k * (ms_cols - ms_error) / n) # Confidence interval (approximate) # Using F distribution df1 = n - 1 df2 = (n - 1) * (k - 1) if ms_error == 0: return icc, np.nan, np.nan f_value = ms_rows / ms_error f_lower = f_value / stats.f.ppf(0.975, df1, df2) f_upper = f_value / stats.f.ppf(0.025, df1, df2) icc_lower = (f_lower - 1) / (f_lower + k - 1) icc_upper = (f_upper - 1) / (f_upper + k - 1) return icc, icc_lower, icc_upper def analyze_ratings(): """Main analysis function.""" print("=" * 60) print("CREATIVE IDEA ASSESSMENT ANALYSIS") print("=" * 60) print() # Load data assessment_data = load_assessment_data() ratings = load_ratings_from_db() idea_lookup = build_idea_lookup(assessment_data) if not ratings: print("No ratings found in database.") return print(f"Loaded {len(ratings)} ratings from database") print(f"Experiment ID: {assessment_data['experiment_id']}") print() # Get unique raters raters = list(set(r['rater_id'] for r in ratings)) print(f"Raters: {raters}") print() # Join ratings with metadata enriched_ratings = [] for r in ratings: idea_meta = idea_lookup.get(r['idea_id'], {}) enriched_ratings.append({ **r, 'condition': idea_meta.get('condition', 'unknown'), 'expert_name': idea_meta.get('expert_name', ''), 'keyword': idea_meta.get('keyword', ''), 'query_text': idea_meta.get('query_text', ''), 'idea_text': idea_meta.get('text', '') }) # Dimensions dimensions = ['originality', 'elaboration', 'coherence', 'usefulness'] # ================================ # Inter-rater reliability # ================================ print("-" * 60) print("INTER-RATER RELIABILITY") print("-" * 60) print() if len(raters) >= 2: # Build ratings matrix per dimension idea_ids = list(set(r['idea_id'] for r in enriched_ratings)) for dim in dimensions: # Create matrix: rows = ideas, cols = raters matrix = np.full((len(idea_ids), len(raters)), np.nan) idea_to_idx = {idea: idx for idx, idea in enumerate(idea_ids)} rater_to_idx = {rater: idx for idx, rater in enumerate(raters)} for r in enriched_ratings: if r[dim] is not None: i = idea_to_idx[r['idea_id']] j = rater_to_idx[r['rater_id']] matrix[i, j] = r[dim] # Calculate metrics alpha = calculate_krippendorff_alpha(matrix) icc, icc_low, icc_high = calculate_icc(matrix) print(f"{dim.upper()}:") print(f" Krippendorff's alpha: {alpha:.3f}") print(f" ICC(2,1): {icc:.3f} (95% CI: {icc_low:.3f} - {icc_high:.3f})") print() else: print("Need at least 2 raters for inter-rater reliability analysis.") print() # ================================ # Condition comparisons # ================================ print("-" * 60) print("MEAN RATINGS BY CONDITION") print("-" * 60) print() # Group ratings by condition condition_ratings: dict[str, dict[str, list[int]]] = defaultdict(lambda: defaultdict(list)) for r in enriched_ratings: condition = r['condition'] for dim in dimensions: if r[dim] is not None: condition_ratings[condition][dim].append(r[dim]) # Calculate means and print condition_stats = {} for condition in sorted(condition_ratings.keys()): print(f"\n{condition}:") condition_stats[condition] = {} for dim in dimensions: values = condition_ratings[condition][dim] if values: mean = np.mean(values) std = np.std(values) n = len(values) condition_stats[condition][dim] = {'mean': mean, 'std': std, 'n': n} print(f" {dim}: {mean:.2f} (SD={std:.2f}, n={n})") else: print(f" {dim}: no data") # ================================ # Statistical comparisons # ================================ print() print("-" * 60) print("STATISTICAL COMPARISONS (Kruskal-Wallis)") print("-" * 60) print() conditions = sorted(condition_ratings.keys()) if len(conditions) >= 2: for dim in dimensions: groups = [condition_ratings[c][dim] for c in conditions if condition_ratings[c][dim]] if len(groups) >= 2: h_stat, p_value = stats.kruskal(*groups) sig = "*" if p_value < 0.05 else "" print(f"{dim}: H={h_stat:.2f}, p={p_value:.4f} {sig}") else: print(f"{dim}: insufficient data for comparison") else: print("Need at least 2 conditions with data for statistical comparison.") # ================================ # Export results # ================================ output = { 'analysis_timestamp': datetime.utcnow().isoformat(), 'experiment_id': assessment_data['experiment_id'], 'total_ratings': len(ratings), 'raters': raters, 'rater_count': len(raters), 'condition_stats': condition_stats, 'enriched_ratings': enriched_ratings } output_path = RESULTS_DIR / 'analysis_results.json' with open(output_path, 'w', encoding='utf-8') as f: json.dump(output, f, ensure_ascii=False, indent=2, default=str) print() print("-" * 60) print(f"Results exported to: {output_path}") print("=" * 60) if __name__ == '__main__': analyze_ratings()