feat(03-05): implement animal model evidence fetch and transform

- models.py: AnimalModelRecord with ortholog confidence, phenotype flags, and normalized scoring
- fetch.py: Retrieve orthologs from HCOP, phenotypes from MGI/ZFIN/IMPC with retry
- transform.py: Filter sensory/cilia-relevant phenotypes, score with confidence weighting
- Ortholog confidence: HIGH (8+ sources), MEDIUM (4-7), LOW (1-3)
- Scoring: mouse +0.4, zebrafish +0.3, IMPC +0.3, weighted by confidence
- NULL preservation: no ortholog = NULL score (not zero)
This commit is contained in:
2026-02-11 19:00:24 +08:00
parent 8aa66987f8
commit 0e389c7e41
4 changed files with 1019 additions and 0 deletions

View File

@@ -0,0 +1,497 @@
"""Fetch animal model phenotype data and ortholog mappings."""
import gzip
import io
from pathlib import Path
from typing import Optional
import httpx
import polars as pl
import structlog
from tenacity import (
retry,
stop_after_attempt,
wait_exponential,
retry_if_exception_type,
)
logger = structlog.get_logger()
# HCOP ortholog database URLs
HCOP_HUMAN_MOUSE_URL = "https://ftp.ebi.ac.uk/pub/databases/genenames/hcop/human_mouse_hcop_fifteen_column.txt.gz"
HCOP_HUMAN_ZEBRAFISH_URL = "https://ftp.ebi.ac.uk/pub/databases/genenames/hcop/human_zebrafish_hcop_fifteen_column.txt.gz"
# MGI phenotype report URL
MGI_GENE_PHENO_URL = "https://www.informatics.jax.org/downloads/reports/MGI_GenePheno.rpt"
# ZFIN phenotype data URL
ZFIN_PHENO_URL = "https://zfin.org/downloads/phenoGeneCleanData_fish.txt"
# IMPC API base URL
IMPC_API_BASE = "https://www.ebi.ac.uk/mi/impc/solr/genotype-phenotype/select"
@retry(
stop=stop_after_attempt(5),
wait=wait_exponential(multiplier=1, min=4, max=60),
retry=retry_if_exception_type(
(httpx.HTTPStatusError, httpx.ConnectError, httpx.TimeoutException)
),
)
def _download_gzipped(url: str) -> bytes:
"""Download and decompress a gzipped file.
Args:
url: URL to download
Returns:
Decompressed file content as bytes
"""
logger.info("download_start", url=url)
with httpx.stream("GET", url, timeout=120.0, follow_redirects=True) as response:
response.raise_for_status()
# Read compressed data
compressed_data = b""
for chunk in response.iter_bytes(chunk_size=8192):
compressed_data += chunk
# Decompress
logger.info("decompress_start", compressed_size_mb=round(len(compressed_data) / 1024 / 1024, 2))
decompressed = gzip.decompress(compressed_data)
logger.info("decompress_complete", decompressed_size_mb=round(len(decompressed) / 1024 / 1024, 2))
return decompressed
@retry(
stop=stop_after_attempt(5),
wait=wait_exponential(multiplier=1, min=4, max=60),
retry=retry_if_exception_type(
(httpx.HTTPStatusError, httpx.ConnectError, httpx.TimeoutException)
),
)
def _download_text(url: str) -> str:
"""Download a text file with retry.
Args:
url: URL to download
Returns:
File content as string
"""
logger.info("download_text_start", url=url)
with httpx.stream("GET", url, timeout=120.0, follow_redirects=True) as response:
response.raise_for_status()
content = response.text
logger.info("download_text_complete", size_mb=round(len(content) / 1024 / 1024, 2))
return content
def fetch_ortholog_mapping(gene_ids: list[str]) -> pl.DataFrame:
"""Fetch human-to-mouse and human-to-zebrafish ortholog mappings from HCOP.
Downloads HCOP ortholog data, assigns confidence scores based on number of
supporting databases, and handles one-to-many mappings by selecting the
ortholog with highest confidence.
Confidence scoring:
- HIGH: 8+ supporting databases
- MEDIUM: 4-7 supporting databases
- LOW: 1-3 supporting databases
Args:
gene_ids: List of human gene IDs (ENSG format)
Returns:
DataFrame with columns:
- gene_id: Human gene ID
- mouse_ortholog: Mouse gene symbol
- mouse_ortholog_confidence: HIGH/MEDIUM/LOW
- zebrafish_ortholog: Zebrafish gene symbol
- zebrafish_ortholog_confidence: HIGH/MEDIUM/LOW
"""
logger.info("fetch_ortholog_mapping_start", gene_count=len(gene_ids))
# Download human-mouse HCOP data
logger.info("fetch_hcop_mouse")
mouse_data = _download_gzipped(HCOP_HUMAN_MOUSE_URL)
mouse_df = pl.read_csv(
io.BytesIO(mouse_data),
separator="\t",
null_values=["", "NA"],
)
logger.info("hcop_mouse_columns", columns=mouse_df.columns)
# Parse mouse ortholog data
# HCOP columns: human_entrez_gene, human_ensembl_gene, hgnc_id, human_name, human_symbol,
# human_chr, human_assert_ids, mouse_entrez_gene, mouse_ensembl_gene,
# mgi_id, mouse_name, mouse_symbol, mouse_chr, mouse_assert_ids, support
mouse_orthologs = (
mouse_df
.filter(pl.col("human_ensembl_gene").is_in(gene_ids))
.select([
pl.col("human_ensembl_gene").alias("gene_id"),
pl.col("mouse_symbol").alias("mouse_ortholog"),
pl.col("support").str.split(",").list.len().alias("support_count"),
])
.with_columns([
pl.when(pl.col("support_count") >= 8)
.then(pl.lit("HIGH"))
.when(pl.col("support_count") >= 4)
.then(pl.lit("MEDIUM"))
.otherwise(pl.lit("LOW"))
.alias("mouse_ortholog_confidence")
])
.sort(["gene_id", "support_count"], descending=[False, True])
.group_by("gene_id")
.first()
.select(["gene_id", "mouse_ortholog", "mouse_ortholog_confidence"])
)
logger.info("mouse_orthologs_mapped", count=len(mouse_orthologs))
# Download human-zebrafish HCOP data
logger.info("fetch_hcop_zebrafish")
zebrafish_data = _download_gzipped(HCOP_HUMAN_ZEBRAFISH_URL)
zebrafish_df = pl.read_csv(
io.BytesIO(zebrafish_data),
separator="\t",
null_values=["", "NA"],
)
logger.info("hcop_zebrafish_columns", columns=zebrafish_df.columns)
# Parse zebrafish ortholog data
zebrafish_orthologs = (
zebrafish_df
.filter(pl.col("human_ensembl_gene").is_in(gene_ids))
.select([
pl.col("human_ensembl_gene").alias("gene_id"),
pl.col("zebrafish_symbol").alias("zebrafish_ortholog"),
pl.col("support").str.split(",").list.len().alias("support_count"),
])
.with_columns([
pl.when(pl.col("support_count") >= 8)
.then(pl.lit("HIGH"))
.when(pl.col("support_count") >= 4)
.then(pl.lit("MEDIUM"))
.otherwise(pl.lit("LOW"))
.alias("zebrafish_ortholog_confidence")
])
.sort(["gene_id", "support_count"], descending=[False, True])
.group_by("gene_id")
.first()
.select(["gene_id", "zebrafish_ortholog", "zebrafish_ortholog_confidence"])
)
logger.info("zebrafish_orthologs_mapped", count=len(zebrafish_orthologs))
# Create base DataFrame with all gene IDs
base_df = pl.DataFrame({"gene_id": gene_ids})
# Left join ortholog mappings
result = (
base_df
.join(mouse_orthologs, on="gene_id", how="left")
.join(zebrafish_orthologs, on="gene_id", how="left")
)
logger.info(
"fetch_ortholog_mapping_complete",
total_genes=len(result),
mouse_mapped=result.filter(pl.col("mouse_ortholog").is_not_null()).height,
zebrafish_mapped=result.filter(pl.col("zebrafish_ortholog").is_not_null()).height,
)
return result
def fetch_mgi_phenotypes(mouse_gene_symbols: list[str]) -> pl.DataFrame:
"""Fetch mouse phenotype data from MGI (Mouse Genome Informatics).
Downloads the MGI gene-phenotype report and extracts phenotype terms
for the specified mouse genes.
Args:
mouse_gene_symbols: List of mouse gene symbols
Returns:
DataFrame with columns:
- mouse_gene: Mouse gene symbol
- mp_term_id: Mammalian Phenotype term ID
- mp_term_name: Mammalian Phenotype term name
"""
if not mouse_gene_symbols:
logger.info("fetch_mgi_phenotypes_skip", reason="no_mouse_genes")
return pl.DataFrame({
"mouse_gene": [],
"mp_term_id": [],
"mp_term_name": [],
})
logger.info("fetch_mgi_phenotypes_start", gene_count=len(mouse_gene_symbols))
# Download MGI phenotype report
content = _download_text(MGI_GENE_PHENO_URL)
# Parse TSV (skip first line if it's a comment)
lines = content.strip().split("\n")
if lines[0].startswith("#"):
lines = lines[1:]
# Read as DataFrame
df = pl.read_csv(
io.StringIO("\n".join(lines)),
separator="\t",
null_values=["", "NA"],
has_header=True,
)
logger.info("mgi_raw_columns", columns=df.columns)
# MGI_GenePheno.rpt columns vary, but typically include:
# Allelic Composition, Allele Symbol(s), Genetic Background, Mammalian Phenotype ID, PubMed ID, MGI Marker Accession ID
# We need to identify the right columns
# Expected columns: marker symbol, MP ID, MP term
# Common column names: "Marker Symbol", "Mammalian Phenotype ID"
# Try to find the right columns
marker_col = None
mp_id_col = None
for col in df.columns:
col_lower = col.lower()
if "marker" in col_lower and "symbol" in col_lower:
marker_col = col
elif "mammalian phenotype id" in col_lower or "mp id" in col_lower:
mp_id_col = col
if marker_col is None or mp_id_col is None:
logger.warning("mgi_column_detection_failed", columns=df.columns)
# Return empty result
return pl.DataFrame({
"mouse_gene": [],
"mp_term_id": [],
"mp_term_name": [],
})
# Filter for genes of interest and extract phenotypes
# Note: MGI report may have one row per allele-phenotype combination
# We'll aggregate unique phenotypes per gene
result = (
df
.filter(pl.col(marker_col).is_in(mouse_gene_symbols))
.select([
pl.col(marker_col).alias("mouse_gene"),
pl.col(mp_id_col).alias("mp_term_id"),
pl.lit(None).alias("mp_term_name"), # Term name not in this report
])
.unique()
)
logger.info("fetch_mgi_phenotypes_complete", phenotype_count=len(result))
return result
def fetch_zfin_phenotypes(zebrafish_gene_symbols: list[str]) -> pl.DataFrame:
"""Fetch zebrafish phenotype data from ZFIN.
Downloads ZFIN phenotype data and extracts phenotype terms for the
specified zebrafish genes.
Args:
zebrafish_gene_symbols: List of zebrafish gene symbols
Returns:
DataFrame with columns:
- zebrafish_gene: Zebrafish gene symbol
- zp_term_id: Zebrafish Phenotype term ID (or descriptor)
- zp_term_name: Zebrafish Phenotype term name
"""
if not zebrafish_gene_symbols:
logger.info("fetch_zfin_phenotypes_skip", reason="no_zebrafish_genes")
return pl.DataFrame({
"zebrafish_gene": [],
"zp_term_id": [],
"zp_term_name": [],
})
logger.info("fetch_zfin_phenotypes_start", gene_count=len(zebrafish_gene_symbols))
# Download ZFIN phenotype data
content = _download_text(ZFIN_PHENO_URL)
# Parse TSV
df = pl.read_csv(
io.StringIO(content),
separator="\t",
null_values=["", "NA"],
has_header=True,
)
logger.info("zfin_raw_columns", columns=df.columns)
# ZFIN phenoGeneCleanData_fish.txt columns (typical):
# Gene Symbol, Gene ID, Affected Structure or Process 1 subterm ID, etc.
# Look for gene symbol and phenotype columns
gene_col = None
pheno_col = None
for col in df.columns:
col_lower = col.lower()
if "gene" in col_lower and ("symbol" in col_lower or "name" in col_lower):
gene_col = col
elif "phenotype" in col_lower or "structure" in col_lower or "process" in col_lower:
if pheno_col is None: # Take first phenotype-related column
pheno_col = col
if gene_col is None or pheno_col is None:
logger.warning("zfin_column_detection_failed", columns=df.columns)
return pl.DataFrame({
"zebrafish_gene": [],
"zp_term_id": [],
"zp_term_name": [],
})
# Filter and extract
result = (
df
.filter(pl.col(gene_col).is_in(zebrafish_gene_symbols))
.select([
pl.col(gene_col).alias("zebrafish_gene"),
pl.lit(None).alias("zp_term_id"),
pl.col(pheno_col).alias("zp_term_name"),
])
.unique()
)
logger.info("fetch_zfin_phenotypes_complete", phenotype_count=len(result))
return result
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=2, max=30),
retry=retry_if_exception_type(
(httpx.HTTPStatusError, httpx.ConnectError, httpx.TimeoutException)
),
)
def _query_impc_batch(gene_symbols: list[str]) -> pl.DataFrame:
"""Query IMPC API for a batch of genes.
Args:
gene_symbols: List of mouse gene symbols (batch)
Returns:
DataFrame with IMPC phenotype data
"""
# Build query: marker_symbol:(gene1 OR gene2 OR ...)
query = "marker_symbol:(" + " OR ".join(gene_symbols) + ")"
params = {
"q": query,
"rows": 10000,
"wt": "json",
}
logger.info("impc_query_batch", gene_count=len(gene_symbols))
response = httpx.get(IMPC_API_BASE, params=params, timeout=60.0)
response.raise_for_status()
data = response.json()
docs = data.get("response", {}).get("docs", [])
if not docs:
return pl.DataFrame({
"mouse_gene": [],
"mp_term_id": [],
"mp_term_name": [],
"p_value": [],
})
# Extract relevant fields
records = []
for doc in docs:
gene = doc.get("marker_symbol")
mp_id = doc.get("mp_term_id")
mp_name = doc.get("mp_term_name")
p_value = doc.get("p_value")
if gene and mp_id:
records.append({
"mouse_gene": gene,
"mp_term_id": mp_id,
"mp_term_name": mp_name,
"p_value": p_value,
})
df = pl.DataFrame(records)
logger.info("impc_batch_complete", phenotype_count=len(df))
return df
def fetch_impc_phenotypes(mouse_gene_symbols: list[str]) -> pl.DataFrame:
"""Fetch mouse phenotype data from IMPC (International Mouse Phenotyping Consortium).
Queries the IMPC SOLR API in batches to get phenotype data for mouse genes.
Includes statistical significance (p-value) for each phenotype.
Args:
mouse_gene_symbols: List of mouse gene symbols
Returns:
DataFrame with columns:
- mouse_gene: Mouse gene symbol
- mp_term_id: Mammalian Phenotype term ID
- mp_term_name: Mammalian Phenotype term name
- p_value: Statistical significance of phenotype
"""
if not mouse_gene_symbols:
logger.info("fetch_impc_phenotypes_skip", reason="no_mouse_genes")
return pl.DataFrame({
"mouse_gene": [],
"mp_term_id": [],
"mp_term_name": [],
"p_value": [],
})
logger.info("fetch_impc_phenotypes_start", gene_count=len(mouse_gene_symbols))
# Query in batches of 50 to avoid overloading API
batch_size = 50
all_results = []
for i in range(0, len(mouse_gene_symbols), batch_size):
batch = mouse_gene_symbols[i:i + batch_size]
try:
batch_df = _query_impc_batch(batch)
all_results.append(batch_df)
except Exception as e:
logger.warning("impc_batch_failed", batch_index=i // batch_size, error=str(e))
# Continue with other batches
if not all_results:
logger.warning("fetch_impc_phenotypes_no_results")
return pl.DataFrame({
"mouse_gene": [],
"mp_term_id": [],
"mp_term_name": [],
"p_value": [],
})
# Combine all batches
result = pl.concat(all_results, how="vertical_relaxed").unique()
logger.info("fetch_impc_phenotypes_complete", total_phenotypes=len(result))
return result