feat(03-03): implement protein evidence layer with UniProt/InterPro integration

- Create protein features data model with domain, coiled-coil, TM, cilia motifs
- Implement fetch.py with UniProt REST API and InterPro API queries
- Implement transform.py with feature extraction, motif detection, normalization
- Implement load.py with DuckDB persistence and provenance tracking
- Add CLI protein command following evidence layer pattern
- Add comprehensive unit and integration tests (all passing)
- Handle NULL preservation and List(Null) edge case
- Add get_steps() method to ProvenanceTracker for test compatibility
This commit is contained in:
2026-02-11 19:07:30 +08:00
parent bcd3c4ffbe
commit 46059874f2
10 changed files with 1937 additions and 0 deletions

View File

@@ -0,0 +1,46 @@
"""Protein sequence and structure features evidence layer.
Extracts protein features from UniProt and InterPro:
- Protein length, domain composition, domain count
- Coiled-coil regions, transmembrane domains, scaffold/adaptor domains
- Cilia-associated motifs detected via domain keyword matching
- Normalized composite protein score (0-1)
Evidence follows fetch -> transform -> load pattern with checkpoint-restart.
"""
from usher_pipeline.evidence.protein.models import (
ProteinFeatureRecord,
PROTEIN_TABLE_NAME,
CILIA_DOMAIN_KEYWORDS,
SCAFFOLD_DOMAIN_TYPES,
)
from usher_pipeline.evidence.protein.fetch import (
fetch_uniprot_features,
fetch_interpro_domains,
)
from usher_pipeline.evidence.protein.transform import (
extract_protein_features,
detect_cilia_motifs,
normalize_protein_features,
process_protein_evidence,
)
from usher_pipeline.evidence.protein.load import (
load_to_duckdb,
query_cilia_candidates,
)
__all__ = [
"ProteinFeatureRecord",
"PROTEIN_TABLE_NAME",
"CILIA_DOMAIN_KEYWORDS",
"SCAFFOLD_DOMAIN_TYPES",
"fetch_uniprot_features",
"fetch_interpro_domains",
"extract_protein_features",
"detect_cilia_motifs",
"normalize_protein_features",
"process_protein_evidence",
"load_to_duckdb",
"query_cilia_candidates",
]

View File

