feat(03-04): create localization evidence data model and processing

- Define LocalizationRecord model with HPA and proteomics fields
- Implement fetch_hpa_subcellular to download HPA bulk data
- Implement fetch_cilia_proteomics with curated reference gene sets
- Implement classify_evidence_type (experimental vs computational)
- Implement score_localization with cilia proximity scoring
- Implement process_localization_evidence end-to-end pipeline
- Create load_to_duckdb for persistence with provenance
This commit is contained in:
2026-02-11 19:00:09 +08:00
parent adbb74b965
commit 6645c59b0b
5 changed files with 864 additions and 0 deletions

View File

@@ -0,0 +1,33 @@
"""Subcellular localization evidence layer.
Integrates HPA subcellular localization and published cilia/centrosome
proteomics data to score genes by proximity to cilia-related compartments.
"""
from usher_pipeline.evidence.localization.fetch import (
fetch_hpa_subcellular,
fetch_cilia_proteomics,
)
from usher_pipeline.evidence.localization.transform import (
classify_evidence_type,
score_localization,
process_localization_evidence,
)
from usher_pipeline.evidence.localization.load import (
load_to_duckdb,
)
from usher_pipeline.evidence.localization.models import (
LocalizationRecord,
LOCALIZATION_TABLE_NAME,
)
__all__ = [
"fetch_hpa_subcellular",
"fetch_cilia_proteomics",
"classify_evidence_type",
"score_localization",
"process_localization_evidence",
"load_to_duckdb",
"LocalizationRecord",
"LOCALIZATION_TABLE_NAME",
]

View File

