""" SQLite database setup and operations for assessment ratings storage. """ import sqlite3 from contextlib import contextmanager from datetime import datetime from pathlib import Path from typing import Any, Generator # Database path DB_PATH = Path(__file__).parent.parent / 'results' / 'ratings.db' def get_db_path() -> Path: """Get the database path, ensuring directory exists.""" DB_PATH.parent.mkdir(parents=True, exist_ok=True) return DB_PATH @contextmanager def get_connection() -> Generator[sqlite3.Connection, None, None]: """Get a database connection as a context manager.""" conn = sqlite3.connect(get_db_path()) conn.row_factory = sqlite3.Row try: yield conn finally: conn.close() def init_db() -> None: """Initialize the database with required tables.""" with get_connection() as conn: cursor = conn.cursor() # Raters table cursor.execute(''' CREATE TABLE IF NOT EXISTS raters ( rater_id TEXT PRIMARY KEY, name TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) ''') # Ratings table cursor.execute(''' CREATE TABLE IF NOT EXISTS ratings ( id INTEGER PRIMARY KEY AUTOINCREMENT, rater_id TEXT NOT NULL, idea_id TEXT NOT NULL, query_id TEXT NOT NULL, originality INTEGER CHECK(originality BETWEEN 1 AND 5), elaboration INTEGER CHECK(elaboration BETWEEN 1 AND 5), coherence INTEGER CHECK(coherence BETWEEN 1 AND 5), usefulness INTEGER CHECK(usefulness BETWEEN 1 AND 5), skipped INTEGER DEFAULT 0, timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (rater_id) REFERENCES raters(rater_id), UNIQUE(rater_id, idea_id) ) ''') # Progress table cursor.execute(''' CREATE TABLE IF NOT EXISTS progress ( rater_id TEXT NOT NULL, query_id TEXT NOT NULL, completed_count INTEGER DEFAULT 0, total_count INTEGER DEFAULT 0, started_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY (rater_id, query_id), FOREIGN KEY (rater_id) REFERENCES raters(rater_id) ) ''') # Create indexes for common queries cursor.execute(''' CREATE INDEX IF NOT EXISTS idx_ratings_rater ON ratings(rater_id) ''') cursor.execute(''' CREATE INDEX IF NOT EXISTS idx_ratings_idea ON ratings(idea_id) ''') conn.commit() # Rater operations def create_rater(rater_id: str, name: str | None = None) -> dict[str, Any]: """Create a new rater.""" with get_connection() as conn: cursor = conn.cursor() try: cursor.execute( 'INSERT INTO raters (rater_id, name) VALUES (?, ?)', (rater_id, name or rater_id) ) conn.commit() return {'rater_id': rater_id, 'name': name or rater_id, 'created': True} except sqlite3.IntegrityError: # Rater already exists return get_rater(rater_id) def get_rater(rater_id: str) -> dict[str, Any] | None: """Get a rater by ID.""" with get_connection() as conn: cursor = conn.cursor() cursor.execute('SELECT * FROM raters WHERE rater_id = ?', (rater_id,)) row = cursor.fetchone() if row: return dict(row) return None def list_raters() -> list[dict[str, Any]]: """List all raters.""" with get_connection() as conn: cursor = conn.cursor() cursor.execute('SELECT * FROM raters ORDER BY created_at') return [dict(row) for row in cursor.fetchall()] # Rating operations def save_rating( rater_id: str, idea_id: str, query_id: str, originality: int | None, elaboration: int | None, coherence: int | None, usefulness: int | None, skipped: bool = False ) -> dict[str, Any]: """Save or update a rating.""" with get_connection() as conn: cursor = conn.cursor() cursor.execute(''' INSERT INTO ratings (rater_id, idea_id, query_id, originality, elaboration, coherence, usefulness, skipped, timestamp) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(rater_id, idea_id) DO UPDATE SET originality = excluded.originality, elaboration = excluded.elaboration, coherence = excluded.coherence, usefulness = excluded.usefulness, skipped = excluded.skipped, timestamp = excluded.timestamp ''', (rater_id, idea_id, query_id, originality, elaboration, coherence, usefulness, int(skipped), datetime.utcnow())) conn.commit() # Update progress update_progress(rater_id, query_id) return { 'rater_id': rater_id, 'idea_id': idea_id, 'saved': True } def get_rating(rater_id: str, idea_id: str) -> dict[str, Any] | None: """Get a specific rating.""" with get_connection() as conn: cursor = conn.cursor() cursor.execute( 'SELECT * FROM ratings WHERE rater_id = ? AND idea_id = ?', (rater_id, idea_id) ) row = cursor.fetchone() if row: return dict(row) return None def get_ratings_by_rater(rater_id: str) -> list[dict[str, Any]]: """Get all ratings by a rater.""" with get_connection() as conn: cursor = conn.cursor() cursor.execute( 'SELECT * FROM ratings WHERE rater_id = ? ORDER BY timestamp', (rater_id,) ) return [dict(row) for row in cursor.fetchall()] def get_ratings_by_idea(idea_id: str) -> list[dict[str, Any]]: """Get all ratings for an idea.""" with get_connection() as conn: cursor = conn.cursor() cursor.execute( 'SELECT * FROM ratings WHERE idea_id = ? ORDER BY rater_id', (idea_id,) ) return [dict(row) for row in cursor.fetchall()] def get_all_ratings() -> list[dict[str, Any]]: """Get all ratings.""" with get_connection() as conn: cursor = conn.cursor() cursor.execute('SELECT * FROM ratings ORDER BY timestamp') return [dict(row) for row in cursor.fetchall()] # Progress operations def update_progress(rater_id: str, query_id: str) -> None: """Update progress for a rater on a query.""" with get_connection() as conn: cursor = conn.cursor() # Count completed ratings for this query cursor.execute(''' SELECT COUNT(*) as count FROM ratings WHERE rater_id = ? AND query_id = ? ''', (rater_id, query_id)) completed = cursor.fetchone()['count'] # Update or insert progress cursor.execute(''' INSERT INTO progress (rater_id, query_id, completed_count, updated_at) VALUES (?, ?, ?, ?) ON CONFLICT(rater_id, query_id) DO UPDATE SET completed_count = excluded.completed_count, updated_at = excluded.updated_at ''', (rater_id, query_id, completed, datetime.utcnow())) conn.commit() def set_progress_total(rater_id: str, query_id: str, total: int) -> None: """Set the total count for a query's progress.""" with get_connection() as conn: cursor = conn.cursor() cursor.execute(''' INSERT INTO progress (rater_id, query_id, total_count, completed_count) VALUES (?, ?, ?, 0) ON CONFLICT(rater_id, query_id) DO UPDATE SET total_count = excluded.total_count ''', (rater_id, query_id, total)) conn.commit() def get_progress(rater_id: str) -> list[dict[str, Any]]: """Get progress for all queries for a rater.""" with get_connection() as conn: cursor = conn.cursor() cursor.execute( 'SELECT * FROM progress WHERE rater_id = ? ORDER BY query_id', (rater_id,) ) return [dict(row) for row in cursor.fetchall()] def get_progress_for_query(rater_id: str, query_id: str) -> dict[str, Any] | None: """Get progress for a specific query.""" with get_connection() as conn: cursor = conn.cursor() cursor.execute( 'SELECT * FROM progress WHERE rater_id = ? AND query_id = ?', (rater_id, query_id) ) row = cursor.fetchone() if row: return dict(row) return None def get_rated_idea_ids(rater_id: str, query_id: str) -> set[str]: """Get the set of idea IDs already rated by a rater for a query.""" with get_connection() as conn: cursor = conn.cursor() cursor.execute( 'SELECT idea_id FROM ratings WHERE rater_id = ? AND query_id = ?', (rater_id, query_id) ) return {row['idea_id'] for row in cursor.fetchall()} # Statistics def get_statistics() -> dict[str, Any]: """Get overall statistics.""" with get_connection() as conn: cursor = conn.cursor() cursor.execute('SELECT COUNT(*) as count FROM raters') rater_count = cursor.fetchone()['count'] cursor.execute('SELECT COUNT(*) as count FROM ratings WHERE skipped = 0') rating_count = cursor.fetchone()['count'] cursor.execute('SELECT COUNT(*) as count FROM ratings WHERE skipped = 1') skip_count = cursor.fetchone()['count'] cursor.execute('SELECT COUNT(DISTINCT idea_id) as count FROM ratings') rated_ideas = cursor.fetchone()['count'] return { 'rater_count': rater_count, 'rating_count': rating_count, 'skip_count': skip_count, 'rated_ideas': rated_ideas } # Initialize on import init_db()