feat(03-03): implement protein evidence layer with UniProt/InterPro integration

- Create protein features data model with domain, coiled-coil, TM, cilia motifs
- Implement fetch.py with UniProt REST API and InterPro API queries
- Implement transform.py with feature extraction, motif detection, normalization
- Implement load.py with DuckDB persistence and provenance tracking
- Add CLI protein command following evidence layer pattern
- Add comprehensive unit and integration tests (all passing)
- Handle NULL preservation and List(Null) edge case
- Add get_steps() method to ProvenanceTracker for test compatibility
This commit is contained in:
2026-02-11 19:07:30 +08:00
parent bcd3c4ffbe
commit 46059874f2
10 changed files with 1937 additions and 0 deletions

167
tests/test_expression.py Normal file
View File

@@ -0,0 +1,167 @@
"""Unit tests for expression evidence layer.
Tests tau calculation, enrichment scoring, and null handling with synthetic data.
NO external API calls - all data is mocked or synthetic.
"""
import polars as pl
import pytest
from usher_pipeline.evidence.expression.transform import (
calculate_tau_specificity,
compute_expression_score,
)
def test_tau_calculation_ubiquitous():
"""Equal expression across tissues -> Tau near 0 (ubiquitous)."""
# Create synthetic data with equal expression across tissues
df = pl.DataFrame({
"gene_id": ["ENSG00000001", "ENSG00000002"],
"tissue1": [10.0, 20.0],
"tissue2": [10.0, 20.0],
"tissue3": [10.0, 20.0],
"tissue4": [10.0, 20.0],
})
tissue_cols = ["tissue1", "tissue2", "tissue3", "tissue4"]
result = calculate_tau_specificity(df, tissue_cols)
# Tau should be close to 0 for ubiquitous expression
assert "tau_specificity" in result.columns
tau_values = result.select("tau_specificity").to_series().to_list()
assert tau_values[0] == pytest.approx(0.0, abs=0.01)
assert tau_values[1] == pytest.approx(0.0, abs=0.01)
def test_tau_calculation_specific():
"""Expression in one tissue only -> Tau near 1 (tissue-specific)."""
# Gene expressed only in one tissue
df = pl.DataFrame({
"gene_id": ["ENSG00000001"],
"tissue1": [100.0],
"tissue2": [0.0],
"tissue3": [0.0],
"tissue4": [0.0],
})
tissue_cols = ["tissue1", "tissue2", "tissue3", "tissue4"]
result = calculate_tau_specificity(df, tissue_cols)
tau = result.select("tau_specificity").item()
# Tau = sum(1 - xi/xmax) / (n-1) = (0 + 1 + 1 + 1) / 3 = 1.0
assert tau == pytest.approx(1.0, abs=0.01)
def test_tau_null_handling():
"""NULL tissue values -> NULL Tau (insufficient data)."""
df = pl.DataFrame({
"gene_id": ["ENSG00000001", "ENSG00000002"],
"tissue1": [10.0, 20.0],
"tissue2": [None, 20.0], # NULL for gene 1
"tissue3": [10.0, 20.0],
"tissue4": [10.0, 20.0],
})
tissue_cols = ["tissue1", "tissue2", "tissue3", "tissue4"]
result = calculate_tau_specificity(df, tissue_cols)
tau_values = result.select("tau_specificity").to_series().to_list()
# Gene 1 has NULL tissue -> NULL Tau
assert tau_values[0] is None
# Gene 2 has complete data -> Tau should be valid
assert tau_values[1] is not None
def test_enrichment_score_high():
"""High retina expression relative to global -> high enrichment."""
df = pl.DataFrame({
"gene_id": ["ENSG00000001"],
"hpa_retina_tpm": [50.0],
"hpa_cerebellum_tpm": [40.0],
"gtex_retina_tpm": [60.0],
"hpa_testis_tpm": [5.0],
"hpa_fallopian_tube_tpm": [5.0],
"gtex_testis_tpm": [5.0],
"cellxgene_photoreceptor_expr": [None],
"cellxgene_hair_cell_expr": [None],
"tau_specificity": [0.5],
})
result = compute_expression_score(df)
# Usher tissues (retina, cerebellum) have much higher expression than global
# Mean Usher: (50+40+60)/3 = 50
# Mean global: (50+40+60+5+5+5)/6 = 27.5
# Enrichment: 50/27.5 ≈ 1.82
assert "usher_tissue_enrichment" in result.columns
enrichment = result.select("usher_tissue_enrichment").item()
assert enrichment > 1.5 # Significantly enriched
def test_enrichment_score_low():
"""No target tissue expression -> low enrichment."""
df = pl.DataFrame({
"gene_id": ["ENSG00000001"],
"hpa_retina_tpm": [5.0],
"hpa_cerebellum_tpm": [5.0],
"gtex_retina_tpm": [5.0],
"hpa_testis_tpm": [50.0],
"hpa_fallopian_tube_tpm": [50.0],
"gtex_testis_tpm": [50.0],
"cellxgene_photoreceptor_expr": [None],
"cellxgene_hair_cell_expr": [None],
"tau_specificity": [0.8],
})
result = compute_expression_score(df)
enrichment = result.select("usher_tissue_enrichment").item()
assert enrichment < 1.0 # Not enriched in Usher tissues
def test_expression_score_normalization():
"""Composite score should be in [0, 1] range."""
df = pl.DataFrame({
"gene_id": ["ENSG00000001", "ENSG00000002", "ENSG00000003"],
"hpa_retina_tpm": [50.0, 10.0, 5.0],
"hpa_cerebellum_tpm": [40.0, 10.0, 5.0],
"gtex_retina_tpm": [60.0, 10.0, 5.0],
"hpa_testis_tpm": [5.0, 50.0, 50.0],
"hpa_fallopian_tube_tpm": [5.0, 50.0, 50.0],
"gtex_testis_tpm": [5.0, 50.0, 50.0],
"cellxgene_photoreceptor_expr": [None, None, None],
"cellxgene_hair_cell_expr": [None, None, None],
"tau_specificity": [0.5, 0.3, 0.2],
})
result = compute_expression_score(df)
scores = result.select("expression_score_normalized").to_series().to_list()
for score in scores:
if score is not None:
assert 0.0 <= score <= 1.0, f"Score {score} out of range [0,1]"
def test_null_preservation_all_sources():
"""Gene with no data from any source -> NULL score."""
df = pl.DataFrame({
"gene_id": ["ENSG00000001"],
"hpa_retina_tpm": [None],
"hpa_cerebellum_tpm": [None],
"gtex_retina_tpm": [None],
"hpa_testis_tpm": [None],
"hpa_fallopian_tube_tpm": [None],
"gtex_testis_tpm": [None],
"cellxgene_photoreceptor_expr": [None],
"cellxgene_hair_cell_expr": [None],
"tau_specificity": [None],
})
result = compute_expression_score(df)
# Both enrichment and score should be NULL
enrichment = result.select("usher_tissue_enrichment").item()
score = result.select("expression_score_normalized").item()
assert enrichment is None
assert score is None

