feat(02-01): add gnomAD constraint data models and download module

- Create evidence layer package structure
- Define ConstraintRecord Pydantic model with NULL preservation
- Implement streaming download with httpx and tenacity retry
- Add lazy TSV parser with column name variant handling
- Add httpx and structlog dependencies
This commit is contained in:
2026-02-11 18:11:49 +08:00
parent c7753e7b1c
commit a88b0eea60
5 changed files with 239 additions and 0 deletions

View File

@@ -33,6 +33,8 @@ dependencies = [
"polars>=0.19.0",
"pyarrow>=14.0.0",
"pyyaml>=6.0",
"httpx>=0.28",
"structlog>=25.0",
]
[project.optional-dependencies]

View File

@@ -0,0 +1 @@
"""Evidence layer package for external data sources."""

View File

@@ -0,0 +1,11 @@
"""gnomAD constraint metrics evidence layer."""
from usher_pipeline.evidence.gnomad.models import ConstraintRecord, GNOMAD_CONSTRAINT_URL
from usher_pipeline.evidence.gnomad.fetch import download_constraint_metrics, parse_constraint_tsv
__all__ = [
"ConstraintRecord",
"GNOMAD_CONSTRAINT_URL",
"download_constraint_metrics",
"parse_constraint_tsv",
]

View File

@@ -0,0 +1,167 @@
"""Download and parse gnomAD constraint metrics."""
import gzip
import shutil
from pathlib import Path
import httpx
import polars as pl
import structlog
from tenacity import (
retry,
stop_after_attempt,
wait_exponential,
retry_if_exception_type,
)
from usher_pipeline.evidence.gnomad.models import (
GNOMAD_CONSTRAINT_URL,
COLUMN_VARIANTS,
)
logger = structlog.get_logger()
@retry(
stop=stop_after_attempt(5),
wait=wait_exponential(multiplier=1, min=4, max=60),
retry=retry_if_exception_type(
(httpx.HTTPStatusError, httpx.ConnectError, httpx.TimeoutException)
),
)
def download_constraint_metrics(
output_path: Path,
url: str = GNOMAD_CONSTRAINT_URL,
force: bool = False,
) -> Path:
"""Download gnomAD constraint metrics file with retry and streaming.
Args:
output_path: Where to save the TSV file
url: gnomAD constraint file URL (default: v4.1)
force: If True, re-download even if file exists
Returns:
Path to the downloaded TSV file
Raises:
httpx.HTTPStatusError: On HTTP errors (after retries)
httpx.ConnectError: On connection errors (after retries)
httpx.TimeoutException: On timeout (after retries)
"""
output_path = Path(output_path)
# Checkpoint pattern: skip if already downloaded
if output_path.exists() and not force:
logger.info(
"gnomad_constraint_exists",
path=str(output_path),
size_mb=round(output_path.stat().st_size / 1024 / 1024, 2),
)
return output_path
# Create parent directory if needed
output_path.parent.mkdir(parents=True, exist_ok=True)
# Determine if we need to decompress
is_compressed = url.endswith(".bgz") or url.endswith(".gz")
temp_path = output_path.with_suffix(output_path.suffix + ".tmp")
logger.info("gnomad_download_start", url=url, compressed=is_compressed)
# Stream download to disk (avoid loading into memory)
with httpx.stream("GET", url, timeout=120.0, follow_redirects=True) as response:
response.raise_for_status()
total_bytes = int(response.headers.get("content-length", 0))
downloaded = 0
with open(temp_path, "wb") as f:
for chunk in response.iter_bytes(chunk_size=8192):
f.write(chunk)
downloaded += len(chunk)
# Log progress every 10MB
if total_bytes > 0 and downloaded % (10 * 1024 * 1024) < 8192:
pct = (downloaded / total_bytes) * 100
logger.info(
"gnomad_download_progress",
downloaded_mb=round(downloaded / 1024 / 1024, 2),
total_mb=round(total_bytes / 1024 / 1024, 2),
percent=round(pct, 1),
)
# Decompress if needed (bgzip is gzip-compatible)
if is_compressed:
logger.info("gnomad_decompress_start", compressed_path=str(temp_path))
with gzip.open(temp_path, "rb") as f_in:
with open(output_path, "wb") as f_out:
shutil.copyfileobj(f_in, f_out)
temp_path.unlink()
else:
temp_path.rename(output_path)
logger.info(
"gnomad_download_complete",
path=str(output_path),
size_mb=round(output_path.stat().st_size / 1024 / 1024, 2),
)
return output_path
def parse_constraint_tsv(tsv_path: Path) -> pl.LazyFrame:
"""Parse gnomAD constraint TSV into a LazyFrame.
Handles column name variants between gnomAD v2.1.1 and v4.x.
Null values ("NA", "", ".") are preserved as polars null.
Args:
tsv_path: Path to gnomAD constraint TSV file
Returns:
LazyFrame with standardized column names matching ConstraintRecord fields
"""
tsv_path = Path(tsv_path)
# Read first line to detect actual column names
with open(tsv_path, "r") as f:
header_line = f.readline().strip()
actual_columns = header_line.split("\t")
logger.info(
"gnomad_parse_start",
path=str(tsv_path),
column_count=len(actual_columns),
)
# Scan with lazy evaluation
lf = pl.scan_csv(
tsv_path,
separator="\t",
null_values=["NA", "", "."],
has_header=True,
)
# Map actual columns to our standardized names
column_mapping = {}
for our_name, variants in COLUMN_VARIANTS.items():
for variant in variants:
if variant in actual_columns:
column_mapping[variant] = our_name
break
if not column_mapping:
logger.warning(
"gnomad_no_column_matches",
actual_columns=actual_columns[:10], # Log first 10 for debugging
)
# Return as-is if we can't map columns
return lf
logger.info("gnomad_column_mapping", mapping=column_mapping)
# Select and rename mapped columns
lf = lf.select([pl.col(old).alias(new) for old, new in column_mapping.items()])
return lf

