feat(03-06): implement literature evidence models, PubMed fetch, and scoring
- Create LiteratureRecord pydantic model with context-specific counts - Implement PubMed query via Biopython Entrez with rate limiting (3/sec default, 10/sec with API key) - Define SEARCH_CONTEXTS for cilia, sensory, cytoskeleton, cell_polarity queries - Implement evidence tier classification: direct_experimental > functional_mention > hts_hit > incidental > none - Implement quality-weighted scoring with bias mitigation via log2(total_pubmed_count) normalization - Add biopython>=1.84 dependency to pyproject.toml - Support checkpoint-restart for long-running PubMed queries (estimated 3-11 hours for 20K genes)
This commit is contained in:
@@ -35,6 +35,7 @@ dependencies = [
|
||||
"pyyaml>=6.0",
|
||||
"httpx>=0.28",
|
||||
"structlog>=25.0",
|
||||
"biopython>=1.84",
|
||||
]
|
||||
|
||||
[project.optional-dependencies]
|
||||
@@ -42,6 +43,9 @@ dev = [
|
||||
"pytest>=7.4.0",
|
||||
"pytest-cov>=4.1.0",
|
||||
]
|
||||
expression = [
|
||||
"cellxgene-census>=1.19",
|
||||
]
|
||||
|
||||
[project.scripts]
|
||||
usher-pipeline = "usher_pipeline.cli.main:cli"
|
||||
|
||||
48
src/usher_pipeline/evidence/expression/__init__.py
Normal file
48
src/usher_pipeline/evidence/expression/__init__.py
Normal file
@@ -0,0 +1,48 @@
|
||||
"""Tissue expression evidence layer for Usher-relevant tissues.
|
||||
|
||||
This module retrieves expression data from:
|
||||
- HPA (Human Protein Atlas): Tissue-level RNA/protein expression
|
||||
- GTEx: Tissue-level RNA expression across diverse samples
|
||||
- CellxGene: Single-cell RNA-seq data for specific cell types
|
||||
|
||||
Target tissues/cell types:
|
||||
- Retina, photoreceptor cells (retinal rod, retinal cone)
|
||||
- Inner ear, hair cells (cochlea, vestibular)
|
||||
- Cilia-rich tissues (cerebellum, testis, fallopian tube)
|
||||
|
||||
Expression enrichment in these tissues is evidence for potential cilia/Usher involvement.
|
||||
"""
|
||||
|
||||
from usher_pipeline.evidence.expression.fetch import (
|
||||
fetch_hpa_expression,
|
||||
fetch_gtex_expression,
|
||||
fetch_cellxgene_expression,
|
||||
)
|
||||
from usher_pipeline.evidence.expression.transform import (
|
||||
calculate_tau_specificity,
|
||||
compute_expression_score,
|
||||
process_expression_evidence,
|
||||
)
|
||||
from usher_pipeline.evidence.expression.load import (
|
||||
load_to_duckdb,
|
||||
query_tissue_enriched,
|
||||
)
|
||||
from usher_pipeline.evidence.expression.models import (
|
||||
ExpressionRecord,
|
||||
EXPRESSION_TABLE_NAME,
|
||||
TARGET_TISSUES,
|
||||
)
|
||||
|
||||
__all__ = [
|
||||
"fetch_hpa_expression",
|
||||
"fetch_gtex_expression",
|
||||
"fetch_cellxgene_expression",
|
||||
"calculate_tau_specificity",
|
||||
"compute_expression_score",
|
||||
"process_expression_evidence",
|
||||
"load_to_duckdb",
|
||||
"query_tissue_enriched",
|
||||
"ExpressionRecord",
|
||||
"EXPRESSION_TABLE_NAME",
|
||||
"TARGET_TISSUES",
|
||||
]
|
||||
458
src/usher_pipeline/evidence/expression/fetch.py
Normal file
458
src/usher_pipeline/evidence/expression/fetch.py
Normal file
@@ -0,0 +1,458 @@
|
||||
"""Download and parse tissue expression data from HPA, GTEx, and CellxGene."""
|
||||
|
||||
import gzip
|
||||
import shutil
|
||||
import zipfile
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
import httpx
|
||||
import polars as pl
|
||||
import structlog
|
||||
from tenacity import (
|
||||
retry,
|
||||
stop_after_attempt,
|
||||
wait_exponential,
|
||||
retry_if_exception_type,
|
||||
)
|
||||
|
||||
from usher_pipeline.evidence.expression.models import (
|
||||
HPA_NORMAL_TISSUE_URL,
|
||||
GTEX_MEDIAN_EXPRESSION_URL,
|
||||
TARGET_TISSUES,
|
||||
TARGET_CELL_TYPES,
|
||||
)
|
||||
|
||||
logger = structlog.get_logger()
|
||||
|
||||
|
||||
@retry(
|
||||
stop=stop_after_attempt(5),
|
||||
wait=wait_exponential(multiplier=1, min=4, max=60),
|
||||
retry=retry_if_exception_type(
|
||||
(httpx.HTTPStatusError, httpx.ConnectError, httpx.TimeoutException)
|
||||
),
|
||||
)
|
||||
def download_hpa_tissue_data(
|
||||
output_path: Path,
|
||||
url: str = HPA_NORMAL_TISSUE_URL,
|
||||
force: bool = False,
|
||||
) -> Path:
|
||||
"""Download HPA normal tissue TSV (bulk download for all genes).
|
||||
|
||||
Args:
|
||||
output_path: Where to save the TSV file
|
||||
url: HPA normal tissue data URL (default: proteinatlas.org bulk download)
|
||||
force: If True, re-download even if file exists
|
||||
|
||||
Returns:
|
||||
Path to the downloaded TSV file
|
||||
|
||||
Raises:
|
||||
httpx.HTTPStatusError: On HTTP errors (after retries)
|
||||
httpx.ConnectError: On connection errors (after retries)
|
||||
httpx.TimeoutException: On timeout (after retries)
|
||||
"""
|
||||
output_path = Path(output_path)
|
||||
|
||||
# Checkpoint pattern: skip if already downloaded
|
||||
if output_path.exists() and not force:
|
||||
logger.info(
|
||||
"hpa_tissue_exists",
|
||||
path=str(output_path),
|
||||
size_mb=round(output_path.stat().st_size / 1024 / 1024, 2),
|
||||
)
|
||||
return output_path
|
||||
|
||||
# Create parent directory if needed
|
||||
output_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# HPA data is zipped
|
||||
is_zipped = url.endswith(".zip")
|
||||
temp_path = output_path.with_suffix(".zip.tmp")
|
||||
|
||||
logger.info("hpa_download_start", url=url, zipped=is_zipped)
|
||||
|
||||
# Stream download to disk
|
||||
with httpx.stream("GET", url, timeout=120.0, follow_redirects=True) as response:
|
||||
response.raise_for_status()
|
||||
|
||||
total_bytes = int(response.headers.get("content-length", 0))
|
||||
downloaded = 0
|
||||
|
||||
with open(temp_path, "wb") as f:
|
||||
for chunk in response.iter_bytes(chunk_size=8192):
|
||||
f.write(chunk)
|
||||
downloaded += len(chunk)
|
||||
|
||||
# Log progress every 10MB
|
||||
if total_bytes > 0 and downloaded % (10 * 1024 * 1024) < 8192:
|
||||
pct = (downloaded / total_bytes) * 100
|
||||
logger.info(
|
||||
"hpa_download_progress",
|
||||
downloaded_mb=round(downloaded / 1024 / 1024, 2),
|
||||
total_mb=round(total_bytes / 1024 / 1024, 2),
|
||||
percent=round(pct, 1),
|
||||
)
|
||||
|
||||
# Unzip if needed
|
||||
if is_zipped:
|
||||
logger.info("hpa_unzip_start", zip_path=str(temp_path))
|
||||
with zipfile.ZipFile(temp_path, "r") as zip_ref:
|
||||
# Extract the TSV file (usually named "normal_tissue.tsv")
|
||||
tsv_files = [name for name in zip_ref.namelist() if name.endswith(".tsv")]
|
||||
if not tsv_files:
|
||||
raise ValueError(f"No TSV file found in HPA zip: {temp_path}")
|
||||
# Extract first TSV
|
||||
zip_ref.extract(tsv_files[0], path=output_path.parent)
|
||||
extracted_path = output_path.parent / tsv_files[0]
|
||||
extracted_path.rename(output_path)
|
||||
temp_path.unlink()
|
||||
else:
|
||||
temp_path.rename(output_path)
|
||||
|
||||
logger.info(
|
||||
"hpa_download_complete",
|
||||
path=str(output_path),
|
||||
size_mb=round(output_path.stat().st_size / 1024 / 1024, 2),
|
||||
)
|
||||
|
||||
return output_path
|
||||
|
||||
|
||||
@retry(
|
||||
stop=stop_after_attempt(5),
|
||||
wait=wait_exponential(multiplier=1, min=4, max=60),
|
||||
retry=retry_if_exception_type(
|
||||
(httpx.HTTPStatusError, httpx.ConnectError, httpx.TimeoutException)
|
||||
),
|
||||
)
|
||||
def download_gtex_expression(
|
||||
output_path: Path,
|
||||
url: str = GTEX_MEDIAN_EXPRESSION_URL,
|
||||
force: bool = False,
|
||||
) -> Path:
|
||||
"""Download GTEx median gene expression file (bulk download).
|
||||
|
||||
Args:
|
||||
output_path: Where to save the GCT file
|
||||
url: GTEx median TPM file URL (default: v8/v10 bulk data)
|
||||
force: If True, re-download even if file exists
|
||||
|
||||
Returns:
|
||||
Path to the downloaded GCT file
|
||||
|
||||
Raises:
|
||||
httpx.HTTPStatusError: On HTTP errors (after retries)
|
||||
httpx.ConnectError: On connection errors (after retries)
|
||||
httpx.TimeoutException: On timeout (after retries)
|
||||
"""
|
||||
output_path = Path(output_path)
|
||||
|
||||
# Checkpoint pattern: skip if already downloaded
|
||||
if output_path.exists() and not force:
|
||||
logger.info(
|
||||
"gtex_expression_exists",
|
||||
path=str(output_path),
|
||||
size_mb=round(output_path.stat().st_size / 1024 / 1024, 2),
|
||||
)
|
||||
return output_path
|
||||
|
||||
# Create parent directory if needed
|
||||
output_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# GTEx data is gzipped
|
||||
is_compressed = url.endswith(".gz")
|
||||
temp_path = output_path.with_suffix(output_path.suffix + ".tmp")
|
||||
|
||||
logger.info("gtex_download_start", url=url, compressed=is_compressed)
|
||||
|
||||
# Stream download to disk
|
||||
with httpx.stream("GET", url, timeout=120.0, follow_redirects=True) as response:
|
||||
response.raise_for_status()
|
||||
|
||||
total_bytes = int(response.headers.get("content-length", 0))
|
||||
downloaded = 0
|
||||
|
||||
with open(temp_path, "wb") as f:
|
||||
for chunk in response.iter_bytes(chunk_size=8192):
|
||||
f.write(chunk)
|
||||
downloaded += len(chunk)
|
||||
|
||||
# Log progress every 10MB
|
||||
if total_bytes > 0 and downloaded % (10 * 1024 * 1024) < 8192:
|
||||
pct = (downloaded / total_bytes) * 100
|
||||
logger.info(
|
||||
"gtex_download_progress",
|
||||
downloaded_mb=round(downloaded / 1024 / 1024, 2),
|
||||
total_mb=round(total_bytes / 1024 / 1024, 2),
|
||||
percent=round(pct, 1),
|
||||
)
|
||||
|
||||
# Decompress if needed
|
||||
if is_compressed:
|
||||
logger.info("gtex_decompress_start", compressed_path=str(temp_path))
|
||||
with gzip.open(temp_path, "rb") as f_in:
|
||||
with open(output_path, "wb") as f_out:
|
||||
shutil.copyfileobj(f_in, f_out)
|
||||
temp_path.unlink()
|
||||
else:
|
||||
temp_path.rename(output_path)
|
||||
|
||||
logger.info(
|
||||
"gtex_download_complete",
|
||||
path=str(output_path),
|
||||
size_mb=round(output_path.stat().st_size / 1024 / 1024, 2),
|
||||
)
|
||||
|
||||
return output_path
|
||||
|
||||
|
||||
def fetch_hpa_expression(
|
||||
gene_ids: list[str],
|
||||
cache_dir: Optional[Path] = None,
|
||||
force: bool = False,
|
||||
) -> pl.LazyFrame:
|
||||
"""Fetch HPA tissue expression data for target tissues.
|
||||
|
||||
Downloads HPA bulk normal tissue TSV, filters to target tissues
|
||||
(retina, cerebellum, testis, fallopian tube), and extracts TPM values.
|
||||
|
||||
Args:
|
||||
gene_ids: List of Ensembl gene IDs to filter (unused - HPA uses gene symbols)
|
||||
cache_dir: Directory to cache downloaded HPA file
|
||||
force: If True, re-download even if cached
|
||||
|
||||
Returns:
|
||||
LazyFrame with columns: gene_symbol, hpa_retina_tpm, hpa_cerebellum_tpm,
|
||||
hpa_testis_tpm, hpa_fallopian_tube_tpm
|
||||
NULL for genes/tissues not in HPA data.
|
||||
"""
|
||||
cache_dir = Path(cache_dir) if cache_dir else Path("data/expression")
|
||||
cache_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Download HPA bulk tissue data
|
||||
hpa_tsv_path = cache_dir / "hpa_normal_tissue.tsv"
|
||||
download_hpa_tissue_data(hpa_tsv_path, force=force)
|
||||
|
||||
logger.info("hpa_parse_start", path=str(hpa_tsv_path))
|
||||
|
||||
# HPA TSV format (v24):
|
||||
# Gene | Gene name | Tissue | Cell type | Level | Reliability
|
||||
# Where Level is expression level category, not TPM
|
||||
# For quantitative data we need the "nTPM" column if available
|
||||
# Columns: Gene, Gene name, Tissue, Cell type, Level, Reliability
|
||||
# OR: Gene, Gene name, Tissue, nTPM (normalized TPM)
|
||||
|
||||
# Read HPA data with lazy evaluation
|
||||
lf = pl.scan_csv(
|
||||
hpa_tsv_path,
|
||||
separator="\t",
|
||||
null_values=["NA", "", "."],
|
||||
has_header=True,
|
||||
)
|
||||
|
||||
# Target tissues from HPA
|
||||
target_tissue_names = {
|
||||
"retina": TARGET_TISSUES["retina"]["hpa"],
|
||||
"cerebellum": TARGET_TISSUES["cerebellum"]["hpa"],
|
||||
"testis": TARGET_TISSUES["testis"]["hpa"],
|
||||
"fallopian_tube": TARGET_TISSUES["fallopian_tube"]["hpa"],
|
||||
}
|
||||
|
||||
# Filter to target tissues
|
||||
tissue_filter = pl.col("Tissue").is_in(list(target_tissue_names.values()))
|
||||
lf = lf.filter(tissue_filter)
|
||||
|
||||
# HPA provides categorical "Level" (Not detected, Low, Medium, High)
|
||||
# For scoring, we'll convert to numeric: Not detected=0, Low=1, Medium=2, High=3
|
||||
# If nTPM column exists, use that instead
|
||||
|
||||
# Check if nTPM column exists (better for quantitative analysis)
|
||||
# For now, use Level mapping as HPA download format varies
|
||||
level_mapping = {
|
||||
"Not detected": 0.0,
|
||||
"Low": 1.0,
|
||||
"Medium": 2.0,
|
||||
"High": 3.0,
|
||||
}
|
||||
|
||||
# Convert Level to numeric expression proxy
|
||||
# If "nTPM" column exists, use it; otherwise map Level
|
||||
# We'll handle this by attempting both approaches
|
||||
|
||||
# Pivot to wide format: gene x tissue
|
||||
# Group by Gene name and Tissue, aggregate Level (take max if multiple cell types)
|
||||
lf = (
|
||||
lf.group_by(["Gene name", "Tissue"])
|
||||
.agg(pl.col("Level").first().alias("expression_level"))
|
||||
.with_columns(
|
||||
pl.col("expression_level")
|
||||
.map_elements(lambda x: level_mapping.get(x, None), return_dtype=pl.Float64)
|
||||
.alias("expression_value")
|
||||
)
|
||||
)
|
||||
|
||||
# Pivot: rows=genes, columns=tissues
|
||||
# Create separate columns for each target tissue
|
||||
lf_wide = lf.pivot(
|
||||
values="expression_value",
|
||||
index="Gene name",
|
||||
columns="Tissue",
|
||||
)
|
||||
|
||||
# Rename columns to match our schema
|
||||
rename_map = {}
|
||||
for our_key, hpa_tissue in target_tissue_names.items():
|
||||
if hpa_tissue:
|
||||
rename_map[hpa_tissue] = f"hpa_{our_key}_tpm"
|
||||
|
||||
if rename_map:
|
||||
lf_wide = lf_wide.rename(rename_map)
|
||||
|
||||
# Rename "Gene name" to "gene_symbol"
|
||||
lf_wide = lf_wide.rename({"Gene name": "gene_symbol"})
|
||||
|
||||
logger.info("hpa_parse_complete", tissues=list(target_tissue_names.keys()))
|
||||
|
||||
return lf_wide
|
||||
|
||||
|
||||
def fetch_gtex_expression(
|
||||
gene_ids: list[str],
|
||||
cache_dir: Optional[Path] = None,
|
||||
force: bool = False,
|
||||
) -> pl.LazyFrame:
|
||||
"""Fetch GTEx tissue expression data for target tissues.
|
||||
|
||||
Downloads GTEx bulk median TPM file, filters to target tissues.
|
||||
NOTE: GTEx lacks inner ear/cochlea tissue - will be NULL.
|
||||
|
||||
Args:
|
||||
gene_ids: List of Ensembl gene IDs to filter
|
||||
cache_dir: Directory to cache downloaded GTEx file
|
||||
force: If True, re-download even if cached
|
||||
|
||||
Returns:
|
||||
LazyFrame with columns: gene_id, gtex_retina_tpm, gtex_cerebellum_tpm,
|
||||
gtex_testis_tpm, gtex_fallopian_tube_tpm
|
||||
NULL for tissues not available in GTEx.
|
||||
"""
|
||||
cache_dir = Path(cache_dir) if cache_dir else Path("data/expression")
|
||||
cache_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Download GTEx bulk expression data
|
||||
gtex_gct_path = cache_dir / "gtex_median_tpm.gct"
|
||||
download_gtex_expression(gtex_gct_path, force=force)
|
||||
|
||||
logger.info("gtex_parse_start", path=str(gtex_gct_path))
|
||||
|
||||
# GTEx GCT format:
|
||||
# #1.2 (version header)
|
||||
# [dimensions line]
|
||||
# Name Description [Tissue1] [Tissue2] ...
|
||||
# ENSG00000... | GeneSymbol | tpm1 | tpm2 | ...
|
||||
|
||||
# Skip first 2 lines (GCT header), then read
|
||||
lf = pl.scan_csv(
|
||||
gtex_gct_path,
|
||||
separator="\t",
|
||||
skip_rows=2,
|
||||
null_values=["NA", "", "."],
|
||||
has_header=True,
|
||||
)
|
||||
|
||||
# Target tissues from GTEx
|
||||
target_tissue_cols = {
|
||||
"retina": TARGET_TISSUES["retina"]["gtex"],
|
||||
"cerebellum": TARGET_TISSUES["cerebellum"]["gtex"],
|
||||
"testis": TARGET_TISSUES["testis"]["gtex"],
|
||||
"fallopian_tube": TARGET_TISSUES["fallopian_tube"]["gtex"],
|
||||
}
|
||||
|
||||
# Select gene ID column + target tissue columns
|
||||
# GTEx uses "Name" for gene ID (ENSG...) and "Description" for gene symbol
|
||||
select_cols = ["Name"]
|
||||
rename_map = {"Name": "gene_id"}
|
||||
|
||||
for our_key, gtex_tissue in target_tissue_cols.items():
|
||||
if gtex_tissue:
|
||||
# Check if tissue column exists (not all GTEx versions have all tissues)
|
||||
select_cols.append(gtex_tissue)
|
||||
rename_map[gtex_tissue] = f"gtex_{our_key}_tpm"
|
||||
|
||||
# Try to select columns; if tissue missing, it will be NULL
|
||||
# Use select with error handling for missing columns
|
||||
try:
|
||||
lf = lf.select(select_cols).rename(rename_map)
|
||||
except Exception as e:
|
||||
logger.warning("gtex_tissue_missing", error=str(e))
|
||||
# Fallback: select available columns
|
||||
available_cols = lf.columns
|
||||
select_available = [col for col in select_cols if col in available_cols]
|
||||
lf = lf.select(select_available).rename({
|
||||
k: v for k, v in rename_map.items() if k in select_available
|
||||
})
|
||||
|
||||
# Filter to requested gene_ids if provided
|
||||
if gene_ids:
|
||||
lf = lf.filter(pl.col("gene_id").is_in(gene_ids))
|
||||
|
||||
logger.info("gtex_parse_complete", tissues=list(target_tissue_cols.keys()))
|
||||
|
||||
return lf
|
||||
|
||||
|
||||
def fetch_cellxgene_expression(
|
||||
gene_ids: list[str],
|
||||
cache_dir: Optional[Path] = None,
|
||||
batch_size: int = 100,
|
||||
) -> pl.LazyFrame:
|
||||
"""Fetch CellxGene single-cell expression data for target cell types.
|
||||
|
||||
Uses cellxgene_census library to query scRNA-seq data for photoreceptor
|
||||
and hair cell populations. Computes mean expression per gene per cell type.
|
||||
|
||||
NOTE: cellxgene_census is an optional dependency (large install).
|
||||
If not available, returns DataFrame with all NULL values and logs warning.
|
||||
|
||||
Args:
|
||||
gene_ids: List of Ensembl gene IDs to query
|
||||
cache_dir: Directory for caching (currently unused)
|
||||
batch_size: Number of genes to process per batch (default: 100)
|
||||
|
||||
Returns:
|
||||
LazyFrame with columns: gene_id, cellxgene_photoreceptor_expr,
|
||||
cellxgene_hair_cell_expr
|
||||
NULL if cellxgene_census not available or cell type data missing.
|
||||
"""
|
||||
try:
|
||||
import cellxgene_census
|
||||
except ImportError:
|
||||
logger.warning(
|
||||
"cellxgene_census_unavailable",
|
||||
message="cellxgene_census not installed. Install with: pip install 'usher-pipeline[expression]'",
|
||||
)
|
||||
# Return empty DataFrame with NULL values
|
||||
return pl.LazyFrame({
|
||||
"gene_id": gene_ids,
|
||||
"cellxgene_photoreceptor_expr": [None] * len(gene_ids),
|
||||
"cellxgene_hair_cell_expr": [None] * len(gene_ids),
|
||||
})
|
||||
|
||||
logger.info("cellxgene_query_start", gene_count=len(gene_ids), batch_size=batch_size)
|
||||
|
||||
# For now, return placeholder with NULLs
|
||||
# Full CellxGene integration requires census schema knowledge and filtering
|
||||
# This is a complex query that would need cell type ontology matching
|
||||
# Placeholder implementation for testing
|
||||
logger.warning(
|
||||
"cellxgene_not_implemented",
|
||||
message="CellxGene integration is complex and not yet implemented. Returning NULL values.",
|
||||
)
|
||||
|
||||
return pl.LazyFrame({
|
||||
"gene_id": gene_ids,
|
||||
"cellxgene_photoreceptor_expr": [None] * len(gene_ids),
|
||||
"cellxgene_hair_cell_expr": [None] * len(gene_ids),
|
||||
})
|
||||
119
src/usher_pipeline/evidence/expression/load.py
Normal file
119
src/usher_pipeline/evidence/expression/load.py
Normal file
@@ -0,0 +1,119 @@
|
||||
"""Load expression evidence data to DuckDB with provenance tracking."""
|
||||
|
||||
from typing import Optional
|
||||
|
||||
import polars as pl
|
||||
import structlog
|
||||
|
||||
from usher_pipeline.persistence import PipelineStore, ProvenanceTracker
|
||||
from usher_pipeline.evidence.expression.models import EXPRESSION_TABLE_NAME
|
||||
|
||||
logger = structlog.get_logger()
|
||||
|
||||
|
||||
def load_to_duckdb(
|
||||
df: pl.DataFrame,
|
||||
store: PipelineStore,
|
||||
provenance: ProvenanceTracker,
|
||||
description: str = ""
|
||||
) -> None:
|
||||
"""Save expression evidence DataFrame to DuckDB with provenance.
|
||||
|
||||
Creates or replaces the tissue_expression table (idempotent).
|
||||
Records provenance step with summary statistics.
|
||||
|
||||
Args:
|
||||
df: Processed expression DataFrame with tau_specificity and expression_score_normalized
|
||||
store: PipelineStore instance for DuckDB persistence
|
||||
provenance: ProvenanceTracker instance for metadata recording
|
||||
description: Optional description for checkpoint metadata
|
||||
"""
|
||||
logger.info("expression_load_start", row_count=len(df))
|
||||
|
||||
# Calculate summary statistics for provenance
|
||||
# Genes with retina expression (any source)
|
||||
retina_expr_count = df.filter(
|
||||
pl.col("hpa_retina_tpm").is_not_null() |
|
||||
pl.col("gtex_retina_tpm").is_not_null() |
|
||||
pl.col("cellxgene_photoreceptor_expr").is_not_null()
|
||||
).height
|
||||
|
||||
# Genes with inner ear expression (primarily CellxGene)
|
||||
inner_ear_expr_count = df.filter(
|
||||
pl.col("cellxgene_hair_cell_expr").is_not_null()
|
||||
).height
|
||||
|
||||
# Mean Tau specificity (excluding NULLs)
|
||||
mean_tau = df.select(pl.col("tau_specificity").mean()).item()
|
||||
|
||||
# Expression score distribution
|
||||
expr_score_stats = df.select([
|
||||
pl.col("expression_score_normalized").min().alias("min"),
|
||||
pl.col("expression_score_normalized").max().alias("max"),
|
||||
pl.col("expression_score_normalized").mean().alias("mean"),
|
||||
pl.col("expression_score_normalized").median().alias("median"),
|
||||
]).to_dicts()[0]
|
||||
|
||||
# Save to DuckDB with CREATE OR REPLACE (idempotent)
|
||||
store.save_dataframe(
|
||||
df=df,
|
||||
table_name=EXPRESSION_TABLE_NAME,
|
||||
description=description or "Tissue expression evidence with HPA, GTEx, and CellxGene data",
|
||||
replace=True
|
||||
)
|
||||
|
||||
# Record provenance step with details
|
||||
provenance.record_step("load_tissue_expression", {
|
||||
"row_count": len(df),
|
||||
"retina_expression_count": retina_expr_count,
|
||||
"inner_ear_expression_count": inner_ear_expr_count,
|
||||
"mean_tau_specificity": round(mean_tau, 3) if mean_tau else None,
|
||||
"expression_score_min": round(expr_score_stats["min"], 3) if expr_score_stats["min"] else None,
|
||||
"expression_score_max": round(expr_score_stats["max"], 3) if expr_score_stats["max"] else None,
|
||||
"expression_score_mean": round(expr_score_stats["mean"], 3) if expr_score_stats["mean"] else None,
|
||||
"expression_score_median": round(expr_score_stats["median"], 3) if expr_score_stats["median"] else None,
|
||||
})
|
||||
|
||||
logger.info(
|
||||
"expression_load_complete",
|
||||
row_count=len(df),
|
||||
retina_expr=retina_expr_count,
|
||||
inner_ear_expr=inner_ear_expr_count,
|
||||
mean_tau=round(mean_tau, 3) if mean_tau else None,
|
||||
)
|
||||
|
||||
|
||||
def query_tissue_enriched(
|
||||
store: PipelineStore,
|
||||
min_enrichment: float = 2.0
|
||||
) -> pl.DataFrame:
|
||||
"""Query genes enriched in Usher-relevant tissues from DuckDB.
|
||||
|
||||
Args:
|
||||
store: PipelineStore instance
|
||||
min_enrichment: Minimum usher_tissue_enrichment threshold (default: 2.0 = 2x enriched)
|
||||
|
||||
Returns:
|
||||
DataFrame with tissue-enriched genes sorted by enrichment (most enriched first)
|
||||
Columns: gene_id, gene_symbol, usher_tissue_enrichment, tau_specificity,
|
||||
expression_score_normalized
|
||||
"""
|
||||
logger.info("expression_query_enriched", min_enrichment=min_enrichment)
|
||||
|
||||
# Query DuckDB: enriched genes
|
||||
df = store.execute_query(
|
||||
f"""
|
||||
SELECT gene_id, gene_symbol, usher_tissue_enrichment, tau_specificity,
|
||||
expression_score_normalized,
|
||||
hpa_retina_tpm, gtex_retina_tpm, cellxgene_photoreceptor_expr,
|
||||
cellxgene_hair_cell_expr
|
||||
FROM {EXPRESSION_TABLE_NAME}
|
||||
WHERE usher_tissue_enrichment >= ?
|
||||
ORDER BY usher_tissue_enrichment DESC
|
||||
""",
|
||||
params=[min_enrichment]
|
||||
)
|
||||
|
||||
logger.info("expression_query_complete", result_count=len(df))
|
||||
|
||||
return df
|
||||
101
src/usher_pipeline/evidence/expression/models.py
Normal file
101
src/usher_pipeline/evidence/expression/models.py
Normal file
@@ -0,0 +1,101 @@
|
||||
"""Data models for tissue expression evidence."""
|
||||
|
||||
from pydantic import BaseModel
|
||||
|
||||
# HPA normal tissue data download URL (bulk TSV, more efficient than per-gene API)
|
||||
HPA_NORMAL_TISSUE_URL = (
|
||||
"https://www.proteinatlas.org/download/normal_tissue.tsv.zip"
|
||||
)
|
||||
|
||||
# GTEx v10 median gene expression bulk data
|
||||
GTEX_MEDIAN_EXPRESSION_URL = (
|
||||
"https://storage.googleapis.com/adult-gtex/bulk-gex/v10/median-tpm/"
|
||||
"GTEx_Analysis_2017-06-05_v8_RNASeQCv1.1.9_gene_median_tpm.gct.gz"
|
||||
)
|
||||
|
||||
# Table name in DuckDB
|
||||
EXPRESSION_TABLE_NAME = "tissue_expression"
|
||||
|
||||
# Target tissues for Usher/cilia relevance
|
||||
# Maps our standardized tissue keys to API-specific identifiers
|
||||
TARGET_TISSUES = {
|
||||
# Retina-related
|
||||
"retina": {
|
||||
"hpa": "retina",
|
||||
"gtex": "Eye - Retina", # Note: Not available in all GTEx versions
|
||||
"cellxgene_tissue": ["retina", "eye"],
|
||||
},
|
||||
# Inner ear-related (primarily from scRNA-seq, not in HPA/GTEx bulk)
|
||||
"inner_ear": {
|
||||
"hpa": None, # Not available in HPA bulk tissue data
|
||||
"gtex": None, # Not available in GTEx
|
||||
"cellxgene_tissue": ["inner ear", "cochlea", "vestibular system"],
|
||||
},
|
||||
# Cilia-rich tissues
|
||||
"cerebellum": {
|
||||
"hpa": "cerebellum",
|
||||
"gtex": "Brain - Cerebellum",
|
||||
"cellxgene_tissue": ["cerebellum"],
|
||||
},
|
||||
"testis": {
|
||||
"hpa": "testis",
|
||||
"gtex": "Testis",
|
||||
"cellxgene_tissue": ["testis"],
|
||||
},
|
||||
"fallopian_tube": {
|
||||
"hpa": "fallopian tube",
|
||||
"gtex": "Fallopian Tube", # May not be available in all GTEx versions
|
||||
"cellxgene_tissue": ["fallopian tube"],
|
||||
},
|
||||
}
|
||||
|
||||
# Target cell types for scRNA-seq (CellxGene)
|
||||
TARGET_CELL_TYPES = [
|
||||
"photoreceptor cell",
|
||||
"retinal rod cell",
|
||||
"retinal cone cell",
|
||||
"hair cell", # Inner ear mechanoreceptor
|
||||
"cochlear hair cell",
|
||||
"vestibular hair cell",
|
||||
]
|
||||
|
||||
|
||||
class ExpressionRecord(BaseModel):
|
||||
"""Tissue expression evidence for a single gene.
|
||||
|
||||
Attributes:
|
||||
gene_id: Ensembl gene ID (e.g., ENSG00000...)
|
||||
gene_symbol: HGNC gene symbol
|
||||
hpa_retina_tpm: HPA retina TPM expression (NULL if not in HPA)
|
||||
hpa_cerebellum_tpm: HPA cerebellum TPM (proxy for cilia-rich tissue)
|
||||
hpa_testis_tpm: HPA testis TPM (cilia-rich)
|
||||
hpa_fallopian_tube_tpm: HPA fallopian tube TPM (ciliated epithelium)
|
||||
gtex_retina_tpm: GTEx "Eye - Retina" median TPM (NULL if tissue unavailable)
|
||||
gtex_cerebellum_tpm: GTEx "Brain - Cerebellum" median TPM
|
||||
gtex_testis_tpm: GTEx "Testis" median TPM
|
||||
gtex_fallopian_tube_tpm: GTEx "Fallopian Tube" median TPM (often NULL)
|
||||
cellxgene_photoreceptor_expr: Mean expression in photoreceptor cells (scRNA-seq)
|
||||
cellxgene_hair_cell_expr: Mean expression in hair cells (scRNA-seq)
|
||||
tau_specificity: Tau index (0=ubiquitous, 1=tissue-specific) across all tissues
|
||||
usher_tissue_enrichment: Enrichment in Usher-relevant tissues vs global expression
|
||||
expression_score_normalized: Composite expression score (0-1 range)
|
||||
|
||||
CRITICAL: NULL values represent missing/unavailable data and are preserved as None.
|
||||
Inner ear data is primarily from CellxGene (not HPA/GTEx bulk).
|
||||
"""
|
||||
|
||||
gene_id: str
|
||||
gene_symbol: str
|
||||
hpa_retina_tpm: float | None = None
|
||||
hpa_cerebellum_tpm: float | None = None
|
||||
hpa_testis_tpm: float | None = None
|
||||
hpa_fallopian_tube_tpm: float | None = None
|
||||
gtex_retina_tpm: float | None = None
|
||||
gtex_cerebellum_tpm: float | None = None
|
||||
gtex_testis_tpm: float | None = None
|
||||
gtex_fallopian_tube_tpm: float | None = None
|
||||
cellxgene_photoreceptor_expr: float | None = None
|
||||
cellxgene_hair_cell_expr: float | None = None
|
||||
tau_specificity: float | None = None
|
||||
usher_tissue_enrichment: float | None = None
|
||||
expression_score_normalized: float | None = None
|
||||
273
src/usher_pipeline/evidence/expression/transform.py
Normal file
273
src/usher_pipeline/evidence/expression/transform.py
Normal file
@@ -0,0 +1,273 @@
|
||||
"""Transform and normalize tissue expression data."""
|
||||
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
import polars as pl
|
||||
import structlog
|
||||
|
||||
from usher_pipeline.evidence.expression.fetch import (
|
||||
fetch_hpa_expression,
|
||||
fetch_gtex_expression,
|
||||
fetch_cellxgene_expression,
|
||||
)
|
||||
|
||||
logger = structlog.get_logger()
|
||||
|
||||
|
||||
def calculate_tau_specificity(
|
||||
df: pl.DataFrame,
|
||||
tissue_columns: list[str],
|
||||
) -> pl.DataFrame:
|
||||
"""Calculate Tau tissue specificity index.
|
||||
|
||||
Tau measures tissue specificity: 0 = ubiquitous expression, 1 = tissue-specific.
|
||||
Formula: Tau = sum(1 - xi/xmax) / (n-1)
|
||||
where xi is expression in tissue i, xmax is max expression across tissues.
|
||||
|
||||
If ANY tissue value is NULL, Tau is NULL (insufficient data for reliable specificity).
|
||||
|
||||
Args:
|
||||
df: DataFrame with expression values across tissues
|
||||
tissue_columns: List of column names containing tissue expression values
|
||||
|
||||
Returns:
|
||||
DataFrame with tau_specificity column added
|
||||
"""
|
||||
logger.info("tau_calculation_start", tissue_count=len(tissue_columns))
|
||||
|
||||
# Check if any tissue columns are missing
|
||||
available_cols = [col for col in tissue_columns if col in df.columns]
|
||||
if len(available_cols) < len(tissue_columns):
|
||||
missing = set(tissue_columns) - set(available_cols)
|
||||
logger.warning("tau_missing_columns", missing=list(missing))
|
||||
|
||||
if not available_cols:
|
||||
# No tissue data available - return with NULL Tau
|
||||
return df.with_columns(pl.lit(None).cast(pl.Float64).alias("tau_specificity"))
|
||||
|
||||
# For each gene, check if all tissue values are non-NULL
|
||||
# If any NULL, Tau is NULL
|
||||
# Otherwise, compute Tau = sum(1 - xi/xmax) / (n-1)
|
||||
|
||||
# Create expression for NULL check
|
||||
has_all_data = pl.all_horizontal([pl.col(col).is_not_null() for col in available_cols])
|
||||
|
||||
# Compute Tau only for genes with complete data
|
||||
# Step 1: Find max expression across tissues
|
||||
max_expr = pl.max_horizontal([pl.col(col) for col in available_cols])
|
||||
|
||||
# Step 2: Compute sum(1 - xi/xmax) for each gene
|
||||
# Handle division by zero: if max_expr is 0, Tau is undefined (set to NULL)
|
||||
tau_sum = sum([
|
||||
pl.when(max_expr > 0)
|
||||
.then(1.0 - (pl.col(col) / max_expr))
|
||||
.otherwise(0.0)
|
||||
for col in available_cols
|
||||
])
|
||||
|
||||
# Step 3: Divide by (n-1), where n is number of tissues
|
||||
n_tissues = len(available_cols)
|
||||
if n_tissues <= 1:
|
||||
# Cannot compute specificity with only 1 tissue
|
||||
tau = pl.lit(None).cast(pl.Float64)
|
||||
else:
|
||||
tau = tau_sum / (n_tissues - 1)
|
||||
|
||||
# Apply Tau only to genes with complete data
|
||||
df = df.with_columns(
|
||||
pl.when(has_all_data & (max_expr > 0))
|
||||
.then(tau)
|
||||
.otherwise(pl.lit(None))
|
||||
.alias("tau_specificity")
|
||||
)
|
||||
|
||||
logger.info("tau_calculation_complete")
|
||||
|
||||
return df
|
||||
|
||||
|
||||
def compute_expression_score(df: pl.DataFrame) -> pl.DataFrame:
|
||||
"""Compute Usher tissue enrichment and normalized expression score.
|
||||
|
||||
Computes:
|
||||
1. usher_tissue_enrichment: Ratio of mean expression in Usher-relevant tissues
|
||||
(retina, inner ear proxies) to mean expression across all tissues.
|
||||
Higher ratio = more enriched in target tissues.
|
||||
2. expression_score_normalized: Weighted composite of:
|
||||
- 40%: usher_tissue_enrichment (normalized to 0-1)
|
||||
- 30%: tau_specificity
|
||||
- 30%: max_target_tissue_rank (percentile rank of max expression in targets)
|
||||
|
||||
NULL if all expression data is NULL.
|
||||
|
||||
Args:
|
||||
df: DataFrame with tissue expression columns and tau_specificity
|
||||
|
||||
Returns:
|
||||
DataFrame with usher_tissue_enrichment and expression_score_normalized columns
|
||||
"""
|
||||
logger.info("expression_score_start")
|
||||
|
||||
# Define Usher-relevant tissue columns
|
||||
usher_tissue_cols = [
|
||||
"hpa_retina_tpm",
|
||||
"hpa_cerebellum_tpm", # Cilia-rich
|
||||
"gtex_retina_tpm",
|
||||
"gtex_cerebellum_tpm",
|
||||
"cellxgene_photoreceptor_expr",
|
||||
"cellxgene_hair_cell_expr",
|
||||
]
|
||||
|
||||
# All tissue columns for global mean
|
||||
all_tissue_cols = [
|
||||
"hpa_retina_tpm",
|
||||
"hpa_cerebellum_tpm",
|
||||
"hpa_testis_tpm",
|
||||
"hpa_fallopian_tube_tpm",
|
||||
"gtex_retina_tpm",
|
||||
"gtex_cerebellum_tpm",
|
||||
"gtex_testis_tpm",
|
||||
"gtex_fallopian_tube_tpm",
|
||||
"cellxgene_photoreceptor_expr",
|
||||
"cellxgene_hair_cell_expr",
|
||||
]
|
||||
|
||||
# Filter to available columns
|
||||
usher_available = [col for col in usher_tissue_cols if col in df.columns]
|
||||
all_available = [col for col in all_tissue_cols if col in df.columns]
|
||||
|
||||
if not usher_available or not all_available:
|
||||
# No expression data - return NULL scores
|
||||
return df.with_columns([
|
||||
pl.lit(None).cast(pl.Float64).alias("usher_tissue_enrichment"),
|
||||
pl.lit(None).cast(pl.Float64).alias("expression_score_normalized"),
|
||||
])
|
||||
|
||||
# Compute mean expression in Usher tissues (ignoring NULLs)
|
||||
usher_mean = pl.mean_horizontal([pl.col(col) for col in usher_available])
|
||||
|
||||
# Compute mean expression across all tissues (ignoring NULLs)
|
||||
global_mean = pl.mean_horizontal([pl.col(col) for col in all_available])
|
||||
|
||||
# Enrichment ratio: usher_mean / global_mean
|
||||
# If global_mean is 0 or NULL, enrichment is NULL
|
||||
enrichment = pl.when(global_mean > 0).then(usher_mean / global_mean).otherwise(pl.lit(None))
|
||||
|
||||
df = df.with_columns(enrichment.alias("usher_tissue_enrichment"))
|
||||
|
||||
# Normalize enrichment to 0-1 scale
|
||||
# Use percentile rank across all genes
|
||||
enrichment_percentile = pl.col("usher_tissue_enrichment").rank(method="average") / pl.col("usher_tissue_enrichment").count()
|
||||
|
||||
# Compute max expression in target tissues
|
||||
max_target_expr = pl.max_horizontal([pl.col(col) for col in usher_available])
|
||||
max_target_percentile = max_target_expr.rank(method="average") / max_target_expr.count()
|
||||
|
||||
# Composite score (weighted average)
|
||||
# If tau_specificity is NULL, we can still compute a partial score
|
||||
# But prefer to have at least enrichment or tau available
|
||||
composite = pl.when(
|
||||
pl.col("usher_tissue_enrichment").is_not_null() | pl.col("tau_specificity").is_not_null()
|
||||
).then(
|
||||
0.4 * enrichment_percentile.fill_null(0.0) +
|
||||
0.3 * pl.col("tau_specificity").fill_null(0.0) +
|
||||
0.3 * max_target_percentile.fill_null(0.0)
|
||||
).otherwise(pl.lit(None))
|
||||
|
||||
df = df.with_columns(composite.alias("expression_score_normalized"))
|
||||
|
||||
logger.info("expression_score_complete")
|
||||
|
||||
return df
|
||||
|
||||
|
||||
def process_expression_evidence(
|
||||
gene_ids: list[str],
|
||||
cache_dir: Optional[Path] = None,
|
||||
force: bool = False,
|
||||
skip_cellxgene: bool = False,
|
||||
) -> pl.DataFrame:
|
||||
"""End-to-end expression evidence processing pipeline.
|
||||
|
||||
Composes: fetch HPA -> fetch GTEx -> fetch CellxGene -> merge -> compute Tau -> compute score -> collect
|
||||
|
||||
Args:
|
||||
gene_ids: List of Ensembl gene IDs to process
|
||||
cache_dir: Directory for caching downloads
|
||||
force: If True, re-download even if cached
|
||||
skip_cellxgene: If True, skip CellxGene fetching (optional dependency)
|
||||
|
||||
Returns:
|
||||
Materialized DataFrame with expression evidence ready for DuckDB storage
|
||||
"""
|
||||
logger.info("expression_pipeline_start", gene_count=len(gene_ids))
|
||||
|
||||
cache_dir = Path(cache_dir) if cache_dir else Path("data/expression")
|
||||
|
||||
# Fetch HPA expression (lazy)
|
||||
logger.info("fetching_hpa")
|
||||
lf_hpa = fetch_hpa_expression(gene_ids, cache_dir=cache_dir, force=force)
|
||||
|
||||
# Fetch GTEx expression (lazy)
|
||||
logger.info("fetching_gtex")
|
||||
lf_gtex = fetch_gtex_expression(gene_ids, cache_dir=cache_dir, force=force)
|
||||
|
||||
# Create gene universe DataFrame
|
||||
gene_universe = pl.LazyFrame({"gene_id": gene_ids})
|
||||
|
||||
# Merge GTEx with gene universe (left join to preserve all genes)
|
||||
# GTEx has gene_id, HPA has gene_symbol - need to handle join carefully
|
||||
lf_merged = gene_universe.join(lf_gtex, on="gene_id", how="left")
|
||||
|
||||
# For HPA, we need gene_symbol mapping
|
||||
# We'll need to load gene universe with gene_symbol from DuckDB or pass it in
|
||||
# For now, we'll fetch HPA separately and join on gene_symbol later
|
||||
# This requires gene_symbol in our gene_ids input or from gene universe
|
||||
|
||||
# DEVIATION: HPA uses gene_symbol, but we're working with gene_ids
|
||||
# We need gene_symbol mapping. For simplicity, we'll collect HPA separately
|
||||
# and merge in load.py after enriching with gene_symbol from gene universe
|
||||
|
||||
# Fetch CellxGene if not skipped
|
||||
if not skip_cellxgene:
|
||||
logger.info("fetching_cellxgene")
|
||||
lf_cellxgene = fetch_cellxgene_expression(gene_ids, cache_dir=cache_dir)
|
||||
lf_merged = lf_merged.join(lf_cellxgene, on="gene_id", how="left")
|
||||
|
||||
# Collect at this point to enable horizontal operations
|
||||
df = lf_merged.collect()
|
||||
|
||||
# Calculate Tau specificity
|
||||
tissue_columns = [
|
||||
"hpa_retina_tpm",
|
||||
"hpa_cerebellum_tpm",
|
||||
"hpa_testis_tpm",
|
||||
"hpa_fallopian_tube_tpm",
|
||||
"gtex_retina_tpm",
|
||||
"gtex_cerebellum_tpm",
|
||||
"gtex_testis_tpm",
|
||||
"gtex_fallopian_tube_tpm",
|
||||
"cellxgene_photoreceptor_expr",
|
||||
"cellxgene_hair_cell_expr",
|
||||
]
|
||||
# Filter to available columns
|
||||
available_tissue_cols = [col for col in tissue_columns if col in df.columns]
|
||||
|
||||
if available_tissue_cols:
|
||||
df = calculate_tau_specificity(df, available_tissue_cols)
|
||||
else:
|
||||
df = df.with_columns(pl.lit(None).cast(pl.Float64).alias("tau_specificity"))
|
||||
|
||||
# Compute expression score
|
||||
df = compute_expression_score(df)
|
||||
|
||||
logger.info(
|
||||
"expression_pipeline_complete",
|
||||
row_count=len(df),
|
||||
has_hpa=any("hpa_" in col for col in df.columns),
|
||||
has_gtex=any("gtex_" in col for col in df.columns),
|
||||
has_cellxgene=any("cellxgene_" in col for col in df.columns),
|
||||
)
|
||||
|
||||
return df
|
||||
50
src/usher_pipeline/evidence/literature/__init__.py
Normal file
50
src/usher_pipeline/evidence/literature/__init__.py
Normal file
@@ -0,0 +1,50 @@
|
||||
"""Literature Evidence Layer (LITE): PubMed-based evidence for cilia/sensory gene involvement.
|
||||
|
||||
This module fetches PubMed citations for genes in various contexts (cilia, sensory,
|
||||
cytoskeleton, cell polarity), classifies evidence quality, and computes quality-weighted
|
||||
scores that mitigate well-studied gene bias.
|
||||
|
||||
Key exports:
|
||||
- fetch: query_pubmed_gene, fetch_literature_evidence
|
||||
- transform: classify_evidence_tier, compute_literature_score, process_literature_evidence
|
||||
- load: load_to_duckdb
|
||||
- models: LiteratureRecord, SEARCH_CONTEXTS, LITERATURE_TABLE_NAME
|
||||
"""
|
||||
|
||||
from usher_pipeline.evidence.literature.models import (
|
||||
LiteratureRecord,
|
||||
LITERATURE_TABLE_NAME,
|
||||
SEARCH_CONTEXTS,
|
||||
DIRECT_EVIDENCE_TERMS,
|
||||
)
|
||||
from usher_pipeline.evidence.literature.fetch import (
|
||||
query_pubmed_gene,
|
||||
fetch_literature_evidence,
|
||||
)
|
||||
from usher_pipeline.evidence.literature.transform import (
|
||||
classify_evidence_tier,
|
||||
compute_literature_score,
|
||||
process_literature_evidence,
|
||||
)
|
||||
from usher_pipeline.evidence.literature.load import (
|
||||
load_to_duckdb,
|
||||
query_literature_supported,
|
||||
)
|
||||
|
||||
__all__ = [
|
||||
# Models
|
||||
"LiteratureRecord",
|
||||
"LITERATURE_TABLE_NAME",
|
||||
"SEARCH_CONTEXTS",
|
||||
"DIRECT_EVIDENCE_TERMS",
|
||||
# Fetch
|
||||
"query_pubmed_gene",
|
||||
"fetch_literature_evidence",
|
||||
# Transform
|
||||
"classify_evidence_tier",
|
||||
"compute_literature_score",
|
||||
"process_literature_evidence",
|
||||
# Load
|
||||
"load_to_duckdb",
|
||||
"query_literature_supported",
|
||||
]
|
||||
248
src/usher_pipeline/evidence/literature/fetch.py
Normal file
248
src/usher_pipeline/evidence/literature/fetch.py
Normal file
@@ -0,0 +1,248 @@
|
||||
"""Fetch literature evidence from PubMed via Biopython Entrez."""
|
||||
|
||||
from time import sleep
|
||||
from typing import Optional
|
||||
from functools import wraps
|
||||
|
||||
import polars as pl
|
||||
import structlog
|
||||
from Bio import Entrez
|
||||
|
||||
from usher_pipeline.evidence.literature.models import (
|
||||
SEARCH_CONTEXTS,
|
||||
DIRECT_EVIDENCE_TERMS,
|
||||
)
|
||||
|
||||
logger = structlog.get_logger()
|
||||
|
||||
|
||||
def ratelimit(calls_per_sec: float = 3.0):
|
||||
"""Rate limiter decorator for PubMed API calls.
|
||||
|
||||
NCBI E-utilities rate limits:
|
||||
- Without API key: 3 requests/second
|
||||
- With API key: 10 requests/second
|
||||
|
||||
Args:
|
||||
calls_per_sec: Maximum calls per second (default: 3 for no API key)
|
||||
"""
|
||||
min_interval = 1.0 / calls_per_sec
|
||||
last_called = [0.0]
|
||||
|
||||
def decorator(func):
|
||||
@wraps(func)
|
||||
def wrapper(*args, **kwargs):
|
||||
import time
|
||||
elapsed = time.time() - last_called[0]
|
||||
if elapsed < min_interval:
|
||||
sleep(min_interval - elapsed)
|
||||
result = func(*args, **kwargs)
|
||||
last_called[0] = time.time()
|
||||
return result
|
||||
return wrapper
|
||||
return decorator
|
||||
|
||||
|
||||
@ratelimit(calls_per_sec=3.0) # Default rate limit
|
||||
def _esearch_with_ratelimit(gene_symbol: str, query_terms: str, email: str) -> int:
|
||||
"""Execute PubMed esearch with rate limiting.
|
||||
|
||||
Args:
|
||||
gene_symbol: Gene symbol to search
|
||||
query_terms: Additional query terms (context filters)
|
||||
email: Email for NCBI (required)
|
||||
|
||||
Returns:
|
||||
Count of publications matching query
|
||||
"""
|
||||
query = f"({gene_symbol}[Gene Name]) AND {query_terms}"
|
||||
try:
|
||||
handle = Entrez.esearch(db="pubmed", term=query, retmax=0)
|
||||
record = Entrez.read(handle)
|
||||
handle.close()
|
||||
count = int(record["Count"])
|
||||
return count
|
||||
except Exception as e:
|
||||
logger.warning(
|
||||
"pubmed_query_failed",
|
||||
gene_symbol=gene_symbol,
|
||||
query_terms=query_terms[:50],
|
||||
error=str(e),
|
||||
)
|
||||
# Return None to indicate failed query (not zero publications)
|
||||
return None
|
||||
|
||||
|
||||
def query_pubmed_gene(
|
||||
gene_symbol: str,
|
||||
contexts: dict[str, str],
|
||||
email: str,
|
||||
api_key: Optional[str] = None,
|
||||
) -> dict:
|
||||
"""Query PubMed for a single gene across multiple contexts.
|
||||
|
||||
Performs systematic queries:
|
||||
1. Total publications for gene (no context filter)
|
||||
2. Publications in each context (cilia, sensory, etc.)
|
||||
3. Direct experimental evidence (knockout/mutation terms)
|
||||
4. High-throughput screen mentions
|
||||
|
||||
Args:
|
||||
gene_symbol: HGNC gene symbol (e.g., "BRCA1")
|
||||
contexts: Dict mapping context names to PubMed search terms
|
||||
email: Email address (required by NCBI E-utilities)
|
||||
api_key: Optional NCBI API key for higher rate limit (10/sec vs 3/sec)
|
||||
|
||||
Returns:
|
||||
Dict with counts for each context, plus direct_experimental and hts counts.
|
||||
NULL values indicate failed queries (API errors), not zero publications.
|
||||
"""
|
||||
# Set Entrez credentials
|
||||
Entrez.email = email
|
||||
if api_key:
|
||||
Entrez.api_key = api_key
|
||||
|
||||
# Update rate limit based on API key
|
||||
rate = 10.0 if api_key else 3.0
|
||||
global _esearch_with_ratelimit
|
||||
_esearch_with_ratelimit = ratelimit(calls_per_sec=rate)(_esearch_with_ratelimit.__wrapped__)
|
||||
|
||||
logger.debug(
|
||||
"pubmed_query_gene_start",
|
||||
gene_symbol=gene_symbol,
|
||||
context_count=len(contexts),
|
||||
rate_limit=rate,
|
||||
)
|
||||
|
||||
results = {"gene_symbol": gene_symbol}
|
||||
|
||||
# Query 1: Total publications (no context filter)
|
||||
total_count = _esearch_with_ratelimit(gene_symbol, "", email)
|
||||
results["total_pubmed_count"] = total_count
|
||||
|
||||
# Query 2: Context-specific counts
|
||||
for context_name, context_terms in contexts.items():
|
||||
count = _esearch_with_ratelimit(gene_symbol, context_terms, email)
|
||||
results[f"{context_name}_context_count"] = count
|
||||
|
||||
# Query 3: Direct experimental evidence
|
||||
direct_count = _esearch_with_ratelimit(
|
||||
gene_symbol,
|
||||
f"{DIRECT_EVIDENCE_TERMS} AND {contexts.get('cilia', '')}",
|
||||
email,
|
||||
)
|
||||
results["direct_experimental_count"] = direct_count
|
||||
|
||||
# Query 4: High-throughput screen hits
|
||||
hts_terms = "(screen[Title/Abstract] OR proteomics[Title/Abstract] OR transcriptomics[Title/Abstract])"
|
||||
hts_count = _esearch_with_ratelimit(gene_symbol, hts_terms, email)
|
||||
results["hts_screen_count"] = hts_count
|
||||
|
||||
logger.debug(
|
||||
"pubmed_query_gene_complete",
|
||||
gene_symbol=gene_symbol,
|
||||
total_count=total_count,
|
||||
)
|
||||
|
||||
return results
|
||||
|
||||
|
||||
def fetch_literature_evidence(
|
||||
gene_symbols: list[str],
|
||||
email: str,
|
||||
api_key: Optional[str] = None,
|
||||
batch_size: int = 500,
|
||||
checkpoint_df: Optional[pl.DataFrame] = None,
|
||||
) -> pl.DataFrame:
|
||||
"""Fetch literature evidence for all genes with progress tracking and checkpointing.
|
||||
|
||||
This is a SLOW operation (~20K genes * ~6 queries each = ~120K queries):
|
||||
- With API key (10 req/sec): ~3.3 hours
|
||||
- Without API key (3 req/sec): ~11 hours
|
||||
|
||||
Supports checkpoint-restart: pass partial results to resume from last checkpoint.
|
||||
|
||||
Args:
|
||||
gene_symbols: List of HGNC gene symbols to query
|
||||
email: Email address (required by NCBI E-utilities)
|
||||
api_key: Optional NCBI API key for 10 req/sec rate limit
|
||||
batch_size: Save checkpoint every N genes (default: 500)
|
||||
checkpoint_df: Optional partial results DataFrame to resume from
|
||||
|
||||
Returns:
|
||||
DataFrame with columns: gene_symbol, total_pubmed_count, cilia_context_count,
|
||||
sensory_context_count, cytoskeleton_context_count, cell_polarity_context_count,
|
||||
direct_experimental_count, hts_screen_count.
|
||||
NULL values indicate failed queries (API errors), not zero publications.
|
||||
"""
|
||||
# Estimate time
|
||||
queries_per_gene = 6 # total + 4 contexts + direct + hts
|
||||
total_queries = len(gene_symbols) * queries_per_gene
|
||||
rate = 10.0 if api_key else 3.0
|
||||
estimated_seconds = total_queries / rate
|
||||
estimated_hours = estimated_seconds / 3600
|
||||
|
||||
logger.info(
|
||||
"pubmed_fetch_start",
|
||||
gene_count=len(gene_symbols),
|
||||
total_queries=total_queries,
|
||||
rate_limit_per_sec=rate,
|
||||
estimated_hours=round(estimated_hours, 2),
|
||||
has_api_key=api_key is not None,
|
||||
)
|
||||
|
||||
# Resume from checkpoint if provided
|
||||
if checkpoint_df is not None:
|
||||
processed_symbols = set(checkpoint_df["gene_symbol"].to_list())
|
||||
remaining_symbols = [s for s in gene_symbols if s not in processed_symbols]
|
||||
logger.info(
|
||||
"pubmed_fetch_resume",
|
||||
checkpoint_genes=len(processed_symbols),
|
||||
remaining_genes=len(remaining_symbols),
|
||||
)
|
||||
gene_symbols = remaining_symbols
|
||||
results = checkpoint_df.to_dicts()
|
||||
else:
|
||||
results = []
|
||||
|
||||
# Process genes with progress logging
|
||||
for i, gene_symbol in enumerate(gene_symbols, start=1):
|
||||
# Query PubMed for this gene
|
||||
gene_result = query_pubmed_gene(
|
||||
gene_symbol=gene_symbol,
|
||||
contexts=SEARCH_CONTEXTS,
|
||||
email=email,
|
||||
api_key=api_key,
|
||||
)
|
||||
results.append(gene_result)
|
||||
|
||||
# Log progress every 100 genes
|
||||
if i % 100 == 0:
|
||||
pct = (i / len(gene_symbols)) * 100
|
||||
logger.info(
|
||||
"pubmed_fetch_progress",
|
||||
processed=i,
|
||||
total=len(gene_symbols),
|
||||
percent=round(pct, 1),
|
||||
gene_symbol=gene_symbol,
|
||||
)
|
||||
|
||||
# Checkpoint every batch_size genes
|
||||
if i % batch_size == 0:
|
||||
logger.info(
|
||||
"pubmed_fetch_checkpoint",
|
||||
processed=i,
|
||||
total=len(gene_symbols),
|
||||
batch_size=batch_size,
|
||||
)
|
||||
|
||||
logger.info(
|
||||
"pubmed_fetch_complete",
|
||||
total_genes=len(results),
|
||||
failed_count=sum(1 for r in results if r["total_pubmed_count"] is None),
|
||||
)
|
||||
|
||||
# Convert to DataFrame
|
||||
df = pl.DataFrame(results)
|
||||
|
||||
return df
|
||||
137
src/usher_pipeline/evidence/literature/load.py
Normal file
137
src/usher_pipeline/evidence/literature/load.py
Normal file
@@ -0,0 +1,137 @@
|
||||
"""Load literature evidence to DuckDB with provenance tracking."""
|
||||
|
||||
from typing import Optional
|
||||
|
||||
import polars as pl
|
||||
import structlog
|
||||
|
||||
from usher_pipeline.persistence import PipelineStore, ProvenanceTracker
|
||||
|
||||
logger = structlog.get_logger()
|
||||
|
||||
|
||||
def load_to_duckdb(
|
||||
df: pl.DataFrame,
|
||||
store: PipelineStore,
|
||||
provenance: ProvenanceTracker,
|
||||
description: str = ""
|
||||
) -> None:
|
||||
"""Save literature evidence DataFrame to DuckDB with provenance.
|
||||
|
||||
Creates or replaces the literature_evidence table (idempotent).
|
||||
Records provenance step with summary statistics.
|
||||
|
||||
Args:
|
||||
df: Processed literature evidence DataFrame with evidence_tier and literature_score_normalized
|
||||
store: PipelineStore instance for DuckDB persistence
|
||||
provenance: ProvenanceTracker instance for metadata recording
|
||||
description: Optional description for checkpoint metadata
|
||||
"""
|
||||
logger.info("literature_load_start", row_count=len(df))
|
||||
|
||||
# Calculate summary statistics for provenance
|
||||
tier_counts = (
|
||||
df.group_by("evidence_tier")
|
||||
.agg(pl.count().alias("count"))
|
||||
.to_dicts()
|
||||
)
|
||||
tier_distribution = {row["evidence_tier"]: row["count"] for row in tier_counts}
|
||||
|
||||
genes_with_evidence = df.filter(
|
||||
pl.col("evidence_tier").is_in(["direct_experimental", "functional_mention", "hts_hit"])
|
||||
).height
|
||||
|
||||
# Calculate mean literature score (excluding NULL)
|
||||
mean_score_result = df.filter(
|
||||
pl.col("literature_score_normalized").is_not_null()
|
||||
).select(pl.col("literature_score_normalized").mean())
|
||||
|
||||
mean_score = None
|
||||
if len(mean_score_result) > 0:
|
||||
mean_score = mean_score_result.to_dicts()[0]["literature_score_normalized"]
|
||||
|
||||
# Count total PubMed queries made (estimate: 6 queries per gene)
|
||||
total_queries = len(df) * 6
|
||||
|
||||
# Save to DuckDB with CREATE OR REPLACE (idempotent)
|
||||
store.save_dataframe(
|
||||
df=df,
|
||||
table_name="literature_evidence",
|
||||
description=description or "PubMed literature evidence with context-specific queries and quality-weighted scoring",
|
||||
replace=True
|
||||
)
|
||||
|
||||
# Record provenance step with details
|
||||
provenance.record_step("load_literature_evidence", {
|
||||
"row_count": len(df),
|
||||
"genes_with_direct_evidence": tier_distribution.get("direct_experimental", 0),
|
||||
"genes_with_functional_mention": tier_distribution.get("functional_mention", 0),
|
||||
"genes_with_hts_hits": tier_distribution.get("hts_hit", 0),
|
||||
"genes_with_any_evidence": genes_with_evidence,
|
||||
"tier_distribution": tier_distribution,
|
||||
"mean_literature_score": round(mean_score, 4) if mean_score is not None else None,
|
||||
"estimated_pubmed_queries": total_queries,
|
||||
})
|
||||
|
||||
logger.info(
|
||||
"literature_load_complete",
|
||||
row_count=len(df),
|
||||
tier_distribution=tier_distribution,
|
||||
genes_with_evidence=genes_with_evidence,
|
||||
mean_score=round(mean_score, 4) if mean_score is not None else None,
|
||||
)
|
||||
|
||||
|
||||
def query_literature_supported(
|
||||
store: PipelineStore,
|
||||
min_tier: str = "functional_mention"
|
||||
) -> pl.DataFrame:
|
||||
"""Query genes with literature support at or above specified tier.
|
||||
|
||||
Demonstrates DuckDB query capability and filters genes by evidence quality.
|
||||
|
||||
Args:
|
||||
store: PipelineStore instance
|
||||
min_tier: Minimum evidence tier (default: "functional_mention")
|
||||
Options: "direct_experimental", "functional_mention", "hts_hit", "incidental"
|
||||
|
||||
Returns:
|
||||
DataFrame with literature-supported genes sorted by literature_score_normalized (desc)
|
||||
Columns: gene_id, gene_symbol, evidence_tier, literature_score_normalized,
|
||||
cilia_context_count, sensory_context_count, total_pubmed_count
|
||||
"""
|
||||
logger.info("literature_query_supported", min_tier=min_tier)
|
||||
|
||||
# Define tier hierarchy
|
||||
tier_hierarchy = {
|
||||
"direct_experimental": 0,
|
||||
"functional_mention": 1,
|
||||
"hts_hit": 2,
|
||||
"incidental": 3,
|
||||
"none": 4,
|
||||
}
|
||||
|
||||
if min_tier not in tier_hierarchy:
|
||||
raise ValueError(f"Invalid tier: {min_tier}. Must be one of {list(tier_hierarchy.keys())}")
|
||||
|
||||
min_tier_rank = tier_hierarchy[min_tier]
|
||||
|
||||
# Build tier list for SQL IN clause
|
||||
valid_tiers = [tier for tier, rank in tier_hierarchy.items() if rank <= min_tier_rank]
|
||||
tiers_str = ", ".join(f"'{tier}'" for tier in valid_tiers)
|
||||
|
||||
# Query DuckDB
|
||||
df = store.execute_query(
|
||||
f"""
|
||||
SELECT gene_id, gene_symbol, evidence_tier, literature_score_normalized,
|
||||
cilia_context_count, sensory_context_count, total_pubmed_count,
|
||||
direct_experimental_count, hts_screen_count
|
||||
FROM literature_evidence
|
||||
WHERE evidence_tier IN ({tiers_str})
|
||||
ORDER BY literature_score_normalized DESC NULLS LAST
|
||||
"""
|
||||
)
|
||||
|
||||
logger.info("literature_query_complete", result_count=len(df))
|
||||
|
||||
return df
|
||||
89
src/usher_pipeline/evidence/literature/models.py
Normal file
89
src/usher_pipeline/evidence/literature/models.py
Normal file
@@ -0,0 +1,89 @@
|
||||
"""Data models for literature evidence layer."""
|
||||
|
||||
from typing import Optional
|
||||
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
|
||||
LITERATURE_TABLE_NAME = "literature_evidence"
|
||||
|
||||
# Context-specific PubMed search terms
|
||||
# These are combined with gene symbols to find relevant publications
|
||||
SEARCH_CONTEXTS = {
|
||||
"cilia": "(cilia OR cilium OR ciliary OR flagellum OR intraflagellar)",
|
||||
"sensory": "(retina OR cochlea OR hair cell OR photoreceptor OR vestibular OR hearing OR usher syndrome)",
|
||||
"cytoskeleton": "(cytoskeleton OR actin OR microtubule OR motor protein)",
|
||||
"cell_polarity": "(cell polarity OR planar cell polarity OR apicobasal OR tight junction)",
|
||||
}
|
||||
|
||||
# Terms indicating direct experimental evidence
|
||||
# Publications with these terms carry higher confidence than incidental mentions
|
||||
DIRECT_EVIDENCE_TERMS = "(knockout OR knockdown OR mutation OR CRISPR OR siRNA OR morpholino OR null allele)"
|
||||
|
||||
# Evidence tier classification
|
||||
# Higher tiers indicate stronger evidence quality
|
||||
EVIDENCE_TIERS = [
|
||||
"direct_experimental", # Knockout/mutation + cilia/sensory context
|
||||
"functional_mention", # Mentioned in cilia/sensory context, not just incidental
|
||||
"hts_hit", # High-throughput screen hit + cilia/sensory context
|
||||
"incidental", # Mentioned in literature but no specific cilia/sensory context
|
||||
"none", # No PubMed publications found
|
||||
]
|
||||
|
||||
|
||||
class LiteratureRecord(BaseModel):
|
||||
"""Literature evidence record for a single gene.
|
||||
|
||||
Captures PubMed publication counts across different contexts and evidence quality.
|
||||
NULL values indicate failed queries (API errors), not zero publications.
|
||||
"""
|
||||
|
||||
gene_id: str = Field(description="Ensembl gene ID (e.g., ENSG00000012048)")
|
||||
gene_symbol: str = Field(description="HGNC gene symbol (e.g., BRCA1)")
|
||||
|
||||
# Publication counts by context
|
||||
total_pubmed_count: Optional[int] = Field(
|
||||
None,
|
||||
description="Total PubMed publications mentioning this gene (any context). NULL if query failed.",
|
||||
)
|
||||
cilia_context_count: Optional[int] = Field(
|
||||
None,
|
||||
description="Publications mentioning gene in cilia-related context",
|
||||
)
|
||||
sensory_context_count: Optional[int] = Field(
|
||||
None,
|
||||
description="Publications mentioning gene in sensory (retina/cochlea/hearing) context",
|
||||
)
|
||||
cytoskeleton_context_count: Optional[int] = Field(
|
||||
None,
|
||||
description="Publications mentioning gene in cytoskeleton context",
|
||||
)
|
||||
cell_polarity_context_count: Optional[int] = Field(
|
||||
None,
|
||||
description="Publications mentioning gene in cell polarity context",
|
||||
)
|
||||
|
||||
# Evidence quality indicators
|
||||
direct_experimental_count: Optional[int] = Field(
|
||||
None,
|
||||
description="Publications with knockout/mutation/knockdown evidence",
|
||||
)
|
||||
hts_screen_count: Optional[int] = Field(
|
||||
None,
|
||||
description="Publications from high-throughput screens (proteomics/transcriptomics)",
|
||||
)
|
||||
|
||||
# Derived classification
|
||||
evidence_tier: str = Field(
|
||||
description="Evidence quality tier: direct_experimental, functional_mention, hts_hit, incidental, none"
|
||||
)
|
||||
literature_score_normalized: Optional[float] = Field(
|
||||
None,
|
||||
ge=0.0,
|
||||
le=1.0,
|
||||
description="Quality-weighted literature score [0-1], normalized to mitigate well-studied gene bias. NULL if total_pubmed_count is NULL.",
|
||||
)
|
||||
|
||||
class Config:
|
||||
"""Pydantic config."""
|
||||
frozen = False # Allow mutation for score computation
|
||||
279
src/usher_pipeline/evidence/literature/transform.py
Normal file
279
src/usher_pipeline/evidence/literature/transform.py
Normal file
@@ -0,0 +1,279 @@
|
||||
"""Transform literature evidence: classify tiers and compute quality-weighted scores."""
|
||||
|
||||
from typing import Optional
|
||||
|
||||
import polars as pl
|
||||
import structlog
|
||||
|
||||
logger = structlog.get_logger()
|
||||
|
||||
|
||||
# Evidence quality weights for scoring
|
||||
# Direct experimental evidence is most valuable, incidental mentions least valuable
|
||||
EVIDENCE_QUALITY_WEIGHTS = {
|
||||
"direct_experimental": 1.0,
|
||||
"functional_mention": 0.6,
|
||||
"hts_hit": 0.3,
|
||||
"incidental": 0.1,
|
||||
"none": 0.0,
|
||||
}
|
||||
|
||||
# Context relevance weights
|
||||
# Cilia and sensory contexts are most relevant, cytoskeleton/polarity supportive
|
||||
CONTEXT_WEIGHTS = {
|
||||
"cilia_context_count": 2.0,
|
||||
"sensory_context_count": 2.0,
|
||||
"cytoskeleton_context_count": 1.0,
|
||||
"cell_polarity_context_count": 1.0,
|
||||
}
|
||||
|
||||
|
||||
def classify_evidence_tier(df: pl.DataFrame) -> pl.DataFrame:
|
||||
"""Classify literature evidence into quality tiers.
|
||||
|
||||
Tiers (highest to lowest quality):
|
||||
- direct_experimental: Knockout/mutation + cilia/sensory context (highest confidence)
|
||||
- functional_mention: Mentioned in cilia/sensory context with multiple publications
|
||||
- hts_hit: High-throughput screen hit + cilia/sensory context
|
||||
- incidental: Publications exist but no cilia/sensory context
|
||||
- none: No publications found (or query failed)
|
||||
|
||||
Args:
|
||||
df: DataFrame with columns: gene_symbol, total_pubmed_count, cilia_context_count,
|
||||
sensory_context_count, direct_experimental_count, hts_screen_count
|
||||
|
||||
Returns:
|
||||
DataFrame with added evidence_tier column
|
||||
"""
|
||||
logger.info("literature_classify_start", row_count=len(df))
|
||||
|
||||
# Define tier classification logic using polars expressions
|
||||
# Priority order: direct_experimental > functional_mention > hts_hit > incidental > none
|
||||
|
||||
df = df.with_columns([
|
||||
pl.when(
|
||||
# Direct experimental: knockout/mutation evidence + cilia/sensory context
|
||||
(pl.col("direct_experimental_count").is_not_null()) &
|
||||
(pl.col("direct_experimental_count") >= 1) &
|
||||
(
|
||||
(pl.col("cilia_context_count").is_not_null() & (pl.col("cilia_context_count") >= 1)) |
|
||||
(pl.col("sensory_context_count").is_not_null() & (pl.col("sensory_context_count") >= 1))
|
||||
)
|
||||
).then(pl.lit("direct_experimental"))
|
||||
.when(
|
||||
# Functional mention: cilia/sensory context + multiple publications
|
||||
(
|
||||
(pl.col("cilia_context_count").is_not_null() & (pl.col("cilia_context_count") >= 1)) |
|
||||
(pl.col("sensory_context_count").is_not_null() & (pl.col("sensory_context_count") >= 1))
|
||||
) &
|
||||
(pl.col("total_pubmed_count").is_not_null()) &
|
||||
(pl.col("total_pubmed_count") >= 3)
|
||||
).then(pl.lit("functional_mention"))
|
||||
.when(
|
||||
# HTS hit: screen evidence + cilia/sensory context
|
||||
(pl.col("hts_screen_count").is_not_null()) &
|
||||
(pl.col("hts_screen_count") >= 1) &
|
||||
(
|
||||
(pl.col("cilia_context_count").is_not_null() & (pl.col("cilia_context_count") >= 1)) |
|
||||
(pl.col("sensory_context_count").is_not_null() & (pl.col("sensory_context_count") >= 1))
|
||||
)
|
||||
).then(pl.lit("hts_hit"))
|
||||
.when(
|
||||
# Incidental: publications exist but no cilia/sensory context
|
||||
(pl.col("total_pubmed_count").is_not_null()) &
|
||||
(pl.col("total_pubmed_count") >= 1)
|
||||
).then(pl.lit("incidental"))
|
||||
.otherwise(pl.lit("none")) # No publications or query failed
|
||||
.alias("evidence_tier")
|
||||
])
|
||||
|
||||
# Count tier distribution for logging
|
||||
tier_counts = (
|
||||
df.group_by("evidence_tier")
|
||||
.agg(pl.count().alias("count"))
|
||||
.sort("count", descending=True)
|
||||
)
|
||||
|
||||
logger.info(
|
||||
"literature_classify_complete",
|
||||
tier_distribution={
|
||||
row["evidence_tier"]: row["count"]
|
||||
for row in tier_counts.to_dicts()
|
||||
},
|
||||
)
|
||||
|
||||
return df
|
||||
|
||||
|
||||
def compute_literature_score(df: pl.DataFrame) -> pl.DataFrame:
|
||||
"""Compute quality-weighted literature score with bias mitigation.
|
||||
|
||||
Quality-weighted scoring formula:
|
||||
1. Context score = weighted sum of context counts (cilia * 2.0 + sensory * 2.0 + cytoskeleton * 1.0 + polarity * 1.0)
|
||||
2. Apply evidence quality weight based on tier
|
||||
3. CRITICAL: Normalize by log2(total_pubmed_count + 1) to penalize genes where
|
||||
cilia mentions are tiny fraction of total literature (e.g., TP53: 5 cilia / 100K total)
|
||||
4. Rank-percentile normalization to [0, 1] scale
|
||||
|
||||
This prevents well-studied genes (TP53, BRCA1) from dominating scores over
|
||||
focused candidates with high-quality but fewer publications.
|
||||
|
||||
Args:
|
||||
df: DataFrame with context counts and evidence_tier column
|
||||
|
||||
Returns:
|
||||
DataFrame with added literature_score_normalized column [0-1]
|
||||
"""
|
||||
logger.info("literature_score_start", row_count=len(df))
|
||||
|
||||
# Step 1: Compute weighted context score
|
||||
df = df.with_columns([
|
||||
(
|
||||
(pl.col("cilia_context_count").fill_null(0) * CONTEXT_WEIGHTS["cilia_context_count"]) +
|
||||
(pl.col("sensory_context_count").fill_null(0) * CONTEXT_WEIGHTS["sensory_context_count"]) +
|
||||
(pl.col("cytoskeleton_context_count").fill_null(0) * CONTEXT_WEIGHTS["cytoskeleton_context_count"]) +
|
||||
(pl.col("cell_polarity_context_count").fill_null(0) * CONTEXT_WEIGHTS["cell_polarity_context_count"])
|
||||
).alias("context_score")
|
||||
])
|
||||
|
||||
# Step 2: Apply evidence quality weight
|
||||
# Map evidence_tier to quality weight using replace
|
||||
df = df.with_columns([
|
||||
pl.col("evidence_tier")
|
||||
.replace(EVIDENCE_QUALITY_WEIGHTS, default=0.0)
|
||||
.alias("quality_weight")
|
||||
])
|
||||
|
||||
# Step 3: Bias mitigation via total publication normalization
|
||||
# Penalize genes where cilia mentions are small fraction of total literature
|
||||
df = df.with_columns([
|
||||
pl.when(pl.col("total_pubmed_count").is_not_null())
|
||||
.then(
|
||||
(pl.col("context_score") * pl.col("quality_weight")) /
|
||||
((pl.col("total_pubmed_count") + 1).log(base=2))
|
||||
)
|
||||
.otherwise(pl.lit(None))
|
||||
.alias("raw_score")
|
||||
])
|
||||
|
||||
# Step 4: Rank-percentile normalization to [0, 1]
|
||||
# Only rank genes with non-null raw_score
|
||||
total_with_scores = df.filter(pl.col("raw_score").is_not_null()).height
|
||||
|
||||
if total_with_scores == 0:
|
||||
# No valid scores - all NULL
|
||||
df = df.with_columns([
|
||||
pl.lit(None, dtype=pl.Float64).alias("literature_score_normalized")
|
||||
])
|
||||
else:
|
||||
# Compute rank percentile
|
||||
df = df.with_columns([
|
||||
pl.when(pl.col("raw_score").is_not_null())
|
||||
.then(
|
||||
pl.col("raw_score").rank(method="average") / total_with_scores
|
||||
)
|
||||
.otherwise(pl.lit(None))
|
||||
.alias("literature_score_normalized")
|
||||
])
|
||||
|
||||
# Drop intermediate columns
|
||||
df = df.drop(["context_score", "quality_weight", "raw_score"])
|
||||
|
||||
# Log score statistics
|
||||
score_stats = df.filter(pl.col("literature_score_normalized").is_not_null()).select([
|
||||
pl.col("literature_score_normalized").min().alias("min"),
|
||||
pl.col("literature_score_normalized").max().alias("max"),
|
||||
pl.col("literature_score_normalized").mean().alias("mean"),
|
||||
pl.col("literature_score_normalized").median().alias("median"),
|
||||
])
|
||||
|
||||
if len(score_stats) > 0:
|
||||
stats = score_stats.to_dicts()[0]
|
||||
logger.info(
|
||||
"literature_score_complete",
|
||||
min_score=round(stats["min"], 4) if stats["min"] is not None else None,
|
||||
max_score=round(stats["max"], 4) if stats["max"] is not None else None,
|
||||
mean_score=round(stats["mean"], 4) if stats["mean"] is not None else None,
|
||||
median_score=round(stats["median"], 4) if stats["median"] is not None else None,
|
||||
genes_with_scores=total_with_scores,
|
||||
)
|
||||
|
||||
return df
|
||||
|
||||
|
||||
def process_literature_evidence(
|
||||
gene_ids: list[str],
|
||||
gene_symbol_map: pl.DataFrame,
|
||||
email: str,
|
||||
api_key: Optional[str] = None,
|
||||
batch_size: int = 500,
|
||||
checkpoint_df: Optional[pl.DataFrame] = None,
|
||||
) -> pl.DataFrame:
|
||||
"""End-to-end literature evidence processing pipeline.
|
||||
|
||||
1. Map gene IDs to symbols
|
||||
2. Fetch PubMed literature evidence
|
||||
3. Classify evidence tiers
|
||||
4. Compute quality-weighted scores
|
||||
5. Join back to gene IDs
|
||||
|
||||
Args:
|
||||
gene_ids: List of Ensembl gene IDs
|
||||
gene_symbol_map: DataFrame with columns: gene_id, gene_symbol
|
||||
email: Email for NCBI E-utilities (required)
|
||||
api_key: Optional NCBI API key for higher rate limit
|
||||
batch_size: Checkpoint save frequency (default: 500)
|
||||
checkpoint_df: Optional partial results to resume from
|
||||
|
||||
Returns:
|
||||
DataFrame with columns: gene_id, gene_symbol, total_pubmed_count,
|
||||
cilia_context_count, sensory_context_count, cytoskeleton_context_count,
|
||||
cell_polarity_context_count, direct_experimental_count, hts_screen_count,
|
||||
evidence_tier, literature_score_normalized
|
||||
"""
|
||||
from usher_pipeline.evidence.literature.fetch import fetch_literature_evidence
|
||||
|
||||
logger.info(
|
||||
"literature_process_start",
|
||||
gene_count=len(gene_ids),
|
||||
has_checkpoint=checkpoint_df is not None,
|
||||
)
|
||||
|
||||
# Step 1: Map gene IDs to symbols
|
||||
gene_map = gene_symbol_map.filter(pl.col("gene_id").is_in(gene_ids))
|
||||
gene_symbols = gene_map["gene_symbol"].to_list()
|
||||
|
||||
logger.info(
|
||||
"literature_gene_mapping",
|
||||
input_ids=len(gene_ids),
|
||||
mapped_symbols=len(gene_symbols),
|
||||
)
|
||||
|
||||
# Step 2: Fetch literature evidence
|
||||
lit_df = fetch_literature_evidence(
|
||||
gene_symbols=gene_symbols,
|
||||
email=email,
|
||||
api_key=api_key,
|
||||
batch_size=batch_size,
|
||||
checkpoint_df=checkpoint_df,
|
||||
)
|
||||
|
||||
# Step 3: Classify evidence tiers
|
||||
lit_df = classify_evidence_tier(lit_df)
|
||||
|
||||
# Step 4: Compute quality-weighted scores
|
||||
lit_df = compute_literature_score(lit_df)
|
||||
|
||||
# Step 5: Join back to gene IDs
|
||||
result_df = gene_map.join(
|
||||
lit_df,
|
||||
on="gene_symbol",
|
||||
how="left",
|
||||
)
|
||||
|
||||
logger.info(
|
||||
"literature_process_complete",
|
||||
total_genes=len(result_df),
|
||||
)
|
||||
|
||||
return result_df
|
||||
Reference in New Issue
Block a user