@@ -0,0 +1,285 @@
"""Download and parse HPA subcellular localization and cilia proteomics data."""
import io
import zipfile
from pathlib import Path
from typing import Optional
import httpx
import polars as pl
import structlog
from tenacity import (
retry,
stop_after_attempt,
wait_exponential,
retry_if_exception_type,
)
from usher_pipeline.evidence.localization.models import HPA_SUBCELLULAR_URL
logger = structlog.get_logger()
# Curated reference gene sets from published proteomics studies
# CiliaCarta: comprehensive cilium proteome database
# Source: van Dam et al. (2019) PLoS ONE, Arnaiz et al. (2014) PNAS
CILIA_PROTEOMICS_GENES = {
"BBS1", "BBS2", "BBS4", "BBS5", "BBS7", "BBS9", "BBS10", "BBS12",
"CEP290", "CEP164", "CEP120", "RPGRIP1L", "TCTN1", "TCTN2", "TCTN3",
"IFT88", "IFT81", "IFT140", "IFT172", "IFT80", "IFT52", "IFT57",
"DYNC2H1", "DYNC2LI1", "WDR34", "WDR60", "TCTEX1D2",
"NPHP1", "NPHP4", "INVS", "ANKS6", "NEK8",
"ARL13B", "INPP5E", "CEP41", "TMEM67", "TMEM216", "TMEM231", "TMEM237",
"MKS1", "TMEM17", "CC2D2A", "AHI1", "RPGRIP1", "NPHP3",
"OFD1", "C5orf42", "CSPP1", "C2CD3", "CEP83", "SCLT1",
"KIF7", "GLI2", "GLI3", "SUFU", "GPR161",
"TALPID3", "B9D1", "B9D2", "MKS1", "TCTN2",
"KIAA0586", "NEK1", "DZIP1", "DZIP1L", "FUZ",
"POC5", "POC1B", "CEP135", "CEP152", "CEP192",
"ALMS1", "TTC21B", "IFT122", "IFT144", "WDR19", "WDR35",
"SPAG1", "RSPH1", "RSPH4A", "RSPH9", "DNAH5", "DNAH11", "DNAI1", "DNAI2",
"C21orf59", "CCDC39", "CCDC40", "CCDC65", "CCDC103", "CCDC114",
"DRC1", "ARMC4", "TTC25", "ZMYND10", "LRRC6", "PIH1D3",
"HYDIN", "SPEF2", "CFAP43", "CFAP44", "CFAP53", "CFAP54",
}
# Centrosome proteome database genes
# Source: Firat-Karalar & Stearns (2014), Andersen et al. (2003) MCP
CENTROSOME_PROTEOMICS_GENES = {
"PCNT", "CDK5RAP2", "CEP192", "CEP152", "CEP135", "CEP120",
"TUBG1", "TUBG2", "TUBGCP2", "TUBGCP3", "TUBGCP4", "TUBGCP5", "TUBGCP6",
"NEDD1", "AKAP9", "NINL", "NIN",
"CEP170", "CEP170B", "CEP131", "CEP63", "CEP72", "CEP97",
"PLK1", "PLK4", "AURKA", "AURKB",
"SASS6", "CENPJ", "STIL", "SAS6", "CEP152",
"POC5", "POC1A", "POC1B", "CPAP", "CEP135",
"CEP295", "OFD1", "C2CD3", "CCDC14", "CCDC67", "CCDC120",
"KIAA0753", "SSX2IP", "CEP89", "CEP104", "CEP112", "CEP128",
"CP110", "CCDC67", "CEP97", "CNTROB", "CETN2", "CETN3",
}
@retry(
stop=stop_after_attempt(5),
wait=wait_exponential(multiplier=1, min=4, max=60),
retry=retry_if_exception_type(
(httpx.HTTPStatusError, httpx.ConnectError, httpx.TimeoutException)
),
)
def download_hpa_subcellular(
output_path: Path,
url: str = HPA_SUBCELLULAR_URL,
force: bool = False,
) -> Path:
"""Download HPA subcellular location data with retry and streaming.
Downloads the HPA subcellular_location.tsv.zip file, extracts the TSV,
and saves it to the output path.
Args:
output_path: Where to save the TSV file
url: HPA subcellular location URL (default: official bulk download)
force: If True, re-download even if file exists
Returns:
Path to the downloaded TSV file
Raises:
httpx.HTTPStatusError: On HTTP errors (after retries)
httpx.ConnectError: On connection errors (after retries)
httpx.TimeoutException: On timeout (after retries)
"""
output_path = Path(output_path)
# Checkpoint pattern: skip if already downloaded
if output_path.exists() and not force:
logger.info(
"hpa_subcellular_exists",
path=str(output_path),
size_mb=round(output_path.stat().st_size / 1024 / 1024, 2),
)
return output_path
# Create parent directory if needed
output_path.parent.mkdir(parents=True, exist_ok=True)
logger.info("hpa_download_start", url=url)
# Stream download to memory (HPA file is ~10MB compressed)
with httpx.stream("GET", url, timeout=120.0, follow_redirects=True) as response:
response.raise_for_status()
# Read zip content into memory
zip_content = response.read()
logger.info("hpa_download_complete", size_mb=round(len(zip_content) / 1024 / 1024, 2))
# Extract TSV from zip
logger.info("hpa_extract_start")
with zipfile.ZipFile(io.BytesIO(zip_content)) as zf:
# Find the TSV file (should be subcellular_location.tsv)
tsv_files = [f for f in zf.namelist() if f.endswith(".tsv")]
if not tsv_files:
raise ValueError(f"No TSV file found in HPA zip: {zf.namelist()}")
tsv_filename = tsv_files[0]
logger.info("hpa_extract_file", filename=tsv_filename)
# Extract to output path
with zf.open(tsv_filename) as tsv_file:
with open(output_path, "wb") as f:
f.write(tsv_file.read())
logger.info(
"hpa_extract_complete",
path=str(output_path),
size_mb=round(output_path.stat().st_size / 1024 / 1024, 2),
)
return output_path
def fetch_hpa_subcellular(
gene_ids: list[str],
gene_symbol_map: pl.DataFrame,
cache_dir: Optional[Path] = None,
force: bool = False,
) -> pl.DataFrame:
"""Fetch HPA subcellular localization data for genes.
Downloads HPA subcellular location data, parses it, and filters to the
input gene list. Maps gene symbols to gene IDs using the provided mapping.
Args:
gene_ids: List of Ensembl gene IDs to fetch
gene_symbol_map: DataFrame with gene_id and gene_symbol columns
cache_dir: Directory to cache HPA download (default: data/localization)
force: If True, re-download HPA data
Returns:
DataFrame with columns:
- gene_id: Ensembl gene ID
- gene_symbol: HGNC symbol
- hpa_main_location: Semicolon-separated location string
- hpa_reliability: Reliability level
"""
# Default cache location
if cache_dir is None:
cache_dir = Path("data/localization")
cache_dir = Path(cache_dir)
cache_dir.mkdir(parents=True, exist_ok=True)
tsv_path = cache_dir / "hpa_subcellular_location.tsv"
# Download HPA data
logger.info("fetch_hpa_start", gene_count=len(gene_ids))
tsv_path = download_hpa_subcellular(tsv_path, force=force)
# Parse TSV with polars
logger.info("hpa_parse_start", path=str(tsv_path))
df = pl.scan_csv(
tsv_path,
separator="\t",
null_values=["", "NA"],
has_header=True,
)
# Extract relevant columns
# HPA columns: Gene, Gene name, Reliability, Main location, Additional location, Extracellular location, ...
df = df.select([
pl.col("Gene name").alias("gene_symbol"), # HGNC symbol
pl.col("Reliability").alias("hpa_reliability"),
pl.col("Main location").alias("main_location"),
pl.col("Additional location").alias("additional_location"),
pl.col("Extracellular location").alias("extracellular_location"),
])
# Combine all locations into one field (semicolon-separated)
df = df.with_columns([
pl.concat_str(
[
pl.col("main_location").fill_null(""),
pl.col("additional_location").fill_null(""),
pl.col("extracellular_location").fill_null(""),
],
separator=";",
)
.str.replace_all(";;+", ";") # Remove multiple semicolons
.str.strip_chars(";") # Remove leading/trailing semicolons
.alias("hpa_main_location")
])
# Select final columns
df = df.select([
pl.col("gene_symbol"),
pl.col("hpa_reliability"),
pl.col("hpa_main_location"),
])
# Collect to DataFrame
df = df.collect()
logger.info("hpa_parse_complete", row_count=len(df))
# Map gene symbols to gene IDs
logger.info("hpa_map_gene_ids")
df = df.join(
gene_symbol_map.select(["gene_id", "gene_symbol"]),
on="gene_symbol",
how="inner",
)
# Filter to requested gene_ids
gene_ids_set = set(gene_ids)
df = df.filter(pl.col("gene_id").is_in(gene_ids_set))
logger.info("hpa_filter_complete", row_count=len(df))
return df
def fetch_cilia_proteomics(
gene_ids: list[str],
gene_symbol_map: pl.DataFrame,
) -> pl.DataFrame:
"""Cross-reference genes against curated cilia/centrosome proteomics datasets.
Uses embedded gene sets from published studies (CiliaCarta, Centrosome-DB).
Genes not found in datasets are marked as False (not NULL), since absence
from proteomics is informative (not detected vs. not tested).
Args:
gene_ids: List of Ensembl gene IDs to check
gene_symbol_map: DataFrame with gene_id and gene_symbol columns
Returns:
DataFrame with columns:
- gene_id: Ensembl gene ID
- gene_symbol: HGNC symbol
- in_cilia_proteomics: bool (True if in CiliaCarta, False otherwise)
- in_centrosome_proteomics: bool (True if in Centrosome-DB, False otherwise)
"""
logger.info(
"fetch_proteomics_start",
gene_count=len(gene_ids),
cilia_ref_count=len(CILIA_PROTEOMICS_GENES),
centrosome_ref_count=len(CENTROSOME_PROTEOMICS_GENES),
)
# Filter gene symbol map to requested gene_ids
gene_ids_set = set(gene_ids)
df = gene_symbol_map.filter(pl.col("gene_id").is_in(gene_ids_set))
# Check membership in proteomics datasets
df = df.with_columns([
pl.col("gene_symbol").is_in(CILIA_PROTEOMICS_GENES).alias("in_cilia_proteomics"),
pl.col("gene_symbol").is_in(CENTROSOME_PROTEOMICS_GENES).alias("in_centrosome_proteomics"),
])
logger.info(
"fetch_proteomics_complete",
cilia_hits=df.filter(pl.col("in_cilia_proteomics")).height,
centrosome_hits=df.filter(pl.col("in_centrosome_proteomics")).height,
)
return df.select(["gene_id", "gene_symbol", "in_cilia_proteomics", "in_centrosome_proteomics"])