View File

@@ -0,0 +1,170 @@
"""Integration tests for expression evidence layer.
Tests with mocked downloads and synthetic fixtures.
NO actual external API calls to HPA/GTEx/CellxGene.
"""
import polars as pl
import pytest
from pathlib import Path
from unittest.mock import Mock, patch, MagicMock
from usher_pipeline.evidence.expression.transform import process_expression_evidence
from usher_pipeline.evidence.expression.load import load_to_duckdb
from usher_pipeline.persistence import PipelineStore, ProvenanceTracker
@pytest.fixture
def temp_cache_dir(tmp_path):
"""Create temporary cache directory for downloads."""
cache_dir = tmp_path / "expression"
cache_dir.mkdir()
return cache_dir
@pytest.fixture
def mock_gene_ids():
"""Sample gene IDs for testing."""
return ["ENSG00000001", "ENSG00000002", "ENSG00000003"]
@pytest.fixture
def mock_hpa_data():
"""Synthetic HPA expression data."""
return pl.LazyFrame({
"gene_symbol": ["GENE1", "GENE2", "GENE3"],
"hpa_retina_tpm": [50.0, 10.0, None],
"hpa_cerebellum_tpm": [40.0, 10.0, 5.0],
"hpa_testis_tpm": [5.0, 50.0, 50.0],
"hpa_fallopian_tube_tpm": [5.0, 50.0, None],
})
@pytest.fixture
def mock_gtex_data(mock_gene_ids):
"""Synthetic GTEx expression data."""
return pl.LazyFrame({
"gene_id": mock_gene_ids,
"gtex_retina_tpm": [60.0, 10.0, None],
"gtex_cerebellum_tpm": [45.0, 10.0, 5.0],
"gtex_testis_tpm": [5.0, 55.0, 55.0],
"gtex_fallopian_tube_tpm": [None, None, None], # Often not available
})
@pytest.fixture
def mock_cellxgene_data(mock_gene_ids):
"""Synthetic CellxGene data (NULLs as placeholder)."""
return pl.LazyFrame({
"gene_id": mock_gene_ids,
"cellxgene_photoreceptor_expr": [None, None, None],
"cellxgene_hair_cell_expr": [None, None, None],
})
def test_process_expression_pipeline_with_mocks(
temp_cache_dir, mock_gene_ids, mock_hpa_data, mock_gtex_data, mock_cellxgene_data
):
"""Test full pipeline with mocked data sources."""
# Mock all fetch functions to return synthetic data
with patch('usher_pipeline.evidence.expression.transform.fetch_hpa_expression') as mock_hpa, \
patch('usher_pipeline.evidence.expression.transform.fetch_gtex_expression') as mock_gtex, \
patch('usher_pipeline.evidence.expression.transform.fetch_cellxgene_expression') as mock_cellxgene:
mock_hpa.return_value = mock_hpa_data
mock_gtex.return_value = mock_gtex_data
mock_cellxgene.return_value = mock_cellxgene_data
# Run pipeline (skip CellxGene for simplicity)
df = process_expression_evidence(
gene_ids=mock_gene_ids,
cache_dir=temp_cache_dir,
skip_cellxgene=True,
)
# Verify output structure
assert len(df) == len(mock_gene_ids)
assert "gene_id" in df.columns
assert "tau_specificity" in df.columns
assert "usher_tissue_enrichment" in df.columns
assert "expression_score_normalized" in df.columns
def test_checkpoint_restart(temp_cache_dir, mock_gene_ids):
"""Test checkpoint-restart: skip processing if table exists."""
# Create mock store with existing checkpoint
mock_store = Mock(spec=PipelineStore)
mock_store.has_checkpoint.return_value = True
# Mock load_dataframe to return synthetic data
existing_data = pl.DataFrame({
"gene_id": mock_gene_ids,
"tau_specificity": [0.5, 0.3, 0.2],
"usher_tissue_enrichment": [2.0, 1.0, 0.5],
"expression_score_normalized": [0.8, 0.5, 0.3],
})
mock_store.load_dataframe.return_value = existing_data
# Verify checkpoint works (would skip processing in real CLI)
assert mock_store.has_checkpoint('tissue_expression')
df = mock_store.load_dataframe('tissue_expression')
assert len(df) == len(mock_gene_ids)
def test_provenance_recording():
"""Test provenance step recording during load."""
# Create synthetic expression data
df = pl.DataFrame({
"gene_id": ["ENSG00000001", "ENSG00000002"],
"hpa_retina_tpm": [50.0, None],
"gtex_retina_tpm": [60.0, 10.0],
"cellxgene_photoreceptor_expr": [None, None],
"cellxgene_hair_cell_expr": [None, None],
"tau_specificity": [0.5, None],
"usher_tissue_enrichment": [2.0, 1.0],
"expression_score_normalized": [0.8, 0.5],
})
# Mock store and provenance tracker
mock_store = Mock(spec=PipelineStore)
mock_provenance = Mock(spec=ProvenanceTracker)
# Call load function
load_to_duckdb(
df=df,
store=mock_store,
provenance=mock_provenance,
description="Test expression data"
)
# Verify provenance step was recorded
mock_provenance.record_step.assert_called_once()
step_name, step_details = mock_provenance.record_step.call_args[0]
assert step_name == "load_tissue_expression"
assert "row_count" in step_details
assert step_details["row_count"] == 2
def test_null_expression_handling():
"""Test that genes with all NULL expression data are handled gracefully."""
df = pl.DataFrame({
"gene_id": ["ENSG00000001", "ENSG00000002"],
"hpa_retina_tpm": [None, 50.0],
"hpa_cerebellum_tpm": [None, 40.0],
"gtex_retina_tpm": [None, 60.0],
"cellxgene_photoreceptor_expr": [None, None],
"cellxgene_hair_cell_expr": [None, None],
"tau_specificity": [None, 0.5],
"usher_tissue_enrichment": [None, 2.0],
"expression_score_normalized": [None, 0.8],
})
# Mock store
mock_store = Mock(spec=PipelineStore)
mock_provenance = Mock(spec=ProvenanceTracker)
# Should not raise exception
load_to_duckdb(df, mock_store, mock_provenance)
# Verify store was called
mock_store.save_dataframe.assert_called_once()

