feat(02-01): add gnomAD transform pipeline and comprehensive tests

- Implement filter_by_coverage with quality_flag categorization (measured/incomplete_coverage/no_data)
- Add normalize_scores with LOEUF inversion (lower LOEUF = higher score)
- NULL preservation throughout pipeline (unknown != zero constraint)
- process_gnomad_constraint end-to-end pipeline function
- 15 comprehensive unit tests covering edge cases:
  - NULL handling and preservation
  - Coverage filtering without dropping genes
  - Normalization bounds and inversion
  - Mixed type handling for robust parsing
- Fix column mapping to handle gnomAD v4.x loeuf/loeuf_upper duplication
- All existing tests continue to pass
This commit is contained in:
2026-02-11 18:14:41 +08:00
parent a88b0eea60
commit 174c4af02d
5 changed files with 522 additions and 5 deletions

View File

@@ -2,10 +2,18 @@
from usher_pipeline.evidence.gnomad.models import ConstraintRecord, GNOMAD_CONSTRAINT_URL
from usher_pipeline.evidence.gnomad.fetch import download_constraint_metrics, parse_constraint_tsv
from usher_pipeline.evidence.gnomad.transform import (
filter_by_coverage,
normalize_scores,
process_gnomad_constraint,
)
__all__ = [
"ConstraintRecord",
"GNOMAD_CONSTRAINT_URL",
"download_constraint_metrics",
"parse_constraint_tsv",
"filter_by_coverage",
"normalize_scores",
"process_gnomad_constraint",
]

View File

@@ -144,11 +144,15 @@ def parse_constraint_tsv(tsv_path: Path) -> pl.LazyFrame:
)
# Map actual columns to our standardized names
# Track which source columns we've already used to avoid duplicates
column_mapping = {}
used_source_cols = set()
for our_name, variants in COLUMN_VARIANTS.items():
for variant in variants:
if variant in actual_columns:
if variant in actual_columns and variant not in used_source_cols:
column_mapping[variant] = our_name
used_source_cols.add(variant)
break
if not column_mapping:
@@ -164,4 +168,13 @@ def parse_constraint_tsv(tsv_path: Path) -> pl.LazyFrame:
# Select and rename mapped columns
lf = lf.select([pl.col(old).alias(new) for old, new in column_mapping.items()])
# Special case: if loeuf is mapped but loeuf_upper is not, duplicate loeuf to loeuf_upper
# (In gnomAD, the "upper" value IS the LOEUF we use)
mapped_names = set(column_mapping.values())
if "loeuf" in mapped_names and "loeuf_upper" not in mapped_names:
lf = lf.with_columns(pl.col("loeuf").alias("loeuf_upper"))
elif "loeuf_upper" in mapped_names and "loeuf" not in mapped_names:
# If we only got loeuf_upper, copy it to loeuf
lf = lf.with_columns(pl.col("loeuf_upper").alias("loeuf"))
return lf

View File

