feat(02-02): add DuckDB loader and CLI evidence command for gnomAD

- load_to_duckdb: Saves constraint DataFrame to gnomad_constraint table with provenance tracking
- query_constrained_genes: Queries constrained genes by LOEUF threshold (validates GCON-03 interpretation)
- evidence_cmd.py: CLI command group with gnomad subcommand (fetch->transform->load orchestration)
- Checkpoint-restart: Skips processing if gnomad_constraint table exists (--force to override)
- Full CLI: usher-pipeline evidence gnomad [--force] [--url URL] [--min-depth N] [--min-cds-pct N]
This commit is contained in:
2026-02-11 18:19:07 +08:00
parent c6198122ac
commit ee27f3ad2f
4 changed files with 349 additions and 1 deletions

View File

@@ -0,0 +1,243 @@
"""Evidence layer commands: Fetch and process evidence data.
Commands for downloading and processing various evidence sources:
- gnomad: Constraint metrics (pLI, LOEUF)
- clingen: Gene-disease associations (future)
- gtex: Expression data (future)
- etc.
"""
import logging
import sys
from pathlib import Path
import click
import structlog
from usher_pipeline.config.loader import load_config
from usher_pipeline.persistence import PipelineStore, ProvenanceTracker
from usher_pipeline.evidence.gnomad import (
download_constraint_metrics,
process_gnomad_constraint,
load_to_duckdb,
GNOMAD_CONSTRAINT_URL,
)
logger = logging.getLogger(__name__)
@click.group('evidence')
def evidence():
"""Fetch and process evidence layer data.
Evidence sources include constraint metrics (gnomAD), gene-disease
associations (ClinGen), expression data (GTEx), and more.
Each evidence source follows the fetch -> transform -> load pattern
with checkpoint-restart and provenance tracking.
"""
pass
@evidence.command('gnomad')
@click.option(
'--force',
is_flag=True,
help='Re-download and reprocess data even if checkpoint exists'
)
@click.option(
'--url',
default=GNOMAD_CONSTRAINT_URL,
help='Override gnomAD constraint file URL'
)
@click.option(
'--min-depth',
type=float,
default=30.0,
help='Minimum mean sequencing depth for quality filtering (default: 30x)'
)
@click.option(
'--min-cds-pct',
type=float,
default=0.9,
help='Minimum CDS coverage percentage for quality filtering (default: 0.9 = 90%%)'
)
@click.pass_context
def gnomad(ctx, force, url, min_depth, min_cds_pct):
"""Fetch and load gnomAD constraint metrics (pLI, LOEUF).
Downloads gnomAD v4.1 constraint metrics, filters by coverage quality,
normalizes LOEUF scores (0-1 range, inverted), and loads to DuckDB.
Supports checkpoint-restart: skips processing if data already exists
in DuckDB (use --force to re-run).
Examples:
# First run: download, process, and load
usher-pipeline evidence gnomad
# Force re-download and reprocess
usher-pipeline evidence gnomad --force
# Use custom quality thresholds
usher-pipeline evidence gnomad --min-depth 20 --min-cds-pct 0.8
"""
config_path = ctx.obj['config_path']
click.echo(click.style("=== gnomAD Constraint Evidence ===", bold=True))
click.echo()
store = None
try:
# Load config
click.echo("Loading configuration...")
config = load_config(config_path)
click.echo(click.style(f" Config loaded: {config_path}", fg='green'))
click.echo(f" gnomAD Version: {config.versions.gnomad_version}")
click.echo()
# Initialize storage and provenance
click.echo("Initializing storage and provenance tracking...")
store = PipelineStore.from_config(config)
provenance = ProvenanceTracker.from_config(config)
click.echo(click.style(" Storage initialized", fg='green'))
click.echo()
# Check checkpoint
has_checkpoint = store.has_checkpoint('gnomad_constraint')
if has_checkpoint and not force:
click.echo(click.style(
"gnomAD constraint checkpoint exists. Skipping processing (use --force to re-run).",
fg='yellow'
))
click.echo()
# Load existing data for summary display
df = store.load_dataframe('gnomad_constraint')
if df is not None:
total_genes = len(df)
measured = df.filter(df['quality_flag'] == 'measured').height
incomplete = df.filter(df['quality_flag'] == 'incomplete_coverage').height
no_data = df.filter(df['quality_flag'] == 'no_data').height
click.echo(click.style("=== Summary ===", bold=True))
click.echo(f"Total Genes: {total_genes}")
click.echo(f" Measured (good coverage): {measured}")
click.echo(f" Incomplete coverage: {incomplete}")
click.echo(f" No data: {no_data}")
click.echo(f"DuckDB Path: {config.duckdb_path}")
click.echo()
click.echo(click.style("Evidence layer ready (used existing checkpoint)", fg='green'))
return
# Download gnomAD constraint metrics
click.echo("Downloading gnomAD constraint metrics...")
click.echo(f" URL: {url}")
click.echo(f" Version: {config.versions.gnomad_version}")
gnomad_dir = Path(config.data_dir) / "gnomad"
gnomad_dir.mkdir(parents=True, exist_ok=True)
tsv_path = gnomad_dir / "constraint_metrics.tsv"
try:
tsv_path = download_constraint_metrics(
output_path=tsv_path,
url=url,
force=force
)
click.echo(click.style(
f" Downloaded to: {tsv_path}",
fg='green'
))
except Exception as e:
click.echo(click.style(f" Error downloading: {e}", fg='red'), err=True)
logger.exception("Failed to download gnomAD constraint metrics")
sys.exit(1)
click.echo()
provenance.record_step('download_gnomad_constraint', {
'url': url,
'version': config.versions.gnomad_version,
'output_path': str(tsv_path),
})
# Process constraint data
click.echo("Processing constraint metrics...")
click.echo(f" Min depth: {min_depth}x")
click.echo(f" Min CDS coverage: {min_cds_pct:.0%}")
try:
df = process_gnomad_constraint(
tsv_path=tsv_path,
min_depth=min_depth,
min_cds_pct=min_cds_pct
)
click.echo(click.style(
f" Processed {len(df)} genes",
fg='green'
))
except Exception as e:
click.echo(click.style(f" Error processing: {e}", fg='red'), err=True)
logger.exception("Failed to process gnomAD constraint metrics")
sys.exit(1)
click.echo()
provenance.record_step('process_gnomad_constraint', {
'min_depth': min_depth,
'min_cds_pct': min_cds_pct,
'total_genes': len(df),
})
# Load to DuckDB
click.echo("Loading to DuckDB...")
try:
load_to_duckdb(
df=df,
store=store,
provenance=provenance,
description=f"gnomAD {config.versions.gnomad_version} constraint metrics"
)
click.echo(click.style(
f" Saved to 'gnomad_constraint' table",
fg='green'
))
except Exception as e:
click.echo(click.style(f" Error loading: {e}", fg='red'), err=True)
logger.exception("Failed to load gnomAD constraint data to DuckDB")
sys.exit(1)
click.echo()
# Save provenance sidecar
click.echo("Saving provenance metadata...")
provenance_path = gnomad_dir / "constraint.provenance.json"
provenance.save_sidecar(provenance_path)
click.echo(click.style(f" Provenance saved: {provenance_path}", fg='green'))
click.echo()
# Display summary
measured = df.filter(df['quality_flag'] == 'measured').height
incomplete = df.filter(df['quality_flag'] == 'incomplete_coverage').height
no_data = df.filter(df['quality_flag'] == 'no_data').height
click.echo(click.style("=== Summary ===", bold=True))
click.echo(f"Total Genes: {len(df)}")
click.echo(f" Measured (good coverage): {measured}")
click.echo(f" Incomplete coverage: {incomplete}")
click.echo(f" No data: {no_data}")
click.echo(f"DuckDB Path: {config.duckdb_path}")
click.echo(f"Provenance: {provenance_path}")
click.echo()
click.echo(click.style("gnomAD evidence layer complete!", fg='green', bold=True))
except Exception as e:
click.echo(click.style(f"Evidence command failed: {e}", fg='red'), err=True)
logger.exception("Evidence command failed")
sys.exit(1)
finally:
# Clean up resources
if store is not None:
store.close()