View File

@@ -0,0 +1,119 @@
"""Load localization evidence to DuckDB with provenance tracking."""
from typing import Optional
import polars as pl
import structlog
from usher_pipeline.persistence import PipelineStore, ProvenanceTracker
logger = structlog.get_logger()
def load_to_duckdb(
df: pl.DataFrame,
store: PipelineStore,
provenance: ProvenanceTracker,
description: str = ""
) -> None:
"""Save localization evidence DataFrame to DuckDB with provenance.
Creates or replaces the subcellular_localization table (idempotent).
Records provenance step with summary statistics.
Args:
df: Processed localization DataFrame with evidence types and scores
store: PipelineStore instance for DuckDB persistence
provenance: ProvenanceTracker instance for metadata recording
description: Optional description for checkpoint metadata
"""
logger.info("localization_load_start", row_count=len(df))
# Calculate summary statistics for provenance
experimental_count = df.filter(pl.col("evidence_type") == "experimental").height
computational_count = df.filter(pl.col("evidence_type") == "computational").height
both_count = df.filter(pl.col("evidence_type") == "both").height
none_count = df.filter(pl.col("evidence_type") == "none").height
cilia_compartment_count = df.filter(
(pl.col("compartment_cilia") == True) | (pl.col("compartment_centrosome") == True)
).height
mean_localization_score = df["localization_score_normalized"].mean()
# Count genes with high cilia proximity (> 0.5)
high_proximity_count = df.filter(
pl.col("cilia_proximity_score") > 0.5
).height
# Save to DuckDB with CREATE OR REPLACE (idempotent)
store.save_dataframe(
df=df,
table_name="subcellular_localization",
description=description or "HPA subcellular localization with cilia/centrosome proteomics cross-references",
replace=True
)
# Record provenance step with details
provenance.record_step("load_subcellular_localization", {
"row_count": len(df),
"experimental_count": experimental_count,
"computational_count": computational_count,
"both_count": both_count,
"none_count": none_count,
"cilia_compartment_count": cilia_compartment_count,
"high_proximity_count": high_proximity_count,
"mean_localization_score": float(mean_localization_score) if mean_localization_score is not None else None,
})
logger.info(
"localization_load_complete",
row_count=len(df),
experimental=experimental_count,
computational=computational_count,
both=both_count,
none=none_count,
cilia_compartment=cilia_compartment_count,
high_proximity=high_proximity_count,
mean_score=mean_localization_score,
)
def query_cilia_localized(
store: PipelineStore,
proximity_threshold: float = 0.5
) -> pl.DataFrame:
"""Query genes with high cilia proximity scores from DuckDB.
Demonstrates DuckDB query capability and provides helper for downstream
analysis. Returns genes with strong localization evidence for cilia-related
compartments.
Args:
store: PipelineStore instance
proximity_threshold: Minimum cilia_proximity_score (default: 0.5)
Returns:
DataFrame with cilia-localized genes sorted by localization score
Columns: gene_id, gene_symbol, evidence_type, compartment flags,
cilia_proximity_score, localization_score_normalized
"""
logger.info("localization_query_cilia", proximity_threshold=proximity_threshold)
# Query DuckDB: genes with high cilia proximity
df = store.execute_query(
"""
SELECT gene_id, gene_symbol, evidence_type,
compartment_cilia, compartment_centrosome, compartment_basal_body,
in_cilia_proteomics, in_centrosome_proteomics,
cilia_proximity_score, localization_score_normalized
FROM subcellular_localization
WHERE cilia_proximity_score > ?
ORDER BY localization_score_normalized DESC, cilia_proximity_score DESC
""",
params=[proximity_threshold]
)
logger.info("localization_query_complete", result_count=len(df))
return df