310
tests/test_protein.py Normal file
View File

@@ -0,0 +1,310 @@
"""Unit tests for protein features evidence layer."""
from unittest.mock import Mock, patch, MagicMock
import polars as pl
import pytest
from polars.testing import assert_frame_equal
from usher_pipeline.evidence.protein.models import (
ProteinFeatureRecord,
CILIA_DOMAIN_KEYWORDS,
SCAFFOLD_DOMAIN_TYPES,
)
from usher_pipeline.evidence.protein.transform import (
extract_protein_features,
detect_cilia_motifs,
normalize_protein_features,
)
@pytest.fixture
def sample_uniprot_df():
"""Sample UniProt data for testing."""
return pl.DataFrame({
"uniprot_id": ["P12345", "Q67890", "A11111", "B22222"],
"protein_length": [500, 1200, 300, None], # None = not found
"domain_names": [
["PDZ domain", "Kinase domain"],
["IFT complex subunit", "WD40 repeat"],
["Transmembrane region"],
[],
],
"coiled_coil_count": [2, 0, 0, None],
"transmembrane_count": [0, 5, 10, None],
})
@pytest.fixture
def sample_interpro_df():
"""Sample InterPro data for testing."""
return pl.DataFrame({
"uniprot_id": ["P12345", "Q67890", "A11111", "B22222"],
"domain_names": [
["SH3 domain"],
["Ciliary targeting signal", "Ankyrin repeat"],
[],
[],
],
"interpro_ids": [
["IPR001452"],
["IPR005598", "IPR002110"],
[],
[],
],
})
def test_uniprot_feature_extraction(sample_uniprot_df, sample_interpro_df):
"""Correct parsing of length, domain, coiled-coil, TM from UniProt data."""
df = extract_protein_features(sample_uniprot_df, sample_interpro_df)
# Check P12345
p12345 = df.filter(pl.col("uniprot_id") == "P12345")
assert p12345["protein_length"][0] == 500
assert p12345["coiled_coil"][0] == True # count=2 > 0
assert p12345["coiled_coil_count"][0] == 2
assert p12345["transmembrane_count"][0] == 0
# Domain count should include both UniProt and InterPro (deduplicated)
assert p12345["domain_count"][0] == 3 # PDZ, Kinase, SH3
# Check B22222 (not found in UniProt)
b22222 = df.filter(pl.col("uniprot_id") == "B22222")
assert b22222["protein_length"][0] is None
assert b22222["coiled_coil"][0] is None
assert b22222["transmembrane_count"][0] is None
def test_cilia_motif_detection_positive():
"""Domain name containing cilia keywords sets has_cilia_domain=True."""
df = pl.DataFrame({
"uniprot_id": ["P12345"],
"protein_length": [500],
"domain_count": [2],
"coiled_coil": [False],
"coiled_coil_count": [0],
"transmembrane_count": [0],
"domain_names": [["IFT complex subunit", "Kinase domain"]],
})
result = detect_cilia_motifs(df)
assert result["has_cilia_domain"][0] == True
def test_cilia_motif_detection_negative():
"""Standard domain (e.g., Kinase) does not trigger has_cilia_domain."""
df = pl.DataFrame({
"uniprot_id": ["P12345"],
"protein_length": [500],
"domain_count": [1],
"coiled_coil": [False],
"coiled_coil_count": [0],
"transmembrane_count": [0],
"domain_names": [["Kinase domain"]],
})
result = detect_cilia_motifs(df)
assert result["has_cilia_domain"][0] == False
def test_scaffold_detection():
"""PDZ domain triggers scaffold_adaptor_domain=True."""
df = pl.DataFrame({
"uniprot_id": ["P12345"],
"protein_length": [500],
"domain_count": [1],
"coiled_coil": [False],
"coiled_coil_count": [0],
"transmembrane_count": [0],
"domain_names": [["PDZ domain"]],
})
result = detect_cilia_motifs(df)
assert result["scaffold_adaptor_domain"][0] == True
def test_null_uniprot():
"""Gene without UniProt entry has all features NULL."""
df = pl.DataFrame({
"uniprot_id": ["B22222"],
"protein_length": [None],
"domain_count": [0],
"coiled_coil": [None],
"coiled_coil_count": [None],
"transmembrane_count": [None],
"domain_names": [[]],
})
result = detect_cilia_motifs(df)
result = normalize_protein_features(result)
# All boolean flags should be NULL (not False)
assert result["has_cilia_domain"][0] is None
assert result["scaffold_adaptor_domain"][0] is None
assert result["has_sensory_domain"][0] is None
# Composite score should be NULL
assert result["protein_score_normalized"][0] is None
def test_normalization_bounds():
"""All normalized features are in [0, 1] range."""
df = pl.DataFrame({
"uniprot_id": ["P1", "P2", "P3"],
"protein_length": [100, 500, 2000],
"domain_count": [0, 5, 20],
"coiled_coil": [False, True, True],
"coiled_coil_count": [0, 2, 5],
"transmembrane_count": [0, 5, 25], # 25 gets capped at 20
"domain_names": [[], ["PDZ"], ["IFT", "Ciliary"]],
})
result = detect_cilia_motifs(df)
result = normalize_protein_features(result)
# Check all scores are in [0, 1]
for score in result["protein_score_normalized"]:
assert score is not None
assert 0.0 <= score <= 1.0
def test_composite_score_cilia_gene():
"""Gene with cilia domains scores higher than gene without."""
df = pl.DataFrame({
"uniprot_id": ["P_CILIA", "P_NOCILIA"],
"protein_length": [500, 500],
"domain_count": [5, 5],
"coiled_coil": [True, True],
"coiled_coil_count": [2, 2],
"transmembrane_count": [5, 5],
"domain_names": [
["IFT complex", "PDZ domain"], # Has cilia + scaffold
["Kinase domain", "PDZ domain"], # Only scaffold
],
})
result = detect_cilia_motifs(df)
result = normalize_protein_features(result)
cilia_score = result.filter(pl.col("uniprot_id") == "P_CILIA")["protein_score_normalized"][0]
nocilia_score = result.filter(pl.col("uniprot_id") == "P_NOCILIA")["protein_score_normalized"][0]
# Cilia gene should score higher (15% weight for has_cilia_domain)
assert cilia_score > nocilia_score
def test_composite_score_null_handling():
"""NULL UniProt produces NULL composite score (not 0.0)."""
df = pl.DataFrame({
"uniprot_id": ["P_VALID", "P_NULL"],
"protein_length": [500, None],
"domain_count": [5, 0],
"coiled_coil": [True, None],
"coiled_coil_count": [2, None],
"transmembrane_count": [5, None],
"domain_names": [["PDZ"], []],
})
result = detect_cilia_motifs(df)
result = normalize_protein_features(result)
valid_score = result.filter(pl.col("uniprot_id") == "P_VALID")["protein_score_normalized"][0]
null_score = result.filter(pl.col("uniprot_id") == "P_NULL")["protein_score_normalized"][0]
assert valid_score is not None
assert null_score is None # NOT 0.0
def test_domain_keyword_case_insensitive():
"""Cilia keyword matching is case-insensitive."""
df = pl.DataFrame({
"uniprot_id": ["P1", "P2", "P3"],
"protein_length": [500, 500, 500],
"domain_count": [1, 1, 1],
"coiled_coil": [False, False, False],
"coiled_coil_count": [0, 0, 0],
"transmembrane_count": [0, 0, 0],
"domain_names": [
["intraflagellar transport"], # lowercase
["CILIARY targeting signal"], # uppercase
["Basal Body protein"], # mixed case
],
})
result = detect_cilia_motifs(df)
# All should match
assert result["has_cilia_domain"][0] == True
assert result["has_cilia_domain"][1] == True
assert result["has_cilia_domain"][2] == True
@patch("usher_pipeline.evidence.protein.fetch.httpx.Client")
def test_fetch_uniprot_features_with_mock(mock_client_class):
"""Test UniProt fetch with mocked HTTP responses."""
from usher_pipeline.evidence.protein.fetch import fetch_uniprot_features
# Mock httpx client
mock_client = MagicMock()
mock_client_class.return_value.__enter__.return_value = mock_client
# Mock UniProt API response
mock_response = Mock()
mock_response.json.return_value = {
"results": [
{
"primaryAccession": "P12345",
"sequence": {"length": 500},
"features": [
{"type": "Domain", "description": "PDZ domain"},
{"type": "Coiled coil"},
{"type": "Transmembrane"},
],
}
]
}
mock_client.get.return_value = mock_response
# Call fetch
df = fetch_uniprot_features(["P12345"])
# Verify result
assert len(df) == 1
assert df["uniprot_id"][0] == "P12345"
assert df["protein_length"][0] == 500
assert df["coiled_coil_count"][0] == 1
assert df["transmembrane_count"][0] == 1
@patch("usher_pipeline.evidence.protein.fetch.httpx.Client")
def test_fetch_interpro_domains_with_mock(mock_client_class):
"""Test InterPro fetch with mocked HTTP responses."""
from usher_pipeline.evidence.protein.fetch import fetch_interpro_domains
# Mock httpx client
mock_client = MagicMock()
mock_client_class.return_value.__enter__.return_value = mock_client
# Mock InterPro API response
mock_response = Mock()
mock_response.json.return_value = {
"results": [
{
"metadata": {
"accession": "IPR001452",
"name": {"name": "SH3 domain"},
}
}
]
}
mock_client.get.return_value = mock_response
# Call fetch
df = fetch_interpro_domains(["P12345"])
# Verify result
assert len(df) == 1
assert df["uniprot_id"][0] == "P12345"
assert "SH3 domain" in df["domain_names"][0]
assert "IPR001452" in df["interpro_ids"][0]

