From a88b0eea604cc540483bf78909a27efded132f54 Mon Sep 17 00:00:00 2001 From: gbanyan Date: Wed, 11 Feb 2026 18:11:49 +0800 Subject: [PATCH] feat(02-01): add gnomAD constraint data models and download module - Create evidence layer package structure - Define ConstraintRecord Pydantic model with NULL preservation - Implement streaming download with httpx and tenacity retry - Add lazy TSV parser with column name variant handling - Add httpx and structlog dependencies --- pyproject.toml | 2 + src/usher_pipeline/evidence/__init__.py | 1 + .../evidence/gnomad/__init__.py | 11 ++ src/usher_pipeline/evidence/gnomad/fetch.py | 167 ++++++++++++++++++ src/usher_pipeline/evidence/gnomad/models.py | 58 ++++++ 5 files changed, 239 insertions(+) create mode 100644 src/usher_pipeline/evidence/__init__.py create mode 100644 src/usher_pipeline/evidence/gnomad/__init__.py create mode 100644 src/usher_pipeline/evidence/gnomad/fetch.py create mode 100644 src/usher_pipeline/evidence/gnomad/models.py diff --git a/pyproject.toml b/pyproject.toml index fe5ba4f..cd46beb 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -33,6 +33,8 @@ dependencies = [ "polars>=0.19.0", "pyarrow>=14.0.0", "pyyaml>=6.0", + "httpx>=0.28", + "structlog>=25.0", ] [project.optional-dependencies] diff --git a/src/usher_pipeline/evidence/__init__.py b/src/usher_pipeline/evidence/__init__.py new file mode 100644 index 0000000..841bb68 --- /dev/null +++ b/src/usher_pipeline/evidence/__init__.py @@ -0,0 +1 @@ +"""Evidence layer package for external data sources.""" diff --git a/src/usher_pipeline/evidence/gnomad/__init__.py b/src/usher_pipeline/evidence/gnomad/__init__.py new file mode 100644 index 0000000..911054d --- /dev/null +++ b/src/usher_pipeline/evidence/gnomad/__init__.py @@ -0,0 +1,11 @@ +"""gnomAD constraint metrics evidence layer.""" + +from usher_pipeline.evidence.gnomad.models import ConstraintRecord, GNOMAD_CONSTRAINT_URL +from usher_pipeline.evidence.gnomad.fetch import download_constraint_metrics, parse_constraint_tsv + +__all__ = [ + "ConstraintRecord", + "GNOMAD_CONSTRAINT_URL", + "download_constraint_metrics", + "parse_constraint_tsv", +] diff --git a/src/usher_pipeline/evidence/gnomad/fetch.py b/src/usher_pipeline/evidence/gnomad/fetch.py new file mode 100644 index 0000000..63d81fa --- /dev/null +++ b/src/usher_pipeline/evidence/gnomad/fetch.py @@ -0,0 +1,167 @@ +"""Download and parse gnomAD constraint metrics.""" + +import gzip +import shutil +from pathlib import Path + +import httpx +import polars as pl +import structlog +from tenacity import ( + retry, + stop_after_attempt, + wait_exponential, + retry_if_exception_type, +) + +from usher_pipeline.evidence.gnomad.models import ( + GNOMAD_CONSTRAINT_URL, + COLUMN_VARIANTS, +) + +logger = structlog.get_logger() + + +@retry( + stop=stop_after_attempt(5), + wait=wait_exponential(multiplier=1, min=4, max=60), + retry=retry_if_exception_type( + (httpx.HTTPStatusError, httpx.ConnectError, httpx.TimeoutException) + ), +) +def download_constraint_metrics( + output_path: Path, + url: str = GNOMAD_CONSTRAINT_URL, + force: bool = False, +) -> Path: + """Download gnomAD constraint metrics file with retry and streaming. + + Args: + output_path: Where to save the TSV file + url: gnomAD constraint file URL (default: v4.1) + force: If True, re-download even if file exists + + Returns: + Path to the downloaded TSV file + + Raises: + httpx.HTTPStatusError: On HTTP errors (after retries) + httpx.ConnectError: On connection errors (after retries) + httpx.TimeoutException: On timeout (after retries) + """ + output_path = Path(output_path) + + # Checkpoint pattern: skip if already downloaded + if output_path.exists() and not force: + logger.info( + "gnomad_constraint_exists", + path=str(output_path), + size_mb=round(output_path.stat().st_size / 1024 / 1024, 2), + ) + return output_path + + # Create parent directory if needed + output_path.parent.mkdir(parents=True, exist_ok=True) + + # Determine if we need to decompress + is_compressed = url.endswith(".bgz") or url.endswith(".gz") + temp_path = output_path.with_suffix(output_path.suffix + ".tmp") + + logger.info("gnomad_download_start", url=url, compressed=is_compressed) + + # Stream download to disk (avoid loading into memory) + with httpx.stream("GET", url, timeout=120.0, follow_redirects=True) as response: + response.raise_for_status() + + total_bytes = int(response.headers.get("content-length", 0)) + downloaded = 0 + + with open(temp_path, "wb") as f: + for chunk in response.iter_bytes(chunk_size=8192): + f.write(chunk) + downloaded += len(chunk) + + # Log progress every 10MB + if total_bytes > 0 and downloaded % (10 * 1024 * 1024) < 8192: + pct = (downloaded / total_bytes) * 100 + logger.info( + "gnomad_download_progress", + downloaded_mb=round(downloaded / 1024 / 1024, 2), + total_mb=round(total_bytes / 1024 / 1024, 2), + percent=round(pct, 1), + ) + + # Decompress if needed (bgzip is gzip-compatible) + if is_compressed: + logger.info("gnomad_decompress_start", compressed_path=str(temp_path)) + with gzip.open(temp_path, "rb") as f_in: + with open(output_path, "wb") as f_out: + shutil.copyfileobj(f_in, f_out) + temp_path.unlink() + else: + temp_path.rename(output_path) + + logger.info( + "gnomad_download_complete", + path=str(output_path), + size_mb=round(output_path.stat().st_size / 1024 / 1024, 2), + ) + + return output_path + + +def parse_constraint_tsv(tsv_path: Path) -> pl.LazyFrame: + """Parse gnomAD constraint TSV into a LazyFrame. + + Handles column name variants between gnomAD v2.1.1 and v4.x. + Null values ("NA", "", ".") are preserved as polars null. + + Args: + tsv_path: Path to gnomAD constraint TSV file + + Returns: + LazyFrame with standardized column names matching ConstraintRecord fields + """ + tsv_path = Path(tsv_path) + + # Read first line to detect actual column names + with open(tsv_path, "r") as f: + header_line = f.readline().strip() + actual_columns = header_line.split("\t") + + logger.info( + "gnomad_parse_start", + path=str(tsv_path), + column_count=len(actual_columns), + ) + + # Scan with lazy evaluation + lf = pl.scan_csv( + tsv_path, + separator="\t", + null_values=["NA", "", "."], + has_header=True, + ) + + # Map actual columns to our standardized names + column_mapping = {} + for our_name, variants in COLUMN_VARIANTS.items(): + for variant in variants: + if variant in actual_columns: + column_mapping[variant] = our_name + break + + if not column_mapping: + logger.warning( + "gnomad_no_column_matches", + actual_columns=actual_columns[:10], # Log first 10 for debugging + ) + # Return as-is if we can't map columns + return lf + + logger.info("gnomad_column_mapping", mapping=column_mapping) + + # Select and rename mapped columns + lf = lf.select([pl.col(old).alias(new) for old, new in column_mapping.items()]) + + return lf diff --git a/src/usher_pipeline/evidence/gnomad/models.py b/src/usher_pipeline/evidence/gnomad/models.py new file mode 100644 index 0000000..d4162d9 --- /dev/null +++ b/src/usher_pipeline/evidence/gnomad/models.py @@ -0,0 +1,58 @@ +"""Data models for gnomAD constraint metrics.""" + +from pydantic import BaseModel + +# gnomAD v4.1 constraint metrics download URL +GNOMAD_CONSTRAINT_URL = ( + "https://storage.googleapis.com/gcp-public-data--gnomad/release/4.1/constraint/" + "gnomad.v4.1.constraint_metrics.tsv" +) + +# Column name mapping for different gnomAD versions +# v2.1.1 uses: gene, transcript, pLI, oe_lof_upper (LOEUF), mean_proportion_covered_bases +# v4.x uses: gene, transcript, mane_select, lof.pLI, lof.oe_ci.upper, mean_proportion_covered +COLUMN_VARIANTS = { + "gene_id": ["gene", "gene_id"], + "gene_symbol": ["gene_symbol", "gene"], + "transcript": ["transcript", "canonical_transcript", "mane_select"], + "pli": ["pLI", "lof.pLI", "pli"], + "loeuf": ["oe_lof_upper", "lof.oe_ci.upper", "oe_lof", "loeuf"], + "loeuf_upper": ["oe_lof_upper_ci", "lof.oe_ci.upper", "oe_lof_upper"], + "mean_depth": ["mean_coverage", "mean_depth", "mean_cov"], + "cds_covered_pct": [ + "mean_proportion_covered_bases", + "mean_proportion_covered", + "cds_covered_pct", + ], +} + + +class ConstraintRecord(BaseModel): + """gnomAD constraint metrics for a single gene. + + Attributes: + gene_id: Ensembl gene ID (e.g., ENSG00000...) + gene_symbol: HGNC gene symbol + transcript: Canonical transcript ID + pli: Probability of being loss-of-function intolerant (NULL if no estimate) + loeuf: Loss-of-function observed/expected upper bound fraction (NULL if no estimate) + loeuf_upper: Upper bound of LOEUF confidence interval + mean_depth: Mean exome sequencing depth across CDS + cds_covered_pct: Fraction of CDS bases with adequate coverage (0.0-1.0) + quality_flag: Data quality indicator - "measured", "incomplete_coverage", or "no_data" + loeuf_normalized: Normalized LOEUF score (0-1, inverted: higher = more constrained) + + CRITICAL: NULL values represent missing data and are preserved as None. + Do NOT convert NULL to 0.0 - "unknown" is semantically different from "zero constraint". + """ + + gene_id: str + gene_symbol: str + transcript: str + pli: float | None = None + loeuf: float | None = None + loeuf_upper: float | None = None + mean_depth: float | None = None + cds_covered_pct: float | None = None + quality_flag: str = "no_data" + loeuf_normalized: float | None = None