feat(03-05): implement animal model evidence fetch and transform

- models.py: AnimalModelRecord with ortholog confidence, phenotype flags, and normalized scoring
- fetch.py: Retrieve orthologs from HCOP, phenotypes from MGI/ZFIN/IMPC with retry
- transform.py: Filter sensory/cilia-relevant phenotypes, score with confidence weighting
- Ortholog confidence: HIGH (8+ sources), MEDIUM (4-7), LOW (1-3)
- Scoring: mouse +0.4, zebrafish +0.3, IMPC +0.3, weighted by confidence
- NULL preservation: no ortholog = NULL score (not zero)
This commit is contained in:
2026-02-11 19:00:24 +08:00
parent 8aa66987f8
commit 0e389c7e41
4 changed files with 1019 additions and 0 deletions

View File

@@ -0,0 +1,42 @@
"""Animal model phenotype evidence layer.
Retrieves knockout/perturbation phenotypes from:
- MGI (Mouse Genome Informatics) - mouse phenotypes
- ZFIN (Zebrafish Information Network) - zebrafish phenotypes
- IMPC (International Mouse Phenotyping Consortium) - mouse phenotypes
Maps human genes to model organism orthologs with confidence scoring,
filters for sensory/cilia-relevant phenotypes, and scores evidence.
"""
from usher_pipeline.evidence.animal_models.models import (
AnimalModelRecord,
ANIMAL_TABLE_NAME,
SENSORY_MP_KEYWORDS,
SENSORY_ZP_KEYWORDS,
)
from usher_pipeline.evidence.animal_models.fetch import (
fetch_ortholog_mapping,
fetch_mgi_phenotypes,
fetch_zfin_phenotypes,
fetch_impc_phenotypes,
)
from usher_pipeline.evidence.animal_models.transform import (
filter_sensory_phenotypes,
score_animal_evidence,
process_animal_model_evidence,
)
__all__ = [
"AnimalModelRecord",
"ANIMAL_TABLE_NAME",
"SENSORY_MP_KEYWORDS",
"SENSORY_ZP_KEYWORDS",
"fetch_ortholog_mapping",
"fetch_mgi_phenotypes",
"fetch_zfin_phenotypes",
"fetch_impc_phenotypes",
"filter_sensory_phenotypes",
"score_animal_evidence",
"process_animal_model_evidence",
]

View File