View File

@@ -0,0 +1,112 @@
"""Data models for subcellular localization evidence."""
from typing import Optional
from pydantic import BaseModel, Field
# Table name for DuckDB storage
LOCALIZATION_TABLE_NAME = "subcellular_localization"
# HPA subcellular data URL (bulk download)
HPA_SUBCELLULAR_URL = "https://www.proteinatlas.org/download/subcellular_location.tsv.zip"
# Compartment definitions for scoring
CILIA_COMPARTMENTS = [
"Cilia",
"Cilium",
"Centrosome",
"Centriole",
"Basal body",
"Microtubule organizing center",
]
CILIA_ADJACENT_COMPARTMENTS = [
"Cytoskeleton",
"Microtubules",
"Cell Junctions",
"Focal adhesion sites",
]
class LocalizationRecord(BaseModel):
"""Represents subcellular localization evidence for a gene.
Integrates HPA subcellular location data with curated cilia/centrosome
proteomics datasets to generate a cilia-proximity localization score.
Distinguishes experimental evidence (Enhanced/Supported reliability,
proteomics) from computational predictions (Approved/Uncertain reliability).
"""
# Core identifiers
gene_id: str = Field(description="Ensembl gene ID (ENSG...)")
gene_symbol: str = Field(description="HGNC gene symbol")
# HPA subcellular location data (NULL if gene not in HPA)
hpa_main_location: Optional[str] = Field(
default=None,
description="Semicolon-separated list of HPA subcellular locations"
)
hpa_reliability: Optional[str] = Field(
default=None,
description="HPA reliability level: Enhanced, Supported, Approved, Uncertain"
)
hpa_evidence_type: Optional[str] = Field(
default=None,
description="HPA evidence classification: experimental or predicted"
)
# Proteomics dataset presence (False if not found, not NULL)
in_cilia_proteomics: Optional[bool] = Field(
default=None,
description="Found in published cilium proteomics datasets (CiliaCarta, etc.)"
)
in_centrosome_proteomics: Optional[bool] = Field(
default=None,
description="Found in published centrosome proteomics datasets (Centrosome-DB, etc.)"
)
# Compartment flags (parsed from HPA locations)
compartment_cilia: Optional[bool] = Field(
default=None,
description="Localized to cilia/cilium"
)
compartment_centrosome: Optional[bool] = Field(
default=None,
description="Localized to centrosome"
)
compartment_basal_body: Optional[bool] = Field(
default=None,
description="Localized to basal body"
)
compartment_transition_zone: Optional[bool] = Field(
default=None,
description="Localized to transition zone"
)
compartment_stereocilia: Optional[bool] = Field(
default=None,
description="Localized to stereocilia"
)
# Evidence classification
evidence_type: str = Field(
description="Evidence type: experimental, computational, both, or none"
)
# Scoring
cilia_proximity_score: Optional[float] = Field(
default=None,
ge=0.0,
le=1.0,
description="Proximity to cilia-related compartments (0-1, NULL if no data)"
)
localization_score_normalized: Optional[float] = Field(
default=None,
ge=0.0,
le=1.0,
description="Normalized localization score weighted by evidence type (0-1, NULL if no data)"
)
class Config:
"""Pydantic config."""
validate_assignment = True