View File

@@ -0,0 +1,58 @@
"""Data models for gnomAD constraint metrics."""
from pydantic import BaseModel
# gnomAD v4.1 constraint metrics download URL
GNOMAD_CONSTRAINT_URL = (
"https://storage.googleapis.com/gcp-public-data--gnomad/release/4.1/constraint/"
"gnomad.v4.1.constraint_metrics.tsv"
)
# Column name mapping for different gnomAD versions
# v2.1.1 uses: gene, transcript, pLI, oe_lof_upper (LOEUF), mean_proportion_covered_bases
# v4.x uses: gene, transcript, mane_select, lof.pLI, lof.oe_ci.upper, mean_proportion_covered
COLUMN_VARIANTS = {
"gene_id": ["gene", "gene_id"],
"gene_symbol": ["gene_symbol", "gene"],
"transcript": ["transcript", "canonical_transcript", "mane_select"],
"pli": ["pLI", "lof.pLI", "pli"],
"loeuf": ["oe_lof_upper", "lof.oe_ci.upper", "oe_lof", "loeuf"],
"loeuf_upper": ["oe_lof_upper_ci", "lof.oe_ci.upper", "oe_lof_upper"],
"mean_depth": ["mean_coverage", "mean_depth", "mean_cov"],
"cds_covered_pct": [
"mean_proportion_covered_bases",
"mean_proportion_covered",
"cds_covered_pct",
],
}
class ConstraintRecord(BaseModel):
"""gnomAD constraint metrics for a single gene.
Attributes:
gene_id: Ensembl gene ID (e.g., ENSG00000...)
gene_symbol: HGNC gene symbol
transcript: Canonical transcript ID
pli: Probability of being loss-of-function intolerant (NULL if no estimate)
loeuf: Loss-of-function observed/expected upper bound fraction (NULL if no estimate)
loeuf_upper: Upper bound of LOEUF confidence interval
mean_depth: Mean exome sequencing depth across CDS
cds_covered_pct: Fraction of CDS bases with adequate coverage (0.0-1.0)
quality_flag: Data quality indicator - "measured", "incomplete_coverage", or "no_data"
loeuf_normalized: Normalized LOEUF score (0-1, inverted: higher = more constrained)
CRITICAL: NULL values represent missing data and are preserved as None.
Do NOT convert NULL to 0.0 - "unknown" is semantically different from "zero constraint".
"""
gene_id: str
gene_symbol: str
transcript: str
pli: float | None = None
loeuf: float | None = None
loeuf_upper: float | None = None
mean_depth: float | None = None
cds_covered_pct: float | None = None
quality_flag: str = "no_data"
loeuf_normalized: float | None = None