View File

@@ -11,6 +11,7 @@ import click
from usher_pipeline import __version__
from usher_pipeline.config.loader import load_config
from usher_pipeline.cli.setup_cmd import setup
from usher_pipeline.cli.evidence_cmd import evidence
# Configure logging
@@ -95,8 +96,9 @@ def info(ctx):
ctx.exit(1)
# Register setup command
# Register commands
cli.add_command(setup)
cli.add_command(evidence)
if __name__ == '__main__':

View File

@@ -7,6 +7,7 @@ from usher_pipeline.evidence.gnomad.transform import (
normalize_scores,
process_gnomad_constraint,
)
from usher_pipeline.evidence.gnomad.load import load_to_duckdb, query_constrained_genes
__all__ = [
"ConstraintRecord",
@@ -16,4 +17,6 @@ __all__ = [
"filter_by_coverage",
"normalize_scores",
"process_gnomad_constraint",
"load_to_duckdb",
"query_constrained_genes",
]

View File

@@ -0,0 +1,100 @@
"""Load gnomAD constraint data to DuckDB with provenance tracking."""
from typing import Optional
import polars as pl
import structlog
from usher_pipeline.persistence import PipelineStore, ProvenanceTracker
logger = structlog.get_logger()
def load_to_duckdb(
df: pl.DataFrame,
store: PipelineStore,
provenance: ProvenanceTracker,
description: str = ""
) -> None:
"""Save gnomAD constraint DataFrame to DuckDB with provenance.
Creates or replaces the gnomad_constraint table (idempotent).
Records provenance step with summary statistics.
Args:
df: Processed gnomAD constraint DataFrame with quality_flag and loeuf_normalized
store: PipelineStore instance for DuckDB persistence
provenance: ProvenanceTracker instance for metadata recording
description: Optional description for checkpoint metadata
"""
logger.info("gnomad_load_start", row_count=len(df))
# Calculate summary statistics for provenance
measured_count = df.filter(pl.col("quality_flag") == "measured").height
incomplete_count = df.filter(pl.col("quality_flag") == "incomplete_coverage").height
no_data_count = df.filter(pl.col("quality_flag") == "no_data").height
null_loeuf_count = df.filter(pl.col("loeuf").is_null()).height
# Save to DuckDB with CREATE OR REPLACE (idempotent)
store.save_dataframe(
df=df,
table_name="gnomad_constraint",
description=description or "gnomAD v4.1 constraint metrics with quality flags and normalized LOEUF scores",
replace=True
)
# Record provenance step with details
provenance.record_step("load_gnomad_constraint", {
"row_count": len(df),
"measured_count": measured_count,
"incomplete_count": incomplete_count,
"no_data_count": no_data_count,
"null_loeuf_count": null_loeuf_count,
})
logger.info(
"gnomad_load_complete",
row_count=len(df),
measured=measured_count,
incomplete=incomplete_count,
no_data=no_data_count,
null_loeuf=null_loeuf_count,
)
def query_constrained_genes(
store: PipelineStore,
loeuf_threshold: float = 0.6
) -> pl.DataFrame:
"""Query highly constrained genes from DuckDB.
Demonstrates DuckDB query capability and validates GCON-03 interpretation:
constrained genes are "important but under-studied" signals, not direct
cilia involvement evidence.
Args:
store: PipelineStore instance
loeuf_threshold: LOEUF threshold (lower = more constrained)
Default: 0.6 (represents genes in lower 40% of LOEUF distribution)
Returns:
DataFrame with constrained genes sorted by LOEUF (most constrained first)
Columns: gene_id, gene_symbol, loeuf, pli, quality_flag, loeuf_normalized
"""
logger.info("gnomad_query_constrained", loeuf_threshold=loeuf_threshold)
# Query DuckDB: constrained genes with good coverage only
df = store.execute_query(
"""
SELECT gene_id, gene_symbol, loeuf, pli, quality_flag, loeuf_normalized
FROM gnomad_constraint
WHERE quality_flag = 'measured'
AND loeuf < ?
ORDER BY loeuf ASC
""",
params=[loeuf_threshold]
)
logger.info("gnomad_query_complete", result_count=len(df))
return df