View File

@@ -0,0 +1,350 @@
"""Integration tests for protein features evidence layer."""
from pathlib import Path
from unittest.mock import Mock, patch, MagicMock
import polars as pl
import pytest
from usher_pipeline.config.loader import load_config
from usher_pipeline.persistence import PipelineStore, ProvenanceTracker
from usher_pipeline.evidence.protein import (
process_protein_evidence,
load_to_duckdb,
query_cilia_candidates,
)
@pytest.fixture
def test_config(tmp_path: Path):
"""Create test configuration."""
config_path = tmp_path / "config.yaml"
config_content = f"""
data_dir: {tmp_path / "data"}
cache_dir: {tmp_path / "cache"}
duckdb_path: {tmp_path / "test.duckdb"}
versions:
ensembl_release: 113
gnomad_version: v4.1
gtex_version: v8
hpa_version: "23.0"
api:
rate_limit_per_second: 5
max_retries: 5
cache_ttl_seconds: 86400
timeout_seconds: 30
scoring:
gnomad: 0.20
expression: 0.20
annotation: 0.15
localization: 0.15
animal_model: 0.15
literature: 0.15
"""
config_path.write_text(config_content)
return load_config(config_path)
@pytest.fixture
def mock_gene_universe():
"""Mock gene universe with UniProt mappings."""
return pl.DataFrame({
"gene_id": ["ENSG00000001", "ENSG00000002", "ENSG00000003", "ENSG00000004"],
"gene_symbol": ["GENE1", "GENE2", "GENE3", "GENE4"],
"uniprot_id": ["P12345", "Q67890", "A11111", None], # GENE4 has no UniProt
})
@pytest.fixture
def mock_uniprot_response():
"""Mock UniProt API response with realistic domain structures."""
return {
"results": [
{
"primaryAccession": "P12345",
"sequence": {"length": 500},
"features": [
{"type": "Domain", "description": "PDZ domain"},
{"type": "Domain", "description": "SH3 domain"},
{"type": "Coiled coil"},
{"type": "Coiled coil"},
],
},
{
"primaryAccession": "Q67890",
"sequence": {"length": 1200},
"features": [
{"type": "Domain", "description": "IFT complex subunit"},
{"type": "Domain", "description": "WD40 repeat"},
{"type": "Transmembrane"},
{"type": "Transmembrane"},
{"type": "Transmembrane"},
],
},
{
"primaryAccession": "A11111",
"sequence": {"length": 300},
"features": [
{"type": "Domain", "description": "Kinase domain"},
],
},
]
}
@pytest.fixture
def mock_interpro_response():
"""Mock InterPro API responses per protein."""
return {
"P12345": {
"results": [
{
"metadata": {
"accession": "IPR001452",
"name": {"name": "Ankyrin repeat"},
}
}
]
},
"Q67890": {
"results": [
{
"metadata": {
"accession": "IPR005598",
"name": {"name": "Ciliary targeting signal"},
}
}
]
},
"A11111": {
"results": []
},
}
@patch("usher_pipeline.evidence.protein.fetch.httpx.Client")
@patch("usher_pipeline.evidence.protein.fetch.time.sleep") # Speed up tests
def test_full_pipeline_with_mocked_apis(
mock_sleep,
mock_client_class,
mock_gene_universe,
mock_uniprot_response,
mock_interpro_response,
):
"""Test full pipeline with mocked UniProt and InterPro APIs."""
# Mock httpx client
mock_client = MagicMock()
mock_client_class.return_value.__enter__.return_value = mock_client
# Setup mock responses
def mock_get(url, params=None):
mock_response = Mock()
# UniProt search endpoint
if "uniprot" in url and "search" in url:
mock_response.json.return_value = mock_uniprot_response
mock_response.raise_for_status = Mock()
return mock_response
# InterPro API
if "interpro" in url:
# Extract accession from URL
accession = url.split("/")[-1]
if accession in mock_interpro_response:
mock_response.json.return_value = mock_interpro_response[accession]
else:
mock_response.json.return_value = {"results": []}
mock_response.raise_for_status = Mock()
return mock_response
raise ValueError(f"Unexpected URL: {url}")
mock_client.get.side_effect = mock_get
# Run pipeline
gene_ids = mock_gene_universe.select("gene_id").to_series().to_list()
df = process_protein_evidence(gene_ids, mock_gene_universe)
# Verify results
assert len(df) == 4 # All genes present
# Check GENE1 (P12345) - has PDZ, SH3, Ankyrin (scaffold domains) + coiled-coils
gene1 = df.filter(pl.col("gene_symbol") == "GENE1")
assert gene1["uniprot_id"][0] == "P12345"
assert gene1["protein_length"][0] == 500
assert gene1["domain_count"][0] == 3 # PDZ, SH3, Ankyrin
assert gene1["coiled_coil"][0] == True
assert gene1["coiled_coil_count"][0] == 2
assert gene1["scaffold_adaptor_domain"][0] == True # Has PDZ, SH3, Ankyrin
assert gene1["protein_score_normalized"][0] is not None
# Check GENE2 (Q67890) - has IFT and ciliary domains
gene2 = df.filter(pl.col("gene_symbol") == "GENE2")
assert gene2["has_cilia_domain"][0] == True # IFT + ciliary
assert gene2["transmembrane_count"][0] == 3
# Check GENE3 (A11111) - minimal features
gene3 = df.filter(pl.col("gene_symbol") == "GENE3")
assert gene3["domain_count"][0] == 1 # Only Kinase
assert gene3["has_cilia_domain"][0] == False
# Check GENE4 (no UniProt) - all NULL
gene4 = df.filter(pl.col("gene_symbol") == "GENE4")
assert gene4["uniprot_id"][0] is None
assert gene4["protein_length"][0] is None
assert gene4["protein_score_normalized"][0] is None
def test_checkpoint_restart(tmp_path: Path, test_config, mock_gene_universe):
"""Test checkpoint-restart pattern with DuckDB."""
db_path = tmp_path / "test.duckdb"
store = PipelineStore(db_path)
provenance = ProvenanceTracker.from_config(test_config)
# Create synthetic protein features
df = pl.DataFrame({
"gene_id": ["ENSG00000001", "ENSG00000002"],
"gene_symbol": ["GENE1", "GENE2"],
"uniprot_id": ["P12345", "Q67890"],
"protein_length": [500, 1200],
"domain_count": [3, 5],
"coiled_coil": [True, False],
"coiled_coil_count": [2, 0],
"transmembrane_count": [0, 3],
"scaffold_adaptor_domain": [True, False],
"has_cilia_domain": [False, True],
"has_sensory_domain": [False, False],
"protein_score_normalized": [0.65, 0.82],
})
# Load to DuckDB
load_to_duckdb(df, store, provenance, "Test protein features")
# Verify checkpoint exists
assert store.has_checkpoint("protein_features")
# Reload data
loaded_df = store.load_dataframe("protein_features")
assert loaded_df is not None
assert len(loaded_df) == 2
assert loaded_df["gene_symbol"].to_list() == ["GENE1", "GENE2"]
# Verify provenance
checkpoints = store.list_checkpoints()
protein_checkpoint = [c for c in checkpoints if c["table_name"] == "protein_features"][0]
assert protein_checkpoint["row_count"] == 2
store.close()
def test_query_cilia_candidates(tmp_path: Path):
"""Test querying genes with cilia-associated features."""
db_path = tmp_path / "test.duckdb"
store = PipelineStore(db_path)
# Create test data with various feature combinations
df = pl.DataFrame({
"gene_id": ["ENSG00000001", "ENSG00000002", "ENSG00000003", "ENSG00000004"],
"gene_symbol": ["GENE1", "GENE2", "GENE3", "GENE4"],
"uniprot_id": ["P1", "P2", "P3", "P4"],
"protein_length": [500, 600, 700, 800],
"domain_count": [3, 4, 2, 5],
"coiled_coil": [True, True, False, True],
"transmembrane_count": [0, 2, 0, 3],
"scaffold_adaptor_domain": [True, False, True, True],
"has_cilia_domain": [False, True, False, False],
"has_sensory_domain": [False, False, False, True],
"protein_score_normalized": [0.65, 0.82, 0.45, 0.78],
})
# Load to DuckDB
store.save_dataframe(df, "protein_features", "Test data", replace=True)
# Query cilia candidates
candidates = query_cilia_candidates(store)
# Should include:
# - GENE1: has coiled_coil + scaffold_adaptor_domain
# - GENE2: has cilia_domain
# - GENE4: has coiled_coil + scaffold_adaptor_domain
# Should NOT include:
# - GENE3: has scaffold but no coiled_coil, and no cilia_domain
assert len(candidates) == 3
symbols = candidates["gene_symbol"].to_list()
assert "GENE1" in symbols
assert "GENE2" in symbols
assert "GENE4" in symbols
assert "GENE3" not in symbols
# Verify sorting by score (descending)
assert candidates["protein_score_normalized"][0] == 0.82 # GENE2
store.close()
def test_provenance_recording(tmp_path: Path, test_config):
"""Test provenance metadata is correctly recorded."""
db_path = tmp_path / "test.duckdb"
store = PipelineStore(db_path)
provenance = ProvenanceTracker.from_config(test_config)
# Create test data with known stats
df = pl.DataFrame({
"gene_id": ["ENSG00000001", "ENSG00000002", "ENSG00000003"],
"gene_symbol": ["GENE1", "GENE2", "GENE3"],
"uniprot_id": ["P1", "P2", None], # 1 without UniProt
"protein_length": [500, 600, None],
"domain_count": [3, 4, None],
"coiled_coil": [True, False, None],
"coiled_coil_count": [2, 0, None],
"transmembrane_count": [0, 2, None],
"scaffold_adaptor_domain": [True, False, None],
"has_cilia_domain": [False, True, None],
"has_sensory_domain": [False, False, None],
"protein_score_normalized": [0.65, 0.82, None],
})
# Load with provenance
load_to_duckdb(df, store, provenance, "Test protein features")
# Verify provenance step was recorded
steps = provenance.get_steps()
protein_step = [s for s in steps if s["name"] == "load_protein_features"][0]
assert protein_step["details"]["total_genes"] == 3
assert protein_step["details"]["with_uniprot"] == 2
assert protein_step["details"]["null_uniprot"] == 1
assert protein_step["details"]["cilia_domain_count"] == 1
assert protein_step["details"]["scaffold_domain_count"] == 1
assert protein_step["details"]["coiled_coil_count"] == 1
assert protein_step["details"]["transmembrane_domain_count"] == 1
store.close()
@patch("usher_pipeline.evidence.protein.fetch.httpx.Client")
@patch("usher_pipeline.evidence.protein.fetch.time.sleep")
def test_null_preservation(mock_sleep, mock_client_class, mock_gene_universe):
"""Test that NULL values are preserved (not converted to 0)."""
# Mock httpx client
mock_client = MagicMock()
mock_client_class.return_value.__enter__.return_value = mock_client
# Mock response with one protein not found
mock_response = Mock()
mock_response.json.return_value = {
"results": [] # No results for any protein
}
mock_client.get.return_value = mock_response
# Run pipeline
gene_ids = ["ENSG00000001"]
gene_map = mock_gene_universe.filter(pl.col("gene_id") == "ENSG00000001")
df = process_protein_evidence(gene_ids, gene_map)
# All protein features should be NULL (not 0)
assert df["protein_length"][0] is None
assert df["domain_count"][0] is None or df["domain_count"][0] == 0
assert df["coiled_coil"][0] is None
assert df["transmembrane_count"][0] is None
assert df["protein_score_normalized"][0] is None # Critical: NOT 0.0