feat(03-01): add annotation DuckDB loader, CLI command, and tests

- Create load_to_duckdb with provenance tracking and tier distribution stats
- Add query_poorly_annotated helper to find under-studied genes
- Register `evidence annotation` CLI command with checkpoint-restart pattern
- Add comprehensive unit tests (9 tests) covering GO extraction, NULL handling, tier classification, score normalization, weighting
- Add integration tests (6 tests) for pipeline, idempotency, checkpoint-restart, provenance, queries
- All 15 tests pass with proper NULL preservation and schema validation
This commit is contained in:
2026-02-11 19:03:10 +08:00
parent 0e389c7e41
commit d70239c4ce
5 changed files with 1625 additions and 2 deletions

View File

@@ -10,6 +10,7 @@ from usher_pipeline.evidence.annotation.transform import (
normalize_annotation_score,
process_annotation_evidence,
)
from usher_pipeline.evidence.annotation.load import load_to_duckdb, query_poorly_annotated
__all__ = [
"AnnotationRecord",
@@ -19,4 +20,6 @@ __all__ = [
"classify_annotation_tier",
"normalize_annotation_score",
"process_annotation_evidence",
"load_to_duckdb",
"query_poorly_annotated",
]

View File

@@ -0,0 +1,119 @@
"""Load gene annotation completeness data to DuckDB with provenance tracking."""
from typing import Optional
import polars as pl
import structlog
from usher_pipeline.persistence import PipelineStore, ProvenanceTracker
logger = structlog.get_logger()
def load_to_duckdb(
df: pl.DataFrame,
store: PipelineStore,
provenance: ProvenanceTracker,
description: str = ""
) -> None:
"""Save annotation completeness DataFrame to DuckDB with provenance.
Creates or replaces the annotation_completeness table (idempotent).
Records provenance step with summary statistics.
Args:
df: Processed annotation completeness DataFrame with tiers and normalized scores
store: PipelineStore instance for DuckDB persistence
provenance: ProvenanceTracker instance for metadata recording
description: Optional description for checkpoint metadata
"""
logger.info("annotation_load_start", row_count=len(df))
# Calculate summary statistics for provenance
well_annotated_count = df.filter(pl.col("annotation_tier") == "well_annotated").height
partial_count = df.filter(pl.col("annotation_tier") == "partially_annotated").height
poor_count = df.filter(pl.col("annotation_tier") == "poorly_annotated").height
null_go_count = df.filter(pl.col("go_term_count").is_null()).height
null_uniprot_count = df.filter(pl.col("uniprot_annotation_score").is_null()).height
null_score_count = df.filter(pl.col("annotation_score_normalized").is_null()).height
# Compute mean/median for non-NULL scores
score_stats = df.filter(pl.col("annotation_score_normalized").is_not_null()).select([
pl.col("annotation_score_normalized").mean().alias("mean"),
pl.col("annotation_score_normalized").median().alias("median"),
])
mean_score = score_stats["mean"][0] if score_stats.height > 0 else None
median_score = score_stats["median"][0] if score_stats.height > 0 else None
# Save to DuckDB with CREATE OR REPLACE (idempotent)
store.save_dataframe(
df=df,
table_name="annotation_completeness",
description=description or "Gene annotation completeness metrics with GO terms, UniProt scores, and pathway membership",
replace=True
)
# Record provenance step with details
provenance.record_step("load_annotation_completeness", {
"row_count": len(df),
"well_annotated_count": well_annotated_count,
"partially_annotated_count": partial_count,
"poorly_annotated_count": poor_count,
"null_go_count": null_go_count,
"null_uniprot_count": null_uniprot_count,
"null_score_count": null_score_count,
"mean_annotation_score": mean_score,
"median_annotation_score": median_score,
})
logger.info(
"annotation_load_complete",
row_count=len(df),
well_annotated=well_annotated_count,
partially_annotated=partial_count,
poorly_annotated=poor_count,
null_go=null_go_count,
null_uniprot=null_uniprot_count,
null_score=null_score_count,
mean_score=mean_score,
median_score=median_score,
)
def query_poorly_annotated(
store: PipelineStore,
max_score: float = 0.3
) -> pl.DataFrame:
"""Query poorly annotated genes from DuckDB.
Identifies under-studied genes that may be promising cilia/Usher candidates
when combined with other evidence layers.
Args:
store: PipelineStore instance
max_score: Maximum annotation score threshold (default: 0.3 = lower 30% of annotation distribution)
Returns:
DataFrame with poorly annotated genes sorted by annotation score (lowest first)
Columns: gene_id, gene_symbol, go_term_count, uniprot_annotation_score,
has_pathway_membership, annotation_tier, annotation_score_normalized
"""
logger.info("annotation_query_poorly_annotated", max_score=max_score)
# Query DuckDB: poorly annotated genes with valid scores
df = store.execute_query(
"""
SELECT gene_id, gene_symbol, go_term_count, uniprot_annotation_score,
has_pathway_membership, annotation_tier, annotation_score_normalized
FROM annotation_completeness
WHERE annotation_score_normalized IS NOT NULL
AND annotation_score_normalized <= ?
ORDER BY annotation_score_normalized ASC
""",
params=[max_score]
)
logger.info("annotation_query_complete", result_count=len(df))
return df