diff --git a/src/usher_pipeline/evidence/protein/__init__.py b/src/usher_pipeline/evidence/protein/__init__.py new file mode 100644 index 0000000..31941fa --- /dev/null +++ b/src/usher_pipeline/evidence/protein/__init__.py @@ -0,0 +1,46 @@ +"""Protein sequence and structure features evidence layer. + +Extracts protein features from UniProt and InterPro: +- Protein length, domain composition, domain count +- Coiled-coil regions, transmembrane domains, scaffold/adaptor domains +- Cilia-associated motifs detected via domain keyword matching +- Normalized composite protein score (0-1) + +Evidence follows fetch -> transform -> load pattern with checkpoint-restart. +""" + +from usher_pipeline.evidence.protein.models import ( + ProteinFeatureRecord, + PROTEIN_TABLE_NAME, + CILIA_DOMAIN_KEYWORDS, + SCAFFOLD_DOMAIN_TYPES, +) +from usher_pipeline.evidence.protein.fetch import ( + fetch_uniprot_features, + fetch_interpro_domains, +) +from usher_pipeline.evidence.protein.transform import ( + extract_protein_features, + detect_cilia_motifs, + normalize_protein_features, + process_protein_evidence, +) +from usher_pipeline.evidence.protein.load import ( + load_to_duckdb, + query_cilia_candidates, +) + +__all__ = [ + "ProteinFeatureRecord", + "PROTEIN_TABLE_NAME", + "CILIA_DOMAIN_KEYWORDS", + "SCAFFOLD_DOMAIN_TYPES", + "fetch_uniprot_features", + "fetch_interpro_domains", + "extract_protein_features", + "detect_cilia_motifs", + "normalize_protein_features", + "process_protein_evidence", + "load_to_duckdb", + "query_cilia_candidates", +] diff --git a/src/usher_pipeline/evidence/protein/fetch.py b/src/usher_pipeline/evidence/protein/fetch.py new file mode 100644 index 0000000..03b6c40 --- /dev/null +++ b/src/usher_pipeline/evidence/protein/fetch.py @@ -0,0 +1,274 @@ +"""Fetch protein features from UniProt and InterPro APIs.""" + +import time +from typing import List + +import httpx +import polars as pl +import structlog +from tenacity import ( + retry, + stop_after_attempt, + wait_exponential, + retry_if_exception_type, +) + +logger = structlog.get_logger() + +# UniProt REST API base URL +UNIPROT_API_BASE = "https://rest.uniprot.org" + +# InterPro REST API base URL +INTERPRO_API_BASE = "https://www.ebi.ac.uk/interpro/api" + + +@retry( + stop=stop_after_attempt(5), + wait=wait_exponential(multiplier=1, min=4, max=60), + retry=retry_if_exception_type( + (httpx.HTTPStatusError, httpx.ConnectError, httpx.TimeoutException) + ), +) +def fetch_uniprot_features(uniprot_ids: List[str]) -> pl.DataFrame: + """Query UniProt REST API for protein features. + + Fetches protein length, domain annotations, coiled-coil regions, + and transmembrane regions from UniProt in batches. + + Args: + uniprot_ids: List of UniProt accession IDs + + Returns: + DataFrame with columns: + - uniprot_id: UniProt accession + - protein_length: Amino acid length + - domain_names: List of domain names + - coiled_coil_count: Number of coiled-coil regions + - transmembrane_count: Number of transmembrane regions + + Raises: + httpx.HTTPStatusError: On HTTP errors (after retries) + httpx.ConnectError: On connection errors (after retries) + httpx.TimeoutException: On timeout (after retries) + """ + if not uniprot_ids: + return pl.DataFrame({ + "uniprot_id": [], + "protein_length": [], + "domain_names": [], + "coiled_coil_count": [], + "transmembrane_count": [], + }) + + logger.info("uniprot_fetch_start", accession_count=len(uniprot_ids)) + + # UniProt batch size recommendation: 100 accessions per request + batch_size = 100 + all_records = [] + + for i in range(0, len(uniprot_ids), batch_size): + batch = uniprot_ids[i:i + batch_size] + batch_num = (i // batch_size) + 1 + total_batches = (len(uniprot_ids) + batch_size - 1) // batch_size + + logger.info( + "uniprot_batch_start", + batch=batch_num, + total_batches=total_batches, + batch_size=len(batch), + ) + + # Query UniProt API with accession list + # Use search endpoint with fields parameter + query = " OR ".join(f"accession:{acc}" for acc in batch) + url = f"{UNIPROT_API_BASE}/uniprotkb/search" + params = { + "query": query, + "format": "json", + "fields": "accession,length,ft_domain,ft_coiled,ft_transmem,annotation_score", + "size": batch_size, + } + + with httpx.Client(timeout=30.0) as client: + response = client.get(url, params=params) + response.raise_for_status() + data = response.json() + + # Parse results + results = data.get("results", []) + + # Create lookup for fast access + found_accessions = set() + for entry in results: + accession = entry.get("primaryAccession") + if not accession: + continue + + found_accessions.add(accession) + + # Extract protein length + length = entry.get("sequence", {}).get("length") + + # Extract domain names from ft_domain features + domain_names = [] + for feature in entry.get("features", []): + if feature.get("type") == "Domain": + description = feature.get("description", "") + if description: + domain_names.append(description) + + # Count coiled-coil regions + coiled_coil_count = sum( + 1 for feature in entry.get("features", []) + if feature.get("type") == "Coiled coil" + ) + + # Count transmembrane regions + transmembrane_count = sum( + 1 for feature in entry.get("features", []) + if feature.get("type") == "Transmembrane" + ) + + all_records.append({ + "uniprot_id": accession, + "protein_length": length, + "domain_names": domain_names, + "coiled_coil_count": coiled_coil_count, + "transmembrane_count": transmembrane_count, + }) + + # Add NULL records for accessions not found + for acc in batch: + if acc not in found_accessions: + all_records.append({ + "uniprot_id": acc, + "protein_length": None, + "domain_names": [], + "coiled_coil_count": None, + "transmembrane_count": None, + }) + + # Rate limiting: UniProt allows 200 requests/second + # With batches of 100, this gives us 20K accessions/second + # Conservative: 5 requests/second = 500 accessions/second + time.sleep(0.2) + + logger.info( + "uniprot_fetch_complete", + total_accessions=len(uniprot_ids), + records_found=sum(1 for r in all_records if r["protein_length"] is not None), + ) + + return pl.DataFrame(all_records) + + +@retry( + stop=stop_after_attempt(5), + wait=wait_exponential(multiplier=1, min=4, max=60), + retry=retry_if_exception_type( + (httpx.HTTPStatusError, httpx.ConnectError, httpx.TimeoutException) + ), +) +def fetch_interpro_domains(uniprot_ids: List[str]) -> pl.DataFrame: + """Query InterPro API for domain annotations. + + Fetches detailed domain classification from InterPro to supplement + UniProt annotations. + + Args: + uniprot_ids: List of UniProt accession IDs + + Returns: + DataFrame with columns: + - uniprot_id: UniProt accession + - domain_names: List of InterPro domain names + - interpro_ids: List of InterPro accession IDs + + Raises: + httpx.HTTPStatusError: On HTTP errors (after retries) + httpx.ConnectError: On connection errors (after retries) + httpx.TimeoutException: On timeout (after retries) + """ + if not uniprot_ids: + return pl.DataFrame({ + "uniprot_id": [], + "domain_names": [], + "interpro_ids": [], + }) + + logger.info("interpro_fetch_start", accession_count=len(uniprot_ids)) + + # InterPro requires per-protein queries + # Use conservative rate limiting: 10 req/sec as recommended + all_records = [] + + # For large datasets (>10K proteins), log warning about potential slowness + if len(uniprot_ids) > 10000: + logger.warning( + "interpro_large_dataset", + accession_count=len(uniprot_ids), + estimated_time_min=len(uniprot_ids) / 10 / 60, + note="Consider InterPro bulk download for large datasets", + ) + + with httpx.Client(timeout=30.0) as client: + for idx, accession in enumerate(uniprot_ids, 1): + if idx % 100 == 0: + logger.info( + "interpro_progress", + processed=idx, + total=len(uniprot_ids), + percent=round(idx / len(uniprot_ids) * 100, 1), + ) + + # Query InterPro API + url = f"{INTERPRO_API_BASE}/entry/interpro/protein/uniprot/{accession}" + + try: + response = client.get(url, params={"format": "json"}) + response.raise_for_status() + data = response.json() + + # Parse InterPro entries + domain_names = [] + interpro_ids = [] + + if "results" in data: + for entry in data["results"]: + metadata = entry.get("metadata", {}) + interpro_id = metadata.get("accession") + name = metadata.get("name", {}).get("name", "") + + if interpro_id: + interpro_ids.append(interpro_id) + if name: + domain_names.append(name) + + all_records.append({ + "uniprot_id": accession, + "domain_names": domain_names, + "interpro_ids": interpro_ids, + }) + + except httpx.HTTPStatusError as e: + if e.response.status_code == 404: + # Protein not found in InterPro - this is OK + all_records.append({ + "uniprot_id": accession, + "domain_names": [], + "interpro_ids": [], + }) + else: + # Other HTTP errors - re-raise for retry + raise + + # Rate limiting: 10 requests/second + time.sleep(0.1) + + logger.info( + "interpro_fetch_complete", + total_accessions=len(uniprot_ids), + records_found=sum(1 for r in all_records if r["domain_names"]), + ) + + return pl.DataFrame(all_records) diff --git a/src/usher_pipeline/evidence/protein/load.py b/src/usher_pipeline/evidence/protein/load.py new file mode 100644 index 0000000..2f5f4d7 --- /dev/null +++ b/src/usher_pipeline/evidence/protein/load.py @@ -0,0 +1,128 @@ +"""Load protein features to DuckDB with provenance tracking.""" + +from typing import Optional + +import polars as pl +import structlog + +from usher_pipeline.persistence import PipelineStore, ProvenanceTracker + +logger = structlog.get_logger() + + +def load_to_duckdb( + df: pl.DataFrame, + store: PipelineStore, + provenance: ProvenanceTracker, + description: str = "", +) -> None: + """Save protein features DataFrame to DuckDB with provenance. + + Creates or replaces the protein_features table (idempotent). + Records provenance step with summary statistics. + + Args: + df: Processed protein features DataFrame with normalized scores + store: PipelineStore instance for DuckDB persistence + provenance: ProvenanceTracker instance for metadata recording + description: Optional description for checkpoint metadata + """ + logger.info("protein_load_start", row_count=len(df)) + + # Calculate summary statistics for provenance + total_genes = len(df) + with_uniprot = df.filter(pl.col("uniprot_id").is_not_null()).height + null_uniprot = total_genes - with_uniprot + + cilia_domain_count = df.filter(pl.col("has_cilia_domain") == True).height + scaffold_domain_count = df.filter(pl.col("scaffold_adaptor_domain") == True).height + coiled_coil_count = df.filter(pl.col("coiled_coil") == True).height + tm_domain_count = df.filter(pl.col("transmembrane_count") > 0).height + + # Mean domain count (only for proteins with data) + mean_domain_count = ( + df.filter(pl.col("domain_count").is_not_null()) + .select(pl.col("domain_count").mean()) + .item() + ) + if mean_domain_count is not None: + mean_domain_count = round(mean_domain_count, 2) + + # Save to DuckDB with CREATE OR REPLACE (idempotent) + store.save_dataframe( + df=df, + table_name="protein_features", + description=description or "Protein features from UniProt/InterPro with domain composition and cilia motif detection", + replace=True, + ) + + # Record provenance step with details + provenance.record_step("load_protein_features", { + "total_genes": total_genes, + "with_uniprot": with_uniprot, + "null_uniprot": null_uniprot, + "cilia_domain_count": cilia_domain_count, + "scaffold_domain_count": scaffold_domain_count, + "coiled_coil_count": coiled_coil_count, + "transmembrane_domain_count": tm_domain_count, + "mean_domain_count": mean_domain_count, + }) + + logger.info( + "protein_load_complete", + row_count=total_genes, + with_uniprot=with_uniprot, + cilia_domains=cilia_domain_count, + scaffold_domains=scaffold_domain_count, + coiled_coils=coiled_coil_count, + ) + + +def query_cilia_candidates( + store: PipelineStore, +) -> pl.DataFrame: + """Query genes with cilia-associated protein features. + + Identifies candidate genes with: + - Cilia domain annotations, OR + - Both coiled-coil regions AND scaffold/adaptor domains + + This combination is enriched in known cilia proteins and provides + structural evidence for potential cilia involvement. + + Args: + store: PipelineStore instance + + Returns: + DataFrame with candidate genes sorted by protein_score_normalized + Columns: gene_id, gene_symbol, protein_length, domain_count, + coiled_coil, has_cilia_domain, scaffold_adaptor_domain, + protein_score_normalized + """ + logger.info("protein_query_cilia_candidates") + + # Query DuckDB for genes with cilia features + df = store.execute_query( + """ + SELECT + gene_id, + gene_symbol, + uniprot_id, + protein_length, + domain_count, + coiled_coil, + transmembrane_count, + scaffold_adaptor_domain, + has_cilia_domain, + has_sensory_domain, + protein_score_normalized + FROM protein_features + WHERE has_cilia_domain = TRUE + OR (coiled_coil = TRUE AND scaffold_adaptor_domain = TRUE) + ORDER BY protein_score_normalized DESC NULLS LAST + """ + ) + + logger.info("protein_query_complete", result_count=len(df)) + + return df diff --git a/src/usher_pipeline/evidence/protein/models.py b/src/usher_pipeline/evidence/protein/models.py new file mode 100644 index 0000000..f29c256 --- /dev/null +++ b/src/usher_pipeline/evidence/protein/models.py @@ -0,0 +1,71 @@ +"""Data models for protein sequence and structure features.""" + +from pydantic import BaseModel + +# DuckDB table name for protein features +PROTEIN_TABLE_NAME = "protein_features" + +# Cilia-associated domain keywords for motif detection +# These are structural patterns found in known cilia proteins +# Pattern matching does NOT presuppose cilia involvement - it flags features +# associated with cilia biology for further investigation +CILIA_DOMAIN_KEYWORDS = [ + "IFT", + "intraflagellar", + "BBSome", + "ciliary", + "cilia", + "basal body", + "centrosome", + "transition zone", + "axoneme", +] + +# Scaffold and adaptor domain types commonly found in Usher proteins +# These domains mediate protein-protein interactions and are enriched in +# cilia-associated proteins +SCAFFOLD_DOMAIN_TYPES = [ + "PDZ", + "SH3", + "Ankyrin", + "WD40", + "Coiled coil", + "SAM", + "FERM", + "Harmonin", +] + + +class ProteinFeatureRecord(BaseModel): + """Protein features for a single gene. + + Attributes: + gene_id: Ensembl gene ID (e.g., ENSG00000...) + gene_symbol: HGNC gene symbol + uniprot_id: UniProt accession (NULL if no mapping) + protein_length: Amino acid length (NULL if no UniProt entry) + domain_count: Total number of annotated domains (NULL if no data) + coiled_coil: Has coiled-coil region (NULL if no data) + coiled_coil_count: Number of coiled-coil regions (NULL if no data) + transmembrane_count: Number of transmembrane regions (NULL if no data) + scaffold_adaptor_domain: Has PDZ, SH3, ankyrin, WD40, or similar scaffold domain (NULL if no data) + has_cilia_domain: Has IFT, BBSome, ciliary targeting, or transition zone domain (NULL if no data) + has_sensory_domain: Has stereocilia, photoreceptor, or sensory-associated domain (NULL if no data) + protein_score_normalized: Composite protein feature score 0-1 (NULL if no UniProt entry) + + CRITICAL: NULL values represent missing data and are preserved as None. + Do NOT convert NULL to 0.0 - "unknown" is semantically different from "no domain". + """ + + gene_id: str + gene_symbol: str + uniprot_id: str | None = None + protein_length: int | None = None + domain_count: int | None = None + coiled_coil: bool | None = None + coiled_coil_count: int | None = None + transmembrane_count: int | None = None + scaffold_adaptor_domain: bool | None = None + has_cilia_domain: bool | None = None + has_sensory_domain: bool | None = None + protein_score_normalized: float | None = None diff --git a/src/usher_pipeline/evidence/protein/transform.py b/src/usher_pipeline/evidence/protein/transform.py new file mode 100644 index 0000000..7453826 --- /dev/null +++ b/src/usher_pipeline/evidence/protein/transform.py @@ -0,0 +1,411 @@ +"""Transform and normalize protein features.""" + +from typing import List + +import polars as pl +import structlog + +from usher_pipeline.evidence.protein.models import ( + CILIA_DOMAIN_KEYWORDS, + SCAFFOLD_DOMAIN_TYPES, +) +from usher_pipeline.evidence.protein.fetch import ( + fetch_uniprot_features, + fetch_interpro_domains, +) + +logger = structlog.get_logger() + + +def extract_protein_features( + uniprot_df: pl.DataFrame, + interpro_df: pl.DataFrame, +) -> pl.DataFrame: + """Extract and combine protein features from UniProt and InterPro data. + + Joins UniProt and InterPro data, computes domain counts, sets flags + for coiled-coil and transmembrane regions. + + Args: + uniprot_df: DataFrame from fetch_uniprot_features with UniProt data + interpro_df: DataFrame from fetch_interpro_domains with InterPro data + + Returns: + DataFrame with combined features: + - uniprot_id, protein_length, domain_count, coiled_coil, + coiled_coil_count, transmembrane_count, domain_names (combined) + """ + logger.info( + "extract_protein_features_start", + uniprot_rows=len(uniprot_df), + interpro_rows=len(interpro_df), + ) + + # Join UniProt and InterPro on uniprot_id + # Use left join to preserve all UniProt entries + combined = uniprot_df.join( + interpro_df, + on="uniprot_id", + how="left", + suffix="_interpro", + ) + + # Combine domain names from both sources (deduplicate) + # Use list.concat instead of + operator + combined = combined.with_columns([ + # Combine domain name lists + pl.when( + pl.col("domain_names_interpro").is_not_null() & + (pl.col("domain_names_interpro").list.len() > 0) + ) + .then( + pl.col("domain_names").list.concat(pl.col("domain_names_interpro")) + .list.unique() + ) + .otherwise(pl.col("domain_names")) + .alias("domain_names_combined"), + ]) + + # Compute domain count from combined sources + combined = combined.with_columns([ + pl.col("domain_names_combined").list.len().alias("domain_count"), + ]) + + # Set coiled_coil boolean from count + combined = combined.with_columns([ + pl.when(pl.col("coiled_coil_count").is_not_null()) + .then(pl.col("coiled_coil_count") > 0) + .otherwise(None) + .alias("coiled_coil"), + ]) + + # Select final columns + result = combined.select([ + "uniprot_id", + "protein_length", + "domain_count", + "coiled_coil", + "coiled_coil_count", + "transmembrane_count", + pl.col("domain_names_combined").alias("domain_names"), + ]) + + logger.info( + "extract_protein_features_complete", + total_rows=len(result), + with_domains=result.filter(pl.col("domain_count") > 0).height, + ) + + return result + + +def detect_cilia_motifs(df: pl.DataFrame) -> pl.DataFrame: + """Detect cilia-associated motifs via domain keyword matching. + + Scans domain names for cilia-related keywords, scaffold/adaptor domains, + and sensory-related domains. Pattern matching does NOT presuppose cilia + involvement - it flags structural features for further investigation. + + Args: + df: DataFrame with domain_names column (list of domain names) + + Returns: + DataFrame with added columns: + - has_cilia_domain: Boolean flag for cilia-associated domains + - scaffold_adaptor_domain: Boolean flag for scaffold domains + - has_sensory_domain: Boolean flag for sensory domains + """ + logger.info("detect_cilia_motifs_start", row_count=len(df)) + + # Ensure domain_names is List(String), not List(Null) + # This handles edge case where all proteins have empty domain lists + if "domain_names" in df.columns: + current_dtype = df.schema["domain_names"] + if str(current_dtype) == "List(Null)" or "Null" in str(current_dtype): + df = df.with_columns([ + pl.col("domain_names").cast(pl.List(pl.String)).alias("domain_names") + ]) + + # Sensory domain keywords + sensory_keywords = [ + "stereocilia", + "photoreceptor", + "usher", + "harmonin", + "cadherin 23", + "protocadherin", + ] + + # Helper function to check if any keyword matches any domain (case-insensitive) + def create_keyword_matcher(keywords: List[str]) -> pl.Expr: + """Create polars expression that checks if any keyword matches any domain.""" + # For each domain in the list, check if it contains any keyword + # Case-insensitive substring matching + conditions = [] + for keyword in keywords: + conditions.append( + pl.col("domain_names") + .list.eval( + pl.when(pl.element().is_not_null()) + .then(pl.element().str.to_lowercase().str.contains(keyword.lower())) + .otherwise(False) + ) + .list.any() + ) + # Return True if ANY condition matches + if conditions: + return pl.any_horizontal(conditions) + else: + return pl.lit(False) + + # Detect cilia domains + df = df.with_columns([ + pl.when( + pl.col("domain_names").is_not_null() & + (pl.col("domain_names").list.len() > 0) + ) + .then(create_keyword_matcher(CILIA_DOMAIN_KEYWORDS)) + .otherwise(None) + .alias("has_cilia_domain"), + ]) + + # Detect scaffold/adaptor domains + df = df.with_columns([ + pl.when( + pl.col("domain_names").is_not_null() & + (pl.col("domain_names").list.len() > 0) + ) + .then(create_keyword_matcher(SCAFFOLD_DOMAIN_TYPES)) + .otherwise(None) + .alias("scaffold_adaptor_domain"), + ]) + + # Detect sensory domains + df = df.with_columns([ + pl.when( + pl.col("domain_names").is_not_null() & + (pl.col("domain_names").list.len() > 0) + ) + .then(create_keyword_matcher(sensory_keywords)) + .otherwise(None) + .alias("has_sensory_domain"), + ]) + + # Log summary + cilia_count = df.filter(pl.col("has_cilia_domain") == True).height + scaffold_count = df.filter(pl.col("scaffold_adaptor_domain") == True).height + sensory_count = df.filter(pl.col("has_sensory_domain") == True).height + + logger.info( + "detect_cilia_motifs_complete", + cilia_domain_count=cilia_count, + scaffold_domain_count=scaffold_count, + sensory_domain_count=sensory_count, + ) + + return df + + +def normalize_protein_features(df: pl.DataFrame) -> pl.DataFrame: + """Normalize protein features to 0-1 range and compute composite score. + + Normalizes continuous features (protein_length, domain_count, transmembrane_count) + and computes weighted composite score. NULL preservation: genes without UniProt + entries get NULL scores (not 0.0). + + Args: + df: DataFrame with extracted protein features + + Returns: + DataFrame with added protein_score_normalized column (0-1 range) + """ + logger.info("normalize_protein_features_start", row_count=len(df)) + + # Filter to proteins with data for normalization stats + measured = df.filter(pl.col("protein_length").is_not_null()) + + if len(measured) == 0: + # No proteins with data - return with NULL scores + return df.with_columns([ + pl.lit(None).cast(pl.Float64).alias("length_rank"), + pl.lit(None).cast(pl.Float64).alias("domain_rank"), + pl.lit(None).cast(pl.Float64).alias("transmembrane_normalized"), + pl.lit(None).cast(pl.Float64).alias("protein_score_normalized"), + ]) + + # Normalize protein length via log-transform rank percentile + # Log-transform handles skewed distribution, rank percentile gives 0-1 + df = df.with_columns([ + pl.when(pl.col("protein_length").is_not_null()) + .then( + pl.col("protein_length").log10().rank(method="average") / + measured.select(pl.col("protein_length").log10().rank(method="average").max()).item() + ) + .otherwise(None) + .alias("length_rank"), + ]) + + # Normalize domain count via rank percentile + df = df.with_columns([ + pl.when(pl.col("domain_count").is_not_null()) + .then( + pl.col("domain_count").rank(method="average") / + measured.select(pl.col("domain_count").rank(method="average").max()).item() + ) + .otherwise(None) + .alias("domain_rank"), + ]) + + # Normalize transmembrane count (cap at 20, then divide) + df = df.with_columns([ + pl.when(pl.col("transmembrane_count").is_not_null()) + .then( + pl.min_horizontal(pl.col("transmembrane_count"), pl.lit(20)) / 20.0 + ) + .otherwise(None) + .alias("transmembrane_normalized"), + ]) + + # Convert boolean flags to 0/1 for composite score + df = df.with_columns([ + pl.col("coiled_coil").cast(pl.Float64).fill_null(0.0).alias("coiled_coil_score"), + pl.col("has_cilia_domain").cast(pl.Float64).fill_null(0.0).alias("cilia_score"), + pl.col("scaffold_adaptor_domain").cast(pl.Float64).fill_null(0.0).alias("scaffold_score"), + ]) + + # Composite protein score (weighted combination) + # Weights: length 15%, domain 20%, coiled-coil 20%, TM 20%, cilia 15%, scaffold 10% + # NULL if no protein_length (i.e., no UniProt entry) + df = df.with_columns([ + pl.when(pl.col("protein_length").is_not_null()) + .then( + (0.15 * pl.col("length_rank").fill_null(0.0)) + + (0.20 * pl.col("domain_rank").fill_null(0.0)) + + (0.20 * pl.col("coiled_coil_score")) + + (0.20 * pl.col("transmembrane_normalized").fill_null(0.0)) + + (0.15 * pl.col("cilia_score")) + + (0.10 * pl.col("scaffold_score")) + ) + .otherwise(None) + .alias("protein_score_normalized"), + ]) + + # Drop temporary scoring columns + df = df.drop([ + "length_rank", + "domain_rank", + "transmembrane_normalized", + "coiled_coil_score", + "cilia_score", + "scaffold_score", + ]) + + # Log summary statistics + score_stats = df.filter(pl.col("protein_score_normalized").is_not_null()).select([ + pl.col("protein_score_normalized").min().alias("min"), + pl.col("protein_score_normalized").max().alias("max"), + pl.col("protein_score_normalized").mean().alias("mean"), + ]) + + if len(score_stats) > 0: + logger.info( + "normalize_protein_features_complete", + scored_proteins=df.filter(pl.col("protein_score_normalized").is_not_null()).height, + score_min=round(score_stats["min"][0], 3), + score_max=round(score_stats["max"][0], 3), + score_mean=round(score_stats["mean"][0], 3), + ) + else: + logger.info("normalize_protein_features_complete", scored_proteins=0) + + return df + + +def process_protein_evidence( + gene_ids: List[str], + uniprot_mapping: pl.DataFrame, +) -> pl.DataFrame: + """End-to-end protein evidence processing pipeline. + + Composes: map gene IDs -> fetch UniProt -> fetch InterPro -> + extract features -> detect motifs -> normalize -> collect + + Args: + gene_ids: List of Ensembl gene IDs + uniprot_mapping: DataFrame with gene_id, gene_symbol, uniprot_id columns + + Returns: + Materialized DataFrame ready for DuckDB storage with columns: + - gene_id, gene_symbol, uniprot_id, protein_length, domain_count, + coiled_coil, coiled_coil_count, transmembrane_count, + scaffold_adaptor_domain, has_cilia_domain, has_sensory_domain, + protein_score_normalized + """ + logger.info("process_protein_evidence_start", gene_count=len(gene_ids)) + + # Filter mapping to requested genes + gene_map = uniprot_mapping.filter(pl.col("gene_id").is_in(gene_ids)) + + # Extract UniProt IDs (filter out NULLs) + uniprot_ids = ( + gene_map + .filter(pl.col("uniprot_id").is_not_null()) + .select("uniprot_id") + .unique() + .to_series() + .to_list() + ) + + logger.info( + "uniprot_mapping_complete", + total_genes=len(gene_ids), + mapped_genes=len(uniprot_ids), + ) + + # Fetch UniProt features + uniprot_df = fetch_uniprot_features(uniprot_ids) + + # Fetch InterPro domains + interpro_df = fetch_interpro_domains(uniprot_ids) + + # Extract features + features_df = extract_protein_features(uniprot_df, interpro_df) + + # Detect cilia motifs + features_df = detect_cilia_motifs(features_df) + + # Normalize features + features_df = normalize_protein_features(features_df) + + # Join back to gene mapping to get gene_id and gene_symbol + # Use right join to preserve all genes (including those without UniProt mapping) + result = features_df.join( + gene_map.select(["gene_id", "gene_symbol", "uniprot_id"]), + on="uniprot_id", + how="right", + ) + + # Select final columns in order + result = result.select([ + "gene_id", + "gene_symbol", + "uniprot_id", + "protein_length", + "domain_count", + "coiled_coil", + "coiled_coil_count", + "transmembrane_count", + "scaffold_adaptor_domain", + "has_cilia_domain", + "has_sensory_domain", + "protein_score_normalized", + ]) + + logger.info( + "process_protein_evidence_complete", + total_genes=len(result), + with_uniprot=result.filter(pl.col("uniprot_id").is_not_null()).height, + with_scores=result.filter(pl.col("protein_score_normalized").is_not_null()).height, + ) + + return result diff --git a/src/usher_pipeline/persistence/provenance.py b/src/usher_pipeline/persistence/provenance.py index 655024e..b721259 100644 --- a/src/usher_pipeline/persistence/provenance.py +++ b/src/usher_pipeline/persistence/provenance.py @@ -37,6 +37,7 @@ class ProvenanceTracker: details: Optional dictionary of additional details """ step = { + "name": step_name, "step_name": step_name, "timestamp": datetime.now(timezone.utc).isoformat(), } @@ -44,6 +45,15 @@ class ProvenanceTracker: step["details"] = details self.processing_steps.append(step) + def get_steps(self) -> list[dict]: + """ + Get all recorded processing steps. + + Returns: + List of processing step dictionaries + """ + return self.processing_steps + def create_metadata(self) -> dict: """ Create full provenance metadata dictionary. diff --git a/tests/test_expression.py b/tests/test_expression.py new file mode 100644 index 0000000..b313a92 --- /dev/null +++ b/tests/test_expression.py @@ -0,0 +1,167 @@ +"""Unit tests for expression evidence layer. + +Tests tau calculation, enrichment scoring, and null handling with synthetic data. +NO external API calls - all data is mocked or synthetic. +""" + +import polars as pl +import pytest + +from usher_pipeline.evidence.expression.transform import ( + calculate_tau_specificity, + compute_expression_score, +) + + +def test_tau_calculation_ubiquitous(): + """Equal expression across tissues -> Tau near 0 (ubiquitous).""" + # Create synthetic data with equal expression across tissues + df = pl.DataFrame({ + "gene_id": ["ENSG00000001", "ENSG00000002"], + "tissue1": [10.0, 20.0], + "tissue2": [10.0, 20.0], + "tissue3": [10.0, 20.0], + "tissue4": [10.0, 20.0], + }) + + tissue_cols = ["tissue1", "tissue2", "tissue3", "tissue4"] + result = calculate_tau_specificity(df, tissue_cols) + + # Tau should be close to 0 for ubiquitous expression + assert "tau_specificity" in result.columns + tau_values = result.select("tau_specificity").to_series().to_list() + assert tau_values[0] == pytest.approx(0.0, abs=0.01) + assert tau_values[1] == pytest.approx(0.0, abs=0.01) + + +def test_tau_calculation_specific(): + """Expression in one tissue only -> Tau near 1 (tissue-specific).""" + # Gene expressed only in one tissue + df = pl.DataFrame({ + "gene_id": ["ENSG00000001"], + "tissue1": [100.0], + "tissue2": [0.0], + "tissue3": [0.0], + "tissue4": [0.0], + }) + + tissue_cols = ["tissue1", "tissue2", "tissue3", "tissue4"] + result = calculate_tau_specificity(df, tissue_cols) + + tau = result.select("tau_specificity").item() + # Tau = sum(1 - xi/xmax) / (n-1) = (0 + 1 + 1 + 1) / 3 = 1.0 + assert tau == pytest.approx(1.0, abs=0.01) + + +def test_tau_null_handling(): + """NULL tissue values -> NULL Tau (insufficient data).""" + df = pl.DataFrame({ + "gene_id": ["ENSG00000001", "ENSG00000002"], + "tissue1": [10.0, 20.0], + "tissue2": [None, 20.0], # NULL for gene 1 + "tissue3": [10.0, 20.0], + "tissue4": [10.0, 20.0], + }) + + tissue_cols = ["tissue1", "tissue2", "tissue3", "tissue4"] + result = calculate_tau_specificity(df, tissue_cols) + + tau_values = result.select("tau_specificity").to_series().to_list() + # Gene 1 has NULL tissue -> NULL Tau + assert tau_values[0] is None + # Gene 2 has complete data -> Tau should be valid + assert tau_values[1] is not None + + +def test_enrichment_score_high(): + """High retina expression relative to global -> high enrichment.""" + df = pl.DataFrame({ + "gene_id": ["ENSG00000001"], + "hpa_retina_tpm": [50.0], + "hpa_cerebellum_tpm": [40.0], + "gtex_retina_tpm": [60.0], + "hpa_testis_tpm": [5.0], + "hpa_fallopian_tube_tpm": [5.0], + "gtex_testis_tpm": [5.0], + "cellxgene_photoreceptor_expr": [None], + "cellxgene_hair_cell_expr": [None], + "tau_specificity": [0.5], + }) + + result = compute_expression_score(df) + + # Usher tissues (retina, cerebellum) have much higher expression than global + # Mean Usher: (50+40+60)/3 = 50 + # Mean global: (50+40+60+5+5+5)/6 = 27.5 + # Enrichment: 50/27.5 ≈ 1.82 + assert "usher_tissue_enrichment" in result.columns + enrichment = result.select("usher_tissue_enrichment").item() + assert enrichment > 1.5 # Significantly enriched + + +def test_enrichment_score_low(): + """No target tissue expression -> low enrichment.""" + df = pl.DataFrame({ + "gene_id": ["ENSG00000001"], + "hpa_retina_tpm": [5.0], + "hpa_cerebellum_tpm": [5.0], + "gtex_retina_tpm": [5.0], + "hpa_testis_tpm": [50.0], + "hpa_fallopian_tube_tpm": [50.0], + "gtex_testis_tpm": [50.0], + "cellxgene_photoreceptor_expr": [None], + "cellxgene_hair_cell_expr": [None], + "tau_specificity": [0.8], + }) + + result = compute_expression_score(df) + + enrichment = result.select("usher_tissue_enrichment").item() + assert enrichment < 1.0 # Not enriched in Usher tissues + + +def test_expression_score_normalization(): + """Composite score should be in [0, 1] range.""" + df = pl.DataFrame({ + "gene_id": ["ENSG00000001", "ENSG00000002", "ENSG00000003"], + "hpa_retina_tpm": [50.0, 10.0, 5.0], + "hpa_cerebellum_tpm": [40.0, 10.0, 5.0], + "gtex_retina_tpm": [60.0, 10.0, 5.0], + "hpa_testis_tpm": [5.0, 50.0, 50.0], + "hpa_fallopian_tube_tpm": [5.0, 50.0, 50.0], + "gtex_testis_tpm": [5.0, 50.0, 50.0], + "cellxgene_photoreceptor_expr": [None, None, None], + "cellxgene_hair_cell_expr": [None, None, None], + "tau_specificity": [0.5, 0.3, 0.2], + }) + + result = compute_expression_score(df) + + scores = result.select("expression_score_normalized").to_series().to_list() + for score in scores: + if score is not None: + assert 0.0 <= score <= 1.0, f"Score {score} out of range [0,1]" + + +def test_null_preservation_all_sources(): + """Gene with no data from any source -> NULL score.""" + df = pl.DataFrame({ + "gene_id": ["ENSG00000001"], + "hpa_retina_tpm": [None], + "hpa_cerebellum_tpm": [None], + "gtex_retina_tpm": [None], + "hpa_testis_tpm": [None], + "hpa_fallopian_tube_tpm": [None], + "gtex_testis_tpm": [None], + "cellxgene_photoreceptor_expr": [None], + "cellxgene_hair_cell_expr": [None], + "tau_specificity": [None], + }) + + result = compute_expression_score(df) + + # Both enrichment and score should be NULL + enrichment = result.select("usher_tissue_enrichment").item() + score = result.select("expression_score_normalized").item() + assert enrichment is None + assert score is None diff --git a/tests/test_expression_integration.py b/tests/test_expression_integration.py new file mode 100644 index 0000000..28e5c44 --- /dev/null +++ b/tests/test_expression_integration.py @@ -0,0 +1,170 @@ +"""Integration tests for expression evidence layer. + +Tests with mocked downloads and synthetic fixtures. +NO actual external API calls to HPA/GTEx/CellxGene. +""" + +import polars as pl +import pytest +from pathlib import Path +from unittest.mock import Mock, patch, MagicMock + +from usher_pipeline.evidence.expression.transform import process_expression_evidence +from usher_pipeline.evidence.expression.load import load_to_duckdb +from usher_pipeline.persistence import PipelineStore, ProvenanceTracker + + +@pytest.fixture +def temp_cache_dir(tmp_path): + """Create temporary cache directory for downloads.""" + cache_dir = tmp_path / "expression" + cache_dir.mkdir() + return cache_dir + + +@pytest.fixture +def mock_gene_ids(): + """Sample gene IDs for testing.""" + return ["ENSG00000001", "ENSG00000002", "ENSG00000003"] + + +@pytest.fixture +def mock_hpa_data(): + """Synthetic HPA expression data.""" + return pl.LazyFrame({ + "gene_symbol": ["GENE1", "GENE2", "GENE3"], + "hpa_retina_tpm": [50.0, 10.0, None], + "hpa_cerebellum_tpm": [40.0, 10.0, 5.0], + "hpa_testis_tpm": [5.0, 50.0, 50.0], + "hpa_fallopian_tube_tpm": [5.0, 50.0, None], + }) + + +@pytest.fixture +def mock_gtex_data(mock_gene_ids): + """Synthetic GTEx expression data.""" + return pl.LazyFrame({ + "gene_id": mock_gene_ids, + "gtex_retina_tpm": [60.0, 10.0, None], + "gtex_cerebellum_tpm": [45.0, 10.0, 5.0], + "gtex_testis_tpm": [5.0, 55.0, 55.0], + "gtex_fallopian_tube_tpm": [None, None, None], # Often not available + }) + + +@pytest.fixture +def mock_cellxgene_data(mock_gene_ids): + """Synthetic CellxGene data (NULLs as placeholder).""" + return pl.LazyFrame({ + "gene_id": mock_gene_ids, + "cellxgene_photoreceptor_expr": [None, None, None], + "cellxgene_hair_cell_expr": [None, None, None], + }) + + +def test_process_expression_pipeline_with_mocks( + temp_cache_dir, mock_gene_ids, mock_hpa_data, mock_gtex_data, mock_cellxgene_data +): + """Test full pipeline with mocked data sources.""" + # Mock all fetch functions to return synthetic data + with patch('usher_pipeline.evidence.expression.transform.fetch_hpa_expression') as mock_hpa, \ + patch('usher_pipeline.evidence.expression.transform.fetch_gtex_expression') as mock_gtex, \ + patch('usher_pipeline.evidence.expression.transform.fetch_cellxgene_expression') as mock_cellxgene: + + mock_hpa.return_value = mock_hpa_data + mock_gtex.return_value = mock_gtex_data + mock_cellxgene.return_value = mock_cellxgene_data + + # Run pipeline (skip CellxGene for simplicity) + df = process_expression_evidence( + gene_ids=mock_gene_ids, + cache_dir=temp_cache_dir, + skip_cellxgene=True, + ) + + # Verify output structure + assert len(df) == len(mock_gene_ids) + assert "gene_id" in df.columns + assert "tau_specificity" in df.columns + assert "usher_tissue_enrichment" in df.columns + assert "expression_score_normalized" in df.columns + + +def test_checkpoint_restart(temp_cache_dir, mock_gene_ids): + """Test checkpoint-restart: skip processing if table exists.""" + # Create mock store with existing checkpoint + mock_store = Mock(spec=PipelineStore) + mock_store.has_checkpoint.return_value = True + + # Mock load_dataframe to return synthetic data + existing_data = pl.DataFrame({ + "gene_id": mock_gene_ids, + "tau_specificity": [0.5, 0.3, 0.2], + "usher_tissue_enrichment": [2.0, 1.0, 0.5], + "expression_score_normalized": [0.8, 0.5, 0.3], + }) + mock_store.load_dataframe.return_value = existing_data + + # Verify checkpoint works (would skip processing in real CLI) + assert mock_store.has_checkpoint('tissue_expression') + df = mock_store.load_dataframe('tissue_expression') + assert len(df) == len(mock_gene_ids) + + +def test_provenance_recording(): + """Test provenance step recording during load.""" + # Create synthetic expression data + df = pl.DataFrame({ + "gene_id": ["ENSG00000001", "ENSG00000002"], + "hpa_retina_tpm": [50.0, None], + "gtex_retina_tpm": [60.0, 10.0], + "cellxgene_photoreceptor_expr": [None, None], + "cellxgene_hair_cell_expr": [None, None], + "tau_specificity": [0.5, None], + "usher_tissue_enrichment": [2.0, 1.0], + "expression_score_normalized": [0.8, 0.5], + }) + + # Mock store and provenance tracker + mock_store = Mock(spec=PipelineStore) + mock_provenance = Mock(spec=ProvenanceTracker) + + # Call load function + load_to_duckdb( + df=df, + store=mock_store, + provenance=mock_provenance, + description="Test expression data" + ) + + # Verify provenance step was recorded + mock_provenance.record_step.assert_called_once() + step_name, step_details = mock_provenance.record_step.call_args[0] + assert step_name == "load_tissue_expression" + assert "row_count" in step_details + assert step_details["row_count"] == 2 + + +def test_null_expression_handling(): + """Test that genes with all NULL expression data are handled gracefully.""" + df = pl.DataFrame({ + "gene_id": ["ENSG00000001", "ENSG00000002"], + "hpa_retina_tpm": [None, 50.0], + "hpa_cerebellum_tpm": [None, 40.0], + "gtex_retina_tpm": [None, 60.0], + "cellxgene_photoreceptor_expr": [None, None], + "cellxgene_hair_cell_expr": [None, None], + "tau_specificity": [None, 0.5], + "usher_tissue_enrichment": [None, 2.0], + "expression_score_normalized": [None, 0.8], + }) + + # Mock store + mock_store = Mock(spec=PipelineStore) + mock_provenance = Mock(spec=ProvenanceTracker) + + # Should not raise exception + load_to_duckdb(df, mock_store, mock_provenance) + + # Verify store was called + mock_store.save_dataframe.assert_called_once() diff --git a/tests/test_protein.py b/tests/test_protein.py new file mode 100644 index 0000000..f563754 --- /dev/null +++ b/tests/test_protein.py @@ -0,0 +1,310 @@ +"""Unit tests for protein features evidence layer.""" + +from unittest.mock import Mock, patch, MagicMock +import polars as pl +import pytest +from polars.testing import assert_frame_equal + +from usher_pipeline.evidence.protein.models import ( + ProteinFeatureRecord, + CILIA_DOMAIN_KEYWORDS, + SCAFFOLD_DOMAIN_TYPES, +) +from usher_pipeline.evidence.protein.transform import ( + extract_protein_features, + detect_cilia_motifs, + normalize_protein_features, +) + + +@pytest.fixture +def sample_uniprot_df(): + """Sample UniProt data for testing.""" + return pl.DataFrame({ + "uniprot_id": ["P12345", "Q67890", "A11111", "B22222"], + "protein_length": [500, 1200, 300, None], # None = not found + "domain_names": [ + ["PDZ domain", "Kinase domain"], + ["IFT complex subunit", "WD40 repeat"], + ["Transmembrane region"], + [], + ], + "coiled_coil_count": [2, 0, 0, None], + "transmembrane_count": [0, 5, 10, None], + }) + + +@pytest.fixture +def sample_interpro_df(): + """Sample InterPro data for testing.""" + return pl.DataFrame({ + "uniprot_id": ["P12345", "Q67890", "A11111", "B22222"], + "domain_names": [ + ["SH3 domain"], + ["Ciliary targeting signal", "Ankyrin repeat"], + [], + [], + ], + "interpro_ids": [ + ["IPR001452"], + ["IPR005598", "IPR002110"], + [], + [], + ], + }) + + +def test_uniprot_feature_extraction(sample_uniprot_df, sample_interpro_df): + """Correct parsing of length, domain, coiled-coil, TM from UniProt data.""" + df = extract_protein_features(sample_uniprot_df, sample_interpro_df) + + # Check P12345 + p12345 = df.filter(pl.col("uniprot_id") == "P12345") + assert p12345["protein_length"][0] == 500 + assert p12345["coiled_coil"][0] == True # count=2 > 0 + assert p12345["coiled_coil_count"][0] == 2 + assert p12345["transmembrane_count"][0] == 0 + # Domain count should include both UniProt and InterPro (deduplicated) + assert p12345["domain_count"][0] == 3 # PDZ, Kinase, SH3 + + # Check B22222 (not found in UniProt) + b22222 = df.filter(pl.col("uniprot_id") == "B22222") + assert b22222["protein_length"][0] is None + assert b22222["coiled_coil"][0] is None + assert b22222["transmembrane_count"][0] is None + + +def test_cilia_motif_detection_positive(): + """Domain name containing cilia keywords sets has_cilia_domain=True.""" + df = pl.DataFrame({ + "uniprot_id": ["P12345"], + "protein_length": [500], + "domain_count": [2], + "coiled_coil": [False], + "coiled_coil_count": [0], + "transmembrane_count": [0], + "domain_names": [["IFT complex subunit", "Kinase domain"]], + }) + + result = detect_cilia_motifs(df) + + assert result["has_cilia_domain"][0] == True + + +def test_cilia_motif_detection_negative(): + """Standard domain (e.g., Kinase) does not trigger has_cilia_domain.""" + df = pl.DataFrame({ + "uniprot_id": ["P12345"], + "protein_length": [500], + "domain_count": [1], + "coiled_coil": [False], + "coiled_coil_count": [0], + "transmembrane_count": [0], + "domain_names": [["Kinase domain"]], + }) + + result = detect_cilia_motifs(df) + + assert result["has_cilia_domain"][0] == False + + +def test_scaffold_detection(): + """PDZ domain triggers scaffold_adaptor_domain=True.""" + df = pl.DataFrame({ + "uniprot_id": ["P12345"], + "protein_length": [500], + "domain_count": [1], + "coiled_coil": [False], + "coiled_coil_count": [0], + "transmembrane_count": [0], + "domain_names": [["PDZ domain"]], + }) + + result = detect_cilia_motifs(df) + + assert result["scaffold_adaptor_domain"][0] == True + + +def test_null_uniprot(): + """Gene without UniProt entry has all features NULL.""" + df = pl.DataFrame({ + "uniprot_id": ["B22222"], + "protein_length": [None], + "domain_count": [0], + "coiled_coil": [None], + "coiled_coil_count": [None], + "transmembrane_count": [None], + "domain_names": [[]], + }) + + result = detect_cilia_motifs(df) + result = normalize_protein_features(result) + + # All boolean flags should be NULL (not False) + assert result["has_cilia_domain"][0] is None + assert result["scaffold_adaptor_domain"][0] is None + assert result["has_sensory_domain"][0] is None + # Composite score should be NULL + assert result["protein_score_normalized"][0] is None + + +def test_normalization_bounds(): + """All normalized features are in [0, 1] range.""" + df = pl.DataFrame({ + "uniprot_id": ["P1", "P2", "P3"], + "protein_length": [100, 500, 2000], + "domain_count": [0, 5, 20], + "coiled_coil": [False, True, True], + "coiled_coil_count": [0, 2, 5], + "transmembrane_count": [0, 5, 25], # 25 gets capped at 20 + "domain_names": [[], ["PDZ"], ["IFT", "Ciliary"]], + }) + + result = detect_cilia_motifs(df) + result = normalize_protein_features(result) + + # Check all scores are in [0, 1] + for score in result["protein_score_normalized"]: + assert score is not None + assert 0.0 <= score <= 1.0 + + +def test_composite_score_cilia_gene(): + """Gene with cilia domains scores higher than gene without.""" + df = pl.DataFrame({ + "uniprot_id": ["P_CILIA", "P_NOCILIA"], + "protein_length": [500, 500], + "domain_count": [5, 5], + "coiled_coil": [True, True], + "coiled_coil_count": [2, 2], + "transmembrane_count": [5, 5], + "domain_names": [ + ["IFT complex", "PDZ domain"], # Has cilia + scaffold + ["Kinase domain", "PDZ domain"], # Only scaffold + ], + }) + + result = detect_cilia_motifs(df) + result = normalize_protein_features(result) + + cilia_score = result.filter(pl.col("uniprot_id") == "P_CILIA")["protein_score_normalized"][0] + nocilia_score = result.filter(pl.col("uniprot_id") == "P_NOCILIA")["protein_score_normalized"][0] + + # Cilia gene should score higher (15% weight for has_cilia_domain) + assert cilia_score > nocilia_score + + +def test_composite_score_null_handling(): + """NULL UniProt produces NULL composite score (not 0.0).""" + df = pl.DataFrame({ + "uniprot_id": ["P_VALID", "P_NULL"], + "protein_length": [500, None], + "domain_count": [5, 0], + "coiled_coil": [True, None], + "coiled_coil_count": [2, None], + "transmembrane_count": [5, None], + "domain_names": [["PDZ"], []], + }) + + result = detect_cilia_motifs(df) + result = normalize_protein_features(result) + + valid_score = result.filter(pl.col("uniprot_id") == "P_VALID")["protein_score_normalized"][0] + null_score = result.filter(pl.col("uniprot_id") == "P_NULL")["protein_score_normalized"][0] + + assert valid_score is not None + assert null_score is None # NOT 0.0 + + +def test_domain_keyword_case_insensitive(): + """Cilia keyword matching is case-insensitive.""" + df = pl.DataFrame({ + "uniprot_id": ["P1", "P2", "P3"], + "protein_length": [500, 500, 500], + "domain_count": [1, 1, 1], + "coiled_coil": [False, False, False], + "coiled_coil_count": [0, 0, 0], + "transmembrane_count": [0, 0, 0], + "domain_names": [ + ["intraflagellar transport"], # lowercase + ["CILIARY targeting signal"], # uppercase + ["Basal Body protein"], # mixed case + ], + }) + + result = detect_cilia_motifs(df) + + # All should match + assert result["has_cilia_domain"][0] == True + assert result["has_cilia_domain"][1] == True + assert result["has_cilia_domain"][2] == True + + +@patch("usher_pipeline.evidence.protein.fetch.httpx.Client") +def test_fetch_uniprot_features_with_mock(mock_client_class): + """Test UniProt fetch with mocked HTTP responses.""" + from usher_pipeline.evidence.protein.fetch import fetch_uniprot_features + + # Mock httpx client + mock_client = MagicMock() + mock_client_class.return_value.__enter__.return_value = mock_client + + # Mock UniProt API response + mock_response = Mock() + mock_response.json.return_value = { + "results": [ + { + "primaryAccession": "P12345", + "sequence": {"length": 500}, + "features": [ + {"type": "Domain", "description": "PDZ domain"}, + {"type": "Coiled coil"}, + {"type": "Transmembrane"}, + ], + } + ] + } + mock_client.get.return_value = mock_response + + # Call fetch + df = fetch_uniprot_features(["P12345"]) + + # Verify result + assert len(df) == 1 + assert df["uniprot_id"][0] == "P12345" + assert df["protein_length"][0] == 500 + assert df["coiled_coil_count"][0] == 1 + assert df["transmembrane_count"][0] == 1 + + +@patch("usher_pipeline.evidence.protein.fetch.httpx.Client") +def test_fetch_interpro_domains_with_mock(mock_client_class): + """Test InterPro fetch with mocked HTTP responses.""" + from usher_pipeline.evidence.protein.fetch import fetch_interpro_domains + + # Mock httpx client + mock_client = MagicMock() + mock_client_class.return_value.__enter__.return_value = mock_client + + # Mock InterPro API response + mock_response = Mock() + mock_response.json.return_value = { + "results": [ + { + "metadata": { + "accession": "IPR001452", + "name": {"name": "SH3 domain"}, + } + } + ] + } + mock_client.get.return_value = mock_response + + # Call fetch + df = fetch_interpro_domains(["P12345"]) + + # Verify result + assert len(df) == 1 + assert df["uniprot_id"][0] == "P12345" + assert "SH3 domain" in df["domain_names"][0] + assert "IPR001452" in df["interpro_ids"][0] diff --git a/tests/test_protein_integration.py b/tests/test_protein_integration.py new file mode 100644 index 0000000..88c27b2 --- /dev/null +++ b/tests/test_protein_integration.py @@ -0,0 +1,350 @@ +"""Integration tests for protein features evidence layer.""" + +from pathlib import Path +from unittest.mock import Mock, patch, MagicMock +import polars as pl +import pytest + +from usher_pipeline.config.loader import load_config +from usher_pipeline.persistence import PipelineStore, ProvenanceTracker +from usher_pipeline.evidence.protein import ( + process_protein_evidence, + load_to_duckdb, + query_cilia_candidates, +) + + +@pytest.fixture +def test_config(tmp_path: Path): + """Create test configuration.""" + config_path = tmp_path / "config.yaml" + config_content = f""" +data_dir: {tmp_path / "data"} +cache_dir: {tmp_path / "cache"} +duckdb_path: {tmp_path / "test.duckdb"} +versions: + ensembl_release: 113 + gnomad_version: v4.1 + gtex_version: v8 + hpa_version: "23.0" +api: + rate_limit_per_second: 5 + max_retries: 5 + cache_ttl_seconds: 86400 + timeout_seconds: 30 +scoring: + gnomad: 0.20 + expression: 0.20 + annotation: 0.15 + localization: 0.15 + animal_model: 0.15 + literature: 0.15 +""" + config_path.write_text(config_content) + return load_config(config_path) + + +@pytest.fixture +def mock_gene_universe(): + """Mock gene universe with UniProt mappings.""" + return pl.DataFrame({ + "gene_id": ["ENSG00000001", "ENSG00000002", "ENSG00000003", "ENSG00000004"], + "gene_symbol": ["GENE1", "GENE2", "GENE3", "GENE4"], + "uniprot_id": ["P12345", "Q67890", "A11111", None], # GENE4 has no UniProt + }) + + +@pytest.fixture +def mock_uniprot_response(): + """Mock UniProt API response with realistic domain structures.""" + return { + "results": [ + { + "primaryAccession": "P12345", + "sequence": {"length": 500}, + "features": [ + {"type": "Domain", "description": "PDZ domain"}, + {"type": "Domain", "description": "SH3 domain"}, + {"type": "Coiled coil"}, + {"type": "Coiled coil"}, + ], + }, + { + "primaryAccession": "Q67890", + "sequence": {"length": 1200}, + "features": [ + {"type": "Domain", "description": "IFT complex subunit"}, + {"type": "Domain", "description": "WD40 repeat"}, + {"type": "Transmembrane"}, + {"type": "Transmembrane"}, + {"type": "Transmembrane"}, + ], + }, + { + "primaryAccession": "A11111", + "sequence": {"length": 300}, + "features": [ + {"type": "Domain", "description": "Kinase domain"}, + ], + }, + ] + } + + +@pytest.fixture +def mock_interpro_response(): + """Mock InterPro API responses per protein.""" + return { + "P12345": { + "results": [ + { + "metadata": { + "accession": "IPR001452", + "name": {"name": "Ankyrin repeat"}, + } + } + ] + }, + "Q67890": { + "results": [ + { + "metadata": { + "accession": "IPR005598", + "name": {"name": "Ciliary targeting signal"}, + } + } + ] + }, + "A11111": { + "results": [] + }, + } + + +@patch("usher_pipeline.evidence.protein.fetch.httpx.Client") +@patch("usher_pipeline.evidence.protein.fetch.time.sleep") # Speed up tests +def test_full_pipeline_with_mocked_apis( + mock_sleep, + mock_client_class, + mock_gene_universe, + mock_uniprot_response, + mock_interpro_response, +): + """Test full pipeline with mocked UniProt and InterPro APIs.""" + # Mock httpx client + mock_client = MagicMock() + mock_client_class.return_value.__enter__.return_value = mock_client + + # Setup mock responses + def mock_get(url, params=None): + mock_response = Mock() + + # UniProt search endpoint + if "uniprot" in url and "search" in url: + mock_response.json.return_value = mock_uniprot_response + mock_response.raise_for_status = Mock() + return mock_response + + # InterPro API + if "interpro" in url: + # Extract accession from URL + accession = url.split("/")[-1] + if accession in mock_interpro_response: + mock_response.json.return_value = mock_interpro_response[accession] + else: + mock_response.json.return_value = {"results": []} + mock_response.raise_for_status = Mock() + return mock_response + + raise ValueError(f"Unexpected URL: {url}") + + mock_client.get.side_effect = mock_get + + # Run pipeline + gene_ids = mock_gene_universe.select("gene_id").to_series().to_list() + df = process_protein_evidence(gene_ids, mock_gene_universe) + + # Verify results + assert len(df) == 4 # All genes present + + # Check GENE1 (P12345) - has PDZ, SH3, Ankyrin (scaffold domains) + coiled-coils + gene1 = df.filter(pl.col("gene_symbol") == "GENE1") + assert gene1["uniprot_id"][0] == "P12345" + assert gene1["protein_length"][0] == 500 + assert gene1["domain_count"][0] == 3 # PDZ, SH3, Ankyrin + assert gene1["coiled_coil"][0] == True + assert gene1["coiled_coil_count"][0] == 2 + assert gene1["scaffold_adaptor_domain"][0] == True # Has PDZ, SH3, Ankyrin + assert gene1["protein_score_normalized"][0] is not None + + # Check GENE2 (Q67890) - has IFT and ciliary domains + gene2 = df.filter(pl.col("gene_symbol") == "GENE2") + assert gene2["has_cilia_domain"][0] == True # IFT + ciliary + assert gene2["transmembrane_count"][0] == 3 + + # Check GENE3 (A11111) - minimal features + gene3 = df.filter(pl.col("gene_symbol") == "GENE3") + assert gene3["domain_count"][0] == 1 # Only Kinase + assert gene3["has_cilia_domain"][0] == False + + # Check GENE4 (no UniProt) - all NULL + gene4 = df.filter(pl.col("gene_symbol") == "GENE4") + assert gene4["uniprot_id"][0] is None + assert gene4["protein_length"][0] is None + assert gene4["protein_score_normalized"][0] is None + + +def test_checkpoint_restart(tmp_path: Path, test_config, mock_gene_universe): + """Test checkpoint-restart pattern with DuckDB.""" + db_path = tmp_path / "test.duckdb" + store = PipelineStore(db_path) + provenance = ProvenanceTracker.from_config(test_config) + + # Create synthetic protein features + df = pl.DataFrame({ + "gene_id": ["ENSG00000001", "ENSG00000002"], + "gene_symbol": ["GENE1", "GENE2"], + "uniprot_id": ["P12345", "Q67890"], + "protein_length": [500, 1200], + "domain_count": [3, 5], + "coiled_coil": [True, False], + "coiled_coil_count": [2, 0], + "transmembrane_count": [0, 3], + "scaffold_adaptor_domain": [True, False], + "has_cilia_domain": [False, True], + "has_sensory_domain": [False, False], + "protein_score_normalized": [0.65, 0.82], + }) + + # Load to DuckDB + load_to_duckdb(df, store, provenance, "Test protein features") + + # Verify checkpoint exists + assert store.has_checkpoint("protein_features") + + # Reload data + loaded_df = store.load_dataframe("protein_features") + assert loaded_df is not None + assert len(loaded_df) == 2 + assert loaded_df["gene_symbol"].to_list() == ["GENE1", "GENE2"] + + # Verify provenance + checkpoints = store.list_checkpoints() + protein_checkpoint = [c for c in checkpoints if c["table_name"] == "protein_features"][0] + assert protein_checkpoint["row_count"] == 2 + + store.close() + + +def test_query_cilia_candidates(tmp_path: Path): + """Test querying genes with cilia-associated features.""" + db_path = tmp_path / "test.duckdb" + store = PipelineStore(db_path) + + # Create test data with various feature combinations + df = pl.DataFrame({ + "gene_id": ["ENSG00000001", "ENSG00000002", "ENSG00000003", "ENSG00000004"], + "gene_symbol": ["GENE1", "GENE2", "GENE3", "GENE4"], + "uniprot_id": ["P1", "P2", "P3", "P4"], + "protein_length": [500, 600, 700, 800], + "domain_count": [3, 4, 2, 5], + "coiled_coil": [True, True, False, True], + "transmembrane_count": [0, 2, 0, 3], + "scaffold_adaptor_domain": [True, False, True, True], + "has_cilia_domain": [False, True, False, False], + "has_sensory_domain": [False, False, False, True], + "protein_score_normalized": [0.65, 0.82, 0.45, 0.78], + }) + + # Load to DuckDB + store.save_dataframe(df, "protein_features", "Test data", replace=True) + + # Query cilia candidates + candidates = query_cilia_candidates(store) + + # Should include: + # - GENE1: has coiled_coil + scaffold_adaptor_domain + # - GENE2: has cilia_domain + # - GENE4: has coiled_coil + scaffold_adaptor_domain + # Should NOT include: + # - GENE3: has scaffold but no coiled_coil, and no cilia_domain + + assert len(candidates) == 3 + symbols = candidates["gene_symbol"].to_list() + assert "GENE1" in symbols + assert "GENE2" in symbols + assert "GENE4" in symbols + assert "GENE3" not in symbols + + # Verify sorting by score (descending) + assert candidates["protein_score_normalized"][0] == 0.82 # GENE2 + + store.close() + + +def test_provenance_recording(tmp_path: Path, test_config): + """Test provenance metadata is correctly recorded.""" + db_path = tmp_path / "test.duckdb" + store = PipelineStore(db_path) + provenance = ProvenanceTracker.from_config(test_config) + + # Create test data with known stats + df = pl.DataFrame({ + "gene_id": ["ENSG00000001", "ENSG00000002", "ENSG00000003"], + "gene_symbol": ["GENE1", "GENE2", "GENE3"], + "uniprot_id": ["P1", "P2", None], # 1 without UniProt + "protein_length": [500, 600, None], + "domain_count": [3, 4, None], + "coiled_coil": [True, False, None], + "coiled_coil_count": [2, 0, None], + "transmembrane_count": [0, 2, None], + "scaffold_adaptor_domain": [True, False, None], + "has_cilia_domain": [False, True, None], + "has_sensory_domain": [False, False, None], + "protein_score_normalized": [0.65, 0.82, None], + }) + + # Load with provenance + load_to_duckdb(df, store, provenance, "Test protein features") + + # Verify provenance step was recorded + steps = provenance.get_steps() + protein_step = [s for s in steps if s["name"] == "load_protein_features"][0] + + assert protein_step["details"]["total_genes"] == 3 + assert protein_step["details"]["with_uniprot"] == 2 + assert protein_step["details"]["null_uniprot"] == 1 + assert protein_step["details"]["cilia_domain_count"] == 1 + assert protein_step["details"]["scaffold_domain_count"] == 1 + assert protein_step["details"]["coiled_coil_count"] == 1 + assert protein_step["details"]["transmembrane_domain_count"] == 1 + + store.close() + + +@patch("usher_pipeline.evidence.protein.fetch.httpx.Client") +@patch("usher_pipeline.evidence.protein.fetch.time.sleep") +def test_null_preservation(mock_sleep, mock_client_class, mock_gene_universe): + """Test that NULL values are preserved (not converted to 0).""" + # Mock httpx client + mock_client = MagicMock() + mock_client_class.return_value.__enter__.return_value = mock_client + + # Mock response with one protein not found + mock_response = Mock() + mock_response.json.return_value = { + "results": [] # No results for any protein + } + mock_client.get.return_value = mock_response + + # Run pipeline + gene_ids = ["ENSG00000001"] + gene_map = mock_gene_universe.filter(pl.col("gene_id") == "ENSG00000001") + df = process_protein_evidence(gene_ids, gene_map) + + # All protein features should be NULL (not 0) + assert df["protein_length"][0] is None + assert df["domain_count"][0] is None or df["domain_count"][0] == 0 + assert df["coiled_coil"][0] is None + assert df["transmembrane_count"][0] is None + assert df["protein_score_normalized"][0] is None # Critical: NOT 0.0