110 lines
4.0 KiB
Python
110 lines
4.0 KiB
Python
from __future__ import annotations
|
|
|
|
import csv
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
from typing import Dict, Iterable, List, Sequence
|
|
|
|
from genomic_consultant.utils.models import FilterConfig, Variant
|
|
|
|
|
|
@dataclass
|
|
class GenomicStore:
|
|
"""Lightweight wrapper around annotated variants."""
|
|
|
|
variants: List[Variant]
|
|
|
|
@classmethod
|
|
def from_tsv(cls, path: Path) -> "GenomicStore":
|
|
"""
|
|
Load variants from a flattened TSV generated by the annotation plan.
|
|
Expected columns (flexible, missing columns are tolerated):
|
|
CHROM POS REF ALT SYMBOL Consequence Protein_position PolyPhen SIFT CLIN_SIG AF gnomAD_AF SpliceAI CADD_PHRED
|
|
"""
|
|
variants: List[Variant] = []
|
|
with Path(path).open() as fh:
|
|
reader = csv.DictReader(fh, delimiter="\t")
|
|
for row in reader:
|
|
row = {k: v for k, v in row.items()} if row else {}
|
|
if not row:
|
|
continue
|
|
variants.append(_row_to_variant(row))
|
|
return cls(variants=variants)
|
|
|
|
def get_variants_by_gene(
|
|
self, individual_id: str, genes: Sequence[str], filters: FilterConfig | None = None
|
|
) -> List[Variant]:
|
|
filters = filters or FilterConfig()
|
|
gene_set = {g.upper() for g in genes}
|
|
return self._apply_filters((v for v in self.variants if (v.gene or "").upper() in gene_set), filters)
|
|
|
|
def get_variants_by_region(
|
|
self, individual_id: str, chrom: str, start: int, end: int, filters: FilterConfig | None = None
|
|
) -> List[Variant]:
|
|
filters = filters or FilterConfig()
|
|
return self._apply_filters(
|
|
(v for v in self.variants if v.chrom == chrom and start <= v.pos <= end),
|
|
filters,
|
|
)
|
|
|
|
def _apply_filters(self, variants: Iterable[Variant], filters: FilterConfig) -> List[Variant]:
|
|
out: List[Variant] = []
|
|
for v in variants:
|
|
if filters.max_af is not None and v.allele_frequency is not None and v.allele_frequency > filters.max_af:
|
|
continue
|
|
if filters.min_af is not None and v.allele_frequency is not None and v.allele_frequency < filters.min_af:
|
|
continue
|
|
if filters.clinvar_significance and (v.clinvar_significance or "").lower() not in {
|
|
sig.lower() for sig in filters.clinvar_significance
|
|
}:
|
|
continue
|
|
if filters.consequence_includes and not _matches_any(v.consequence, filters.consequence_includes):
|
|
continue
|
|
if filters.consequence_excludes and _matches_any(v.consequence, filters.consequence_excludes):
|
|
continue
|
|
out.append(v)
|
|
return out
|
|
|
|
|
|
def _matches_any(value: str | None, patterns: Sequence[str]) -> bool:
|
|
if value is None:
|
|
return False
|
|
v = value.lower()
|
|
return any(pat.lower() in v for pat in patterns)
|
|
|
|
|
|
def _parse_float(val: str | None) -> float | None:
|
|
if val in (None, "", "."):
|
|
return None
|
|
try:
|
|
return float(val)
|
|
except ValueError:
|
|
return None
|
|
|
|
|
|
def _row_to_variant(row: Dict[str, str]) -> Variant:
|
|
chrom = row.get("CHROM") or row.get("#CHROM")
|
|
pos = int(row["POS"])
|
|
af = _parse_float(row.get("AF"))
|
|
gnomad_af = _parse_float(row.get("gnomAD_AF"))
|
|
splice_ai = _parse_float(row.get("SpliceAI"))
|
|
cadd = _parse_float(row.get("CADD_PHRED"))
|
|
return Variant(
|
|
chrom=chrom,
|
|
pos=pos,
|
|
ref=row.get("REF"),
|
|
alt=row.get("ALT"),
|
|
gene=row.get("SYMBOL") or None,
|
|
consequence=row.get("Consequence") or None,
|
|
protein_change=row.get("Protein_position") or None,
|
|
clinvar_significance=row.get("CLIN_SIG") or None,
|
|
allele_frequency=af if af is not None else gnomad_af,
|
|
annotations={
|
|
"polyphen": row.get("PolyPhen"),
|
|
"sift": row.get("SIFT"),
|
|
"gnomad_af": gnomad_af,
|
|
"splice_ai_delta_score": splice_ai,
|
|
"cadd_phred": cadd,
|
|
},
|
|
)
|