Files
genomic-consultant/clinvar_acmg_annotate.py
gbanyan d13d58df8b Refactor: Replace scaffolding with working analysis scripts
- Add trio_analysis.py for trio-based variant analysis with de novo detection
- Add clinvar_acmg_annotate.py for ClinVar/ACMG annotation
- Add gwas_comprehensive.py with 201 SNPs across 18 categories
- Add pharmgkb_full_analysis.py for pharmacogenomics analysis
- Add gwas_trait_lookup.py for basic GWAS trait lookup
- Add pharmacogenomics.py for basic PGx analysis
- Remove unused scaffolding code (src/, configs/, docs/, tests/)
- Update README.md with new documentation

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-12-01 22:36:02 +08:00

449 lines
16 KiB
Python

#!/usr/bin/env python3
"""
ClinVar Annotation and ACMG Classification Script
Integrates ClinVar lookup with ACMG auto-classification for trio analysis.
"""
import gzip
import re
import sys
from collections import defaultdict
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Set, Tuple
from pathlib import Path
# Add project src to path
sys.path.insert(0, str(Path(__file__).parent / "src"))
try:
from genomic_consultant.acmg.tagger import ACMGConfig, tag_variant, _is_lof
from genomic_consultant.utils.models import Variant, EvidenceTag, SuggestedClassification
HAS_PROJECT_MODULES = True
except ImportError:
HAS_PROJECT_MODULES = False
print("Warning: Project modules not found, using built-in ACMG classification")
@dataclass
class ClinVarEntry:
"""ClinVar database entry"""
chrom: str
pos: int
ref: str
alt: str
clnsig: str # Clinical significance
clndn: str # Disease name
clnrevstat: str # Review status
clnvc: str # Variant type
af: Optional[float] = None
@dataclass
class AnnotatedVariant:
"""Variant with all annotations"""
chrom: str
pos: int
ref: str
alt: str
gene: Optional[str] = None
effect: Optional[str] = None
impact: Optional[str] = None
genotypes: Dict[str, str] = field(default_factory=dict)
clinvar_sig: Optional[str] = None
clinvar_disease: Optional[str] = None
clinvar_review: Optional[str] = None
acmg_class: Optional[str] = None
acmg_evidence: List[str] = field(default_factory=list)
inheritance_pattern: Optional[str] = None # de_novo, compound_het, hom_rec, etc.
@property
def variant_id(self) -> str:
return f"{self.chrom}-{self.pos}-{self.ref}-{self.alt}"
def load_clinvar_vcf(clinvar_path: str) -> Dict[str, ClinVarEntry]:
"""Load ClinVar VCF into a lookup dictionary"""
print(f"Loading ClinVar database from {clinvar_path}...")
clinvar_db = {}
open_func = gzip.open if clinvar_path.endswith('.gz') else open
mode = 'rt' if clinvar_path.endswith('.gz') else 'r'
count = 0
with open_func(clinvar_path, mode) as f:
for line in f:
if line.startswith('#'):
continue
parts = line.strip().split('\t')
if len(parts) < 8:
continue
chrom, pos, _, ref, alt, _, _, info = parts[:8]
# Parse INFO field
info_dict = {}
for item in info.split(';'):
if '=' in item:
k, v = item.split('=', 1)
info_dict[k] = v
clnsig = info_dict.get('CLNSIG', '')
clndn = info_dict.get('CLNDN', '')
clnrevstat = info_dict.get('CLNREVSTAT', '')
clnvc = info_dict.get('CLNVC', '')
# Handle multiple alts
for a in alt.split(','):
key = f"{chrom}-{pos}-{ref}-{a}"
clinvar_db[key] = ClinVarEntry(
chrom=chrom,
pos=int(pos),
ref=ref,
alt=a,
clnsig=clnsig,
clndn=clndn,
clnrevstat=clnrevstat,
clnvc=clnvc
)
count += 1
print(f"Loaded {count} ClinVar entries")
return clinvar_db
def parse_snpeff_annotation(info: str) -> Dict:
"""Parse SnpEff ANN field"""
result = {
'gene': None,
'effect': None,
'impact': None,
'hgvs_c': None,
'hgvs_p': None,
}
ann_match = re.search(r'ANN=([^;]+)', info)
if not ann_match:
return result
ann_field = ann_match.group(1)
annotations = ann_field.split(',')
if annotations:
parts = annotations[0].split('|')
if len(parts) >= 4:
result['effect'] = parts[1] if len(parts) > 1 else None
result['impact'] = parts[2] if len(parts) > 2 else None
result['gene'] = parts[3] if len(parts) > 3 else None
if len(parts) > 9:
result['hgvs_c'] = parts[9]
if len(parts) > 10:
result['hgvs_p'] = parts[10]
return result
def get_genotype_class(gt: str) -> str:
"""Classify genotype"""
if gt in ['./.', '.|.', '.']:
return 'MISSING'
alleles = re.split('[/|]', gt)
if all(a == '0' for a in alleles):
return 'HOM_REF'
elif all(a != '0' and a != '.' for a in alleles):
return 'HOM_ALT'
else:
return 'HET'
class ACMGClassifier:
"""ACMG variant classifier"""
def __init__(self, lof_genes: Optional[Set[str]] = None):
self.lof_genes = lof_genes or {
'BRCA1', 'BRCA2', 'TP53', 'PTEN', 'MLH1', 'MSH2', 'MSH6', 'PMS2',
'APC', 'MEN1', 'RB1', 'VHL', 'WT1', 'NF1', 'NF2', 'TSC1', 'TSC2'
}
self.ba1_af = 0.05
self.bs1_af = 0.01
self.pm2_af = 0.0005
def classify(self, variant: AnnotatedVariant, is_de_novo: bool = False) -> Tuple[str, List[str]]:
"""Apply ACMG classification rules"""
evidence = []
# ClinVar evidence
if variant.clinvar_sig:
sig_lower = variant.clinvar_sig.lower()
if 'pathogenic' in sig_lower and 'likely' not in sig_lower:
evidence.append("PP5: ClinVar pathogenic")
elif 'likely_pathogenic' in sig_lower:
evidence.append("PP5: ClinVar likely pathogenic")
elif 'benign' in sig_lower and 'likely' not in sig_lower:
evidence.append("BP6: ClinVar benign")
elif 'likely_benign' in sig_lower:
evidence.append("BP6: ClinVar likely benign")
# Loss of function in LoF-sensitive gene (PVS1)
if variant.effect and variant.gene:
lof_keywords = ['frameshift', 'stop_gained', 'splice_acceptor', 'splice_donor', 'start_lost']
if any(k in variant.effect.lower() for k in lof_keywords):
if variant.gene.upper() in self.lof_genes:
evidence.append("PVS1: Null variant in LoF-sensitive gene")
else:
evidence.append("PVS1_moderate: Null variant (gene not confirmed LoF-sensitive)")
# De novo (PS2)
if is_de_novo:
evidence.append("PS2: De novo variant")
# Impact-based evidence
if variant.impact == 'HIGH':
evidence.append("PM4: Protein length change (HIGH impact)")
elif variant.impact == 'MODERATE':
if variant.effect and 'missense' in variant.effect.lower():
evidence.append("PP3: Computational evidence (missense)")
# Determine final classification
classification = self._determine_class(evidence, variant.clinvar_sig)
return classification, evidence
def _determine_class(self, evidence: List[str], clinvar_sig: Optional[str]) -> str:
"""Determine ACMG class based on evidence"""
evidence_str = ' '.join(evidence)
# ClinVar takes precedence if high confidence
if clinvar_sig:
sig_lower = clinvar_sig.lower()
if 'pathogenic' in sig_lower and 'conflicting' not in sig_lower:
if 'likely' in sig_lower:
return 'Likely Pathogenic'
return 'Pathogenic'
elif 'benign' in sig_lower and 'conflicting' not in sig_lower:
if 'likely' in sig_lower:
return 'Likely Benign'
return 'Benign'
# Rule-based classification
has_pvs1 = 'PVS1:' in evidence_str
has_ps2 = 'PS2:' in evidence_str
has_pm4 = 'PM4:' in evidence_str
has_pp = 'PP' in evidence_str
has_bp = 'BP' in evidence_str
if has_pvs1 and has_ps2:
return 'Pathogenic'
elif has_pvs1 or (has_ps2 and has_pm4):
return 'Likely Pathogenic'
elif has_bp and not has_pp and not has_pvs1:
return 'Likely Benign'
else:
return 'VUS'
def analyze_trio_with_clinvar(
snpeff_vcf: str,
clinvar_path: str,
output_path: str,
proband_idx: int = 0,
father_idx: int = 1,
mother_idx: int = 2
):
"""Main analysis function"""
# Load ClinVar
clinvar_db = load_clinvar_vcf(clinvar_path)
# Initialize classifier
classifier = ACMGClassifier()
# Parse VCF and annotate
print(f"Processing {snpeff_vcf}...")
samples = []
results = []
pathogenic_variants = []
open_func = gzip.open if snpeff_vcf.endswith('.gz') else open
mode = 'rt' if snpeff_vcf.endswith('.gz') else 'r'
with open_func(snpeff_vcf, mode) as f:
for line in f:
if line.startswith('##'):
continue
elif line.startswith('#CHROM'):
parts = line.strip().split('\t')
samples = parts[9:]
continue
parts = line.strip().split('\t')
if len(parts) < 10:
continue
chrom, pos, _, ref, alt, qual, filt, info, fmt = parts[:9]
gt_fields = parts[9:]
# Parse genotypes
fmt_parts = fmt.split(':')
gt_idx = fmt_parts.index('GT') if 'GT' in fmt_parts else 0
genotypes = {}
for i, sample in enumerate(samples):
gt_data = gt_fields[i].split(':')
genotypes[sample] = gt_data[gt_idx] if gt_idx < len(gt_data) else './.'
# Parse SnpEff annotation
ann = parse_snpeff_annotation(info)
# Only process variants in proband
proband = samples[proband_idx] if proband_idx < len(samples) else samples[0]
proband_gt = get_genotype_class(genotypes.get(proband, './.'))
if proband_gt == 'HOM_REF' or proband_gt == 'MISSING':
continue
# Check inheritance pattern
father = samples[father_idx] if father_idx < len(samples) else samples[1]
mother = samples[mother_idx] if mother_idx < len(samples) else samples[2]
father_gt = get_genotype_class(genotypes.get(father, './.'))
mother_gt = get_genotype_class(genotypes.get(mother, './.'))
is_de_novo = (proband_gt in ['HET', 'HOM_ALT'] and
father_gt == 'HOM_REF' and mother_gt == 'HOM_REF')
is_hom_rec = (proband_gt == 'HOM_ALT' and
father_gt == 'HET' and mother_gt == 'HET')
inheritance = None
if is_de_novo:
inheritance = 'de_novo'
elif is_hom_rec:
inheritance = 'homozygous_recessive'
elif proband_gt == 'HET':
if father_gt in ['HET', 'HOM_ALT'] and mother_gt == 'HOM_REF':
inheritance = 'paternal'
elif mother_gt in ['HET', 'HOM_ALT'] and father_gt == 'HOM_REF':
inheritance = 'maternal'
# Lookup ClinVar
for a in alt.split(','):
var_key = f"{chrom}-{pos}-{ref}-{a}"
clinvar_entry = clinvar_db.get(var_key)
variant = AnnotatedVariant(
chrom=chrom,
pos=int(pos),
ref=ref,
alt=a,
gene=ann['gene'],
effect=ann['effect'],
impact=ann['impact'],
genotypes=genotypes,
inheritance_pattern=inheritance
)
if clinvar_entry:
variant.clinvar_sig = clinvar_entry.clnsig
variant.clinvar_disease = clinvar_entry.clndn
variant.clinvar_review = clinvar_entry.clnrevstat
# ACMG classification
acmg_class, evidence = classifier.classify(variant, is_de_novo)
variant.acmg_class = acmg_class
variant.acmg_evidence = evidence
# Filter for clinically relevant variants
if (variant.clinvar_sig and 'pathogenic' in variant.clinvar_sig.lower()) or \
acmg_class in ['Pathogenic', 'Likely Pathogenic'] or \
(is_de_novo and ann['impact'] in ['HIGH', 'MODERATE']):
pathogenic_variants.append(variant)
results.append(variant)
# Generate report
print(f"Writing report to {output_path}...")
with open(output_path, 'w') as f:
f.write("# ClinVar & ACMG Classification Report\n")
f.write(f"# Input: {snpeff_vcf}\n")
f.write(f"# ClinVar: {clinvar_path}\n")
f.write(f"# Samples: {', '.join(samples)}\n")
f.write(f"# Total variants processed: {len(results)}\n\n")
f.write("## CLINICALLY RELEVANT VARIANTS\n\n")
f.write("CHROM\tPOS\tREF\tALT\tGENE\tEFFECT\tIMPACT\tINHERITANCE\tCLINVAR_SIG\tCLINVAR_DISEASE\tACMG_CLASS\tACMG_EVIDENCE\n")
for v in sorted(pathogenic_variants, key=lambda x: (x.acmg_class != 'Pathogenic',
x.acmg_class != 'Likely Pathogenic',
x.chrom, x.pos)):
f.write(f"{v.chrom}\t{v.pos}\t{v.ref}\t{v.alt}\t")
f.write(f"{v.gene or 'N/A'}\t{v.effect or 'N/A'}\t{v.impact or 'N/A'}\t")
f.write(f"{v.inheritance_pattern or 'N/A'}\t")
f.write(f"{v.clinvar_sig or 'N/A'}\t")
f.write(f"{v.clinvar_disease or 'N/A'}\t")
f.write(f"{v.acmg_class}\t")
f.write(f"{'; '.join(v.acmg_evidence)}\n")
# Summary statistics
f.write("\n## SUMMARY\n")
f.write(f"Total variants in proband: {len(results)}\n")
f.write(f"Clinically relevant variants: {len(pathogenic_variants)}\n")
# Count by ACMG class
acmg_counts = defaultdict(int)
for v in pathogenic_variants:
acmg_counts[v.acmg_class] += 1
f.write("\nBy ACMG Classification:\n")
for cls in ['Pathogenic', 'Likely Pathogenic', 'VUS', 'Likely Benign', 'Benign']:
if cls in acmg_counts:
f.write(f" {cls}: {acmg_counts[cls]}\n")
# Count by inheritance
inh_counts = defaultdict(int)
for v in pathogenic_variants:
inh_counts[v.inheritance_pattern or 'unknown'] += 1
f.write("\nBy Inheritance Pattern:\n")
for inh, count in sorted(inh_counts.items()):
f.write(f" {inh}: {count}\n")
# ClinVar matches
clinvar_match = sum(1 for v in pathogenic_variants if v.clinvar_sig)
f.write(f"\nVariants with ClinVar annotation: {clinvar_match}\n")
print(f"\nAnalysis complete!")
print(f"Clinically relevant variants: {len(pathogenic_variants)}")
print(f"Report saved to: {output_path}")
# Print top candidates
print("\n=== TOP PATHOGENIC CANDIDATES ===\n")
top_variants = [v for v in pathogenic_variants if v.acmg_class in ['Pathogenic', 'Likely Pathogenic']][:20]
for v in top_variants:
print(f"{v.chrom}:{v.pos} {v.ref}>{v.alt}")
print(f" Gene: {v.gene} | Effect: {v.effect}")
print(f" Inheritance: {v.inheritance_pattern}")
print(f" ClinVar: {v.clinvar_sig or 'Not found'}")
if v.clinvar_disease:
print(f" Disease: {v.clinvar_disease[:80]}...")
print(f" ACMG: {v.acmg_class}")
print(f" Evidence: {'; '.join(v.acmg_evidence)}")
print()
if __name__ == '__main__':
snpeff_vcf = sys.argv[1] if len(sys.argv) > 1 else '/Volumes/NV2/genomics_analysis/vcf/trio_joint.snpeff.vcf'
clinvar_path = sys.argv[2] if len(sys.argv) > 2 else '/Volumes/NV2/genomics_reference/clinvar/clinvar_GRCh37.vcf.gz'
output_path = sys.argv[3] if len(sys.argv) > 3 else '/Volumes/NV2/genomics_analysis/clinvar_acmg_report.txt'
# VCF sample order: NV0066-08_S33 (idx 0), NV0066-09_S34 (idx 1), NV0066-10_S35 (idx 2)
# Correct mapping: S35 = proband (II-3), S33 = parent, S34 = parent
proband_idx = int(sys.argv[4]) if len(sys.argv) > 4 else 2 # S35 is proband
father_idx = int(sys.argv[5]) if len(sys.argv) > 5 else 0 # S33
mother_idx = int(sys.argv[6]) if len(sys.argv) > 6 else 1 # S34
analyze_trio_with_clinvar(snpeff_vcf, clinvar_path, output_path, proband_idx, father_idx, mother_idx)