View File

@@ -0,0 +1,315 @@
"""Transform localization data: classify evidence type and score cilia proximity."""
import polars as pl
import structlog
from usher_pipeline.evidence.localization.models import (
CILIA_COMPARTMENTS,
CILIA_ADJACENT_COMPARTMENTS,
)
from usher_pipeline.evidence.localization.fetch import (
fetch_hpa_subcellular,
fetch_cilia_proteomics,
)
logger = structlog.get_logger()
def classify_evidence_type(df: pl.DataFrame) -> pl.DataFrame:
"""Classify localization evidence as experimental vs computational.
HPA reliability levels:
- Enhanced/Supported: experimental (antibody-based IHC with validation)
- Approved/Uncertain: computational (predicted from RNA-seq or unvalidated)
Proteomics datasets are always experimental (MS-based detection).
Evidence type categories:
- "experimental": high-confidence experimental evidence only
- "computational": predicted/uncertain only
- "both": both experimental and computational evidence
- "none": no localization data available
Args:
df: DataFrame with hpa_reliability, in_cilia_proteomics, in_centrosome_proteomics
Returns:
DataFrame with added columns:
- hpa_evidence_type: "experimental" or "predicted" (NULL if no HPA data)
- evidence_type: "experimental", "computational", "both", "none"
"""
logger.info("classify_evidence_start", row_count=len(df))
# Classify HPA evidence type based on reliability
df = df.with_columns([
pl.when(pl.col("hpa_reliability").is_in(["Enhanced", "Supported"]))
.then(pl.lit("experimental"))
.when(pl.col("hpa_reliability").is_in(["Approved", "Uncertain"]))
.then(pl.lit("predicted"))
.otherwise(None)
.alias("hpa_evidence_type")
])
# Determine overall evidence type
# Proteomics presence overrides computational HPA classification
df = df.with_columns([
pl.when(
# Has proteomics evidence
(pl.col("in_cilia_proteomics") == True) | (pl.col("in_centrosome_proteomics") == True)
).then(
# Proteomics is experimental
pl.when(pl.col("hpa_evidence_type") == "experimental")
.then(pl.lit("experimental")) # Both proteomics and HPA experimental
.when(pl.col("hpa_evidence_type") == "predicted")
.then(pl.lit("both")) # Proteomics experimental, HPA predicted
.when(pl.col("hpa_evidence_type").is_null())
.then(pl.lit("experimental")) # Only proteomics
.otherwise(pl.lit("experimental"))
).when(
# No proteomics, but has HPA
pl.col("hpa_evidence_type").is_not_null()
).then(
pl.col("hpa_evidence_type") # Use HPA classification
).otherwise(
# No proteomics, no HPA
pl.lit("none")
).alias("evidence_type")
])
logger.info(
"classify_evidence_complete",
experimental=df.filter(pl.col("evidence_type") == "experimental").height,
computational=df.filter(pl.col("evidence_type") == "computational").height,
both=df.filter(pl.col("evidence_type") == "both").height,
none=df.filter(pl.col("evidence_type") == "none").height,
)
return df
def score_localization(df: pl.DataFrame) -> pl.DataFrame:
"""Score cilia proximity based on compartment localization.
Scoring logic:
1. Parse HPA location string to identify compartments
2. Set compartment boolean flags
3. Calculate base cilia_proximity_score:
- 1.0: Direct cilia compartment (Cilia, Centrosome, Basal body, etc.)
- 0.5: Adjacent compartment (Cytoskeleton, Microtubules, etc.)
- 0.3: In proteomics but no HPA cilia location
- 0.0: No cilia-related evidence
- NULL: No localization data at all
4. Apply evidence weight:
- experimental: 1.0x
- computational: 0.6x
- both: 1.0x (experimental evidence present)
- none: NULL
5. Calculate localization_score_normalized: cilia_proximity_score * evidence_weight
Args:
df: DataFrame with hpa_main_location, in_cilia_proteomics,
in_centrosome_proteomics, evidence_type
Returns:
DataFrame with added columns:
- compartment_cilia, compartment_centrosome, etc.: boolean flags
- cilia_proximity_score: 0-1 score (NULL if no data)
- localization_score_normalized: weighted score (NULL if no data)
"""
logger.info("score_localization_start", row_count=len(df))
# Parse compartment flags from HPA location string
# Check if any CILIA_COMPARTMENTS substring appears in hpa_main_location
df = df.with_columns([
# Cilia/Cilium
pl.when(
pl.col("hpa_main_location").is_not_null()
).then(
pl.col("hpa_main_location").str.to_lowercase().str.contains("cili")
).otherwise(None).alias("compartment_cilia"),
# Centrosome
pl.when(
pl.col("hpa_main_location").is_not_null()
).then(
pl.col("hpa_main_location").str.to_lowercase().str.contains("centrosome|centriole")
).otherwise(None).alias("compartment_centrosome"),
# Basal body
pl.when(
pl.col("hpa_main_location").is_not_null()
).then(
pl.col("hpa_main_location").str.to_lowercase().str.contains("basal body")
).otherwise(None).alias("compartment_basal_body"),
# Transition zone (rare in HPA, but check)
pl.when(
pl.col("hpa_main_location").is_not_null()
).then(
pl.col("hpa_main_location").str.to_lowercase().str.contains("transition zone")
).otherwise(None).alias("compartment_transition_zone"),
# Stereocilia (hearing-related cilia)
pl.when(
pl.col("hpa_main_location").is_not_null()
).then(
pl.col("hpa_main_location").str.to_lowercase().str.contains("stereocili")
).otherwise(None).alias("compartment_stereocilia"),
])
# Check for adjacent compartments (cytoskeleton, microtubules)
df = df.with_columns([
pl.when(
pl.col("hpa_main_location").is_not_null()
).then(
pl.col("hpa_main_location").str.to_lowercase().str.contains(
"cytoskeleton|microtubule|cell junction|focal adhesion"
)
).otherwise(None).alias("has_adjacent_compartment")
])
# Calculate base cilia proximity score
df = df.with_columns([
pl.when(
# Direct cilia compartment
(pl.col("compartment_cilia") == True)
| (pl.col("compartment_centrosome") == True)
| (pl.col("compartment_basal_body") == True)
| (pl.col("compartment_transition_zone") == True)
| (pl.col("compartment_stereocilia") == True)
).then(
pl.lit(1.0) # Direct match
).when(
# Adjacent compartment only
pl.col("has_adjacent_compartment") == True
).then(
pl.lit(0.5) # Adjacent
).when(
# In proteomics but no HPA cilia location
((pl.col("in_cilia_proteomics") == True) | (pl.col("in_centrosome_proteomics") == True))
& (pl.col("hpa_main_location").is_null() | (pl.col("compartment_cilia").is_null()))
).then(
pl.lit(0.3) # Proteomics evidence without HPA confirmation
).when(
# Has HPA or proteomics data but no cilia evidence
pl.col("hpa_main_location").is_not_null()
| (pl.col("in_cilia_proteomics") == True)
| (pl.col("in_centrosome_proteomics") == True)
).then(
pl.lit(0.0) # No cilia proximity
).otherwise(
None # No data at all
).alias("cilia_proximity_score")
])
# Apply evidence type weighting
df = df.with_columns([
pl.when(
pl.col("evidence_type") == "none"
).then(
None # No evidence -> NULL score
).when(
pl.col("evidence_type") == "computational"
).then(
(pl.col("cilia_proximity_score") * 0.6).cast(pl.Float64) # Downweight computational
).when(
(pl.col("evidence_type") == "experimental") | (pl.col("evidence_type") == "both")
).then(
(pl.col("cilia_proximity_score") * 1.0).cast(pl.Float64) # Full weight for experimental
).otherwise(
pl.col("cilia_proximity_score")
).alias("localization_score_normalized")
])
# Drop temporary column
df = df.drop("has_adjacent_compartment")
logger.info(
"score_localization_complete",
direct_cilia=df.filter((pl.col("compartment_cilia") == True) | (pl.col("compartment_centrosome") == True)).height,
mean_proximity=df["cilia_proximity_score"].mean(),
mean_normalized=df["localization_score_normalized"].mean(),
)
return df
def process_localization_evidence(
gene_ids: list[str],
gene_symbol_map: pl.DataFrame,
cache_dir=None,
force: bool = False,
) -> pl.DataFrame:
"""End-to-end localization evidence processing pipeline.
Fetches HPA subcellular data and proteomics cross-references,
merges them, classifies evidence type, and scores cilia proximity.
Args:
gene_ids: List of Ensembl gene IDs
gene_symbol_map: DataFrame with gene_id and gene_symbol columns
cache_dir: Directory to cache HPA download
force: If True, re-download HPA data
Returns:
DataFrame with all LocalizationRecord fields
"""
logger.info("process_localization_start", gene_count=len(gene_ids))
# Fetch HPA subcellular data
hpa_df = fetch_hpa_subcellular(
gene_ids=gene_ids,
gene_symbol_map=gene_symbol_map,
cache_dir=cache_dir,
force=force,
)
# Fetch proteomics cross-references
proteomics_df = fetch_cilia_proteomics(
gene_ids=gene_ids,
gene_symbol_map=gene_symbol_map,
)
# Merge HPA and proteomics data
# Use full outer join to preserve genes in either dataset
df = gene_symbol_map.filter(pl.col("gene_id").is_in(gene_ids))
df = df.select(["gene_id", "gene_symbol"])
# Left join HPA data
df = df.join(
hpa_df.select(["gene_id", "hpa_main_location", "hpa_reliability"]),
on="gene_id",
how="left",
)
# Left join proteomics data
df = df.join(
proteomics_df.select(["gene_id", "in_cilia_proteomics", "in_centrosome_proteomics"]),
on="gene_id",
how="left",
)
# Fill NULL proteomics flags with False (absence is informative)
df = df.with_columns([
pl.col("in_cilia_proteomics").fill_null(False),
pl.col("in_centrosome_proteomics").fill_null(False),
])
logger.info(
"process_merge_complete",
row_count=len(df),
has_hpa=df.filter(pl.col("hpa_main_location").is_not_null()).height,
has_proteomics=df.filter(
(pl.col("in_cilia_proteomics") == True) | (pl.col("in_centrosome_proteomics") == True)
).height,
)
# Classify evidence type
df = classify_evidence_type(df)
# Score localization
df = score_localization(df)
logger.info("process_localization_complete", row_count=len(df))
return df