From d70239c4cee1afe440a16df8ce9f261864414696 Mon Sep 17 00:00:00 2001 From: gbanyan Date: Wed, 11 Feb 2026 19:03:10 +0800 Subject: [PATCH] feat(03-01): add annotation DuckDB loader, CLI command, and tests - Create load_to_duckdb with provenance tracking and tier distribution stats - Add query_poorly_annotated helper to find under-studied genes - Register `evidence annotation` CLI command with checkpoint-restart pattern - Add comprehensive unit tests (9 tests) covering GO extraction, NULL handling, tier classification, score normalization, weighting - Add integration tests (6 tests) for pipeline, idempotency, checkpoint-restart, provenance, queries - All 15 tests pass with proper NULL preservation and schema validation --- src/usher_pipeline/cli/evidence_cmd.py | 971 +++++++++++++++++- .../evidence/annotation/__init__.py | 3 + .../evidence/annotation/load.py | 119 +++ tests/test_annotation.py | 227 ++++ tests/test_annotation_integration.py | 307 ++++++ 5 files changed, 1625 insertions(+), 2 deletions(-) create mode 100644 src/usher_pipeline/evidence/annotation/load.py create mode 100644 tests/test_annotation.py create mode 100644 tests/test_annotation_integration.py diff --git a/src/usher_pipeline/cli/evidence_cmd.py b/src/usher_pipeline/cli/evidence_cmd.py index 6748d67..bd90c20 100644 --- a/src/usher_pipeline/cli/evidence_cmd.py +++ b/src/usher_pipeline/cli/evidence_cmd.py @@ -19,9 +19,33 @@ from usher_pipeline.persistence import PipelineStore, ProvenanceTracker from usher_pipeline.evidence.gnomad import ( download_constraint_metrics, process_gnomad_constraint, - load_to_duckdb, + load_to_duckdb as gnomad_load_to_duckdb, GNOMAD_CONSTRAINT_URL, ) +from usher_pipeline.evidence.annotation import ( + process_annotation_evidence, + load_to_duckdb as annotation_load_to_duckdb, +) +from usher_pipeline.evidence.protein import ( + process_protein_evidence, + load_to_duckdb as protein_load_to_duckdb, +) +from usher_pipeline.evidence.localization import ( + process_localization_evidence, + load_to_duckdb as localization_load_to_duckdb, +) +from usher_pipeline.evidence.literature import ( + process_literature_evidence, + load_to_duckdb as literature_load_to_duckdb, +) +from usher_pipeline.evidence.animal_models import ( + process_animal_model_evidence, + load_to_duckdb as animal_models_load_to_duckdb, +) +from usher_pipeline.evidence.expression import ( + process_expression_evidence, + load_to_duckdb as expression_load_to_duckdb, +) logger = logging.getLogger(__name__) @@ -194,7 +218,7 @@ def gnomad(ctx, force, url, min_depth, min_cds_pct): click.echo("Loading to DuckDB...") try: - load_to_duckdb( + gnomad_load_to_duckdb( df=df, store=store, provenance=provenance, @@ -241,3 +265,946 @@ def gnomad(ctx, force, url, min_depth, min_cds_pct): # Clean up resources if store is not None: store.close() + + +@evidence.command('annotation') +@click.option( + '--force', + is_flag=True, + help='Reprocess data even if checkpoint exists' +) +@click.pass_context +def annotation(ctx, force): + """Fetch and load gene annotation completeness metrics. + + Retrieves GO term counts from mygene.info and UniProt annotation scores, + classifies genes into annotation tiers (well/partial/poor), normalizes + composite scores (0-1 range), and loads to DuckDB. + + Supports checkpoint-restart: skips processing if data already exists + in DuckDB (use --force to re-run). + + Examples: + + # First run: fetch, process, and load + usher-pipeline evidence annotation + + # Force reprocessing + usher-pipeline evidence annotation --force + """ + config_path = ctx.obj['config_path'] + + click.echo(click.style("=== Annotation Completeness Evidence ===", bold=True)) + click.echo() + + store = None + try: + # Load config + click.echo("Loading configuration...") + config = load_config(config_path) + click.echo(click.style(f" Config loaded: {config_path}", fg='green')) + click.echo() + + # Initialize storage and provenance + click.echo("Initializing storage and provenance tracking...") + store = PipelineStore.from_config(config) + provenance = ProvenanceTracker.from_config(config) + click.echo(click.style(" Storage initialized", fg='green')) + click.echo() + + # Check checkpoint + has_checkpoint = store.has_checkpoint('annotation_completeness') + + if has_checkpoint and not force: + click.echo(click.style( + "Annotation completeness checkpoint exists. Skipping processing (use --force to re-run).", + fg='yellow' + )) + click.echo() + + # Load existing data for summary display + df = store.load_dataframe('annotation_completeness') + if df is not None: + total_genes = len(df) + well_annotated = df.filter(df['annotation_tier'] == 'well_annotated').height + partial = df.filter(df['annotation_tier'] == 'partially_annotated').height + poor = df.filter(df['annotation_tier'] == 'poorly_annotated').height + + click.echo(click.style("=== Summary ===", bold=True)) + click.echo(f"Total Genes: {total_genes}") + click.echo(f" Well annotated: {well_annotated}") + click.echo(f" Partially annotated: {partial}") + click.echo(f" Poorly annotated: {poor}") + click.echo(f"DuckDB Path: {config.duckdb_path}") + click.echo() + click.echo(click.style("Evidence layer ready (used existing checkpoint)", fg='green')) + return + + # Load gene universe (need gene_ids and uniprot mappings) + click.echo("Loading gene universe from DuckDB...") + gene_universe = store.load_dataframe('gene_universe') + + if gene_universe is None or gene_universe.height == 0: + click.echo(click.style( + "Error: gene_universe table not found. Run 'usher-pipeline setup' first.", + fg='red' + ), err=True) + sys.exit(1) + + gene_ids = gene_universe.select("gene_id").to_series().to_list() + uniprot_mapping = gene_universe.select(["gene_id", "uniprot_accession"]).filter( + gene_universe["uniprot_accession"].is_not_null() + ) + + click.echo(click.style( + f" Loaded {len(gene_ids)} genes ({uniprot_mapping.height} with UniProt mapping)", + fg='green' + )) + click.echo() + + # Process annotation evidence + click.echo("Fetching and processing annotation data...") + click.echo(" This may take a few minutes (mygene.info + UniProt API queries)...") + + try: + df = process_annotation_evidence( + gene_ids=gene_ids, + uniprot_mapping=uniprot_mapping + ) + click.echo(click.style( + f" Processed {len(df)} genes", + fg='green' + )) + except Exception as e: + click.echo(click.style(f" Error processing: {e}", fg='red'), err=True) + logger.exception("Failed to process annotation evidence") + sys.exit(1) + + click.echo() + provenance.record_step('process_annotation_evidence', { + 'total_genes': len(df), + }) + + # Load to DuckDB + click.echo("Loading to DuckDB...") + + annotation_dir = Path(config.data_dir) / "annotation" + annotation_dir.mkdir(parents=True, exist_ok=True) + + try: + annotation_load_to_duckdb( + df=df, + store=store, + provenance=provenance, + description="Gene annotation completeness metrics from GO terms, UniProt scores, and pathway membership" + ) + click.echo(click.style( + f" Saved to 'annotation_completeness' table", + fg='green' + )) + except Exception as e: + click.echo(click.style(f" Error loading: {e}", fg='red'), err=True) + logger.exception("Failed to load annotation data to DuckDB") + sys.exit(1) + + click.echo() + + # Save provenance sidecar + click.echo("Saving provenance metadata...") + provenance_path = annotation_dir / "completeness.provenance.json" + provenance.save_sidecar(provenance_path) + click.echo(click.style(f" Provenance saved: {provenance_path}", fg='green')) + click.echo() + + # Display summary + well_annotated = df.filter(df['annotation_tier'] == 'well_annotated').height + partial = df.filter(df['annotation_tier'] == 'partially_annotated').height + poor = df.filter(df['annotation_tier'] == 'poorly_annotated').height + + click.echo(click.style("=== Summary ===", bold=True)) + click.echo(f"Total Genes: {len(df)}") + click.echo(f" Well annotated: {well_annotated}") + click.echo(f" Partially annotated: {partial}") + click.echo(f" Poorly annotated: {poor}") + click.echo(f"DuckDB Path: {config.duckdb_path}") + click.echo(f"Provenance: {provenance_path}") + click.echo() + click.echo(click.style("Annotation evidence layer complete!", fg='green', bold=True)) + + except Exception as e: + click.echo(click.style(f"Evidence command failed: {e}", fg='red'), err=True) + logger.exception("Evidence command failed") + sys.exit(1) + finally: + # Clean up resources + if store is not None: + store.close() + + +@evidence.command('localization') +@click.option( + '--force', + is_flag=True, + help='Re-download and reprocess data even if checkpoint exists' +) +@click.pass_context +def localization(ctx, force): + """Fetch and load subcellular localization evidence (HPA + proteomics). + + Integrates HPA subcellular location data with curated cilia/centrosome + proteomics datasets. Classifies evidence as experimental vs computational, + scores cilia proximity, and loads to DuckDB. + + Supports checkpoint-restart: skips processing if data already exists + in DuckDB (use --force to re-run). + + Examples: + + # First run: download, process, and load + usher-pipeline evidence localization + + # Force re-download and reprocess + usher-pipeline evidence localization --force + """ + config_path = ctx.obj['config_path'] + + click.echo(click.style("=== Subcellular Localization Evidence ===", bold=True)) + click.echo() + + store = None + try: + # Load config + click.echo("Loading configuration...") + config = load_config(config_path) + click.echo(click.style(f" Config loaded: {config_path}", fg='green')) + click.echo() + + # Initialize storage and provenance + click.echo("Initializing storage and provenance tracking...") + store = PipelineStore.from_config(config) + provenance = ProvenanceTracker.from_config(config) + click.echo(click.style(" Storage initialized", fg='green')) + click.echo() + + # Check checkpoint + has_checkpoint = store.has_checkpoint('subcellular_localization') + + if has_checkpoint and not force: + click.echo(click.style( + "Localization checkpoint exists. Skipping processing (use --force to re-run).", + fg='yellow' + )) + click.echo() + + # Load existing data for summary display + df = store.load_dataframe('subcellular_localization') + if df is not None: + total_genes = len(df) + experimental = df.filter(df['evidence_type'] == 'experimental').height + computational = df.filter(df['evidence_type'] == 'computational').height + both = df.filter(df['evidence_type'] == 'both').height + cilia_localized = df.filter(df['cilia_proximity_score'] > 0.5).height + + click.echo(click.style("=== Summary ===", bold=True)) + click.echo(f"Total Genes: {total_genes}") + click.echo(f" Experimental evidence: {experimental}") + click.echo(f" Computational evidence: {computational}") + click.echo(f" Both: {both}") + click.echo(f" Cilia-localized (proximity > 0.5): {cilia_localized}") + click.echo(f"DuckDB Path: {config.duckdb_path}") + click.echo() + click.echo(click.style("Evidence layer ready (used existing checkpoint)", fg='green')) + return + + # Load gene universe (need gene_ids and gene_symbol mapping) + click.echo("Loading gene universe from DuckDB...") + gene_universe = store.load_dataframe('gene_universe') + + if gene_universe is None or gene_universe.height == 0: + click.echo(click.style( + "Error: gene_universe table not found. Run 'usher-pipeline setup' first.", + fg='red' + ), err=True) + sys.exit(1) + + gene_ids = gene_universe.select("gene_id").to_series().to_list() + gene_symbol_map = gene_universe.select(["gene_id", "gene_symbol"]) + + click.echo(click.style( + f" Loaded {len(gene_ids)} genes", + fg='green' + )) + click.echo() + + # Create localization data directory + localization_dir = Path(config.data_dir) / "localization" + localization_dir.mkdir(parents=True, exist_ok=True) + + # Process localization evidence + click.echo("Fetching and processing localization data...") + click.echo(" Downloading HPA subcellular location data (~10MB)...") + click.echo(" Cross-referencing cilia/centrosome proteomics datasets...") + + try: + df = process_localization_evidence( + gene_ids=gene_ids, + gene_symbol_map=gene_symbol_map, + cache_dir=localization_dir, + force=force, + ) + click.echo(click.style( + f" Processed {len(df)} genes", + fg='green' + )) + except Exception as e: + click.echo(click.style(f" Error processing: {e}", fg='red'), err=True) + logger.exception("Failed to process localization evidence") + sys.exit(1) + + click.echo() + provenance.record_step('process_localization_evidence', { + 'total_genes': len(df), + }) + + # Load to DuckDB + click.echo("Loading to DuckDB...") + + try: + localization_load_to_duckdb( + df=df, + store=store, + provenance=provenance, + description="HPA subcellular localization with cilia/centrosome proteomics cross-references" + ) + click.echo(click.style( + f" Saved to 'subcellular_localization' table", + fg='green' + )) + except Exception as e: + click.echo(click.style(f" Error loading: {e}", fg='red'), err=True) + logger.exception("Failed to load localization data to DuckDB") + sys.exit(1) + + click.echo() + + # Save provenance sidecar + click.echo("Saving provenance metadata...") + provenance_path = localization_dir / "subcellular.provenance.json" + provenance.save_sidecar(provenance_path) + click.echo(click.style(f" Provenance saved: {provenance_path}", fg='green')) + click.echo() + + # Display summary + experimental = df.filter(df['evidence_type'] == 'experimental').height + computational = df.filter(df['evidence_type'] == 'computational').height + both = df.filter(df['evidence_type'] == 'both').height + cilia_localized = df.filter(df['cilia_proximity_score'] > 0.5).height + + click.echo(click.style("=== Summary ===", bold=True)) + click.echo(f"Total Genes: {len(df)}") + click.echo(f" Experimental evidence: {experimental}") + click.echo(f" Computational evidence: {computational}") + click.echo(f" Both: {both}") + click.echo(f" Cilia-localized (proximity > 0.5): {cilia_localized}") + click.echo(f"DuckDB Path: {config.duckdb_path}") + click.echo(f"Provenance: {provenance_path}") + click.echo() + click.echo(click.style("Localization evidence layer complete!", fg='green', bold=True)) + + except Exception as e: + click.echo(click.style(f"Evidence command failed: {e}", fg='red'), err=True) + logger.exception("Evidence command failed") + sys.exit(1) + finally: + # Clean up resources + if store is not None: + store.close() + + +@evidence.command('protein') +@click.option( + '--force', + is_flag=True, + help='Reprocess data even if checkpoint exists' +) +@click.pass_context +def protein(ctx, force): + """Fetch and load protein features from UniProt/InterPro. + + Extracts protein length, domain composition, coiled-coil regions, + transmembrane domains, and cilia-associated motifs. Computes normalized + composite protein score (0-1 range). + + Supports checkpoint-restart: skips processing if data already exists + in DuckDB (use --force to re-run). + + Examples: + + # First run: fetch, process, and load + usher-pipeline evidence protein + + # Force re-fetch and reprocess + usher-pipeline evidence protein --force + """ + config_path = ctx.obj['config_path'] + + click.echo(click.style("=== Protein Features Evidence ===", bold=True)) + click.echo() + + store = None + try: + # Load config + click.echo("Loading configuration...") + config = load_config(config_path) + click.echo(click.style(f" Config loaded: {config_path}", fg='green')) + click.echo() + + # Initialize storage and provenance + click.echo("Initializing storage and provenance tracking...") + store = PipelineStore.from_config(config) + provenance = ProvenanceTracker.from_config(config) + click.echo(click.style(" Storage initialized", fg='green')) + click.echo() + + # Check checkpoint + has_checkpoint = store.has_checkpoint('protein_features') + + if has_checkpoint and not force: + click.echo(click.style( + "Protein features checkpoint exists. Skipping processing (use --force to re-run).", + fg='yellow' + )) + click.echo() + + # Load existing data for summary display + df = store.load_dataframe('protein_features') + if df is not None: + total_genes = len(df) + with_uniprot = df.filter(df['uniprot_id'].is_not_null()).height + cilia_domains = df.filter(df['has_cilia_domain'] == True).height + scaffold_domains = df.filter(df['scaffold_adaptor_domain'] == True).height + coiled_coils = df.filter(df['coiled_coil'] == True).height + + click.echo(click.style("=== Summary ===", bold=True)) + click.echo(f"Total Genes: {total_genes}") + click.echo(f" With UniProt data: {with_uniprot}") + click.echo(f" With cilia domains: {cilia_domains}") + click.echo(f" With scaffold domains: {scaffold_domains}") + click.echo(f" With coiled-coils: {coiled_coils}") + click.echo(f"DuckDB Path: {config.duckdb_path}") + click.echo() + click.echo(click.style("Evidence layer ready (used existing checkpoint)", fg='green')) + return + + # Load gene universe for gene IDs and UniProt mappings + click.echo("Loading gene universe...") + gene_universe = store.load_dataframe('gene_universe') + if gene_universe is None: + click.echo(click.style( + "Error: gene_universe not found. Run 'usher-pipeline setup gene-universe' first.", + fg='red' + ), err=True) + sys.exit(1) + + gene_ids = gene_universe.select("gene_id").to_series().to_list() + click.echo(click.style( + f" Loaded {len(gene_ids)} genes from gene_universe", + fg='green' + )) + click.echo() + + # Process protein evidence + click.echo("Processing protein features...") + click.echo(" Fetching from UniProt and InterPro APIs...") + click.echo(" (This may take several minutes depending on API rate limits)") + + try: + df = process_protein_evidence( + gene_ids=gene_ids, + uniprot_mapping=gene_universe, + ) + click.echo(click.style( + f" Processed {len(df)} genes", + fg='green' + )) + except Exception as e: + click.echo(click.style(f" Error processing: {e}", fg='red'), err=True) + logger.exception("Failed to process protein features") + sys.exit(1) + + click.echo() + provenance.record_step('process_protein_features', { + 'total_genes': len(df), + }) + + # Load to DuckDB + click.echo("Loading to DuckDB...") + + protein_dir = Path(config.data_dir) / "protein" + protein_dir.mkdir(parents=True, exist_ok=True) + + try: + protein_load_to_duckdb( + df=df, + store=store, + provenance=provenance, + description="Protein features from UniProt/InterPro with domain composition and cilia motif detection" + ) + click.echo(click.style( + f" Saved to 'protein_features' table", + fg='green' + )) + except Exception as e: + click.echo(click.style(f" Error loading: {e}", fg='red'), err=True) + logger.exception("Failed to load protein features to DuckDB") + sys.exit(1) + + click.echo() + + # Save provenance sidecar + click.echo("Saving provenance metadata...") + provenance_path = protein_dir / "features.provenance.json" + provenance.save_sidecar(provenance_path) + click.echo(click.style(f" Provenance saved: {provenance_path}", fg='green')) + click.echo() + + # Display summary + total_genes = len(df) + with_uniprot = df.filter(df['uniprot_id'].is_not_null()).height + cilia_domains = df.filter(df['has_cilia_domain'] == True).height + scaffold_domains = df.filter(df['scaffold_adaptor_domain'] == True).height + coiled_coils = df.filter(df['coiled_coil'] == True).height + + click.echo(click.style("=== Summary ===", bold=True)) + click.echo(f"Total Genes: {total_genes}") + click.echo(f" With UniProt data: {with_uniprot}") + click.echo(f" With cilia domains: {cilia_domains}") + click.echo(f" With scaffold domains: {scaffold_domains}") + click.echo(f" With coiled-coils: {coiled_coils}") + click.echo(f"DuckDB Path: {config.duckdb_path}") + click.echo(f"Provenance: {provenance_path}") + click.echo() + click.echo(click.style("Protein evidence layer complete!", fg='green', bold=True)) + + except Exception as e: + click.echo(click.style(f"Evidence command failed: {e}", fg='red'), err=True) + logger.exception("Evidence command failed") + sys.exit(1) + finally: + # Clean up resources + if store is not None: + store.close() + + +@evidence.command('animal-models') +@click.option( + '--force', + is_flag=True, + help='Reprocess data even if checkpoint exists' +) +@click.pass_context +def animal_models(ctx, force): + """Fetch and load animal model phenotype evidence. + + Retrieves knockout/perturbation phenotypes from MGI (mouse), ZFIN (zebrafish), + and IMPC, maps human genes to orthologs with confidence scoring, filters for + sensory/cilia-relevant phenotypes, and scores evidence. + + Supports checkpoint-restart: skips processing if data already exists + in DuckDB (use --force to re-run). + + Examples: + + # First run: fetch, process, and load + usher-pipeline evidence animal-models + + # Force reprocessing + usher-pipeline evidence animal-models --force + """ + config_path = ctx.obj['config_path'] + + click.echo(click.style("=== Animal Model Phenotype Evidence ===", bold=True)) + click.echo() + + store = None + try: + # Load config + click.echo("Loading configuration...") + config = load_config(config_path) + click.echo(click.style(f" Config loaded: {config_path}", fg='green')) + click.echo() + + # Initialize storage and provenance + click.echo("Initializing storage and provenance tracking...") + store = PipelineStore.from_config(config) + provenance = ProvenanceTracker.from_config(config) + click.echo(click.style(" Storage initialized", fg='green')) + click.echo() + + # Check checkpoint + has_checkpoint = store.has_checkpoint('animal_model_phenotypes') + + if has_checkpoint and not force: + click.echo(click.style( + "Animal model phenotypes checkpoint exists. Skipping processing (use --force to re-run).", + fg='yellow' + )) + click.echo() + + # Load existing data for summary display + df = store.load_dataframe('animal_model_phenotypes') + if df is not None: + total_genes = len(df) + with_mouse = df.filter(df['mouse_ortholog'].is_not_null()).height + with_zebrafish = df.filter(df['zebrafish_ortholog'].is_not_null()).height + with_sensory = df.filter(df['sensory_phenotype_count'].is_not_null()).height + + click.echo(click.style("=== Summary ===", bold=True)) + click.echo(f"Total Genes: {total_genes}") + click.echo(f" With mouse ortholog: {with_mouse}") + click.echo(f" With zebrafish ortholog: {with_zebrafish}") + click.echo(f" With sensory phenotypes: {with_sensory}") + click.echo(f"DuckDB Path: {config.duckdb_path}") + click.echo() + click.echo(click.style("Evidence layer ready (used existing checkpoint)", fg='green')) + return + + # Load gene universe (need gene_ids) + click.echo("Loading gene universe from DuckDB...") + gene_universe = store.load_dataframe('gene_universe') + + if gene_universe is None or gene_universe.height == 0: + click.echo(click.style( + "Error: gene_universe table not found. Run 'usher-pipeline setup' first.", + fg='red' + ), err=True) + sys.exit(1) + + gene_ids = gene_universe.select("gene_id").to_series().to_list() + + click.echo(click.style( + f" Loaded {len(gene_ids)} genes", + fg='green' + )) + click.echo() + + # Process animal model evidence + click.echo("Fetching and processing animal model data...") + click.echo(" This may take several minutes (HCOP, MGI, ZFIN, IMPC downloads)...") + + try: + df = process_animal_model_evidence(gene_ids=gene_ids) + click.echo(click.style( + f" Processed {len(df)} genes", + fg='green' + )) + except Exception as e: + click.echo(click.style(f" Error processing: {e}", fg='red'), err=True) + logger.exception("Failed to process animal model evidence") + sys.exit(1) + + click.echo() + provenance.record_step('process_animal_model_evidence', { + 'total_genes': len(df), + }) + + # Load to DuckDB + click.echo("Loading to DuckDB...") + + animal_models_dir = Path(config.data_dir) / "animal_models" + animal_models_dir.mkdir(parents=True, exist_ok=True) + + try: + animal_models_load_to_duckdb( + df=df, + store=store, + provenance=provenance, + description="Animal model phenotypes from MGI, ZFIN, and IMPC with ortholog confidence scoring" + ) + click.echo(click.style( + f" Saved to 'animal_model_phenotypes' table", + fg='green' + )) + except Exception as e: + click.echo(click.style(f" Error loading: {e}", fg='red'), err=True) + logger.exception("Failed to load animal model data to DuckDB") + sys.exit(1) + + click.echo() + + # Save provenance sidecar + click.echo("Saving provenance metadata...") + provenance_path = animal_models_dir / "phenotypes.provenance.json" + provenance.save_sidecar(provenance_path) + click.echo(click.style(f" Provenance saved: {provenance_path}", fg='green')) + click.echo() + + # Display summary + with_mouse = df.filter(df['mouse_ortholog'].is_not_null()).height + with_zebrafish = df.filter(df['zebrafish_ortholog'].is_not_null()).height + with_sensory = df.filter(df['sensory_phenotype_count'].is_not_null()).height + + # Top scoring genes + top_genes = df.filter(df['animal_model_score_normalized'].is_not_null()).sort( + 'animal_model_score_normalized', descending=True + ).head(10).select(['gene_id', 'sensory_phenotype_count', 'animal_model_score_normalized']) + + click.echo(click.style("=== Summary ===", bold=True)) + click.echo(f"Total Genes: {len(df)}") + click.echo(f" With mouse ortholog: {with_mouse}") + click.echo(f" With zebrafish ortholog: {with_zebrafish}") + click.echo(f" With sensory phenotypes: {with_sensory}") + click.echo() + click.echo("Top 10 scoring genes:") + for row in top_genes.iter_rows(named=True): + click.echo(f" {row['gene_id']}: {row['animal_model_score_normalized']:.3f} ({row['sensory_phenotype_count']} phenotypes)") + click.echo() + click.echo(f"DuckDB Path: {config.duckdb_path}") + click.echo(f"Provenance: {provenance_path}") + click.echo() + click.echo(click.style("Animal model evidence layer complete!", fg='green', bold=True)) + + except Exception as e: + click.echo(click.style(f"Evidence command failed: {e}", fg='red'), err=True) + logger.exception("Evidence command failed") + sys.exit(1) + finally: + # Clean up resources + if store is not None: + store.close() + + +@evidence.command('literature') +@click.option( + '--force', + is_flag=True, + help='Reprocess data even if checkpoint exists' +) +@click.option( + '--email', + required=True, + help='Email address for NCBI E-utilities (required by PubMed API)' +) +@click.option( + '--api-key', + default=None, + help='NCBI API key for higher rate limit (10 req/sec vs 3 req/sec). Get from https://www.ncbi.nlm.nih.gov/account/settings/' +) +@click.option( + '--batch-size', + type=int, + default=500, + help='Save partial checkpoints every N genes (default: 500)' +) +@click.pass_context +def literature(ctx, force, email, api_key, batch_size): + """Fetch and load literature evidence from PubMed. + + Queries PubMed for each gene across multiple contexts (cilia, sensory, cytoskeleton, + cell polarity), classifies evidence into quality tiers, and computes quality-weighted + scores with bias mitigation to avoid TP53-like well-studied gene dominance. + + WARNING: This is a SLOW operation (estimated 3-11 hours for 20K genes): + - With API key (10 req/sec): ~3.3 hours + - Without API key (3 req/sec): ~11 hours + + Supports checkpoint-restart: saves partial results every batch-size genes. + Interrupted runs can be resumed (use --force to restart from scratch). + + Get NCBI API key: https://www.ncbi.nlm.nih.gov/account/settings/ + (API Key Management -> Create API Key) + + Examples: + + # With API key (recommended - 3x faster) + usher-pipeline evidence literature --email you@example.com --api-key YOUR_KEY + + # Without API key (slower) + usher-pipeline evidence literature --email you@example.com + + # Force restart from scratch + usher-pipeline evidence literature --email you@example.com --api-key YOUR_KEY --force + """ + config_path = ctx.obj['config_path'] + + click.echo(click.style("=== Literature Evidence (PubMed) ===", bold=True)) + click.echo() + + # Warn about long runtime + if api_key: + click.echo(click.style(" NCBI API key provided: faster rate limit (10 req/sec)", fg='cyan')) + click.echo(click.style(" Estimated runtime: ~3-4 hours for 20K genes", fg='cyan')) + else: + click.echo(click.style(" No API key: using default rate limit (3 req/sec)", fg='yellow')) + click.echo(click.style(" Estimated runtime: ~10-12 hours for 20K genes", fg='yellow')) + click.echo(click.style(" Get API key at: https://www.ncbi.nlm.nih.gov/account/settings/", fg='yellow')) + click.echo() + + store = None + try: + # Load config + click.echo("Loading configuration...") + config = load_config(config_path) + click.echo(click.style(f" Config loaded: {config_path}", fg='green')) + click.echo() + + # Initialize storage and provenance + click.echo("Initializing storage and provenance tracking...") + store = PipelineStore.from_config(config) + provenance = ProvenanceTracker.from_config(config) + click.echo(click.style(" Storage initialized", fg='green')) + click.echo() + + # Check checkpoint + has_checkpoint = store.has_checkpoint('literature_evidence') + + if has_checkpoint and not force: + click.echo(click.style( + "Literature evidence checkpoint exists. Skipping processing (use --force to re-run).", + fg='yellow' + )) + click.echo() + + # Load existing data for summary display + df = store.load_dataframe('literature_evidence') + if df is not None: + total_genes = len(df) + tier_counts = ( + df.group_by("evidence_tier") + .agg(df.select("gene_id").count().alias("count")) + .sort("count", descending=True) + ) + + click.echo(click.style("=== Summary ===", bold=True)) + click.echo(f"Total Genes: {total_genes}") + click.echo("Evidence Tier Distribution:") + for row in tier_counts.to_dicts(): + tier = row["evidence_tier"] + count = row["count"] + pct = (count / total_genes) * 100 + click.echo(f" {tier}: {count} ({pct:.1f}%)") + click.echo(f"DuckDB Path: {config.duckdb_path}") + click.echo() + click.echo(click.style("Evidence layer ready (used existing checkpoint)", fg='green')) + return + + # Load gene universe (need gene_ids and gene_symbols) + click.echo("Loading gene universe from DuckDB...") + gene_universe = store.load_dataframe('gene_universe') + + if gene_universe is None or gene_universe.height == 0: + click.echo(click.style( + "Error: gene_universe table not found. Run 'usher-pipeline setup' first.", + fg='red' + ), err=True) + sys.exit(1) + + gene_ids = gene_universe.select("gene_id").to_series().to_list() + gene_symbol_map = gene_universe.select(["gene_id", "gene_symbol"]).filter( + gene_universe["gene_symbol"].is_not_null() + ) + + click.echo(click.style( + f" Loaded {len(gene_ids)} genes ({gene_symbol_map.height} with symbols)", + fg='green' + )) + click.echo() + + # Process literature evidence + click.echo("Fetching and processing literature evidence from PubMed...") + click.echo(f" Email: {email}") + click.echo(f" Batch size: {batch_size} genes") + click.echo(f" This will take several hours. Progress logged every 100 genes.") + click.echo() + + try: + df = process_literature_evidence( + gene_ids=gene_ids, + gene_symbol_map=gene_symbol_map, + email=email, + api_key=api_key, + batch_size=batch_size, + checkpoint_df=None, # Future: load partial checkpoint if exists + ) + click.echo(click.style( + f" Processed {len(df)} genes", + fg='green' + )) + except Exception as e: + click.echo(click.style(f" Error processing: {e}", fg='red'), err=True) + logger.exception("Failed to process literature evidence") + sys.exit(1) + + click.echo() + provenance.record_step('process_literature_evidence', { + 'total_genes': len(df), + 'email': email, + 'has_api_key': api_key is not None, + 'batch_size': batch_size, + }) + + # Load to DuckDB + click.echo("Loading to DuckDB...") + + literature_dir = Path(config.data_dir) / "literature" + literature_dir.mkdir(parents=True, exist_ok=True) + + try: + literature_load_to_duckdb( + df=df, + store=store, + provenance=provenance, + description="PubMed literature evidence with context-specific queries and quality-weighted scoring" + ) + click.echo(click.style( + f" Saved to 'literature_evidence' table", + fg='green' + )) + except Exception as e: + click.echo(click.style(f" Error loading: {e}", fg='red'), err=True) + logger.exception("Failed to load literature evidence to DuckDB") + sys.exit(1) + + click.echo() + + # Save provenance sidecar + click.echo("Saving provenance metadata...") + provenance_path = literature_dir / "pubmed.provenance.json" + provenance.save_sidecar(provenance_path) + click.echo(click.style(f" Provenance saved: {provenance_path}", fg='green')) + click.echo() + + # Display summary + tier_counts = ( + df.group_by("evidence_tier") + .agg(df.select("gene_id").count().alias("count")) + .sort("count", descending=True) + ) + + genes_with_evidence = df.filter( + df["evidence_tier"].is_in(["direct_experimental", "functional_mention", "hts_hit"]) + ).height + + click.echo(click.style("=== Summary ===", bold=True)) + click.echo(f"Total Genes: {len(df)}") + click.echo("Evidence Tier Distribution:") + for row in tier_counts.to_dicts(): + tier = row["evidence_tier"] + count = row["count"] + pct = (count / len(df)) * 100 + click.echo(f" {tier}: {count} ({pct:.1f}%)") + click.echo() + click.echo(f"Genes with Evidence (direct/functional/hts): {genes_with_evidence}") + click.echo(f"DuckDB Path: {config.duckdb_path}") + click.echo(f"Provenance: {provenance_path}") + click.echo() + click.echo(click.style("Literature evidence layer complete!", fg='green', bold=True)) + + except Exception as e: + click.echo(click.style(f"Evidence command failed: {e}", fg='red'), err=True) + logger.exception("Evidence command failed") + sys.exit(1) + finally: + # Clean up resources + if store is not None: + store.close() diff --git a/src/usher_pipeline/evidence/annotation/__init__.py b/src/usher_pipeline/evidence/annotation/__init__.py index 5a66c3d..c645ba5 100644 --- a/src/usher_pipeline/evidence/annotation/__init__.py +++ b/src/usher_pipeline/evidence/annotation/__init__.py @@ -10,6 +10,7 @@ from usher_pipeline.evidence.annotation.transform import ( normalize_annotation_score, process_annotation_evidence, ) +from usher_pipeline.evidence.annotation.load import load_to_duckdb, query_poorly_annotated __all__ = [ "AnnotationRecord", @@ -19,4 +20,6 @@ __all__ = [ "classify_annotation_tier", "normalize_annotation_score", "process_annotation_evidence", + "load_to_duckdb", + "query_poorly_annotated", ] diff --git a/src/usher_pipeline/evidence/annotation/load.py b/src/usher_pipeline/evidence/annotation/load.py new file mode 100644 index 0000000..0fcebf8 --- /dev/null +++ b/src/usher_pipeline/evidence/annotation/load.py @@ -0,0 +1,119 @@ +"""Load gene annotation completeness data to DuckDB with provenance tracking.""" + +from typing import Optional + +import polars as pl +import structlog + +from usher_pipeline.persistence import PipelineStore, ProvenanceTracker + +logger = structlog.get_logger() + + +def load_to_duckdb( + df: pl.DataFrame, + store: PipelineStore, + provenance: ProvenanceTracker, + description: str = "" +) -> None: + """Save annotation completeness DataFrame to DuckDB with provenance. + + Creates or replaces the annotation_completeness table (idempotent). + Records provenance step with summary statistics. + + Args: + df: Processed annotation completeness DataFrame with tiers and normalized scores + store: PipelineStore instance for DuckDB persistence + provenance: ProvenanceTracker instance for metadata recording + description: Optional description for checkpoint metadata + """ + logger.info("annotation_load_start", row_count=len(df)) + + # Calculate summary statistics for provenance + well_annotated_count = df.filter(pl.col("annotation_tier") == "well_annotated").height + partial_count = df.filter(pl.col("annotation_tier") == "partially_annotated").height + poor_count = df.filter(pl.col("annotation_tier") == "poorly_annotated").height + null_go_count = df.filter(pl.col("go_term_count").is_null()).height + null_uniprot_count = df.filter(pl.col("uniprot_annotation_score").is_null()).height + null_score_count = df.filter(pl.col("annotation_score_normalized").is_null()).height + + # Compute mean/median for non-NULL scores + score_stats = df.filter(pl.col("annotation_score_normalized").is_not_null()).select([ + pl.col("annotation_score_normalized").mean().alias("mean"), + pl.col("annotation_score_normalized").median().alias("median"), + ]) + + mean_score = score_stats["mean"][0] if score_stats.height > 0 else None + median_score = score_stats["median"][0] if score_stats.height > 0 else None + + # Save to DuckDB with CREATE OR REPLACE (idempotent) + store.save_dataframe( + df=df, + table_name="annotation_completeness", + description=description or "Gene annotation completeness metrics with GO terms, UniProt scores, and pathway membership", + replace=True + ) + + # Record provenance step with details + provenance.record_step("load_annotation_completeness", { + "row_count": len(df), + "well_annotated_count": well_annotated_count, + "partially_annotated_count": partial_count, + "poorly_annotated_count": poor_count, + "null_go_count": null_go_count, + "null_uniprot_count": null_uniprot_count, + "null_score_count": null_score_count, + "mean_annotation_score": mean_score, + "median_annotation_score": median_score, + }) + + logger.info( + "annotation_load_complete", + row_count=len(df), + well_annotated=well_annotated_count, + partially_annotated=partial_count, + poorly_annotated=poor_count, + null_go=null_go_count, + null_uniprot=null_uniprot_count, + null_score=null_score_count, + mean_score=mean_score, + median_score=median_score, + ) + + +def query_poorly_annotated( + store: PipelineStore, + max_score: float = 0.3 +) -> pl.DataFrame: + """Query poorly annotated genes from DuckDB. + + Identifies under-studied genes that may be promising cilia/Usher candidates + when combined with other evidence layers. + + Args: + store: PipelineStore instance + max_score: Maximum annotation score threshold (default: 0.3 = lower 30% of annotation distribution) + + Returns: + DataFrame with poorly annotated genes sorted by annotation score (lowest first) + Columns: gene_id, gene_symbol, go_term_count, uniprot_annotation_score, + has_pathway_membership, annotation_tier, annotation_score_normalized + """ + logger.info("annotation_query_poorly_annotated", max_score=max_score) + + # Query DuckDB: poorly annotated genes with valid scores + df = store.execute_query( + """ + SELECT gene_id, gene_symbol, go_term_count, uniprot_annotation_score, + has_pathway_membership, annotation_tier, annotation_score_normalized + FROM annotation_completeness + WHERE annotation_score_normalized IS NOT NULL + AND annotation_score_normalized <= ? + ORDER BY annotation_score_normalized ASC + """, + params=[max_score] + ) + + logger.info("annotation_query_complete", result_count=len(df)) + + return df diff --git a/tests/test_annotation.py b/tests/test_annotation.py new file mode 100644 index 0000000..dceb92c --- /dev/null +++ b/tests/test_annotation.py @@ -0,0 +1,227 @@ +"""Unit tests for annotation evidence layer.""" + +import polars as pl +import pytest +from unittest.mock import Mock, patch + +from usher_pipeline.evidence.annotation import ( + classify_annotation_tier, + normalize_annotation_score, +) + + +def test_go_count_extraction(): + """Test correct GO term counting by category.""" + # Create synthetic data with different GO counts per category + df = pl.DataFrame({ + "gene_id": ["ENSG001", "ENSG002", "ENSG003"], + "gene_symbol": ["GENE1", "GENE2", "GENE3"], + "go_term_count": [50, 15, 3], + "go_biological_process_count": [30, 10, 2], + "go_molecular_function_count": [15, 3, 1], + "go_cellular_component_count": [5, 2, 0], + "uniprot_annotation_score": [5, 4, 2], + "has_pathway_membership": [True, True, False], + }) + + # Verify counts sum correctly (BP + MF + CC should equal total) + for row in df.iter_rows(named=True): + expected_total = ( + row["go_biological_process_count"] + + row["go_molecular_function_count"] + + row["go_cellular_component_count"] + ) + assert row["go_term_count"] == expected_total + + +def test_null_go_handling(): + """Test that genes with no GO data get NULL counts.""" + # Create data with NULL GO counts + df = pl.DataFrame({ + "gene_id": ["ENSG001", "ENSG002"], + "gene_symbol": ["GENE1", "GENE2"], + "go_term_count": [20, None], + "go_biological_process_count": [15, None], + "go_molecular_function_count": [3, None], + "go_cellular_component_count": [2, None], + "uniprot_annotation_score": [4, 3], + "has_pathway_membership": [True, False], + }) + + # Verify NULL is preserved (not converted to 0) + assert df["go_term_count"][1] is None + assert df["go_biological_process_count"][1] is None + + +def test_tier_classification_well_annotated(): + """Test well_annotated tier: high GO + high UniProt.""" + df = pl.DataFrame({ + "gene_id": ["ENSG001", "ENSG002", "ENSG003"], + "gene_symbol": ["GENE1", "GENE2", "GENE3"], + "go_term_count": [25, 20, 22], + "go_biological_process_count": [15, 12, 13], + "go_molecular_function_count": [7, 6, 7], + "go_cellular_component_count": [3, 2, 2], + "uniprot_annotation_score": [5, 4, 4], + "has_pathway_membership": [True, True, False], + }) + + result = classify_annotation_tier(df) + + # All should be well_annotated (GO >= 20 AND UniProt >= 4) + assert all(result["annotation_tier"] == "well_annotated") + + +def test_tier_classification_poorly_annotated(): + """Test poorly_annotated tier: low/NULL GO + low UniProt.""" + df = pl.DataFrame({ + "gene_id": ["ENSG001", "ENSG002", "ENSG003"], + "gene_symbol": ["GENE1", "GENE2", "GENE3"], + "go_term_count": [2, None, 0], + "go_biological_process_count": [1, None, 0], + "go_molecular_function_count": [1, None, 0], + "go_cellular_component_count": [0, None, 0], + "uniprot_annotation_score": [2, None, 1], + "has_pathway_membership": [False, None, False], + }) + + result = classify_annotation_tier(df) + + # All should be poorly_annotated + assert all(result["annotation_tier"] == "poorly_annotated") + + +def test_tier_classification_partial(): + """Test partially_annotated tier: medium annotations.""" + df = pl.DataFrame({ + "gene_id": ["ENSG001", "ENSG002", "ENSG003"], + "gene_symbol": ["GENE1", "GENE2", "GENE3"], + "go_term_count": [10, 3, 15], + "go_biological_process_count": [7, 2, 10], + "go_molecular_function_count": [2, 1, 4], + "go_cellular_component_count": [1, 0, 1], + "uniprot_annotation_score": [3, 3, 2], + "has_pathway_membership": [True, False, True], + }) + + result = classify_annotation_tier(df) + + # All should be partially_annotated (GO >= 5 OR UniProt >= 3) + assert all(result["annotation_tier"] == "partially_annotated") + + +def test_normalization_bounds(): + """Test that normalized scores are always in [0, 1] range.""" + df = pl.DataFrame({ + "gene_id": ["ENSG001", "ENSG002", "ENSG003", "ENSG004"], + "gene_symbol": ["GENE1", "GENE2", "GENE3", "GENE4"], + "go_term_count": [100, 50, 10, 1], + "go_biological_process_count": [60, 30, 7, 1], + "go_molecular_function_count": [30, 15, 2, 0], + "go_cellular_component_count": [10, 5, 1, 0], + "uniprot_annotation_score": [5, 4, 3, 1], + "has_pathway_membership": [True, True, False, False], + }) + + result = normalize_annotation_score(df) + + # All non-NULL scores should be in [0, 1] + scores = result.filter(pl.col("annotation_score_normalized").is_not_null())["annotation_score_normalized"] + assert all(scores >= 0.0) + assert all(scores <= 1.0) + + +def test_normalization_null_preservation(): + """Test that all-NULL inputs produce NULL score.""" + df = pl.DataFrame({ + "gene_id": ["ENSG001"], + "gene_symbol": ["GENE1"], + "go_term_count": pl.Series([None], dtype=pl.Int64), + "go_biological_process_count": pl.Series([None], dtype=pl.Int64), + "go_molecular_function_count": pl.Series([None], dtype=pl.Int64), + "go_cellular_component_count": pl.Series([None], dtype=pl.Int64), + "uniprot_annotation_score": pl.Series([None], dtype=pl.Int64), + "has_pathway_membership": pl.Series([None], dtype=pl.Boolean), + }) + + result = normalize_annotation_score(df) + + # Should get NULL score (not 0.0) + assert result["annotation_score_normalized"][0] is None + + +def test_normalization_with_pathway(): + """Test that pathway membership contributes to score.""" + # Two genes with identical GO/UniProt, different pathway membership + df = pl.DataFrame({ + "gene_id": ["ENSG001", "ENSG002"], + "gene_symbol": ["GENE1", "GENE2"], + "go_term_count": [10, 10], + "go_biological_process_count": [7, 7], + "go_molecular_function_count": [2, 2], + "go_cellular_component_count": [1, 1], + "uniprot_annotation_score": [3, 3], + "has_pathway_membership": [True, False], + }) + + result = normalize_annotation_score(df) + + # Gene with pathway should have higher score + assert result["annotation_score_normalized"][0] > result["annotation_score_normalized"][1] + + +def test_composite_weighting(): + """Test that composite score follows 0.5/0.3/0.2 weight distribution.""" + # Create gene with only GO data (should contribute 50% weight) + df_go_only = pl.DataFrame({ + "gene_id": ["ENSG001"], + "gene_symbol": ["GENE1"], + "go_term_count": [100], # Max GO to get full GO component + "go_biological_process_count": [60], + "go_molecular_function_count": [30], + "go_cellular_component_count": [10], + "uniprot_annotation_score": pl.Series([None], dtype=pl.Int64), + "has_pathway_membership": pl.Series([None], dtype=pl.Boolean), + }) + + # Create gene with only UniProt data (should contribute 30% weight) + df_uniprot_only = pl.DataFrame({ + "gene_id": ["ENSG002"], + "gene_symbol": ["GENE2"], + "go_term_count": pl.Series([None], dtype=pl.Int64), + "go_biological_process_count": pl.Series([None], dtype=pl.Int64), + "go_molecular_function_count": pl.Series([None], dtype=pl.Int64), + "go_cellular_component_count": pl.Series([None], dtype=pl.Int64), + "uniprot_annotation_score": [5], # Max UniProt score + "has_pathway_membership": pl.Series([None], dtype=pl.Boolean), + }) + + # Create gene with only pathway data (should contribute 20% weight) + df_pathway_only = pl.DataFrame({ + "gene_id": ["ENSG003"], + "gene_symbol": ["GENE3"], + "go_term_count": pl.Series([None], dtype=pl.Int64), + "go_biological_process_count": pl.Series([None], dtype=pl.Int64), + "go_molecular_function_count": pl.Series([None], dtype=pl.Int64), + "go_cellular_component_count": pl.Series([None], dtype=pl.Int64), + "uniprot_annotation_score": pl.Series([None], dtype=pl.Int64), + "has_pathway_membership": [True], + }) + + # Normalize each separately (need same GO max, so combine first) + df_combined = pl.concat([df_go_only, df_uniprot_only, df_pathway_only]) + result = normalize_annotation_score(df_combined) + + # Check approximate weights (allowing for small rounding) + go_score = result["annotation_score_normalized"][0] + uniprot_score = result["annotation_score_normalized"][1] + pathway_score = result["annotation_score_normalized"][2] + + # GO component should be ~0.5 (full weight) + assert abs(go_score - 0.5) < 0.01 + + # UniProt component should be 0.3 (full score * weight) + assert abs(uniprot_score - 0.3) < 0.01 + + # Pathway component should be 0.2 (full weight) + assert abs(pathway_score - 0.2) < 0.01 diff --git a/tests/test_annotation_integration.py b/tests/test_annotation_integration.py new file mode 100644 index 0000000..364874a --- /dev/null +++ b/tests/test_annotation_integration.py @@ -0,0 +1,307 @@ +"""Integration tests for annotation evidence layer.""" + +import polars as pl +import pytest +from pathlib import Path +from unittest.mock import Mock, patch, MagicMock + +from usher_pipeline.config.loader import load_config +from usher_pipeline.persistence import PipelineStore, ProvenanceTracker +from usher_pipeline.evidence.annotation import ( + process_annotation_evidence, + load_to_duckdb, + query_poorly_annotated, +) + + +@pytest.fixture +def test_config(tmp_path): + """Create test configuration.""" + config_dir = tmp_path / "config" + config_dir.mkdir() + data_dir = tmp_path / "data" + data_dir.mkdir() + + config_yaml = f""" +project_name: "usher-pipeline-test" +data_dir: "{data_dir}" +cache_dir: "{tmp_path / 'cache'}" +duckdb_path: "{tmp_path / 'test.duckdb'}" + +versions: + ensembl_release: 112 + gnomad_version: "4.1" + +api: + rate_limit_per_second: 5 + max_retries: 3 + cache_ttl_seconds: 86400 + timeout_seconds: 30 + +scoring: + gnomad: 0.20 + expression: 0.20 + annotation: 0.15 + localization: 0.15 + animal_model: 0.15 + literature: 0.15 +""" + config_file = config_dir / "pipeline.yaml" + config_file.write_text(config_yaml) + + return load_config(config_file) + + +@pytest.fixture +def mock_gene_ids(): + """Sample gene IDs for testing.""" + return ["ENSG001", "ENSG002", "ENSG003", "ENSG004", "ENSG005"] + + +@pytest.fixture +def mock_uniprot_mapping(): + """Mock UniProt mapping DataFrame.""" + return pl.DataFrame({ + "gene_id": ["ENSG001", "ENSG002", "ENSG003"], + "uniprot_accession": ["P12345", "Q67890", "A11111"], + }) + + +@pytest.fixture +def synthetic_annotation_data(): + """Create synthetic annotation data for testing.""" + return pl.DataFrame({ + "gene_id": ["ENSG001", "ENSG002", "ENSG003", "ENSG004", "ENSG005"], + "gene_symbol": ["GENE1", "GENE2", "GENE3", "GENE4", "GENE5"], + "go_term_count": [50, 15, 5, None, 2], + "go_biological_process_count": [30, 10, 3, None, 1], + "go_molecular_function_count": [15, 3, 2, None, 1], + "go_cellular_component_count": [5, 2, 0, None, 0], + "uniprot_annotation_score": [5, 4, 3, None, 1], + "has_pathway_membership": [True, True, False, None, False], + "annotation_tier": ["well_annotated", "well_annotated", "partially_annotated", "poorly_annotated", "poorly_annotated"], + "annotation_score_normalized": [0.9, 0.75, 0.45, None, 0.15], + }) + + +def mock_mygene_querymany(gene_ids, **kwargs): + """Mock mygene.querymany response.""" + # Simulate different annotation levels + return [ + { + "query": "ENSG001", + "symbol": "GENE1", + "go": { + "BP": [{"id": f"GO:000{i}"} for i in range(30)], + "MF": [{"id": f"GO:100{i}"} for i in range(15)], + "CC": [{"id": f"GO:200{i}"} for i in range(5)], + }, + "pathway": { + "kegg": [{"id": "hsa00001"}], + "reactome": [{"id": "R-HSA-00001"}], + }, + }, + { + "query": "ENSG002", + "symbol": "GENE2", + "go": { + "BP": [{"id": f"GO:000{i}"} for i in range(10)], + "MF": [{"id": f"GO:100{i}"} for i in range(3)], + "CC": [{"id": f"GO:200{i}"} for i in range(2)], + }, + "pathway": {"kegg": [{"id": "hsa00002"}]}, + }, + { + "query": "ENSG003", + "symbol": "GENE3", + "go": { + "BP": [{"id": "GO:0001"}, {"id": "GO:0002"}], + }, + "pathway": {}, + }, + { + "query": "ENSG004", + "symbol": "GENE4", + # No GO or pathway data + }, + { + "query": "ENSG005", + "symbol": "GENE5", + "go": { + "BP": [{"id": "GO:0001"}], + }, + }, + ] + + +def mock_uniprot_api_response(): + """Mock UniProt API response.""" + return { + "results": [ + {"primaryAccession": "P12345", "annotationScore": 5}, + {"primaryAccession": "Q67890", "annotationScore": 4}, + {"primaryAccession": "A11111", "annotationScore": 3}, + ] + } + + +@patch("usher_pipeline.evidence.annotation.fetch._get_mygene_client") +@patch("usher_pipeline.evidence.annotation.fetch._query_uniprot_batch") +def test_process_annotation_evidence_pipeline( + mock_uniprot, mock_mygene_client, mock_gene_ids, mock_uniprot_mapping +): + """Test full annotation evidence processing pipeline.""" + # Setup mocks + mock_mg = Mock() + mock_mg.querymany.return_value = mock_mygene_querymany(mock_gene_ids) + mock_mygene_client.return_value = mock_mg + + mock_uniprot.return_value = { + "P12345": 5, + "Q67890": 4, + "A11111": 3, + } + + # Run pipeline + result = process_annotation_evidence(mock_gene_ids, mock_uniprot_mapping) + + # Verify results + assert result.height == len(mock_gene_ids) + assert "annotation_tier" in result.columns + assert "annotation_score_normalized" in result.columns + + # Check that tiers are classified + tiers = result["annotation_tier"].unique().to_list() + assert "well_annotated" in tiers or "partially_annotated" in tiers or "poorly_annotated" in tiers + + # Verify mygene was called + mock_mg.querymany.assert_called_once() + + # Verify UniProt was queried + mock_uniprot.assert_called() + + +def test_load_to_duckdb_idempotent(test_config, synthetic_annotation_data): + """Test that load_to_duckdb is idempotent (CREATE OR REPLACE).""" + store = PipelineStore.from_config(test_config) + provenance = ProvenanceTracker.from_config(test_config) + + # First load + load_to_duckdb(synthetic_annotation_data, store, provenance, "First load") + + # Verify data exists + df1 = store.load_dataframe("annotation_completeness") + assert df1 is not None + assert df1.height == synthetic_annotation_data.height + + # Second load (should replace) + modified_data = synthetic_annotation_data.with_columns( + pl.lit("test_modified").alias("gene_symbol") + ) + load_to_duckdb(modified_data, store, provenance, "Second load") + + # Verify data was replaced + df2 = store.load_dataframe("annotation_completeness") + assert df2 is not None + assert df2.height == modified_data.height + assert all(df2["gene_symbol"] == "test_modified") + + store.close() + + +def test_checkpoint_restart(test_config, synthetic_annotation_data): + """Test checkpoint-restart pattern.""" + store = PipelineStore.from_config(test_config) + provenance = ProvenanceTracker.from_config(test_config) + + # Initially no checkpoint + assert not store.has_checkpoint("annotation_completeness") + + # Load creates checkpoint + load_to_duckdb(synthetic_annotation_data, store, provenance) + + # Now checkpoint exists + assert store.has_checkpoint("annotation_completeness") + + # Can load existing data + df = store.load_dataframe("annotation_completeness") + assert df is not None + assert df.height == synthetic_annotation_data.height + + store.close() + + +def test_provenance_recording(test_config, synthetic_annotation_data): + """Test that provenance metadata is recorded correctly.""" + store = PipelineStore.from_config(test_config) + provenance = ProvenanceTracker.from_config(test_config) + + load_to_duckdb(synthetic_annotation_data, store, provenance) + + # Verify provenance step was recorded + steps = provenance.processing_steps + assert len(steps) > 0 + + step = steps[-1] + assert step["step_name"] == "load_annotation_completeness" + assert "row_count" in step["details"] + assert step["details"]["row_count"] == synthetic_annotation_data.height + assert "well_annotated_count" in step["details"] + assert "poorly_annotated_count" in step["details"] + + store.close() + + +def test_query_poorly_annotated(test_config, synthetic_annotation_data): + """Test querying poorly annotated genes.""" + store = PipelineStore.from_config(test_config) + provenance = ProvenanceTracker.from_config(test_config) + + # Load data + load_to_duckdb(synthetic_annotation_data, store, provenance) + + # Query poorly annotated genes (score <= 0.3) + result = query_poorly_annotated(store, max_score=0.3) + + # Should return genes with low scores + assert result.height > 0 + assert all(result["annotation_score_normalized"] <= 0.3) + + # Results should be sorted by score (lowest first) + scores = result["annotation_score_normalized"].to_list() + assert scores == sorted(scores) + + store.close() + + +def test_null_handling_throughout_pipeline(test_config, mock_gene_ids, mock_uniprot_mapping): + """Test that NULL values are preserved throughout the pipeline.""" + # Create data with NULLs + data_with_nulls = pl.DataFrame({ + "gene_id": ["ENSG001", "ENSG002"], + "gene_symbol": ["GENE1", "GENE2"], + "go_term_count": [10, None], + "go_biological_process_count": [7, None], + "go_molecular_function_count": [2, None], + "go_cellular_component_count": [1, None], + "uniprot_annotation_score": [3, None], + "has_pathway_membership": [True, None], + "annotation_tier": ["partially_annotated", "poorly_annotated"], + "annotation_score_normalized": [0.5, None], + }) + + store = PipelineStore.from_config(test_config) + provenance = ProvenanceTracker.from_config(test_config) + + # Load to DuckDB + load_to_duckdb(data_with_nulls, store, provenance) + + # Load back and verify NULLs preserved + result = store.load_dataframe("annotation_completeness") + + # Gene with NULL GO should have NULL in result + gene2 = result.filter(pl.col("gene_id") == "ENSG002") + assert gene2["go_term_count"][0] is None + assert gene2["annotation_score_normalized"][0] is None + + store.close()