@@ -0,0 +1,497 @@
"""Fetch animal model phenotype data and ortholog mappings."""
import gzip
import io
from pathlib import Path
from typing import Optional
import httpx
import polars as pl
import structlog
from tenacity import (
retry,
stop_after_attempt,
wait_exponential,
retry_if_exception_type,
)
logger = structlog.get_logger()
# HCOP ortholog database URLs
HCOP_HUMAN_MOUSE_URL = "https://ftp.ebi.ac.uk/pub/databases/genenames/hcop/human_mouse_hcop_fifteen_column.txt.gz"
HCOP_HUMAN_ZEBRAFISH_URL = "https://ftp.ebi.ac.uk/pub/databases/genenames/hcop/human_zebrafish_hcop_fifteen_column.txt.gz"
# MGI phenotype report URL
MGI_GENE_PHENO_URL = "https://www.informatics.jax.org/downloads/reports/MGI_GenePheno.rpt"
# ZFIN phenotype data URL
ZFIN_PHENO_URL = "https://zfin.org/downloads/phenoGeneCleanData_fish.txt"
# IMPC API base URL
IMPC_API_BASE = "https://www.ebi.ac.uk/mi/impc/solr/genotype-phenotype/select"
@retry(
stop=stop_after_attempt(5),
wait=wait_exponential(multiplier=1, min=4, max=60),
retry=retry_if_exception_type(
(httpx.HTTPStatusError, httpx.ConnectError, httpx.TimeoutException)
),
)
def _download_gzipped(url: str) -> bytes:
"""Download and decompress a gzipped file.
Args:
url: URL to download
Returns:
Decompressed file content as bytes
"""
logger.info("download_start", url=url)
with httpx.stream("GET", url, timeout=120.0, follow_redirects=True) as response:
response.raise_for_status()
# Read compressed data
compressed_data = b""
for chunk in response.iter_bytes(chunk_size=8192):
compressed_data += chunk
# Decompress
logger.info("decompress_start", compressed_size_mb=round(len(compressed_data) / 1024 / 1024, 2))
decompressed = gzip.decompress(compressed_data)
logger.info("decompress_complete", decompressed_size_mb=round(len(decompressed) / 1024 / 1024, 2))
return decompressed
@retry(
stop=stop_after_attempt(5),
wait=wait_exponential(multiplier=1, min=4, max=60),
retry=retry_if_exception_type(
(httpx.HTTPStatusError, httpx.ConnectError, httpx.TimeoutException)
),
)
def _download_text(url: str) -> str:
"""Download a text file with retry.
Args:
url: URL to download
Returns:
File content as string
"""
logger.info("download_text_start", url=url)
with httpx.stream("GET", url, timeout=120.0, follow_redirects=True) as response:
response.raise_for_status()
content = response.text
logger.info("download_text_complete", size_mb=round(len(content) / 1024 / 1024, 2))
return content
def fetch_ortholog_mapping(gene_ids: list[str]) -> pl.DataFrame:
"""Fetch human-to-mouse and human-to-zebrafish ortholog mappings from HCOP.
Downloads HCOP ortholog data, assigns confidence scores based on number of
supporting databases, and handles one-to-many mappings by selecting the
ortholog with highest confidence.
Confidence scoring:
- HIGH: 8+ supporting databases
- MEDIUM: 4-7 supporting databases
- LOW: 1-3 supporting databases
Args:
gene_ids: List of human gene IDs (ENSG format)
Returns:
DataFrame with columns:
- gene_id: Human gene ID
- mouse_ortholog: Mouse gene symbol
- mouse_ortholog_confidence: HIGH/MEDIUM/LOW
- zebrafish_ortholog: Zebrafish gene symbol
- zebrafish_ortholog_confidence: HIGH/MEDIUM/LOW
"""
logger.info("fetch_ortholog_mapping_start", gene_count=len(gene_ids))
# Download human-mouse HCOP data
logger.info("fetch_hcop_mouse")
mouse_data = _download_gzipped(HCOP_HUMAN_MOUSE_URL)
mouse_df = pl.read_csv(
io.BytesIO(mouse_data),
separator="\t",
null_values=["", "NA"],
)
logger.info("hcop_mouse_columns", columns=mouse_df.columns)
# Parse mouse ortholog data
# HCOP columns: human_entrez_gene, human_ensembl_gene, hgnc_id, human_name, human_symbol,
# human_chr, human_assert_ids, mouse_entrez_gene, mouse_ensembl_gene,
# mgi_id, mouse_name, mouse_symbol, mouse_chr, mouse_assert_ids, support
mouse_orthologs = (
mouse_df
.filter(pl.col("human_ensembl_gene").is_in(gene_ids))
.select([
pl.col("human_ensembl_gene").alias("gene_id"),
pl.col("mouse_symbol").alias("mouse_ortholog"),
pl.col("support").str.split(",").list.len().alias("support_count"),
])
.with_columns([
pl.when(pl.col("support_count") >= 8)
.then(pl.lit("HIGH"))
.when(pl.col("support_count") >= 4)
.then(pl.lit("MEDIUM"))
.otherwise(pl.lit("LOW"))
.alias("mouse_ortholog_confidence")
])
.sort(["gene_id", "support_count"], descending=[False, True])
.group_by("gene_id")
.first()
.select(["gene_id", "mouse_ortholog", "mouse_ortholog_confidence"])
)
logger.info("mouse_orthologs_mapped", count=len(mouse_orthologs))
# Download human-zebrafish HCOP data
logger.info("fetch_hcop_zebrafish")
zebrafish_data = _download_gzipped(HCOP_HUMAN_ZEBRAFISH_URL)
zebrafish_df = pl.read_csv(
io.BytesIO(zebrafish_data),
separator="\t",
null_values=["", "NA"],
)
logger.info("hcop_zebrafish_columns", columns=zebrafish_df.columns)
# Parse zebrafish ortholog data
zebrafish_orthologs = (
zebrafish_df
.filter(pl.col("human_ensembl_gene").is_in(gene_ids))
.select([
pl.col("human_ensembl_gene").alias("gene_id"),
pl.col("zebrafish_symbol").alias("zebrafish_ortholog"),
pl.col("support").str.split(",").list.len().alias("support_count"),
])
.with_columns([
pl.when(pl.col("support_count") >= 8)
.then(pl.lit("HIGH"))
.when(pl.col("support_count") >= 4)
.then(pl.lit("MEDIUM"))
.otherwise(pl.lit("LOW"))
.alias("zebrafish_ortholog_confidence")
])
.sort(["gene_id", "support_count"], descending=[False, True])
.group_by("gene_id")
.first()
.select(["gene_id", "zebrafish_ortholog", "zebrafish_ortholog_confidence"])
)
logger.info("zebrafish_orthologs_mapped", count=len(zebrafish_orthologs))
# Create base DataFrame with all gene IDs
base_df = pl.DataFrame({"gene_id": gene_ids})
# Left join ortholog mappings
result = (
base_df
.join(mouse_orthologs, on="gene_id", how="left")
.join(zebrafish_orthologs, on="gene_id", how="left")
)
logger.info(
"fetch_ortholog_mapping_complete",
total_genes=len(result),
mouse_mapped=result.filter(pl.col("mouse_ortholog").is_not_null()).height,
zebrafish_mapped=result.filter(pl.col("zebrafish_ortholog").is_not_null()).height,
)
return result
def fetch_mgi_phenotypes(mouse_gene_symbols: list[str]) -> pl.DataFrame:
"""Fetch mouse phenotype data from MGI (Mouse Genome Informatics).
Downloads the MGI gene-phenotype report and extracts phenotype terms
for the specified mouse genes.
Args:
mouse_gene_symbols: List of mouse gene symbols
Returns:
DataFrame with columns:
- mouse_gene: Mouse gene symbol
- mp_term_id: Mammalian Phenotype term ID
- mp_term_name: Mammalian Phenotype term name
"""
if not mouse_gene_symbols:
logger.info("fetch_mgi_phenotypes_skip", reason="no_mouse_genes")
return pl.DataFrame({
"mouse_gene": [],
"mp_term_id": [],
"mp_term_name": [],
})
logger.info("fetch_mgi_phenotypes_start", gene_count=len(mouse_gene_symbols))
# Download MGI phenotype report
content = _download_text(MGI_GENE_PHENO_URL)
# Parse TSV (skip first line if it's a comment)
lines = content.strip().split("\n")
if lines[0].startswith("#"):
lines = lines[1:]
# Read as DataFrame
df = pl.read_csv(
io.StringIO("\n".join(lines)),
separator="\t",
null_values=["", "NA"],
has_header=True,
)
logger.info("mgi_raw_columns", columns=df.columns)
# MGI_GenePheno.rpt columns vary, but typically include:
# Allelic Composition, Allele Symbol(s), Genetic Background, Mammalian Phenotype ID, PubMed ID, MGI Marker Accession ID
# We need to identify the right columns
# Expected columns: marker symbol, MP ID, MP term
# Common column names: "Marker Symbol", "Mammalian Phenotype ID"
# Try to find the right columns
marker_col = None
mp_id_col = None
for col in df.columns:
col_lower = col.lower()
if "marker" in col_lower and "symbol" in col_lower:
marker_col = col
elif "mammalian phenotype id" in col_lower or "mp id" in col_lower:
mp_id_col = col
if marker_col is None or mp_id_col is None:
logger.warning("mgi_column_detection_failed", columns=df.columns)
# Return empty result
return pl.DataFrame({
"mouse_gene": [],
"mp_term_id": [],
"mp_term_name": [],
})
# Filter for genes of interest and extract phenotypes
# Note: MGI report may have one row per allele-phenotype combination
# We'll aggregate unique phenotypes per gene
result = (
df
.filter(pl.col(marker_col).is_in(mouse_gene_symbols))
.select([
pl.col(marker_col).alias("mouse_gene"),
pl.col(mp_id_col).alias("mp_term_id"),
pl.lit(None).alias("mp_term_name"), # Term name not in this report
])
.unique()
)
logger.info("fetch_mgi_phenotypes_complete", phenotype_count=len(result))
return result
def fetch_zfin_phenotypes(zebrafish_gene_symbols: list[str]) -> pl.DataFrame:
"""Fetch zebrafish phenotype data from ZFIN.
Downloads ZFIN phenotype data and extracts phenotype terms for the
specified zebrafish genes.
Args:
zebrafish_gene_symbols: List of zebrafish gene symbols
Returns:
DataFrame with columns:
- zebrafish_gene: Zebrafish gene symbol
- zp_term_id: Zebrafish Phenotype term ID (or descriptor)
- zp_term_name: Zebrafish Phenotype term name
"""
if not zebrafish_gene_symbols:
logger.info("fetch_zfin_phenotypes_skip", reason="no_zebrafish_genes")
return pl.DataFrame({
"zebrafish_gene": [],
"zp_term_id": [],
"zp_term_name": [],
})
logger.info("fetch_zfin_phenotypes_start", gene_count=len(zebrafish_gene_symbols))
# Download ZFIN phenotype data
content = _download_text(ZFIN_PHENO_URL)
# Parse TSV
df = pl.read_csv(
io.StringIO(content),
separator="\t",
null_values=["", "NA"],
has_header=True,
)
logger.info("zfin_raw_columns", columns=df.columns)
# ZFIN phenoGeneCleanData_fish.txt columns (typical):
# Gene Symbol, Gene ID, Affected Structure or Process 1 subterm ID, etc.
# Look for gene symbol and phenotype columns
gene_col = None
pheno_col = None
for col in df.columns:
col_lower = col.lower()
if "gene" in col_lower and ("symbol" in col_lower or "name" in col_lower):
gene_col = col
elif "phenotype" in col_lower or "structure" in col_lower or "process" in col_lower:
if pheno_col is None: # Take first phenotype-related column
pheno_col = col
if gene_col is None or pheno_col is None:
logger.warning("zfin_column_detection_failed", columns=df.columns)
return pl.DataFrame({
"zebrafish_gene": [],
"zp_term_id": [],
"zp_term_name": [],
})
# Filter and extract
result = (
df
.filter(pl.col(gene_col).is_in(zebrafish_gene_symbols))
.select([
pl.col(gene_col).alias("zebrafish_gene"),
pl.lit(None).alias("zp_term_id"),
pl.col(pheno_col).alias("zp_term_name"),
])
.unique()
)
logger.info("fetch_zfin_phenotypes_complete", phenotype_count=len(result))
return result
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=2, max=30),
retry=retry_if_exception_type(
(httpx.HTTPStatusError, httpx.ConnectError, httpx.TimeoutException)
),
)
def _query_impc_batch(gene_symbols: list[str]) -> pl.DataFrame:
"""Query IMPC API for a batch of genes.
Args:
gene_symbols: List of mouse gene symbols (batch)
Returns:
DataFrame with IMPC phenotype data
"""
# Build query: marker_symbol:(gene1 OR gene2 OR ...)
query = "marker_symbol:(" + " OR ".join(gene_symbols) + ")"
params = {
"q": query,
"rows": 10000,
"wt": "json",
}
logger.info("impc_query_batch", gene_count=len(gene_symbols))
response = httpx.get(IMPC_API_BASE, params=params, timeout=60.0)
response.raise_for_status()
data = response.json()
docs = data.get("response", {}).get("docs", [])
if not docs:
return pl.DataFrame({
"mouse_gene": [],
"mp_term_id": [],
"mp_term_name": [],
"p_value": [],
})
# Extract relevant fields
records = []
for doc in docs:
gene = doc.get("marker_symbol")
mp_id = doc.get("mp_term_id")
mp_name = doc.get("mp_term_name")
p_value = doc.get("p_value")
if gene and mp_id:
records.append({
"mouse_gene": gene,
"mp_term_id": mp_id,
"mp_term_name": mp_name,
"p_value": p_value,
})
df = pl.DataFrame(records)
logger.info("impc_batch_complete", phenotype_count=len(df))
return df
def fetch_impc_phenotypes(mouse_gene_symbols: list[str]) -> pl.DataFrame:
"""Fetch mouse phenotype data from IMPC (International Mouse Phenotyping Consortium).
Queries the IMPC SOLR API in batches to get phenotype data for mouse genes.
Includes statistical significance (p-value) for each phenotype.
Args:
mouse_gene_symbols: List of mouse gene symbols
Returns:
DataFrame with columns:
- mouse_gene: Mouse gene symbol
- mp_term_id: Mammalian Phenotype term ID
- mp_term_name: Mammalian Phenotype term name
- p_value: Statistical significance of phenotype
"""
if not mouse_gene_symbols:
logger.info("fetch_impc_phenotypes_skip", reason="no_mouse_genes")
return pl.DataFrame({
"mouse_gene": [],
"mp_term_id": [],
"mp_term_name": [],
"p_value": [],
})
logger.info("fetch_impc_phenotypes_start", gene_count=len(mouse_gene_symbols))
# Query in batches of 50 to avoid overloading API
batch_size = 50
all_results = []
for i in range(0, len(mouse_gene_symbols), batch_size):
batch = mouse_gene_symbols[i:i + batch_size]
try:
batch_df = _query_impc_batch(batch)
all_results.append(batch_df)
except Exception as e:
logger.warning("impc_batch_failed", batch_index=i // batch_size, error=str(e))
# Continue with other batches
if not all_results:
logger.warning("fetch_impc_phenotypes_no_results")
return pl.DataFrame({
"mouse_gene": [],
"mp_term_id": [],
"mp_term_name": [],
"p_value": [],
})
# Combine all batches
result = pl.concat(all_results, how="vertical_relaxed").unique()
logger.info("fetch_impc_phenotypes_complete", total_phenotypes=len(result))
return result

View File

@@ -0,0 +1,119 @@
"""Data models for animal model phenotype evidence."""
from typing import Optional
from pydantic import BaseModel, Field
# Table name for DuckDB storage
ANIMAL_TABLE_NAME = "animal_model_phenotypes"
# Mammalian Phenotype (MP) ontology keywords for sensory/cilia relevance
SENSORY_MP_KEYWORDS = [
"hearing",
"deaf",
"vestibular",
"balance",
"retina",
"photoreceptor",
"vision",
"blind",
"cochlea",
"stereocilia",
"cilia",
"cilium",
"flagellum",
"situs inversus",
"laterality",
"hydrocephalus",
"kidney cyst",
"polycystic",
]
# Zebrafish Phenotype (ZP) ontology keywords for sensory/cilia relevance
SENSORY_ZP_KEYWORDS = [
"hearing",
"deaf",
"vestibular",
"balance",
"retina",
"photoreceptor",
"vision",
"blind",
"eye",
"ear",
"otolith",
"lateral line",
"cilia",
"cilium",
"flagellum",
"situs",
"laterality",
"hydrocephalus",
"kidney cyst",
"pronephros",
]
class AnimalModelRecord(BaseModel):
"""Record representing animal model phenotype evidence for a gene.
Attributes:
gene_id: Human gene ID (ENSG)
gene_symbol: Human gene symbol
mouse_ortholog: Mouse gene symbol (MGI ID or symbol)
mouse_ortholog_confidence: Ortholog confidence (HIGH/MEDIUM/LOW based on HCOP support)
zebrafish_ortholog: Zebrafish gene symbol (ZFIN ID or symbol)
zebrafish_ortholog_confidence: Ortholog confidence (HIGH/MEDIUM/LOW)
has_mouse_phenotype: Whether mouse ortholog has phenotypes in MGI
has_zebrafish_phenotype: Whether zebrafish ortholog has phenotypes in ZFIN
has_impc_phenotype: Whether mouse ortholog has phenotypes in IMPC
sensory_phenotype_count: Number of sensory-relevant phenotypes across all sources
phenotype_categories: Semicolon-separated list of matched phenotype terms
animal_model_score_normalized: Composite animal model evidence score (0-1 range)
"""
gene_id: str = Field(..., description="Human gene ID (ENSG)")
gene_symbol: str = Field(..., description="Human gene symbol")
mouse_ortholog: Optional[str] = Field(None, description="Mouse gene symbol/ID")
mouse_ortholog_confidence: Optional[str] = Field(
None,
description="Ortholog confidence: HIGH/MEDIUM/LOW"
)
zebrafish_ortholog: Optional[str] = Field(None, description="Zebrafish gene symbol/ID")
zebrafish_ortholog_confidence: Optional[str] = Field(
None,
description="Ortholog confidence: HIGH/MEDIUM/LOW"
)
has_mouse_phenotype: Optional[bool] = Field(
None,
description="Mouse ortholog has phenotypes in MGI"
)
has_zebrafish_phenotype: Optional[bool] = Field(
None,
description="Zebrafish ortholog has phenotypes in ZFIN"
)
has_impc_phenotype: Optional[bool] = Field(
None,
description="Mouse ortholog has phenotypes in IMPC"
)
sensory_phenotype_count: Optional[int] = Field(
None,
description="Number of sensory-relevant phenotypes"
)
phenotype_categories: Optional[str] = Field(
None,
description="Semicolon-separated matched phenotype terms"
)
animal_model_score_normalized: Optional[float] = Field(
None,
ge=0.0,
le=1.0,
description="Composite animal model evidence score (0-1)"
)

View File

@@ -0,0 +1,361 @@
"""Transform animal model phenotype data: filter and score."""
import polars as pl
import structlog
from usher_pipeline.evidence.animal_models.models import (
SENSORY_MP_KEYWORDS,
SENSORY_ZP_KEYWORDS,
)
from usher_pipeline.evidence.animal_models.fetch import (
fetch_ortholog_mapping,
fetch_mgi_phenotypes,
fetch_zfin_phenotypes,
fetch_impc_phenotypes,
)
logger = structlog.get_logger()
def filter_sensory_phenotypes(
phenotype_df: pl.DataFrame,
keywords: list[str],
term_column: str = "mp_term_name"
) -> pl.DataFrame:
"""Filter phenotypes for sensory/cilia relevance using keyword matching.
Performs case-insensitive substring matching against phenotype terms.
Returns only rows where the phenotype term matches at least one keyword.
Args:
phenotype_df: DataFrame with phenotype terms
keywords: List of keywords to match (e.g., SENSORY_MP_KEYWORDS)
term_column: Name of column containing phenotype term names
Returns:
Filtered DataFrame with only sensory-relevant phenotypes
"""
if phenotype_df.is_empty():
return phenotype_df
logger.info("filter_sensory_phenotypes_start", row_count=len(phenotype_df))
# Create case-insensitive match condition
# Match if ANY keyword appears as substring in term
match_condition = pl.lit(False)
for keyword in keywords:
match_condition = match_condition | pl.col(term_column).str.to_lowercase().str.contains(keyword.lower())
# Filter phenotypes
filtered = phenotype_df.filter(match_condition)
logger.info(
"filter_sensory_phenotypes_complete",
input_count=len(phenotype_df),
output_count=len(filtered),
filtered_pct=round(100 * len(filtered) / len(phenotype_df), 1) if len(phenotype_df) > 0 else 0,
)
return filtered
def score_animal_evidence(df: pl.DataFrame) -> pl.DataFrame:
"""Compute animal model evidence scores with ortholog confidence weighting.
Scoring formula:
- Base score = 0 if no phenotypes
- For each organism with sensory phenotypes:
* Mouse (MGI): +0.4 weighted by ortholog confidence
* Zebrafish (ZFIN): +0.3 weighted by ortholog confidence
* IMPC: +0.3 (independent confirmation bonus)
- Confidence weighting: HIGH=1.0, MEDIUM=0.7, LOW=0.4
- Multiply by log2(sensory_phenotype_count + 1) / log2(max_count + 1) to reward multiple phenotypes
- Clamp to [0, 1]
- NULL if no ortholog mapping exists
Args:
df: DataFrame with ortholog mappings and phenotype flags
Returns:
DataFrame with added animal_model_score_normalized column
"""
logger.info("score_animal_evidence_start", gene_count=len(df))
# Define confidence weights
confidence_weight = pl.when(pl.col("confidence") == "HIGH").then(1.0)\
.when(pl.col("confidence") == "MEDIUM").then(0.7)\
.when(pl.col("confidence") == "LOW").then(0.4)\
.otherwise(0.0)
# Score for mouse phenotypes (MGI)
mouse_score = (
pl.when(pl.col("has_mouse_phenotype") == True)
.then(
0.4 * pl.when(pl.col("mouse_ortholog_confidence") == "HIGH").then(1.0)
.when(pl.col("mouse_ortholog_confidence") == "MEDIUM").then(0.7)
.when(pl.col("mouse_ortholog_confidence") == "LOW").then(0.4)
.otherwise(0.0)
)
.otherwise(0.0)
)
# Score for zebrafish phenotypes (ZFIN)
zebrafish_score = (
pl.when(pl.col("has_zebrafish_phenotype") == True)
.then(
0.3 * pl.when(pl.col("zebrafish_ortholog_confidence") == "HIGH").then(1.0)
.when(pl.col("zebrafish_ortholog_confidence") == "MEDIUM").then(0.7)
.when(pl.col("zebrafish_ortholog_confidence") == "LOW").then(0.4)
.otherwise(0.0)
)
.otherwise(0.0)
)
# Score for IMPC phenotypes (independent confirmation)
impc_score = (
pl.when(pl.col("has_impc_phenotype") == True)
.then(0.3)
.otherwise(0.0)
)
# Combine scores
base_score = mouse_score + zebrafish_score + impc_score
# Get max sensory phenotype count for normalization
max_count = df.select(pl.col("sensory_phenotype_count").max()).item()
if max_count is None or max_count == 0:
max_count = 1 # Avoid division by zero
# Apply phenotype count scaling (diminishing returns via log)
# log2(count + 1) / log2(max_count + 1)
import math
max_log = math.log2(max_count + 1)
phenotype_scaling = (
pl.when(pl.col("sensory_phenotype_count").is_not_null())
.then((pl.col("sensory_phenotype_count") + 1).log(2) / max_log)
.otherwise(0.0)
)
# Final score: base_score * phenotype_scaling, clamped to [0, 1]
# NULL if no ortholog mapping
animal_model_score = (
pl.when(
pl.col("mouse_ortholog").is_null() & pl.col("zebrafish_ortholog").is_null()
)
.then(None)
.otherwise(
(base_score * phenotype_scaling).clip(0.0, 1.0)
)
.alias("animal_model_score_normalized")
)
result = df.with_columns([animal_model_score])
logger.info(
"score_animal_evidence_complete",
scored_genes=result.filter(pl.col("animal_model_score_normalized").is_not_null()).height,
null_genes=result.filter(pl.col("animal_model_score_normalized").is_null()).height,
)
return result
def process_animal_model_evidence(gene_ids: list[str]) -> pl.DataFrame:
"""End-to-end processing of animal model phenotype evidence.
Executes the full pipeline:
1. Fetch ortholog mappings (mouse and zebrafish)
2. Fetch phenotypes from MGI, ZFIN, and IMPC
3. Filter for sensory/cilia-relevant phenotypes
4. Aggregate phenotypes by gene
5. Score evidence with confidence weighting
Args:
gene_ids: List of human gene IDs (ENSG format)
Returns:
DataFrame with animal model evidence for each gene
"""
logger.info("process_animal_model_evidence_start", gene_count=len(gene_ids))
# Step 1: Fetch ortholog mappings
logger.info("step_1_fetch_orthologs")
orthologs = fetch_ortholog_mapping(gene_ids)
# Extract lists of orthologs to query
mouse_genes = orthologs.filter(pl.col("mouse_ortholog").is_not_null())["mouse_ortholog"].to_list()
zebrafish_genes = orthologs.filter(pl.col("zebrafish_ortholog").is_not_null())["zebrafish_ortholog"].to_list()
logger.info(
"orthologs_extracted",
mouse_count=len(mouse_genes),
zebrafish_count=len(zebrafish_genes),
)
# Step 2: Fetch phenotypes
logger.info("step_2_fetch_phenotypes")
# MGI phenotypes
mgi_pheno = fetch_mgi_phenotypes(mouse_genes)
mgi_sensory = filter_sensory_phenotypes(mgi_pheno, SENSORY_MP_KEYWORDS, "mp_term_name")
# ZFIN phenotypes
zfin_pheno = fetch_zfin_phenotypes(zebrafish_genes)
zfin_sensory = filter_sensory_phenotypes(zfin_pheno, SENSORY_ZP_KEYWORDS, "zp_term_name")
# IMPC phenotypes
impc_pheno = fetch_impc_phenotypes(mouse_genes)
impc_sensory = filter_sensory_phenotypes(impc_pheno, SENSORY_MP_KEYWORDS, "mp_term_name")
logger.info(
"phenotypes_filtered",
mgi_total=len(mgi_pheno),
mgi_sensory=len(mgi_sensory),
zfin_total=len(zfin_pheno),
zfin_sensory=len(zfin_sensory),
impc_total=len(impc_pheno),
impc_sensory=len(impc_sensory),
)
# Step 3: Aggregate phenotypes by gene
logger.info("step_3_aggregate_phenotypes")
# Count sensory phenotypes per mouse gene
if not mgi_sensory.is_empty():
mgi_counts = (
mgi_sensory
.group_by("mouse_gene")
.agg([
pl.col("mp_term_name").count().alias("mgi_phenotype_count"),
pl.col("mp_term_name").str.concat("; ").alias("mgi_terms"),
])
)
else:
mgi_counts = pl.DataFrame({
"mouse_gene": [],
"mgi_phenotype_count": [],
"mgi_terms": [],
})
# Count sensory phenotypes per zebrafish gene
if not zfin_sensory.is_empty():
zfin_counts = (
zfin_sensory
.group_by("zebrafish_gene")
.agg([
pl.col("zp_term_name").count().alias("zfin_phenotype_count"),
pl.col("zp_term_name").str.concat("; ").alias("zfin_terms"),
])
)
else:
zfin_counts = pl.DataFrame({
"zebrafish_gene": [],
"zfin_phenotype_count": [],
"zfin_terms": [],
})
# Count sensory phenotypes per mouse gene from IMPC
if not impc_sensory.is_empty():
impc_counts = (
impc_sensory
.group_by("mouse_gene")
.agg([
pl.col("mp_term_name").count().alias("impc_phenotype_count"),
pl.col("mp_term_name").str.concat("; ").alias("impc_terms"),
])
)
else:
impc_counts = pl.DataFrame({
"mouse_gene": [],
"impc_phenotype_count": [],
"impc_terms": [],
})
# Step 4: Join phenotype data with ortholog mappings
logger.info("step_4_join_data")
result = (
orthologs
# Join MGI phenotypes
.join(
mgi_counts,
left_on="mouse_ortholog",
right_on="mouse_gene",
how="left"
)
# Join ZFIN phenotypes
.join(
zfin_counts,
left_on="zebrafish_ortholog",
right_on="zebrafish_gene",
how="left"
)
# Join IMPC phenotypes
.join(
impc_counts,
left_on="mouse_ortholog",
right_on="mouse_gene",
how="left"
)
# Add flags
.with_columns([
(pl.col("mgi_phenotype_count") > 0).alias("has_mouse_phenotype"),
(pl.col("zfin_phenotype_count") > 0).alias("has_zebrafish_phenotype"),
(pl.col("impc_phenotype_count") > 0).alias("has_impc_phenotype"),
])
# Calculate total sensory phenotype count
.with_columns([
(
pl.col("mgi_phenotype_count").fill_null(0) +
pl.col("zfin_phenotype_count").fill_null(0) +
pl.col("impc_phenotype_count").fill_null(0)
).alias("sensory_phenotype_count")
])
# Combine phenotype terms
.with_columns([
pl.concat_str([
pl.col("mgi_terms").fill_null(""),
pl.col("zfin_terms").fill_null(""),
pl.col("impc_terms").fill_null(""),
], separator="; ").str.replace_all("; ; ", "; ").str.strip_chars("; ").alias("phenotype_categories")
])
# Set sensory_phenotype_count to NULL if zero (preserve NULL pattern)
.with_columns([
pl.when(pl.col("sensory_phenotype_count") == 0)
.then(None)
.otherwise(pl.col("sensory_phenotype_count"))
.alias("sensory_phenotype_count")
])
# Select final columns
.select([
"gene_id",
"mouse_ortholog",
"mouse_ortholog_confidence",
"zebrafish_ortholog",
"zebrafish_ortholog_confidence",
"has_mouse_phenotype",
"has_zebrafish_phenotype",
"has_impc_phenotype",
"sensory_phenotype_count",
"phenotype_categories",
])
)
# Step 5: Score evidence
logger.info("step_5_score_evidence")
result = score_animal_evidence(result)
logger.info(
"process_animal_model_evidence_complete",
total_genes=len(result),
with_orthologs=result.filter(
pl.col("mouse_ortholog").is_not_null() | pl.col("zebrafish_ortholog").is_not_null()
).height,
with_sensory_phenotypes=result.filter(
pl.col("sensory_phenotype_count").is_not_null()
).height,
)
return result