684 lines
25 KiB
Python
684 lines
25 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
Consanguinity Analysis Script
|
||
Analyzes parental relatedness in trio WES data using:
|
||
1. Runs of Homozygosity (ROH) in proband
|
||
2. Identity-by-State (IBS) between parents
|
||
3. Kinship coefficient estimation
|
||
4. F-statistic (inbreeding coefficient)
|
||
"""
|
||
|
||
import gzip
|
||
import re
|
||
import sys
|
||
from collections import defaultdict
|
||
from dataclasses import dataclass, field
|
||
from typing import List, Dict, Tuple, Optional
|
||
from datetime import datetime
|
||
|
||
# GRCh37 chromosome lengths (from VCF header)
|
||
CHROMOSOME_LENGTHS = {
|
||
"1": 249250621, "2": 243199373, "3": 198022430, "4": 191154276,
|
||
"5": 180915260, "6": 171115067, "7": 159138663, "8": 146364022,
|
||
"9": 141213431, "10": 135534747, "11": 135006516, "12": 133851895,
|
||
"13": 115169878, "14": 107349540, "15": 102531392, "16": 90354753,
|
||
"17": 81195210, "18": 78077248, "19": 59128983, "20": 63025520,
|
||
"21": 48129895, "22": 51304566
|
||
}
|
||
|
||
AUTOSOMES = set(str(i) for i in range(1, 23))
|
||
|
||
|
||
@dataclass
|
||
class VariantSite:
|
||
"""Variant site for consanguinity analysis"""
|
||
chrom: str
|
||
pos: int
|
||
ref: str
|
||
alt: str
|
||
genotypes: Dict[int, str] # sample_idx -> genotype class (HOM_REF, HET, HOM_ALT)
|
||
|
||
|
||
@dataclass
|
||
class ROHSegment:
|
||
"""Run of Homozygosity segment"""
|
||
chrom: str
|
||
start: int
|
||
end: int
|
||
snp_count: int
|
||
|
||
@property
|
||
def length_bp(self) -> int:
|
||
return self.end - self.start
|
||
|
||
@property
|
||
def length_mb(self) -> float:
|
||
return self.length_bp / 1_000_000
|
||
|
||
|
||
@dataclass
|
||
class ConsanguinityResults:
|
||
"""Results from consanguinity analysis"""
|
||
# ROH results
|
||
roh_segments: List[ROHSegment] = field(default_factory=list)
|
||
total_roh_mb: float = 0.0
|
||
count_gt_1mb: int = 0
|
||
count_gt_5mb: int = 0
|
||
count_gt_10mb: int = 0
|
||
longest_segment_mb: float = 0.0
|
||
|
||
# IBS results
|
||
ibs0: int = 0
|
||
ibs1: int = 0
|
||
ibs2: int = 0
|
||
total_comparable: int = 0
|
||
|
||
# Kinship
|
||
kinship_king: float = 0.0
|
||
kinship_simple: float = 0.0
|
||
|
||
# F-statistic
|
||
f_statistic: float = 0.0
|
||
observed_het_rate: float = 0.0
|
||
expected_het_rate: float = 0.0
|
||
informative_sites: int = 0
|
||
|
||
# Stats
|
||
total_variants: int = 0
|
||
filtered_variants: int = 0
|
||
|
||
|
||
def get_genotype_class(gt: str) -> str:
|
||
"""Classify genotype as HOM_REF, HET, HOM_ALT, or MISSING"""
|
||
if gt in ['./.', '.|.', '.', '']:
|
||
return 'MISSING'
|
||
|
||
alleles = re.split('[/|]', gt)
|
||
alleles = [a for a in alleles if a != '.']
|
||
|
||
if not alleles:
|
||
return 'MISSING'
|
||
|
||
if all(a == '0' for a in alleles):
|
||
return 'HOM_REF'
|
||
elif all(a != '0' for a in alleles):
|
||
return 'HOM_ALT'
|
||
else:
|
||
return 'HET'
|
||
|
||
|
||
def parse_vcf_for_consanguinity(vcf_path: str, min_gq: int = 20, min_dp: int = 10) -> Tuple[List[str], List[VariantSite], int]:
|
||
"""
|
||
Parse VCF extracting only high-quality biallelic autosomal SNPs.
|
||
Returns (sample_names, variant_sites, total_raw_variants)
|
||
"""
|
||
samples = []
|
||
variants = []
|
||
total_raw = 0
|
||
|
||
open_func = gzip.open if vcf_path.endswith('.gz') else open
|
||
mode = 'rt' if vcf_path.endswith('.gz') else 'r'
|
||
|
||
with open_func(vcf_path, mode) as f:
|
||
for line in f:
|
||
if line.startswith('##'):
|
||
continue
|
||
elif line.startswith('#CHROM'):
|
||
parts = line.strip().split('\t')
|
||
samples = parts[9:]
|
||
continue
|
||
|
||
total_raw += 1
|
||
parts = line.strip().split('\t')
|
||
if len(parts) < 10:
|
||
continue
|
||
|
||
chrom, pos, _, ref, alt, qual, filt, info, fmt = parts[:9]
|
||
gt_fields = parts[9:]
|
||
|
||
# Filter: autosomal chromosomes only
|
||
chrom_clean = chrom.replace('chr', '')
|
||
if chrom_clean not in AUTOSOMES:
|
||
continue
|
||
|
||
# Filter: biallelic SNPs only
|
||
if ',' in alt: # Multi-allelic
|
||
continue
|
||
if len(ref) != 1 or len(alt) != 1: # Not SNP
|
||
continue
|
||
|
||
# Filter: PASS or .
|
||
if filt not in ['PASS', '.', '']:
|
||
continue
|
||
|
||
# Parse FORMAT field indices
|
||
fmt_fields = fmt.split(':')
|
||
gt_idx = fmt_fields.index('GT') if 'GT' in fmt_fields else 0
|
||
gq_idx = fmt_fields.index('GQ') if 'GQ' in fmt_fields else -1
|
||
dp_idx = fmt_fields.index('DP') if 'DP' in fmt_fields else -1
|
||
|
||
# Parse genotypes with quality filtering
|
||
genotypes = {}
|
||
all_pass_quality = True
|
||
|
||
for i in range(len(samples)):
|
||
if i >= len(gt_fields):
|
||
all_pass_quality = False
|
||
break
|
||
|
||
gt_parts = gt_fields[i].split(':')
|
||
gt = gt_parts[gt_idx] if gt_idx < len(gt_parts) else './.'
|
||
|
||
gt_class = get_genotype_class(gt)
|
||
if gt_class == 'MISSING':
|
||
all_pass_quality = False
|
||
break
|
||
|
||
# Check GQ
|
||
if gq_idx >= 0 and gq_idx < len(gt_parts):
|
||
try:
|
||
gq = int(gt_parts[gq_idx])
|
||
if gq < min_gq:
|
||
all_pass_quality = False
|
||
break
|
||
except (ValueError, TypeError):
|
||
pass # If GQ not parseable, don't fail
|
||
|
||
# Check DP
|
||
if dp_idx >= 0 and dp_idx < len(gt_parts):
|
||
try:
|
||
dp = int(gt_parts[dp_idx])
|
||
if dp < min_dp:
|
||
all_pass_quality = False
|
||
break
|
||
except (ValueError, TypeError):
|
||
pass # If DP not parseable, don't fail
|
||
|
||
genotypes[i] = gt_class
|
||
|
||
if not all_pass_quality:
|
||
continue
|
||
|
||
if len(genotypes) != len(samples):
|
||
continue
|
||
|
||
variant = VariantSite(
|
||
chrom=chrom_clean,
|
||
pos=int(pos),
|
||
ref=ref,
|
||
alt=alt,
|
||
genotypes=genotypes
|
||
)
|
||
variants.append(variant)
|
||
|
||
return samples, variants, total_raw
|
||
|
||
|
||
def calculate_roh_segments(variants: List[VariantSite], proband_idx: int = 2,
|
||
min_snps: int = 25, min_length_kb: int = 1000,
|
||
max_gap_kb: int = 500) -> List[ROHSegment]:
|
||
"""
|
||
Detect Runs of Homozygosity in proband using WES-optimized approach.
|
||
Only counts HOM_ALT (homozygous alternate) sites, not HOM_REF,
|
||
as HOM_REF at most WES sites is uninformative.
|
||
"""
|
||
# Group variants by chromosome - only HOM_ALT sites
|
||
by_chrom = defaultdict(list)
|
||
for v in variants:
|
||
gt = v.genotypes.get(proband_idx)
|
||
# Only count HOM_ALT - these are informative for autozygosity
|
||
if gt == 'HOM_ALT':
|
||
by_chrom[v.chrom].append(v)
|
||
|
||
roh_segments = []
|
||
|
||
for chrom in sorted(by_chrom.keys(), key=lambda x: int(x)):
|
||
chrom_variants = sorted(by_chrom[chrom], key=lambda x: x.pos)
|
||
|
||
if len(chrom_variants) < min_snps:
|
||
continue
|
||
|
||
# Find homozygous stretches
|
||
current_start = chrom_variants[0].pos
|
||
current_end = chrom_variants[0].pos
|
||
current_count = 1
|
||
|
||
for i in range(1, len(chrom_variants)):
|
||
prev_pos = chrom_variants[i-1].pos
|
||
curr_pos = chrom_variants[i].pos
|
||
gap = (curr_pos - prev_pos) / 1000 # kb
|
||
|
||
if gap <= max_gap_kb:
|
||
# Extend current segment
|
||
current_end = curr_pos
|
||
current_count += 1
|
||
else:
|
||
# Check if current segment qualifies
|
||
length_kb = (current_end - current_start) / 1000
|
||
if current_count >= min_snps and length_kb >= min_length_kb:
|
||
roh_segments.append(ROHSegment(
|
||
chrom=chrom,
|
||
start=current_start,
|
||
end=current_end,
|
||
snp_count=current_count
|
||
))
|
||
|
||
# Start new segment
|
||
current_start = curr_pos
|
||
current_end = curr_pos
|
||
current_count = 1
|
||
|
||
# Check final segment
|
||
length_kb = (current_end - current_start) / 1000
|
||
if current_count >= min_snps and length_kb >= min_length_kb:
|
||
roh_segments.append(ROHSegment(
|
||
chrom=chrom,
|
||
start=current_start,
|
||
end=current_end,
|
||
snp_count=current_count
|
||
))
|
||
|
||
return roh_segments
|
||
|
||
|
||
def calculate_ibs_between_parents(variants: List[VariantSite],
|
||
mother_idx: int = 0,
|
||
father_idx: int = 1) -> Tuple[int, int, int, int]:
|
||
"""
|
||
Calculate Identity-by-State sharing between parents.
|
||
Returns (ibs0, ibs1, ibs2, total_comparable)
|
||
"""
|
||
ibs0 = 0 # Opposite homozygotes (AA vs BB)
|
||
ibs1 = 0 # One hom, one het
|
||
ibs2 = 0 # Identical genotypes
|
||
|
||
for v in variants:
|
||
mother_gt = v.genotypes.get(mother_idx)
|
||
father_gt = v.genotypes.get(father_idx)
|
||
|
||
if mother_gt == 'MISSING' or father_gt == 'MISSING':
|
||
continue
|
||
|
||
# IBS classification
|
||
if mother_gt == father_gt:
|
||
ibs2 += 1
|
||
elif mother_gt == 'HET' or father_gt == 'HET':
|
||
ibs1 += 1
|
||
else:
|
||
# Both homozygous but different (HOM_REF vs HOM_ALT)
|
||
ibs0 += 1
|
||
|
||
total = ibs0 + ibs1 + ibs2
|
||
return ibs0, ibs1, ibs2, total
|
||
|
||
|
||
def calculate_kinship_coefficient(ibs0: int, ibs1: int, ibs2: int) -> Tuple[float, float]:
|
||
"""
|
||
Calculate kinship coefficient estimates.
|
||
Returns (king_robust, simple_estimate)
|
||
"""
|
||
total = ibs0 + ibs1 + ibs2
|
||
if total == 0:
|
||
return 0.0, 0.0
|
||
|
||
# KING-robust style estimator
|
||
denominator = ibs1 + 2 * ibs2
|
||
if denominator > 0:
|
||
kinship_king = (ibs2 - 2 * ibs0) / denominator
|
||
else:
|
||
kinship_king = 0.0
|
||
|
||
# Simple IBS-based estimator
|
||
ibs2_frac = ibs2 / total
|
||
ibs0_frac = ibs0 / total
|
||
kinship_simple = 0.5 * (ibs2_frac - ibs0_frac)
|
||
|
||
return kinship_king, kinship_simple
|
||
|
||
|
||
def calculate_f_statistic(variants: List[VariantSite],
|
||
proband_idx: int = 2,
|
||
mother_idx: int = 0,
|
||
father_idx: int = 1) -> Tuple[float, float, float, int]:
|
||
"""
|
||
Calculate F-statistic (inbreeding coefficient) from trio data.
|
||
Uses sites where both parents are HET (most informative).
|
||
Returns (f_statistic, observed_het_rate, expected_het_rate, informative_sites)
|
||
"""
|
||
# Find sites where both parents are heterozygous
|
||
informative_sites = 0
|
||
proband_het_count = 0
|
||
|
||
for v in variants:
|
||
mother_gt = v.genotypes.get(mother_idx)
|
||
father_gt = v.genotypes.get(father_idx)
|
||
proband_gt = v.genotypes.get(proband_idx)
|
||
|
||
# Both parents must be HET for most informative sites
|
||
if mother_gt == 'HET' and father_gt == 'HET':
|
||
informative_sites += 1
|
||
if proband_gt == 'HET':
|
||
proband_het_count += 1
|
||
|
||
if informative_sites == 0:
|
||
return 0.0, 0.0, 0.5, 0
|
||
|
||
# Expected: 50% HET at HET x HET sites under Mendelian inheritance
|
||
expected_het_rate = 0.5
|
||
observed_het_rate = proband_het_count / informative_sites
|
||
|
||
# F = 1 - (observed / expected)
|
||
f_statistic = 1 - (observed_het_rate / expected_het_rate)
|
||
|
||
return f_statistic, observed_het_rate, expected_het_rate, informative_sites
|
||
|
||
|
||
def interpret_roh(total_roh_mb: float, count_gt_5mb: int) -> Tuple[str, str]:
|
||
"""Interpret ROH results. Returns (level, interpretation_zh)"""
|
||
if total_roh_mb < 10:
|
||
return "normal", "正常範圍,無親緣關係證據"
|
||
elif total_roh_mb < 30:
|
||
return "possible_distant", "輕度升高,可能存在遠親關係(三代表親或更遠)"
|
||
elif total_roh_mb < 70:
|
||
return "probable", "明顯升高,可能為二代表親或更近親緣"
|
||
elif total_roh_mb < 150:
|
||
return "strong", "高度升高,符合一代表親(first cousin)預期"
|
||
else:
|
||
return "very_close", "極度升高,可能為叔姪/姑侄關係或更近"
|
||
|
||
|
||
def interpret_kinship(kinship: float) -> Tuple[str, str]:
|
||
"""Interpret kinship coefficient. Returns (level, interpretation_zh)"""
|
||
if kinship < 0.02:
|
||
return "unrelated", "無親緣關係"
|
||
elif kinship < 0.04:
|
||
return "distant", "可能為遠親(三代表親或更遠)"
|
||
elif kinship < 0.08:
|
||
return "second_cousin", "可能為二代表親"
|
||
elif kinship < 0.15:
|
||
return "first_cousin", "可能為一代表親(first cousins)"
|
||
else:
|
||
return "close", "近親關係(半同胞或更近)"
|
||
|
||
|
||
def interpret_f_statistic(f_stat: float) -> Tuple[str, str]:
|
||
"""Interpret F-statistic. Returns (level, interpretation_zh)"""
|
||
if f_stat < -0.05:
|
||
return "outbred", "負值:雜合度高於預期,無近親繁殖,父母來自不同血緣背景"
|
||
elif f_stat < 0.01:
|
||
return "none", "無近親繁殖證據"
|
||
elif f_stat < 0.03:
|
||
return "minimal", "極輕微,可能為遠親"
|
||
elif f_stat < 0.06:
|
||
return "moderate", "中度,符合二代表親子代預期"
|
||
elif f_stat < 0.10:
|
||
return "elevated", "升高,符合一代表親子代預期"
|
||
else:
|
||
return "high", "顯著升高,近親程度高於一代表親"
|
||
|
||
|
||
def run_analysis(vcf_path: str, mother_idx: int = 0, father_idx: int = 1,
|
||
proband_idx: int = 2, min_gq: int = 20, min_dp: int = 10) -> ConsanguinityResults:
|
||
"""Run complete consanguinity analysis"""
|
||
print(f"解析 VCF: {vcf_path}")
|
||
samples, variants, total_raw = parse_vcf_for_consanguinity(vcf_path, min_gq, min_dp)
|
||
|
||
print(f"樣本: {samples}")
|
||
print(f"原始變異數: {total_raw:,}")
|
||
print(f"過濾後變異數: {len(variants):,}")
|
||
|
||
results = ConsanguinityResults()
|
||
results.total_variants = total_raw
|
||
results.filtered_variants = len(variants)
|
||
|
||
# ROH Analysis
|
||
print("\n計算 ROH...")
|
||
roh_segments = calculate_roh_segments(variants, proband_idx)
|
||
results.roh_segments = roh_segments
|
||
results.total_roh_mb = sum(s.length_mb for s in roh_segments)
|
||
results.count_gt_1mb = sum(1 for s in roh_segments if s.length_mb > 1)
|
||
results.count_gt_5mb = sum(1 for s in roh_segments if s.length_mb > 5)
|
||
results.count_gt_10mb = sum(1 for s in roh_segments if s.length_mb > 10)
|
||
results.longest_segment_mb = max((s.length_mb for s in roh_segments), default=0)
|
||
|
||
print(f" ROH 區段數: {len(roh_segments)}")
|
||
print(f" ROH 總長度: {results.total_roh_mb:.2f} Mb")
|
||
|
||
# IBS Analysis
|
||
print("\n計算父母 IBS...")
|
||
ibs0, ibs1, ibs2, total = calculate_ibs_between_parents(variants, mother_idx, father_idx)
|
||
results.ibs0 = ibs0
|
||
results.ibs1 = ibs1
|
||
results.ibs2 = ibs2
|
||
results.total_comparable = total
|
||
|
||
if total > 0:
|
||
print(f" IBS0: {ibs0:,} ({100*ibs0/total:.2f}%)")
|
||
print(f" IBS1: {ibs1:,} ({100*ibs1/total:.2f}%)")
|
||
print(f" IBS2: {ibs2:,} ({100*ibs2/total:.2f}%)")
|
||
|
||
# Kinship coefficient
|
||
print("\n計算親緣係數...")
|
||
kinship_king, kinship_simple = calculate_kinship_coefficient(ibs0, ibs1, ibs2)
|
||
results.kinship_king = kinship_king
|
||
results.kinship_simple = kinship_simple
|
||
print(f" KING-robust: {kinship_king:.4f}")
|
||
print(f" Simple: {kinship_simple:.4f}")
|
||
|
||
# F-statistic
|
||
print("\n計算 F 統計量...")
|
||
f_stat, obs_het, exp_het, info_sites = calculate_f_statistic(
|
||
variants, proband_idx, mother_idx, father_idx
|
||
)
|
||
results.f_statistic = f_stat
|
||
results.observed_het_rate = obs_het
|
||
results.expected_het_rate = exp_het
|
||
results.informative_sites = info_sites
|
||
print(f" F-statistic: {f_stat:.4f}")
|
||
print(f" Informative sites: {info_sites:,}")
|
||
|
||
return results, samples
|
||
|
||
|
||
def generate_report(results: ConsanguinityResults, samples: List[str],
|
||
vcf_path: str, output_path: str,
|
||
mother_idx: int = 0, father_idx: int = 1, proband_idx: int = 2):
|
||
"""Generate Chinese report"""
|
||
|
||
# Interpretations
|
||
roh_level, roh_interp = interpret_roh(results.total_roh_mb, results.count_gt_5mb)
|
||
kin_level, kin_interp = interpret_kinship(results.kinship_king)
|
||
f_level, f_interp = interpret_f_statistic(results.f_statistic)
|
||
|
||
with open(output_path, 'w', encoding='utf-8') as f:
|
||
f.write("=" * 80 + "\n")
|
||
f.write(" 親緣關係與近親婚配分析報告\n")
|
||
f.write(" Parental Relatedness Analysis Report\n")
|
||
f.write("=" * 80 + "\n\n")
|
||
|
||
f.write(f"分析日期: {datetime.now().strftime('%Y-%m-%d %H:%M')}\n")
|
||
f.write(f"輸入文件: {vcf_path}\n\n")
|
||
|
||
f.write("樣本資訊:\n")
|
||
f.write(f" - 母親 (Mother): {samples[mother_idx]} (index {mother_idx})\n")
|
||
f.write(f" - 父親 (Father): {samples[father_idx]} (index {father_idx})\n")
|
||
f.write(f" - 先證者 (Proband): {samples[proband_idx]} (index {proband_idx})\n")
|
||
|
||
f.write("\n" + "=" * 80 + "\n")
|
||
f.write("一、數據摘要\n")
|
||
f.write("=" * 80 + "\n\n")
|
||
|
||
f.write(f"原始變異位點數: {results.total_variants:,}\n")
|
||
f.write(f"過濾後高質量 SNP: {results.filtered_variants:,}\n")
|
||
f.write(f"過濾條件: GQ>=20, DP>=10, 雙等位基因 SNP, 常染色體\n")
|
||
|
||
f.write("\n" + "=" * 80 + "\n")
|
||
f.write("二、先證者純合子區段分析 (ROH Analysis)\n")
|
||
f.write("=" * 80 + "\n\n")
|
||
|
||
f.write("ROH 區段統計:\n")
|
||
f.write(f" - ROH 區段總數: {len(results.roh_segments)}\n")
|
||
f.write(f" - ROH 總長度: {results.total_roh_mb:.2f} Mb\n")
|
||
f.write(f" - 最長區段: {results.longest_segment_mb:.2f} Mb\n")
|
||
if results.roh_segments:
|
||
avg_length = results.total_roh_mb / len(results.roh_segments)
|
||
f.write(f" - 平均區段長度: {avg_length:.2f} Mb\n")
|
||
|
||
f.write("\n區段長度分布:\n")
|
||
f.write(f" - > 1 Mb 區段: {results.count_gt_1mb} 個\n")
|
||
f.write(f" - > 5 Mb 區段: {results.count_gt_5mb} 個\n")
|
||
f.write(f" - > 10 Mb 區段: {results.count_gt_10mb} 個\n")
|
||
|
||
f.write(f"\n【解讀】{roh_interp}\n")
|
||
|
||
f.write("\n參考值:\n")
|
||
f.write(" - 非近親父母: 5-10 Mb (多為短區段)\n")
|
||
f.write(" - 二代表親父母: 30-70 Mb\n")
|
||
f.write(" - 一代表親父母: 100-150 Mb (含多個長區段)\n")
|
||
|
||
f.write("\n" + "=" * 80 + "\n")
|
||
f.write("三、父母間 IBS (Identity-by-State) 分析\n")
|
||
f.write("=" * 80 + "\n\n")
|
||
|
||
f.write("IBS 統計:\n")
|
||
f.write(f" - 可比較位點數: {results.total_comparable:,}\n")
|
||
|
||
if results.total_comparable > 0:
|
||
ibs0_pct = 100 * results.ibs0 / results.total_comparable
|
||
ibs1_pct = 100 * results.ibs1 / results.total_comparable
|
||
ibs2_pct = 100 * results.ibs2 / results.total_comparable
|
||
|
||
f.write(f" - IBS0 (完全不匹配): {results.ibs0:,} ({ibs0_pct:.2f}%)\n")
|
||
f.write(f" - IBS1 (部分匹配): {results.ibs1:,} ({ibs1_pct:.2f}%)\n")
|
||
f.write(f" - IBS2 (完全匹配): {results.ibs2:,} ({ibs2_pct:.2f}%)\n")
|
||
|
||
f.write(f"\n【解讀】{kin_interp}\n")
|
||
|
||
f.write("\n參考值:\n")
|
||
f.write(" IBS2% IBS0%\n")
|
||
f.write(" 非親緣關係: 70-75% 2-3%\n")
|
||
f.write(" 一代表親: 80-85% <1%\n")
|
||
f.write(" 二代表親: 77-80% ~1.5%\n")
|
||
|
||
f.write("\n" + "=" * 80 + "\n")
|
||
f.write("四、親緣係數估算 (Kinship Coefficient)\n")
|
||
f.write("=" * 80 + "\n\n")
|
||
|
||
f.write("計算結果:\n")
|
||
f.write(f" - KING-robust 估算: {results.kinship_king:.4f}\n")
|
||
f.write(f" - 簡化 IBS 估算: {results.kinship_simple:.4f}\n")
|
||
|
||
f.write(f"\n【解讀】{kin_interp}\n")
|
||
|
||
f.write("\n參考值:\n")
|
||
f.write(" - 無親緣關係: ~0 (±0.02)\n")
|
||
f.write(" - 二代表親: ~0.0156\n")
|
||
f.write(" - 一代表親: ~0.0625\n")
|
||
f.write(" - 半同胞: ~0.125\n")
|
||
f.write(" - 全同胞/親子: ~0.25\n")
|
||
|
||
f.write("\n" + "=" * 80 + "\n")
|
||
f.write("五、近親繁殖係數 (F-statistic / Inbreeding Coefficient)\n")
|
||
f.write("=" * 80 + "\n\n")
|
||
|
||
f.write("先證者 F 統計量:\n")
|
||
f.write(f" - 資訊位點數 (父母皆 HET): {results.informative_sites:,}\n")
|
||
f.write(f" - 觀察雜合率: {results.observed_het_rate:.4f}\n")
|
||
f.write(f" - 預期雜合率: {results.expected_het_rate:.4f}\n")
|
||
f.write(f" - F 統計量: {results.f_statistic:.4f}\n")
|
||
|
||
f.write(f"\n【解讀】{f_interp}\n")
|
||
|
||
f.write("\n參考值:\n")
|
||
f.write(" - 非近親: F ≈ 0\n")
|
||
f.write(" - 二代表親子代: F ≈ 0.0156\n")
|
||
f.write(" - 一代表親子代: F ≈ 0.0625\n")
|
||
f.write(" - 叔姪/姑侄子代: F ≈ 0.125\n")
|
||
|
||
f.write("\n" + "=" * 80 + "\n")
|
||
f.write("六、綜合評估與結論\n")
|
||
f.write("=" * 80 + "\n\n")
|
||
|
||
# Comprehensive assessment - F-statistic is the most reliable indicator
|
||
# If F is strongly negative, parents are definitely not consanguineous
|
||
if f_level == "outbred" or results.f_statistic < -0.05:
|
||
f.write("綜合評估:父母無親緣關係\n\n")
|
||
f.write("F 統計量為負值({:.4f}),顯示先證者的雜合度高於預期。\n".format(results.f_statistic))
|
||
f.write("這是父母來自不同血緣背景的強力證據,明確排除近親婚配。\n")
|
||
f.write("\n這是非近親婚配的典型結果。\n")
|
||
evidence_count = 0
|
||
else:
|
||
evidence_count = 0
|
||
if roh_level != "normal":
|
||
evidence_count += 1
|
||
if kin_level != "unrelated":
|
||
evidence_count += 1
|
||
if f_level not in ["none", "outbred"]:
|
||
evidence_count += 1
|
||
|
||
if evidence_count == 0:
|
||
f.write("綜合評估:無明顯親緣關係證據\n\n")
|
||
f.write("三項指標均在正常範圍內,父母之間不存在明顯的血緣關係。\n")
|
||
f.write("這是非近親婚配的典型結果。\n")
|
||
elif evidence_count == 1:
|
||
f.write("綜合評估:可能存在遠親關係\n\n")
|
||
f.write("一項指標顯示輕微異常,可能存在遠親關係(如三代表親或更遠)。\n")
|
||
f.write("這種程度的親緣關係在臨床上通常不具顯著影響。\n")
|
||
elif evidence_count == 2:
|
||
f.write("綜合評估:可能存在近親關係\n\n")
|
||
f.write("多項指標顯示異常,父母之間可能存在二代表親或更近的血緣關係。\n")
|
||
f.write("建議進一步諮詢遺傳諮詢師評估隱性遺傳疾病風險。\n")
|
||
else:
|
||
f.write("綜合評估:存在明顯近親關係\n\n")
|
||
f.write("所有指標均顯示異常,父母之間存在明確的血緣關係。\n")
|
||
f.write("強烈建議諮詢遺傳諮詢師,詳細評估子代的隱性遺傳疾病風險。\n")
|
||
|
||
f.write("\n【臨床建議】\n")
|
||
if evidence_count >= 2:
|
||
f.write("- 建議進行遺傳諮詢,討論隱性遺傳疾病風險\n")
|
||
f.write("- 可考慮針對常見隱性遺傳疾病進行帶因者篩檢\n")
|
||
f.write("- 若有特定遺傳疾病家族史,應重點關注相關基因\n")
|
||
else:
|
||
f.write("- 無特殊臨床建議\n")
|
||
f.write("- 如有家族遺傳疾病史,仍建議進行相關諮詢\n")
|
||
|
||
# ROH segment details
|
||
if results.roh_segments:
|
||
f.write("\n" + "=" * 80 + "\n")
|
||
f.write("附錄:ROH 區段詳細列表\n")
|
||
f.write("=" * 80 + "\n\n")
|
||
|
||
f.write(f"{'染色體':<8}{'起始位置':>15}{'終止位置':>15}{'長度(Mb)':>12}{'SNP數量':>10}\n")
|
||
f.write("-" * 60 + "\n")
|
||
|
||
for seg in sorted(results.roh_segments, key=lambda x: (int(x.chrom), x.start)):
|
||
f.write(f"chr{seg.chrom:<5}{seg.start:>15,}{seg.end:>15,}{seg.length_mb:>12.2f}{seg.snp_count:>10}\n")
|
||
|
||
f.write("\n" + "=" * 80 + "\n")
|
||
f.write(" 分析完成\n")
|
||
f.write("=" * 80 + "\n")
|
||
|
||
print(f"\n報告已儲存: {output_path}")
|
||
|
||
|
||
def main():
|
||
if len(sys.argv) < 2:
|
||
print("用法: python consanguinity_analysis.py <vcf_path> [output_path]")
|
||
print("")
|
||
print("範例:")
|
||
print(" python consanguinity_analysis.py trio_joint.vcf.gz")
|
||
print(" python consanguinity_analysis.py trio_joint.vcf.gz results.txt")
|
||
sys.exit(1)
|
||
|
||
vcf_path = sys.argv[1]
|
||
output_path = sys.argv[2] if len(sys.argv) > 2 else vcf_path.replace('.vcf', '_consanguinity.txt').replace('.gz', '')
|
||
|
||
# Sample indices: 0=mother, 1=father, 2=proband
|
||
results, samples = run_analysis(vcf_path, mother_idx=0, father_idx=1, proband_idx=2)
|
||
generate_report(results, samples, vcf_path, output_path,
|
||
mother_idx=0, father_idx=1, proband_idx=2)
|
||
|
||
|
||
if __name__ == '__main__':
|
||
main()
|