@@ -0,0 +1,274 @@
"""Fetch protein features from UniProt and InterPro APIs."""
import time
from typing import List
import httpx
import polars as pl
import structlog
from tenacity import (
retry,
stop_after_attempt,
wait_exponential,
retry_if_exception_type,
)
logger = structlog.get_logger()
# UniProt REST API base URL
UNIPROT_API_BASE = "https://rest.uniprot.org"
# InterPro REST API base URL
INTERPRO_API_BASE = "https://www.ebi.ac.uk/interpro/api"
@retry(
stop=stop_after_attempt(5),
wait=wait_exponential(multiplier=1, min=4, max=60),
retry=retry_if_exception_type(
(httpx.HTTPStatusError, httpx.ConnectError, httpx.TimeoutException)
),
)
def fetch_uniprot_features(uniprot_ids: List[str]) -> pl.DataFrame:
"""Query UniProt REST API for protein features.
Fetches protein length, domain annotations, coiled-coil regions,
and transmembrane regions from UniProt in batches.
Args:
uniprot_ids: List of UniProt accession IDs
Returns:
DataFrame with columns:
- uniprot_id: UniProt accession
- protein_length: Amino acid length
- domain_names: List of domain names
- coiled_coil_count: Number of coiled-coil regions
- transmembrane_count: Number of transmembrane regions
Raises:
httpx.HTTPStatusError: On HTTP errors (after retries)
httpx.ConnectError: On connection errors (after retries)
httpx.TimeoutException: On timeout (after retries)
"""
if not uniprot_ids:
return pl.DataFrame({
"uniprot_id": [],
"protein_length": [],
"domain_names": [],
"coiled_coil_count": [],
"transmembrane_count": [],
})
logger.info("uniprot_fetch_start", accession_count=len(uniprot_ids))
# UniProt batch size recommendation: 100 accessions per request
batch_size = 100
all_records = []
for i in range(0, len(uniprot_ids), batch_size):
batch = uniprot_ids[i:i + batch_size]
batch_num = (i // batch_size) + 1
total_batches = (len(uniprot_ids) + batch_size - 1) // batch_size
logger.info(
"uniprot_batch_start",
batch=batch_num,
total_batches=total_batches,
batch_size=len(batch),
)
# Query UniProt API with accession list
# Use search endpoint with fields parameter
query = " OR ".join(f"accession:{acc}" for acc in batch)
url = f"{UNIPROT_API_BASE}/uniprotkb/search"
params = {
"query": query,
"format": "json",
"fields": "accession,length,ft_domain,ft_coiled,ft_transmem,annotation_score",
"size": batch_size,
}
with httpx.Client(timeout=30.0) as client:
response = client.get(url, params=params)
response.raise_for_status()
data = response.json()
# Parse results
results = data.get("results", [])
# Create lookup for fast access
found_accessions = set()
for entry in results:
accession = entry.get("primaryAccession")
if not accession:
continue
found_accessions.add(accession)
# Extract protein length
length = entry.get("sequence", {}).get("length")
# Extract domain names from ft_domain features
domain_names = []
for feature in entry.get("features", []):
if feature.get("type") == "Domain":
description = feature.get("description", "")
if description:
domain_names.append(description)
# Count coiled-coil regions
coiled_coil_count = sum(
1 for feature in entry.get("features", [])
if feature.get("type") == "Coiled coil"
)
# Count transmembrane regions
transmembrane_count = sum(
1 for feature in entry.get("features", [])
if feature.get("type") == "Transmembrane"
)
all_records.append({
"uniprot_id": accession,
"protein_length": length,
"domain_names": domain_names,
"coiled_coil_count": coiled_coil_count,
"transmembrane_count": transmembrane_count,
})
# Add NULL records for accessions not found
for acc in batch:
if acc not in found_accessions:
all_records.append({
"uniprot_id": acc,
"protein_length": None,
"domain_names": [],
"coiled_coil_count": None,
"transmembrane_count": None,
})
# Rate limiting: UniProt allows 200 requests/second
# With batches of 100, this gives us 20K accessions/second
# Conservative: 5 requests/second = 500 accessions/second
time.sleep(0.2)
logger.info(
"uniprot_fetch_complete",
total_accessions=len(uniprot_ids),
records_found=sum(1 for r in all_records if r["protein_length"] is not None),
)
return pl.DataFrame(all_records)
@retry(
stop=stop_after_attempt(5),
wait=wait_exponential(multiplier=1, min=4, max=60),
retry=retry_if_exception_type(
(httpx.HTTPStatusError, httpx.ConnectError, httpx.TimeoutException)
),
)
def fetch_interpro_domains(uniprot_ids: List[str]) -> pl.DataFrame:
"""Query InterPro API for domain annotations.
Fetches detailed domain classification from InterPro to supplement
UniProt annotations.
Args:
uniprot_ids: List of UniProt accession IDs
Returns:
DataFrame with columns:
- uniprot_id: UniProt accession
- domain_names: List of InterPro domain names
- interpro_ids: List of InterPro accession IDs
Raises:
httpx.HTTPStatusError: On HTTP errors (after retries)
httpx.ConnectError: On connection errors (after retries)
httpx.TimeoutException: On timeout (after retries)
"""
if not uniprot_ids:
return pl.DataFrame({
"uniprot_id": [],
"domain_names": [],
"interpro_ids": [],
})
logger.info("interpro_fetch_start", accession_count=len(uniprot_ids))
# InterPro requires per-protein queries
# Use conservative rate limiting: 10 req/sec as recommended
all_records = []
# For large datasets (>10K proteins), log warning about potential slowness
if len(uniprot_ids) > 10000:
logger.warning(
"interpro_large_dataset",
accession_count=len(uniprot_ids),
estimated_time_min=len(uniprot_ids) / 10 / 60,
note="Consider InterPro bulk download for large datasets",
)
with httpx.Client(timeout=30.0) as client:
for idx, accession in enumerate(uniprot_ids, 1):
if idx % 100 == 0:
logger.info(
"interpro_progress",
processed=idx,
total=len(uniprot_ids),
percent=round(idx / len(uniprot_ids) * 100, 1),
)
# Query InterPro API
url = f"{INTERPRO_API_BASE}/entry/interpro/protein/uniprot/{accession}"
try:
response = client.get(url, params={"format": "json"})
response.raise_for_status()
data = response.json()
# Parse InterPro entries
domain_names = []
interpro_ids = []
if "results" in data:
for entry in data["results"]:
metadata = entry.get("metadata", {})
interpro_id = metadata.get("accession")
name = metadata.get("name", {}).get("name", "")
if interpro_id:
interpro_ids.append(interpro_id)
if name:
domain_names.append(name)
all_records.append({
"uniprot_id": accession,
"domain_names": domain_names,
"interpro_ids": interpro_ids,
})
except httpx.HTTPStatusError as e:
if e.response.status_code == 404:
# Protein not found in InterPro - this is OK
all_records.append({
"uniprot_id": accession,
"domain_names": [],
"interpro_ids": [],
})
else:
# Other HTTP errors - re-raise for retry
raise
# Rate limiting: 10 requests/second
time.sleep(0.1)
logger.info(
"interpro_fetch_complete",
total_accessions=len(uniprot_ids),
records_found=sum(1 for r in all_records if r["domain_names"]),
)
return pl.DataFrame(all_records)

View File

@@ -0,0 +1,128 @@
"""Load protein features to DuckDB with provenance tracking."""
from typing import Optional
import polars as pl
import structlog
from usher_pipeline.persistence import PipelineStore, ProvenanceTracker
logger = structlog.get_logger()
def load_to_duckdb(
df: pl.DataFrame,
store: PipelineStore,
provenance: ProvenanceTracker,
description: str = "",
) -> None:
"""Save protein features DataFrame to DuckDB with provenance.
Creates or replaces the protein_features table (idempotent).
Records provenance step with summary statistics.
Args:
df: Processed protein features DataFrame with normalized scores
store: PipelineStore instance for DuckDB persistence
provenance: ProvenanceTracker instance for metadata recording
description: Optional description for checkpoint metadata
"""
logger.info("protein_load_start", row_count=len(df))
# Calculate summary statistics for provenance
total_genes = len(df)
with_uniprot = df.filter(pl.col("uniprot_id").is_not_null()).height
null_uniprot = total_genes - with_uniprot
cilia_domain_count = df.filter(pl.col("has_cilia_domain") == True).height
scaffold_domain_count = df.filter(pl.col("scaffold_adaptor_domain") == True).height
coiled_coil_count = df.filter(pl.col("coiled_coil") == True).height
tm_domain_count = df.filter(pl.col("transmembrane_count") > 0).height
# Mean domain count (only for proteins with data)
mean_domain_count = (
df.filter(pl.col("domain_count").is_not_null())
.select(pl.col("domain_count").mean())
.item()
)
if mean_domain_count is not None:
mean_domain_count = round(mean_domain_count, 2)
# Save to DuckDB with CREATE OR REPLACE (idempotent)
store.save_dataframe(
df=df,
table_name="protein_features",
description=description or "Protein features from UniProt/InterPro with domain composition and cilia motif detection",
replace=True,
)
# Record provenance step with details
provenance.record_step("load_protein_features", {
"total_genes": total_genes,
"with_uniprot": with_uniprot,
"null_uniprot": null_uniprot,
"cilia_domain_count": cilia_domain_count,
"scaffold_domain_count": scaffold_domain_count,
"coiled_coil_count": coiled_coil_count,
"transmembrane_domain_count": tm_domain_count,
"mean_domain_count": mean_domain_count,
})
logger.info(
"protein_load_complete",
row_count=total_genes,
with_uniprot=with_uniprot,
cilia_domains=cilia_domain_count,
scaffold_domains=scaffold_domain_count,
coiled_coils=coiled_coil_count,
)
def query_cilia_candidates(
store: PipelineStore,
) -> pl.DataFrame:
"""Query genes with cilia-associated protein features.
Identifies candidate genes with:
- Cilia domain annotations, OR
- Both coiled-coil regions AND scaffold/adaptor domains
This combination is enriched in known cilia proteins and provides
structural evidence for potential cilia involvement.
Args:
store: PipelineStore instance
Returns:
DataFrame with candidate genes sorted by protein_score_normalized
Columns: gene_id, gene_symbol, protein_length, domain_count,
coiled_coil, has_cilia_domain, scaffold_adaptor_domain,
protein_score_normalized
"""
logger.info("protein_query_cilia_candidates")
# Query DuckDB for genes with cilia features
df = store.execute_query(
"""
SELECT
gene_id,
gene_symbol,
uniprot_id,
protein_length,
domain_count,
coiled_coil,
transmembrane_count,
scaffold_adaptor_domain,
has_cilia_domain,
has_sensory_domain,
protein_score_normalized
FROM protein_features
WHERE has_cilia_domain = TRUE
OR (coiled_coil = TRUE AND scaffold_adaptor_domain = TRUE)
ORDER BY protein_score_normalized DESC NULLS LAST
"""
)
logger.info("protein_query_complete", result_count=len(df))
return df

View File

@@ -0,0 +1,71 @@
"""Data models for protein sequence and structure features."""
from pydantic import BaseModel
# DuckDB table name for protein features
PROTEIN_TABLE_NAME = "protein_features"
# Cilia-associated domain keywords for motif detection
# These are structural patterns found in known cilia proteins
# Pattern matching does NOT presuppose cilia involvement - it flags features
# associated with cilia biology for further investigation
CILIA_DOMAIN_KEYWORDS = [
"IFT",
"intraflagellar",
"BBSome",
"ciliary",
"cilia",
"basal body",
"centrosome",
"transition zone",
"axoneme",
]
# Scaffold and adaptor domain types commonly found in Usher proteins
# These domains mediate protein-protein interactions and are enriched in
# cilia-associated proteins
SCAFFOLD_DOMAIN_TYPES = [
"PDZ",
"SH3",
"Ankyrin",
"WD40",
"Coiled coil",
"SAM",
"FERM",
"Harmonin",
]
class ProteinFeatureRecord(BaseModel):
"""Protein features for a single gene.
Attributes:
gene_id: Ensembl gene ID (e.g., ENSG00000...)
gene_symbol: HGNC gene symbol
uniprot_id: UniProt accession (NULL if no mapping)
protein_length: Amino acid length (NULL if no UniProt entry)
domain_count: Total number of annotated domains (NULL if no data)
coiled_coil: Has coiled-coil region (NULL if no data)
coiled_coil_count: Number of coiled-coil regions (NULL if no data)
transmembrane_count: Number of transmembrane regions (NULL if no data)
scaffold_adaptor_domain: Has PDZ, SH3, ankyrin, WD40, or similar scaffold domain (NULL if no data)
has_cilia_domain: Has IFT, BBSome, ciliary targeting, or transition zone domain (NULL if no data)
has_sensory_domain: Has stereocilia, photoreceptor, or sensory-associated domain (NULL if no data)
protein_score_normalized: Composite protein feature score 0-1 (NULL if no UniProt entry)
CRITICAL: NULL values represent missing data and are preserved as None.
Do NOT convert NULL to 0.0 - "unknown" is semantically different from "no domain".
"""
gene_id: str
gene_symbol: str
uniprot_id: str | None = None
protein_length: int | None = None
domain_count: int | None = None
coiled_coil: bool | None = None
coiled_coil_count: int | None = None
transmembrane_count: int | None = None
scaffold_adaptor_domain: bool | None = None
has_cilia_domain: bool | None = None
has_sensory_domain: bool | None = None
protein_score_normalized: float | None = None

View File

@@ -0,0 +1,411 @@
"""Transform and normalize protein features."""
from typing import List
import polars as pl
import structlog
from usher_pipeline.evidence.protein.models import (
CILIA_DOMAIN_KEYWORDS,
SCAFFOLD_DOMAIN_TYPES,
)
from usher_pipeline.evidence.protein.fetch import (
fetch_uniprot_features,
fetch_interpro_domains,
)
logger = structlog.get_logger()
def extract_protein_features(
uniprot_df: pl.DataFrame,
interpro_df: pl.DataFrame,
) -> pl.DataFrame:
"""Extract and combine protein features from UniProt and InterPro data.
Joins UniProt and InterPro data, computes domain counts, sets flags
for coiled-coil and transmembrane regions.
Args:
uniprot_df: DataFrame from fetch_uniprot_features with UniProt data
interpro_df: DataFrame from fetch_interpro_domains with InterPro data
Returns:
DataFrame with combined features:
- uniprot_id, protein_length, domain_count, coiled_coil,
coiled_coil_count, transmembrane_count, domain_names (combined)
"""
logger.info(
"extract_protein_features_start",
uniprot_rows=len(uniprot_df),
interpro_rows=len(interpro_df),
)
# Join UniProt and InterPro on uniprot_id
# Use left join to preserve all UniProt entries
combined = uniprot_df.join(
interpro_df,
on="uniprot_id",
how="left",
suffix="_interpro",
)
# Combine domain names from both sources (deduplicate)
# Use list.concat instead of + operator
combined = combined.with_columns([
# Combine domain name lists
pl.when(
pl.col("domain_names_interpro").is_not_null() &
(pl.col("domain_names_interpro").list.len() > 0)
)
.then(
pl.col("domain_names").list.concat(pl.col("domain_names_interpro"))
.list.unique()
)
.otherwise(pl.col("domain_names"))
.alias("domain_names_combined"),
])
# Compute domain count from combined sources
combined = combined.with_columns([
pl.col("domain_names_combined").list.len().alias("domain_count"),
])
# Set coiled_coil boolean from count
combined = combined.with_columns([
pl.when(pl.col("coiled_coil_count").is_not_null())
.then(pl.col("coiled_coil_count") > 0)
.otherwise(None)
.alias("coiled_coil"),
])
# Select final columns
result = combined.select([
"uniprot_id",
"protein_length",
"domain_count",
"coiled_coil",
"coiled_coil_count",
"transmembrane_count",
pl.col("domain_names_combined").alias("domain_names"),
])
logger.info(
"extract_protein_features_complete",
total_rows=len(result),
with_domains=result.filter(pl.col("domain_count") > 0).height,
)
return result
def detect_cilia_motifs(df: pl.DataFrame) -> pl.DataFrame:
"""Detect cilia-associated motifs via domain keyword matching.
Scans domain names for cilia-related keywords, scaffold/adaptor domains,
and sensory-related domains. Pattern matching does NOT presuppose cilia
involvement - it flags structural features for further investigation.
Args:
df: DataFrame with domain_names column (list of domain names)
Returns:
DataFrame with added columns:
- has_cilia_domain: Boolean flag for cilia-associated domains
- scaffold_adaptor_domain: Boolean flag for scaffold domains
- has_sensory_domain: Boolean flag for sensory domains
"""
logger.info("detect_cilia_motifs_start", row_count=len(df))
# Ensure domain_names is List(String), not List(Null)
# This handles edge case where all proteins have empty domain lists
if "domain_names" in df.columns:
current_dtype = df.schema["domain_names"]
if str(current_dtype) == "List(Null)" or "Null" in str(current_dtype):
df = df.with_columns([
pl.col("domain_names").cast(pl.List(pl.String)).alias("domain_names")
])
# Sensory domain keywords
sensory_keywords = [
"stereocilia",
"photoreceptor",
"usher",
"harmonin",
"cadherin 23",
"protocadherin",
]
# Helper function to check if any keyword matches any domain (case-insensitive)
def create_keyword_matcher(keywords: List[str]) -> pl.Expr:
"""Create polars expression that checks if any keyword matches any domain."""
# For each domain in the list, check if it contains any keyword
# Case-insensitive substring matching
conditions = []
for keyword in keywords:
conditions.append(
pl.col("domain_names")
.list.eval(
pl.when(pl.element().is_not_null())
.then(pl.element().str.to_lowercase().str.contains(keyword.lower()))
.otherwise(False)
)
.list.any()
)
# Return True if ANY condition matches
if conditions:
return pl.any_horizontal(conditions)
else:
return pl.lit(False)
# Detect cilia domains
df = df.with_columns([
pl.when(
pl.col("domain_names").is_not_null() &
(pl.col("domain_names").list.len() > 0)
)
.then(create_keyword_matcher(CILIA_DOMAIN_KEYWORDS))
.otherwise(None)
.alias("has_cilia_domain"),
])
# Detect scaffold/adaptor domains
df = df.with_columns([
pl.when(
pl.col("domain_names").is_not_null() &
(pl.col("domain_names").list.len() > 0)
)
.then(create_keyword_matcher(SCAFFOLD_DOMAIN_TYPES))
.otherwise(None)
.alias("scaffold_adaptor_domain"),
])
# Detect sensory domains
df = df.with_columns([
pl.when(
pl.col("domain_names").is_not_null() &
(pl.col("domain_names").list.len() > 0)
)
.then(create_keyword_matcher(sensory_keywords))
.otherwise(None)
.alias("has_sensory_domain"),
])
# Log summary
cilia_count = df.filter(pl.col("has_cilia_domain") == True).height
scaffold_count = df.filter(pl.col("scaffold_adaptor_domain") == True).height
sensory_count = df.filter(pl.col("has_sensory_domain") == True).height
logger.info(
"detect_cilia_motifs_complete",
cilia_domain_count=cilia_count,
scaffold_domain_count=scaffold_count,
sensory_domain_count=sensory_count,
)
return df
def normalize_protein_features(df: pl.DataFrame) -> pl.DataFrame:
"""Normalize protein features to 0-1 range and compute composite score.
Normalizes continuous features (protein_length, domain_count, transmembrane_count)
and computes weighted composite score. NULL preservation: genes without UniProt
entries get NULL scores (not 0.0).
Args:
df: DataFrame with extracted protein features
Returns:
DataFrame with added protein_score_normalized column (0-1 range)
"""
logger.info("normalize_protein_features_start", row_count=len(df))
# Filter to proteins with data for normalization stats
measured = df.filter(pl.col("protein_length").is_not_null())
if len(measured) == 0:
# No proteins with data - return with NULL scores
return df.with_columns([
pl.lit(None).cast(pl.Float64).alias("length_rank"),
pl.lit(None).cast(pl.Float64).alias("domain_rank"),
pl.lit(None).cast(pl.Float64).alias("transmembrane_normalized"),
pl.lit(None).cast(pl.Float64).alias("protein_score_normalized"),
])
# Normalize protein length via log-transform rank percentile
# Log-transform handles skewed distribution, rank percentile gives 0-1
df = df.with_columns([
pl.when(pl.col("protein_length").is_not_null())
.then(
pl.col("protein_length").log10().rank(method="average") /
measured.select(pl.col("protein_length").log10().rank(method="average").max()).item()
)
.otherwise(None)
.alias("length_rank"),
])
# Normalize domain count via rank percentile
df = df.with_columns([
pl.when(pl.col("domain_count").is_not_null())
.then(
pl.col("domain_count").rank(method="average") /
measured.select(pl.col("domain_count").rank(method="average").max()).item()
)
.otherwise(None)
.alias("domain_rank"),
])
# Normalize transmembrane count (cap at 20, then divide)
df = df.with_columns([
pl.when(pl.col("transmembrane_count").is_not_null())
.then(
pl.min_horizontal(pl.col("transmembrane_count"), pl.lit(20)) / 20.0
)
.otherwise(None)
.alias("transmembrane_normalized"),
])
# Convert boolean flags to 0/1 for composite score
df = df.with_columns([
pl.col("coiled_coil").cast(pl.Float64).fill_null(0.0).alias("coiled_coil_score"),
pl.col("has_cilia_domain").cast(pl.Float64).fill_null(0.0).alias("cilia_score"),
pl.col("scaffold_adaptor_domain").cast(pl.Float64).fill_null(0.0).alias("scaffold_score"),
])
# Composite protein score (weighted combination)
# Weights: length 15%, domain 20%, coiled-coil 20%, TM 20%, cilia 15%, scaffold 10%
# NULL if no protein_length (i.e., no UniProt entry)
df = df.with_columns([
pl.when(pl.col("protein_length").is_not_null())
.then(
(0.15 * pl.col("length_rank").fill_null(0.0)) +
(0.20 * pl.col("domain_rank").fill_null(0.0)) +
(0.20 * pl.col("coiled_coil_score")) +
(0.20 * pl.col("transmembrane_normalized").fill_null(0.0)) +
(0.15 * pl.col("cilia_score")) +
(0.10 * pl.col("scaffold_score"))
)
.otherwise(None)
.alias("protein_score_normalized"),
])
# Drop temporary scoring columns
df = df.drop([
"length_rank",
"domain_rank",
"transmembrane_normalized",
"coiled_coil_score",
"cilia_score",
"scaffold_score",
])
# Log summary statistics
score_stats = df.filter(pl.col("protein_score_normalized").is_not_null()).select([
pl.col("protein_score_normalized").min().alias("min"),
pl.col("protein_score_normalized").max().alias("max"),
pl.col("protein_score_normalized").mean().alias("mean"),
])
if len(score_stats) > 0:
logger.info(
"normalize_protein_features_complete",
scored_proteins=df.filter(pl.col("protein_score_normalized").is_not_null()).height,
score_min=round(score_stats["min"][0], 3),
score_max=round(score_stats["max"][0], 3),
score_mean=round(score_stats["mean"][0], 3),
)
else:
logger.info("normalize_protein_features_complete", scored_proteins=0)
return df
def process_protein_evidence(
gene_ids: List[str],
uniprot_mapping: pl.DataFrame,
) -> pl.DataFrame:
"""End-to-end protein evidence processing pipeline.
Composes: map gene IDs -> fetch UniProt -> fetch InterPro ->
extract features -> detect motifs -> normalize -> collect
Args:
gene_ids: List of Ensembl gene IDs
uniprot_mapping: DataFrame with gene_id, gene_symbol, uniprot_id columns
Returns:
Materialized DataFrame ready for DuckDB storage with columns:
- gene_id, gene_symbol, uniprot_id, protein_length, domain_count,
coiled_coil, coiled_coil_count, transmembrane_count,
scaffold_adaptor_domain, has_cilia_domain, has_sensory_domain,
protein_score_normalized
"""
logger.info("process_protein_evidence_start", gene_count=len(gene_ids))
# Filter mapping to requested genes
gene_map = uniprot_mapping.filter(pl.col("gene_id").is_in(gene_ids))
# Extract UniProt IDs (filter out NULLs)
uniprot_ids = (
gene_map
.filter(pl.col("uniprot_id").is_not_null())
.select("uniprot_id")
.unique()
.to_series()
.to_list()
)
logger.info(
"uniprot_mapping_complete",
total_genes=len(gene_ids),
mapped_genes=len(uniprot_ids),
)
# Fetch UniProt features
uniprot_df = fetch_uniprot_features(uniprot_ids)
# Fetch InterPro domains
interpro_df = fetch_interpro_domains(uniprot_ids)
# Extract features
features_df = extract_protein_features(uniprot_df, interpro_df)
# Detect cilia motifs
features_df = detect_cilia_motifs(features_df)
# Normalize features
features_df = normalize_protein_features(features_df)
# Join back to gene mapping to get gene_id and gene_symbol
# Use right join to preserve all genes (including those without UniProt mapping)
result = features_df.join(
gene_map.select(["gene_id", "gene_symbol", "uniprot_id"]),
on="uniprot_id",
how="right",
)
# Select final columns in order
result = result.select([
"gene_id",
"gene_symbol",
"uniprot_id",
"protein_length",
"domain_count",
"coiled_coil",
"coiled_coil_count",
"transmembrane_count",
"scaffold_adaptor_domain",
"has_cilia_domain",
"has_sensory_domain",
"protein_score_normalized",
])
logger.info(
"process_protein_evidence_complete",
total_genes=len(result),
with_uniprot=result.filter(pl.col("uniprot_id").is_not_null()).height,
with_scores=result.filter(pl.col("protein_score_normalized").is_not_null()).height,
)
return result

View File

@@ -37,6 +37,7 @@ class ProvenanceTracker:
details: Optional dictionary of additional details
"""
step = {
"name": step_name,
"step_name": step_name,
"timestamp": datetime.now(timezone.utc).isoformat(),
}
@@ -44,6 +45,15 @@ class ProvenanceTracker:
step["details"] = details
self.processing_steps.append(step)
def get_steps(self) -> list[dict]:
"""
Get all recorded processing steps.
Returns:
List of processing step dictionaries
"""
return self.processing_steps
def create_metadata(self) -> dict:
"""
Create full provenance metadata dictionary.

167
tests/test_expression.py Normal file
View File

@@ -0,0 +1,167 @@
"""Unit tests for expression evidence layer.
Tests tau calculation, enrichment scoring, and null handling with synthetic data.
NO external API calls - all data is mocked or synthetic.
"""
import polars as pl
import pytest
from usher_pipeline.evidence.expression.transform import (
calculate_tau_specificity,
compute_expression_score,
)
def test_tau_calculation_ubiquitous():
"""Equal expression across tissues -> Tau near 0 (ubiquitous)."""
# Create synthetic data with equal expression across tissues
df = pl.DataFrame({
"gene_id": ["ENSG00000001", "ENSG00000002"],
"tissue1": [10.0, 20.0],
"tissue2": [10.0, 20.0],
"tissue3": [10.0, 20.0],
"tissue4": [10.0, 20.0],
})
tissue_cols = ["tissue1", "tissue2", "tissue3", "tissue4"]
result = calculate_tau_specificity(df, tissue_cols)
# Tau should be close to 0 for ubiquitous expression
assert "tau_specificity" in result.columns
tau_values = result.select("tau_specificity").to_series().to_list()
assert tau_values[0] == pytest.approx(0.0, abs=0.01)
assert tau_values[1] == pytest.approx(0.0, abs=0.01)
def test_tau_calculation_specific():
"""Expression in one tissue only -> Tau near 1 (tissue-specific)."""
# Gene expressed only in one tissue
df = pl.DataFrame({
"gene_id": ["ENSG00000001"],
"tissue1": [100.0],
"tissue2": [0.0],
"tissue3": [0.0],
"tissue4": [0.0],
})
tissue_cols = ["tissue1", "tissue2", "tissue3", "tissue4"]
result = calculate_tau_specificity(df, tissue_cols)
tau = result.select("tau_specificity").item()
# Tau = sum(1 - xi/xmax) / (n-1) = (0 + 1 + 1 + 1) / 3 = 1.0
assert tau == pytest.approx(1.0, abs=0.01)
def test_tau_null_handling():
"""NULL tissue values -> NULL Tau (insufficient data)."""
df = pl.DataFrame({
"gene_id": ["ENSG00000001", "ENSG00000002"],
"tissue1": [10.0, 20.0],
"tissue2": [None, 20.0], # NULL for gene 1
"tissue3": [10.0, 20.0],
"tissue4": [10.0, 20.0],
})
tissue_cols = ["tissue1", "tissue2", "tissue3", "tissue4"]
result = calculate_tau_specificity(df, tissue_cols)
tau_values = result.select("tau_specificity").to_series().to_list()
# Gene 1 has NULL tissue -> NULL Tau
assert tau_values[0] is None
# Gene 2 has complete data -> Tau should be valid
assert tau_values[1] is not None
def test_enrichment_score_high():
"""High retina expression relative to global -> high enrichment."""
df = pl.DataFrame({
"gene_id": ["ENSG00000001"],
"hpa_retina_tpm": [50.0],
"hpa_cerebellum_tpm": [40.0],
"gtex_retina_tpm": [60.0],
"hpa_testis_tpm": [5.0],
"hpa_fallopian_tube_tpm": [5.0],
"gtex_testis_tpm": [5.0],
"cellxgene_photoreceptor_expr": [None],
"cellxgene_hair_cell_expr": [None],
"tau_specificity": [0.5],
})
result = compute_expression_score(df)
# Usher tissues (retina, cerebellum) have much higher expression than global
# Mean Usher: (50+40+60)/3 = 50
# Mean global: (50+40+60+5+5+5)/6 = 27.5
# Enrichment: 50/27.5 ≈ 1.82
assert "usher_tissue_enrichment" in result.columns
enrichment = result.select("usher_tissue_enrichment").item()
assert enrichment > 1.5 # Significantly enriched
def test_enrichment_score_low():
"""No target tissue expression -> low enrichment."""
df = pl.DataFrame({
"gene_id": ["ENSG00000001"],
"hpa_retina_tpm": [5.0],
"hpa_cerebellum_tpm": [5.0],
"gtex_retina_tpm": [5.0],
"hpa_testis_tpm": [50.0],
"hpa_fallopian_tube_tpm": [50.0],
"gtex_testis_tpm": [50.0],
"cellxgene_photoreceptor_expr": [None],
"cellxgene_hair_cell_expr": [None],
"tau_specificity": [0.8],
})
result = compute_expression_score(df)
enrichment = result.select("usher_tissue_enrichment").item()
assert enrichment < 1.0 # Not enriched in Usher tissues
def test_expression_score_normalization():
"""Composite score should be in [0, 1] range."""
df = pl.DataFrame({
"gene_id": ["ENSG00000001", "ENSG00000002", "ENSG00000003"],
"hpa_retina_tpm": [50.0, 10.0, 5.0],
"hpa_cerebellum_tpm": [40.0, 10.0, 5.0],
"gtex_retina_tpm": [60.0, 10.0, 5.0],
"hpa_testis_tpm": [5.0, 50.0, 50.0],
"hpa_fallopian_tube_tpm": [5.0, 50.0, 50.0],
"gtex_testis_tpm": [5.0, 50.0, 50.0],
"cellxgene_photoreceptor_expr": [None, None, None],
"cellxgene_hair_cell_expr": [None, None, None],
"tau_specificity": [0.5, 0.3, 0.2],
})
result = compute_expression_score(df)
scores = result.select("expression_score_normalized").to_series().to_list()
for score in scores:
if score is not None:
assert 0.0 <= score <= 1.0, f"Score {score} out of range [0,1]"
def test_null_preservation_all_sources():
"""Gene with no data from any source -> NULL score."""
df = pl.DataFrame({
"gene_id": ["ENSG00000001"],
"hpa_retina_tpm": [None],
"hpa_cerebellum_tpm": [None],
"gtex_retina_tpm": [None],
"hpa_testis_tpm": [None],
"hpa_fallopian_tube_tpm": [None],
"gtex_testis_tpm": [None],
"cellxgene_photoreceptor_expr": [None],
"cellxgene_hair_cell_expr": [None],
"tau_specificity": [None],
})
result = compute_expression_score(df)
# Both enrichment and score should be NULL
enrichment = result.select("usher_tissue_enrichment").item()
score = result.select("expression_score_normalized").item()
assert enrichment is None
assert score is None

View File

@@ -0,0 +1,170 @@
"""Integration tests for expression evidence layer.
Tests with mocked downloads and synthetic fixtures.
NO actual external API calls to HPA/GTEx/CellxGene.
"""
import polars as pl
import pytest
from pathlib import Path
from unittest.mock import Mock, patch, MagicMock
from usher_pipeline.evidence.expression.transform import process_expression_evidence
from usher_pipeline.evidence.expression.load import load_to_duckdb
from usher_pipeline.persistence import PipelineStore, ProvenanceTracker
@pytest.fixture
def temp_cache_dir(tmp_path):
"""Create temporary cache directory for downloads."""
cache_dir = tmp_path / "expression"
cache_dir.mkdir()
return cache_dir
@pytest.fixture
def mock_gene_ids():
"""Sample gene IDs for testing."""
return ["ENSG00000001", "ENSG00000002", "ENSG00000003"]
@pytest.fixture
def mock_hpa_data():
"""Synthetic HPA expression data."""
return pl.LazyFrame({
"gene_symbol": ["GENE1", "GENE2", "GENE3"],
"hpa_retina_tpm": [50.0, 10.0, None],
"hpa_cerebellum_tpm": [40.0, 10.0, 5.0],
"hpa_testis_tpm": [5.0, 50.0, 50.0],
"hpa_fallopian_tube_tpm": [5.0, 50.0, None],
})
@pytest.fixture
def mock_gtex_data(mock_gene_ids):
"""Synthetic GTEx expression data."""
return pl.LazyFrame({
"gene_id": mock_gene_ids,
"gtex_retina_tpm": [60.0, 10.0, None],
"gtex_cerebellum_tpm": [45.0, 10.0, 5.0],
"gtex_testis_tpm": [5.0, 55.0, 55.0],
"gtex_fallopian_tube_tpm": [None, None, None], # Often not available
})
@pytest.fixture
def mock_cellxgene_data(mock_gene_ids):
"""Synthetic CellxGene data (NULLs as placeholder)."""
return pl.LazyFrame({
"gene_id": mock_gene_ids,
"cellxgene_photoreceptor_expr": [None, None, None],
"cellxgene_hair_cell_expr": [None, None, None],
})
def test_process_expression_pipeline_with_mocks(
temp_cache_dir, mock_gene_ids, mock_hpa_data, mock_gtex_data, mock_cellxgene_data
):
"""Test full pipeline with mocked data sources."""
# Mock all fetch functions to return synthetic data
with patch('usher_pipeline.evidence.expression.transform.fetch_hpa_expression') as mock_hpa, \
patch('usher_pipeline.evidence.expression.transform.fetch_gtex_expression') as mock_gtex, \
patch('usher_pipeline.evidence.expression.transform.fetch_cellxgene_expression') as mock_cellxgene:
mock_hpa.return_value = mock_hpa_data
mock_gtex.return_value = mock_gtex_data
mock_cellxgene.return_value = mock_cellxgene_data
# Run pipeline (skip CellxGene for simplicity)
df = process_expression_evidence(
gene_ids=mock_gene_ids,
cache_dir=temp_cache_dir,
skip_cellxgene=True,
)
# Verify output structure
assert len(df) == len(mock_gene_ids)
assert "gene_id" in df.columns
assert "tau_specificity" in df.columns
assert "usher_tissue_enrichment" in df.columns
assert "expression_score_normalized" in df.columns
def test_checkpoint_restart(temp_cache_dir, mock_gene_ids):
"""Test checkpoint-restart: skip processing if table exists."""
# Create mock store with existing checkpoint
mock_store = Mock(spec=PipelineStore)
mock_store.has_checkpoint.return_value = True
# Mock load_dataframe to return synthetic data
existing_data = pl.DataFrame({
"gene_id": mock_gene_ids,
"tau_specificity": [0.5, 0.3, 0.2],
"usher_tissue_enrichment": [2.0, 1.0, 0.5],
"expression_score_normalized": [0.8, 0.5, 0.3],
})
mock_store.load_dataframe.return_value = existing_data
# Verify checkpoint works (would skip processing in real CLI)
assert mock_store.has_checkpoint('tissue_expression')
df = mock_store.load_dataframe('tissue_expression')
assert len(df) == len(mock_gene_ids)
def test_provenance_recording():
"""Test provenance step recording during load."""
# Create synthetic expression data
df = pl.DataFrame({
"gene_id": ["ENSG00000001", "ENSG00000002"],
"hpa_retina_tpm": [50.0, None],
"gtex_retina_tpm": [60.0, 10.0],
"cellxgene_photoreceptor_expr": [None, None],
"cellxgene_hair_cell_expr": [None, None],
"tau_specificity": [0.5, None],
"usher_tissue_enrichment": [2.0, 1.0],
"expression_score_normalized": [0.8, 0.5],
})
# Mock store and provenance tracker
mock_store = Mock(spec=PipelineStore)
mock_provenance = Mock(spec=ProvenanceTracker)
# Call load function
load_to_duckdb(
df=df,
store=mock_store,
provenance=mock_provenance,
description="Test expression data"
)
# Verify provenance step was recorded
mock_provenance.record_step.assert_called_once()
step_name, step_details = mock_provenance.record_step.call_args[0]
assert step_name == "load_tissue_expression"
assert "row_count" in step_details
assert step_details["row_count"] == 2
def test_null_expression_handling():
"""Test that genes with all NULL expression data are handled gracefully."""
df = pl.DataFrame({
"gene_id": ["ENSG00000001", "ENSG00000002"],
"hpa_retina_tpm": [None, 50.0],
"hpa_cerebellum_tpm": [None, 40.0],
"gtex_retina_tpm": [None, 60.0],
"cellxgene_photoreceptor_expr": [None, None],
"cellxgene_hair_cell_expr": [None, None],
"tau_specificity": [None, 0.5],
"usher_tissue_enrichment": [None, 2.0],
"expression_score_normalized": [None, 0.8],
})
# Mock store
mock_store = Mock(spec=PipelineStore)
mock_provenance = Mock(spec=ProvenanceTracker)
# Should not raise exception
load_to_duckdb(df, mock_store, mock_provenance)
# Verify store was called
mock_store.save_dataframe.assert_called_once()

310
tests/test_protein.py Normal file
View File

@@ -0,0 +1,310 @@
"""Unit tests for protein features evidence layer."""
from unittest.mock import Mock, patch, MagicMock
import polars as pl
import pytest
from polars.testing import assert_frame_equal
from usher_pipeline.evidence.protein.models import (
ProteinFeatureRecord,
CILIA_DOMAIN_KEYWORDS,
SCAFFOLD_DOMAIN_TYPES,
)
from usher_pipeline.evidence.protein.transform import (
extract_protein_features,
detect_cilia_motifs,
normalize_protein_features,
)
@pytest.fixture
def sample_uniprot_df():
"""Sample UniProt data for testing."""
return pl.DataFrame({
"uniprot_id": ["P12345", "Q67890", "A11111", "B22222"],
"protein_length": [500, 1200, 300, None], # None = not found
"domain_names": [
["PDZ domain", "Kinase domain"],
["IFT complex subunit", "WD40 repeat"],
["Transmembrane region"],
[],
],
"coiled_coil_count": [2, 0, 0, None],
"transmembrane_count": [0, 5, 10, None],
})
@pytest.fixture
def sample_interpro_df():
"""Sample InterPro data for testing."""
return pl.DataFrame({
"uniprot_id": ["P12345", "Q67890", "A11111", "B22222"],
"domain_names": [
["SH3 domain"],
["Ciliary targeting signal", "Ankyrin repeat"],
[],
[],
],
"interpro_ids": [
["IPR001452"],
["IPR005598", "IPR002110"],
[],
[],
],
})
def test_uniprot_feature_extraction(sample_uniprot_df, sample_interpro_df):
"""Correct parsing of length, domain, coiled-coil, TM from UniProt data."""
df = extract_protein_features(sample_uniprot_df, sample_interpro_df)
# Check P12345
p12345 = df.filter(pl.col("uniprot_id") == "P12345")
assert p12345["protein_length"][0] == 500
assert p12345["coiled_coil"][0] == True # count=2 > 0
assert p12345["coiled_coil_count"][0] == 2
assert p12345["transmembrane_count"][0] == 0
# Domain count should include both UniProt and InterPro (deduplicated)
assert p12345["domain_count"][0] == 3 # PDZ, Kinase, SH3
# Check B22222 (not found in UniProt)
b22222 = df.filter(pl.col("uniprot_id") == "B22222")
assert b22222["protein_length"][0] is None
assert b22222["coiled_coil"][0] is None
assert b22222["transmembrane_count"][0] is None
def test_cilia_motif_detection_positive():
"""Domain name containing cilia keywords sets has_cilia_domain=True."""
df = pl.DataFrame({
"uniprot_id": ["P12345"],
"protein_length": [500],
"domain_count": [2],
"coiled_coil": [False],
"coiled_coil_count": [0],
"transmembrane_count": [0],
"domain_names": [["IFT complex subunit", "Kinase domain"]],
})
result = detect_cilia_motifs(df)
assert result["has_cilia_domain"][0] == True
def test_cilia_motif_detection_negative():
"""Standard domain (e.g., Kinase) does not trigger has_cilia_domain."""
df = pl.DataFrame({
"uniprot_id": ["P12345"],
"protein_length": [500],
"domain_count": [1],
"coiled_coil": [False],
"coiled_coil_count": [0],
"transmembrane_count": [0],
"domain_names": [["Kinase domain"]],
})
result = detect_cilia_motifs(df)
assert result["has_cilia_domain"][0] == False
def test_scaffold_detection():
"""PDZ domain triggers scaffold_adaptor_domain=True."""
df = pl.DataFrame({
"uniprot_id": ["P12345"],
"protein_length": [500],
"domain_count": [1],
"coiled_coil": [False],
"coiled_coil_count": [0],
"transmembrane_count": [0],
"domain_names": [["PDZ domain"]],
})
result = detect_cilia_motifs(df)
assert result["scaffold_adaptor_domain"][0] == True
def test_null_uniprot():
"""Gene without UniProt entry has all features NULL."""
df = pl.DataFrame({
"uniprot_id": ["B22222"],
"protein_length": [None],
"domain_count": [0],
"coiled_coil": [None],
"coiled_coil_count": [None],
"transmembrane_count": [None],
"domain_names": [[]],
})
result = detect_cilia_motifs(df)
result = normalize_protein_features(result)
# All boolean flags should be NULL (not False)
assert result["has_cilia_domain"][0] is None
assert result["scaffold_adaptor_domain"][0] is None
assert result["has_sensory_domain"][0] is None
# Composite score should be NULL
assert result["protein_score_normalized"][0] is None
def test_normalization_bounds():
"""All normalized features are in [0, 1] range."""
df = pl.DataFrame({
"uniprot_id": ["P1", "P2", "P3"],
"protein_length": [100, 500, 2000],
"domain_count": [0, 5, 20],
"coiled_coil": [False, True, True],
"coiled_coil_count": [0, 2, 5],
"transmembrane_count": [0, 5, 25], # 25 gets capped at 20
"domain_names": [[], ["PDZ"], ["IFT", "Ciliary"]],
})
result = detect_cilia_motifs(df)
result = normalize_protein_features(result)
# Check all scores are in [0, 1]
for score in result["protein_score_normalized"]:
assert score is not None
assert 0.0 <= score <= 1.0
def test_composite_score_cilia_gene():
"""Gene with cilia domains scores higher than gene without."""
df = pl.DataFrame({
"uniprot_id": ["P_CILIA", "P_NOCILIA"],
"protein_length": [500, 500],
"domain_count": [5, 5],
"coiled_coil": [True, True],
"coiled_coil_count": [2, 2],
"transmembrane_count": [5, 5],
"domain_names": [
["IFT complex", "PDZ domain"], # Has cilia + scaffold
["Kinase domain", "PDZ domain"], # Only scaffold
],
})
result = detect_cilia_motifs(df)
result = normalize_protein_features(result)
cilia_score = result.filter(pl.col("uniprot_id") == "P_CILIA")["protein_score_normalized"][0]
nocilia_score = result.filter(pl.col("uniprot_id") == "P_NOCILIA")["protein_score_normalized"][0]
# Cilia gene should score higher (15% weight for has_cilia_domain)
assert cilia_score > nocilia_score
def test_composite_score_null_handling():
"""NULL UniProt produces NULL composite score (not 0.0)."""
df = pl.DataFrame({
"uniprot_id": ["P_VALID", "P_NULL"],
"protein_length": [500, None],
"domain_count": [5, 0],
"coiled_coil": [True, None],
"coiled_coil_count": [2, None],
"transmembrane_count": [5, None],
"domain_names": [["PDZ"], []],
})
result = detect_cilia_motifs(df)
result = normalize_protein_features(result)
valid_score = result.filter(pl.col("uniprot_id") == "P_VALID")["protein_score_normalized"][0]
null_score = result.filter(pl.col("uniprot_id") == "P_NULL")["protein_score_normalized"][0]
assert valid_score is not None
assert null_score is None # NOT 0.0
def test_domain_keyword_case_insensitive():
"""Cilia keyword matching is case-insensitive."""
df = pl.DataFrame({
"uniprot_id": ["P1", "P2", "P3"],
"protein_length": [500, 500, 500],
"domain_count": [1, 1, 1],
"coiled_coil": [False, False, False],
"coiled_coil_count": [0, 0, 0],
"transmembrane_count": [0, 0, 0],
"domain_names": [
["intraflagellar transport"], # lowercase
["CILIARY targeting signal"], # uppercase
["Basal Body protein"], # mixed case
],
})
result = detect_cilia_motifs(df)
# All should match
assert result["has_cilia_domain"][0] == True
assert result["has_cilia_domain"][1] == True
assert result["has_cilia_domain"][2] == True
@patch("usher_pipeline.evidence.protein.fetch.httpx.Client")
def test_fetch_uniprot_features_with_mock(mock_client_class):
"""Test UniProt fetch with mocked HTTP responses."""
from usher_pipeline.evidence.protein.fetch import fetch_uniprot_features
# Mock httpx client
mock_client = MagicMock()
mock_client_class.return_value.__enter__.return_value = mock_client
# Mock UniProt API response
mock_response = Mock()
mock_response.json.return_value = {
"results": [
{
"primaryAccession": "P12345",
"sequence": {"length": 500},
"features": [
{"type": "Domain", "description": "PDZ domain"},
{"type": "Coiled coil"},
{"type": "Transmembrane"},
],
}
]
}
mock_client.get.return_value = mock_response
# Call fetch
df = fetch_uniprot_features(["P12345"])
# Verify result
assert len(df) == 1
assert df["uniprot_id"][0] == "P12345"
assert df["protein_length"][0] == 500
assert df["coiled_coil_count"][0] == 1
assert df["transmembrane_count"][0] == 1
@patch("usher_pipeline.evidence.protein.fetch.httpx.Client")
def test_fetch_interpro_domains_with_mock(mock_client_class):
"""Test InterPro fetch with mocked HTTP responses."""
from usher_pipeline.evidence.protein.fetch import fetch_interpro_domains
# Mock httpx client
mock_client = MagicMock()
mock_client_class.return_value.__enter__.return_value = mock_client
# Mock InterPro API response
mock_response = Mock()
mock_response.json.return_value = {
"results": [
{
"metadata": {
"accession": "IPR001452",
"name": {"name": "SH3 domain"},
}
}
]
}
mock_client.get.return_value = mock_response
# Call fetch
df = fetch_interpro_domains(["P12345"])
# Verify result
assert len(df) == 1
assert df["uniprot_id"][0] == "P12345"
assert "SH3 domain" in df["domain_names"][0]
assert "IPR001452" in df["interpro_ids"][0]

View File

@@ -0,0 +1,350 @@
"""Integration tests for protein features evidence layer."""
from pathlib import Path
from unittest.mock import Mock, patch, MagicMock
import polars as pl
import pytest
from usher_pipeline.config.loader import load_config
from usher_pipeline.persistence import PipelineStore, ProvenanceTracker
from usher_pipeline.evidence.protein import (
process_protein_evidence,
load_to_duckdb,
query_cilia_candidates,
)
@pytest.fixture
def test_config(tmp_path: Path):
"""Create test configuration."""
config_path = tmp_path / "config.yaml"
config_content = f"""
data_dir: {tmp_path / "data"}
cache_dir: {tmp_path / "cache"}
duckdb_path: {tmp_path / "test.duckdb"}
versions:
ensembl_release: 113
gnomad_version: v4.1
gtex_version: v8
hpa_version: "23.0"
api:
rate_limit_per_second: 5
max_retries: 5
cache_ttl_seconds: 86400
timeout_seconds: 30
scoring:
gnomad: 0.20
expression: 0.20
annotation: 0.15
localization: 0.15
animal_model: 0.15
literature: 0.15
"""
config_path.write_text(config_content)
return load_config(config_path)
@pytest.fixture
def mock_gene_universe():
"""Mock gene universe with UniProt mappings."""
return pl.DataFrame({
"gene_id": ["ENSG00000001", "ENSG00000002", "ENSG00000003", "ENSG00000004"],
"gene_symbol": ["GENE1", "GENE2", "GENE3", "GENE4"],
"uniprot_id": ["P12345", "Q67890", "A11111", None], # GENE4 has no UniProt
})
@pytest.fixture
def mock_uniprot_response():
"""Mock UniProt API response with realistic domain structures."""
return {
"results": [
{
"primaryAccession": "P12345",
"sequence": {"length": 500},
"features": [
{"type": "Domain", "description": "PDZ domain"},
{"type": "Domain", "description": "SH3 domain"},
{"type": "Coiled coil"},
{"type": "Coiled coil"},
],
},
{
"primaryAccession": "Q67890",
"sequence": {"length": 1200},
"features": [
{"type": "Domain", "description": "IFT complex subunit"},
{"type": "Domain", "description": "WD40 repeat"},
{"type": "Transmembrane"},
{"type": "Transmembrane"},
{"type": "Transmembrane"},
],
},
{
"primaryAccession": "A11111",
"sequence": {"length": 300},
"features": [
{"type": "Domain", "description": "Kinase domain"},
],
},
]
}
@pytest.fixture
def mock_interpro_response():
"""Mock InterPro API responses per protein."""
return {
"P12345": {
"results": [
{
"metadata": {
"accession": "IPR001452",
"name": {"name": "Ankyrin repeat"},
}
}
]
},
"Q67890": {
"results": [
{
"metadata": {
"accession": "IPR005598",
"name": {"name": "Ciliary targeting signal"},
}
}
]
},
"A11111": {
"results": []
},
}
@patch("usher_pipeline.evidence.protein.fetch.httpx.Client")
@patch("usher_pipeline.evidence.protein.fetch.time.sleep") # Speed up tests
def test_full_pipeline_with_mocked_apis(
mock_sleep,
mock_client_class,
mock_gene_universe,
mock_uniprot_response,
mock_interpro_response,
):
"""Test full pipeline with mocked UniProt and InterPro APIs."""
# Mock httpx client
mock_client = MagicMock()
mock_client_class.return_value.__enter__.return_value = mock_client
# Setup mock responses
def mock_get(url, params=None):
mock_response = Mock()
# UniProt search endpoint
if "uniprot" in url and "search" in url:
mock_response.json.return_value = mock_uniprot_response
mock_response.raise_for_status = Mock()
return mock_response
# InterPro API
if "interpro" in url:
# Extract accession from URL
accession = url.split("/")[-1]
if accession in mock_interpro_response:
mock_response.json.return_value = mock_interpro_response[accession]
else:
mock_response.json.return_value = {"results": []}
mock_response.raise_for_status = Mock()
return mock_response
raise ValueError(f"Unexpected URL: {url}")
mock_client.get.side_effect = mock_get
# Run pipeline
gene_ids = mock_gene_universe.select("gene_id").to_series().to_list()
df = process_protein_evidence(gene_ids, mock_gene_universe)
# Verify results
assert len(df) == 4 # All genes present
# Check GENE1 (P12345) - has PDZ, SH3, Ankyrin (scaffold domains) + coiled-coils
gene1 = df.filter(pl.col("gene_symbol") == "GENE1")
assert gene1["uniprot_id"][0] == "P12345"
assert gene1["protein_length"][0] == 500
assert gene1["domain_count"][0] == 3 # PDZ, SH3, Ankyrin
assert gene1["coiled_coil"][0] == True
assert gene1["coiled_coil_count"][0] == 2
assert gene1["scaffold_adaptor_domain"][0] == True # Has PDZ, SH3, Ankyrin
assert gene1["protein_score_normalized"][0] is not None
# Check GENE2 (Q67890) - has IFT and ciliary domains
gene2 = df.filter(pl.col("gene_symbol") == "GENE2")
assert gene2["has_cilia_domain"][0] == True # IFT + ciliary
assert gene2["transmembrane_count"][0] == 3
# Check GENE3 (A11111) - minimal features
gene3 = df.filter(pl.col("gene_symbol") == "GENE3")
assert gene3["domain_count"][0] == 1 # Only Kinase
assert gene3["has_cilia_domain"][0] == False
# Check GENE4 (no UniProt) - all NULL
gene4 = df.filter(pl.col("gene_symbol") == "GENE4")
assert gene4["uniprot_id"][0] is None
assert gene4["protein_length"][0] is None
assert gene4["protein_score_normalized"][0] is None
def test_checkpoint_restart(tmp_path: Path, test_config, mock_gene_universe):
"""Test checkpoint-restart pattern with DuckDB."""
db_path = tmp_path / "test.duckdb"
store = PipelineStore(db_path)
provenance = ProvenanceTracker.from_config(test_config)
# Create synthetic protein features
df = pl.DataFrame({
"gene_id": ["ENSG00000001", "ENSG00000002"],
"gene_symbol": ["GENE1", "GENE2"],
"uniprot_id": ["P12345", "Q67890"],
"protein_length": [500, 1200],
"domain_count": [3, 5],
"coiled_coil": [True, False],
"coiled_coil_count": [2, 0],
"transmembrane_count": [0, 3],
"scaffold_adaptor_domain": [True, False],
"has_cilia_domain": [False, True],
"has_sensory_domain": [False, False],
"protein_score_normalized": [0.65, 0.82],
})
# Load to DuckDB
load_to_duckdb(df, store, provenance, "Test protein features")
# Verify checkpoint exists
assert store.has_checkpoint("protein_features")
# Reload data
loaded_df = store.load_dataframe("protein_features")
assert loaded_df is not None
assert len(loaded_df) == 2
assert loaded_df["gene_symbol"].to_list() == ["GENE1", "GENE2"]
# Verify provenance
checkpoints = store.list_checkpoints()
protein_checkpoint = [c for c in checkpoints if c["table_name"] == "protein_features"][0]
assert protein_checkpoint["row_count"] == 2
store.close()
def test_query_cilia_candidates(tmp_path: Path):
"""Test querying genes with cilia-associated features."""
db_path = tmp_path / "test.duckdb"
store = PipelineStore(db_path)
# Create test data with various feature combinations
df = pl.DataFrame({
"gene_id": ["ENSG00000001", "ENSG00000002", "ENSG00000003", "ENSG00000004"],
"gene_symbol": ["GENE1", "GENE2", "GENE3", "GENE4"],
"uniprot_id": ["P1", "P2", "P3", "P4"],
"protein_length": [500, 600, 700, 800],
"domain_count": [3, 4, 2, 5],
"coiled_coil": [True, True, False, True],
"transmembrane_count": [0, 2, 0, 3],
"scaffold_adaptor_domain": [True, False, True, True],
"has_cilia_domain": [False, True, False, False],
"has_sensory_domain": [False, False, False, True],
"protein_score_normalized": [0.65, 0.82, 0.45, 0.78],
})
# Load to DuckDB
store.save_dataframe(df, "protein_features", "Test data", replace=True)
# Query cilia candidates
candidates = query_cilia_candidates(store)
# Should include:
# - GENE1: has coiled_coil + scaffold_adaptor_domain
# - GENE2: has cilia_domain
# - GENE4: has coiled_coil + scaffold_adaptor_domain
# Should NOT include:
# - GENE3: has scaffold but no coiled_coil, and no cilia_domain
assert len(candidates) == 3
symbols = candidates["gene_symbol"].to_list()
assert "GENE1" in symbols
assert "GENE2" in symbols
assert "GENE4" in symbols
assert "GENE3" not in symbols
# Verify sorting by score (descending)
assert candidates["protein_score_normalized"][0] == 0.82 # GENE2
store.close()
def test_provenance_recording(tmp_path: Path, test_config):
"""Test provenance metadata is correctly recorded."""
db_path = tmp_path / "test.duckdb"
store = PipelineStore(db_path)
provenance = ProvenanceTracker.from_config(test_config)
# Create test data with known stats
df = pl.DataFrame({
"gene_id": ["ENSG00000001", "ENSG00000002", "ENSG00000003"],
"gene_symbol": ["GENE1", "GENE2", "GENE3"],
"uniprot_id": ["P1", "P2", None], # 1 without UniProt
"protein_length": [500, 600, None],
"domain_count": [3, 4, None],
"coiled_coil": [True, False, None],
"coiled_coil_count": [2, 0, None],
"transmembrane_count": [0, 2, None],
"scaffold_adaptor_domain": [True, False, None],
"has_cilia_domain": [False, True, None],
"has_sensory_domain": [False, False, None],
"protein_score_normalized": [0.65, 0.82, None],
})
# Load with provenance
load_to_duckdb(df, store, provenance, "Test protein features")
# Verify provenance step was recorded
steps = provenance.get_steps()
protein_step = [s for s in steps if s["name"] == "load_protein_features"][0]
assert protein_step["details"]["total_genes"] == 3
assert protein_step["details"]["with_uniprot"] == 2
assert protein_step["details"]["null_uniprot"] == 1
assert protein_step["details"]["cilia_domain_count"] == 1
assert protein_step["details"]["scaffold_domain_count"] == 1
assert protein_step["details"]["coiled_coil_count"] == 1
assert protein_step["details"]["transmembrane_domain_count"] == 1
store.close()
@patch("usher_pipeline.evidence.protein.fetch.httpx.Client")
@patch("usher_pipeline.evidence.protein.fetch.time.sleep")
def test_null_preservation(mock_sleep, mock_client_class, mock_gene_universe):
"""Test that NULL values are preserved (not converted to 0)."""
# Mock httpx client
mock_client = MagicMock()
mock_client_class.return_value.__enter__.return_value = mock_client
# Mock response with one protein not found
mock_response = Mock()
mock_response.json.return_value = {
"results": [] # No results for any protein
}
mock_client.get.return_value = mock_response
# Run pipeline
gene_ids = ["ENSG00000001"]
gene_map = mock_gene_universe.filter(pl.col("gene_id") == "ENSG00000001")
df = process_protein_evidence(gene_ids, gene_map)
# All protein features should be NULL (not 0)
assert df["protein_length"][0] is None
assert df["domain_count"][0] is None or df["domain_count"][0] == 0
assert df["coiled_coil"][0] is None
assert df["transmembrane_count"][0] is None
assert df["protein_score_normalized"][0] is None # Critical: NOT 0.0