@@ -9,15 +9,18 @@ GNOMAD_CONSTRAINT_URL = (
)
# Column name mapping for different gnomAD versions
# v2.1.1 uses: gene, transcript, pLI, oe_lof_upper (LOEUF), mean_proportion_covered_bases
# v4.x uses: gene, transcript, mane_select, lof.pLI, lof.oe_ci.upper, mean_proportion_covered
# v2.1.1 uses: gene, transcript, pLI, oe_lof_upper (LOEUF upper CI), mean_proportion_covered_bases
# v4.x uses: gene, transcript, mane_select, lof.pLI, lof.oe_ci.upper (LOEUF), mean_proportion_covered
# NOTE: In gnomAD data, what's called "upper" is actually the LOEUF value we want (observed/expected upper bound)
COLUMN_VARIANTS = {
"gene_id": ["gene", "gene_id"],
"gene_symbol": ["gene_symbol", "gene"],
"transcript": ["transcript", "canonical_transcript", "mane_select"],
"pli": ["pLI", "lof.pLI", "pli"],
"loeuf": ["oe_lof_upper", "lof.oe_ci.upper", "oe_lof", "loeuf"],
"loeuf_upper": ["oe_lof_upper_ci", "lof.oe_ci.upper", "oe_lof_upper"],
# LOEUF is the "upper" column in gnomAD (oe_lof_upper = observed/expected upper bound)
"loeuf": ["lof.oe_ci.upper", "oe_lof_upper", "oe_lof", "loeuf"],
# loeuf_upper is typically the same as loeuf in gnomAD data (they report the upper CI)
"loeuf_upper": ["lof.oe_ci.upper", "oe_lof_upper_ci", "oe_lof_upper"],
"mean_depth": ["mean_coverage", "mean_depth", "mean_cov"],
"cds_covered_pct": [
"mean_proportion_covered_bases",

View File

@@ -0,0 +1,150 @@
"""Transform and normalize gnomAD constraint metrics."""
from pathlib import Path
import polars as pl
import structlog
from usher_pipeline.evidence.gnomad.fetch import parse_constraint_tsv
logger = structlog.get_logger()
def filter_by_coverage(
lf: pl.LazyFrame,
min_depth: float = 30.0,
min_cds_pct: float = 0.9,
) -> pl.LazyFrame:
"""Add quality_flag column based on coverage thresholds.
Does NOT drop any genes - preserves all rows with quality categorization.
"Unknown" constraint is semantically different from "zero" constraint.
Args:
lf: LazyFrame with gnomAD constraint data
min_depth: Minimum mean sequencing depth (default: 30x)
min_cds_pct: Minimum CDS coverage fraction (default: 0.9 = 90%)
Returns:
LazyFrame with quality_flag column added:
- "measured": Good coverage AND has LOEUF estimate
- "incomplete_coverage": Coverage below thresholds
- "no_data": Both LOEUF and pLI are NULL
"""
# Ensure numeric columns are properly cast to float (handles edge cases with mixed types)
lf = lf.with_columns([
pl.col("mean_depth").cast(pl.Float64, strict=False),
pl.col("cds_covered_pct").cast(pl.Float64, strict=False),
pl.col("loeuf").cast(pl.Float64, strict=False),
pl.col("pli").cast(pl.Float64, strict=False),
])
return lf.with_columns(
pl.when(
pl.col("mean_depth").is_not_null()
& pl.col("cds_covered_pct").is_not_null()
& (pl.col("mean_depth") >= min_depth)
& (pl.col("cds_covered_pct") >= min_cds_pct)
& pl.col("loeuf").is_not_null()
)
.then(pl.lit("measured"))
.when(pl.col("loeuf").is_null() & pl.col("pli").is_null())
.then(pl.lit("no_data"))
.when(
pl.col("mean_depth").is_not_null()
& pl.col("cds_covered_pct").is_not_null()
& ((pl.col("mean_depth") < min_depth) | (pl.col("cds_covered_pct") < min_cds_pct))
)
.then(pl.lit("incomplete_coverage"))
.otherwise(pl.lit("incomplete_coverage"))
.alias("quality_flag")
)
def normalize_scores(lf: pl.LazyFrame) -> pl.LazyFrame:
"""Normalize LOEUF scores to 0-1 range with inversion.
Lower LOEUF = more constrained = HIGHER normalized score.
Only genes with quality_flag="measured" get normalized scores.
Others get NULL (not 0.0 - "unknown" != "zero constraint").
Args:
lf: LazyFrame with gnomAD constraint data and quality_flag column
Returns:
LazyFrame with loeuf_normalized column added
"""
# Compute min/max from measured genes only
measured = lf.filter(pl.col("quality_flag") == "measured")
# Aggregate min/max in a single pass
stats = measured.select(
pl.col("loeuf").min().alias("loeuf_min"),
pl.col("loeuf").max().alias("loeuf_max"),
).collect()
if len(stats) == 0:
# No measured genes - all get NULL
return lf.with_columns(pl.lit(None).cast(pl.Float64).alias("loeuf_normalized"))
loeuf_min = stats["loeuf_min"][0]
loeuf_max = stats["loeuf_max"][0]
if loeuf_min is None or loeuf_max is None or loeuf_min == loeuf_max:
# Handle edge case: all measured genes have same LOEUF
return lf.with_columns(pl.lit(None).cast(pl.Float64).alias("loeuf_normalized"))
# Invert: lower LOEUF -> higher score
# Formula: (max - value) / (max - min)
return lf.with_columns(
pl.when(pl.col("quality_flag") == "measured")
.then((loeuf_max - pl.col("loeuf")) / (loeuf_max - loeuf_min))
.otherwise(pl.lit(None))
.alias("loeuf_normalized")
)
def process_gnomad_constraint(
tsv_path: Path,
min_depth: float = 30.0,
min_cds_pct: float = 0.9,
) -> pl.DataFrame:
"""Full gnomAD constraint processing pipeline.
Composes: parse -> filter_by_coverage -> normalize_scores -> collect
Args:
tsv_path: Path to gnomAD constraint TSV file
min_depth: Minimum mean sequencing depth (default: 30x)
min_cds_pct: Minimum CDS coverage fraction (default: 0.9)
Returns:
Materialized DataFrame ready for DuckDB storage
"""
logger.info("gnomad_process_start", tsv_path=str(tsv_path))
# Parse with lazy evaluation
lf = parse_constraint_tsv(tsv_path)
# Filter and normalize
lf = filter_by_coverage(lf, min_depth=min_depth, min_cds_pct=min_cds_pct)
lf = normalize_scores(lf)
# Materialize
df = lf.collect()
# Log summary statistics
stats = df.group_by("quality_flag").len().sort("quality_flag")
total = len(df)
logger.info(
"gnomad_process_complete",
total_genes=total,
measured=df.filter(pl.col("quality_flag") == "measured").height,
incomplete_coverage=df.filter(
pl.col("quality_flag") == "incomplete_coverage"
).height,
no_data=df.filter(pl.col("quality_flag") == "no_data").height,
)
return df