#!/usr/bin/env python3 """ Consanguinity Analysis Script Analyzes parental relatedness in trio WES data using: 1. Runs of Homozygosity (ROH) in proband 2. Identity-by-State (IBS) between parents 3. Kinship coefficient estimation 4. F-statistic (inbreeding coefficient) """ import gzip import re import sys from collections import defaultdict from dataclasses import dataclass, field from typing import List, Dict, Tuple, Optional from datetime import datetime # GRCh37 chromosome lengths (from VCF header) CHROMOSOME_LENGTHS = { "1": 249250621, "2": 243199373, "3": 198022430, "4": 191154276, "5": 180915260, "6": 171115067, "7": 159138663, "8": 146364022, "9": 141213431, "10": 135534747, "11": 135006516, "12": 133851895, "13": 115169878, "14": 107349540, "15": 102531392, "16": 90354753, "17": 81195210, "18": 78077248, "19": 59128983, "20": 63025520, "21": 48129895, "22": 51304566 } AUTOSOMES = set(str(i) for i in range(1, 23)) @dataclass class VariantSite: """Variant site for consanguinity analysis""" chrom: str pos: int ref: str alt: str genotypes: Dict[int, str] # sample_idx -> genotype class (HOM_REF, HET, HOM_ALT) @dataclass class ROHSegment: """Run of Homozygosity segment""" chrom: str start: int end: int snp_count: int @property def length_bp(self) -> int: return self.end - self.start @property def length_mb(self) -> float: return self.length_bp / 1_000_000 @dataclass class ConsanguinityResults: """Results from consanguinity analysis""" # ROH results roh_segments: List[ROHSegment] = field(default_factory=list) total_roh_mb: float = 0.0 count_gt_1mb: int = 0 count_gt_5mb: int = 0 count_gt_10mb: int = 0 longest_segment_mb: float = 0.0 # IBS results ibs0: int = 0 ibs1: int = 0 ibs2: int = 0 total_comparable: int = 0 # Kinship kinship_king: float = 0.0 kinship_simple: float = 0.0 # F-statistic f_statistic: float = 0.0 observed_het_rate: float = 0.0 expected_het_rate: float = 0.0 informative_sites: int = 0 # Stats total_variants: int = 0 filtered_variants: int = 0 def get_genotype_class(gt: str) -> str: """Classify genotype as HOM_REF, HET, HOM_ALT, or MISSING""" if gt in ['./.', '.|.', '.', '']: return 'MISSING' alleles = re.split('[/|]', gt) alleles = [a for a in alleles if a != '.'] if not alleles: return 'MISSING' if all(a == '0' for a in alleles): return 'HOM_REF' elif all(a != '0' for a in alleles): return 'HOM_ALT' else: return 'HET' def parse_vcf_for_consanguinity(vcf_path: str, min_gq: int = 20, min_dp: int = 10) -> Tuple[List[str], List[VariantSite], int]: """ Parse VCF extracting only high-quality biallelic autosomal SNPs. Returns (sample_names, variant_sites, total_raw_variants) """ samples = [] variants = [] total_raw = 0 open_func = gzip.open if vcf_path.endswith('.gz') else open mode = 'rt' if vcf_path.endswith('.gz') else 'r' with open_func(vcf_path, mode) as f: for line in f: if line.startswith('##'): continue elif line.startswith('#CHROM'): parts = line.strip().split('\t') samples = parts[9:] continue total_raw += 1 parts = line.strip().split('\t') if len(parts) < 10: continue chrom, pos, _, ref, alt, qual, filt, info, fmt = parts[:9] gt_fields = parts[9:] # Filter: autosomal chromosomes only chrom_clean = chrom.replace('chr', '') if chrom_clean not in AUTOSOMES: continue # Filter: biallelic SNPs only if ',' in alt: # Multi-allelic continue if len(ref) != 1 or len(alt) != 1: # Not SNP continue # Filter: PASS or . if filt not in ['PASS', '.', '']: continue # Parse FORMAT field indices fmt_fields = fmt.split(':') gt_idx = fmt_fields.index('GT') if 'GT' in fmt_fields else 0 gq_idx = fmt_fields.index('GQ') if 'GQ' in fmt_fields else -1 dp_idx = fmt_fields.index('DP') if 'DP' in fmt_fields else -1 # Parse genotypes with quality filtering genotypes = {} all_pass_quality = True for i in range(len(samples)): if i >= len(gt_fields): all_pass_quality = False break gt_parts = gt_fields[i].split(':') gt = gt_parts[gt_idx] if gt_idx < len(gt_parts) else './.' gt_class = get_genotype_class(gt) if gt_class == 'MISSING': all_pass_quality = False break # Check GQ if gq_idx >= 0 and gq_idx < len(gt_parts): try: gq = int(gt_parts[gq_idx]) if gq < min_gq: all_pass_quality = False break except (ValueError, TypeError): pass # If GQ not parseable, don't fail # Check DP if dp_idx >= 0 and dp_idx < len(gt_parts): try: dp = int(gt_parts[dp_idx]) if dp < min_dp: all_pass_quality = False break except (ValueError, TypeError): pass # If DP not parseable, don't fail genotypes[i] = gt_class if not all_pass_quality: continue if len(genotypes) != len(samples): continue variant = VariantSite( chrom=chrom_clean, pos=int(pos), ref=ref, alt=alt, genotypes=genotypes ) variants.append(variant) return samples, variants, total_raw def calculate_roh_segments(variants: List[VariantSite], proband_idx: int = 2, min_snps: int = 25, min_length_kb: int = 1000, max_gap_kb: int = 500) -> List[ROHSegment]: """ Detect Runs of Homozygosity in proband using WES-optimized approach. Only counts HOM_ALT (homozygous alternate) sites, not HOM_REF, as HOM_REF at most WES sites is uninformative. """ # Group variants by chromosome - only HOM_ALT sites by_chrom = defaultdict(list) for v in variants: gt = v.genotypes.get(proband_idx) # Only count HOM_ALT - these are informative for autozygosity if gt == 'HOM_ALT': by_chrom[v.chrom].append(v) roh_segments = [] for chrom in sorted(by_chrom.keys(), key=lambda x: int(x)): chrom_variants = sorted(by_chrom[chrom], key=lambda x: x.pos) if len(chrom_variants) < min_snps: continue # Find homozygous stretches current_start = chrom_variants[0].pos current_end = chrom_variants[0].pos current_count = 1 for i in range(1, len(chrom_variants)): prev_pos = chrom_variants[i-1].pos curr_pos = chrom_variants[i].pos gap = (curr_pos - prev_pos) / 1000 # kb if gap <= max_gap_kb: # Extend current segment current_end = curr_pos current_count += 1 else: # Check if current segment qualifies length_kb = (current_end - current_start) / 1000 if current_count >= min_snps and length_kb >= min_length_kb: roh_segments.append(ROHSegment( chrom=chrom, start=current_start, end=current_end, snp_count=current_count )) # Start new segment current_start = curr_pos current_end = curr_pos current_count = 1 # Check final segment length_kb = (current_end - current_start) / 1000 if current_count >= min_snps and length_kb >= min_length_kb: roh_segments.append(ROHSegment( chrom=chrom, start=current_start, end=current_end, snp_count=current_count )) return roh_segments def calculate_ibs_between_parents(variants: List[VariantSite], mother_idx: int = 0, father_idx: int = 1) -> Tuple[int, int, int, int]: """ Calculate Identity-by-State sharing between parents. Returns (ibs0, ibs1, ibs2, total_comparable) """ ibs0 = 0 # Opposite homozygotes (AA vs BB) ibs1 = 0 # One hom, one het ibs2 = 0 # Identical genotypes for v in variants: mother_gt = v.genotypes.get(mother_idx) father_gt = v.genotypes.get(father_idx) if mother_gt == 'MISSING' or father_gt == 'MISSING': continue # IBS classification if mother_gt == father_gt: ibs2 += 1 elif mother_gt == 'HET' or father_gt == 'HET': ibs1 += 1 else: # Both homozygous but different (HOM_REF vs HOM_ALT) ibs0 += 1 total = ibs0 + ibs1 + ibs2 return ibs0, ibs1, ibs2, total def calculate_kinship_coefficient(ibs0: int, ibs1: int, ibs2: int) -> Tuple[float, float]: """ Calculate kinship coefficient estimates. Returns (king_robust, simple_estimate) """ total = ibs0 + ibs1 + ibs2 if total == 0: return 0.0, 0.0 # KING-robust style estimator denominator = ibs1 + 2 * ibs2 if denominator > 0: kinship_king = (ibs2 - 2 * ibs0) / denominator else: kinship_king = 0.0 # Simple IBS-based estimator ibs2_frac = ibs2 / total ibs0_frac = ibs0 / total kinship_simple = 0.5 * (ibs2_frac - ibs0_frac) return kinship_king, kinship_simple def calculate_f_statistic(variants: List[VariantSite], proband_idx: int = 2, mother_idx: int = 0, father_idx: int = 1) -> Tuple[float, float, float, int]: """ Calculate F-statistic (inbreeding coefficient) from trio data. Uses sites where both parents are HET (most informative). Returns (f_statistic, observed_het_rate, expected_het_rate, informative_sites) """ # Find sites where both parents are heterozygous informative_sites = 0 proband_het_count = 0 for v in variants: mother_gt = v.genotypes.get(mother_idx) father_gt = v.genotypes.get(father_idx) proband_gt = v.genotypes.get(proband_idx) # Both parents must be HET for most informative sites if mother_gt == 'HET' and father_gt == 'HET': informative_sites += 1 if proband_gt == 'HET': proband_het_count += 1 if informative_sites == 0: return 0.0, 0.0, 0.5, 0 # Expected: 50% HET at HET x HET sites under Mendelian inheritance expected_het_rate = 0.5 observed_het_rate = proband_het_count / informative_sites # F = 1 - (observed / expected) f_statistic = 1 - (observed_het_rate / expected_het_rate) return f_statistic, observed_het_rate, expected_het_rate, informative_sites def interpret_roh(total_roh_mb: float, count_gt_5mb: int) -> Tuple[str, str]: """Interpret ROH results. Returns (level, interpretation_zh)""" if total_roh_mb < 10: return "normal", "正常範圍,無親緣關係證據" elif total_roh_mb < 30: return "possible_distant", "輕度升高,可能存在遠親關係(三代表親或更遠)" elif total_roh_mb < 70: return "probable", "明顯升高,可能為二代表親或更近親緣" elif total_roh_mb < 150: return "strong", "高度升高,符合一代表親(first cousin)預期" else: return "very_close", "極度升高,可能為叔姪/姑侄關係或更近" def interpret_kinship(kinship: float) -> Tuple[str, str]: """Interpret kinship coefficient. Returns (level, interpretation_zh)""" if kinship < 0.02: return "unrelated", "無親緣關係" elif kinship < 0.04: return "distant", "可能為遠親(三代表親或更遠)" elif kinship < 0.08: return "second_cousin", "可能為二代表親" elif kinship < 0.15: return "first_cousin", "可能為一代表親(first cousins)" else: return "close", "近親關係(半同胞或更近)" def interpret_f_statistic(f_stat: float) -> Tuple[str, str]: """Interpret F-statistic. Returns (level, interpretation_zh)""" if f_stat < -0.05: return "outbred", "負值:雜合度高於預期,無近親繁殖,父母來自不同血緣背景" elif f_stat < 0.01: return "none", "無近親繁殖證據" elif f_stat < 0.03: return "minimal", "極輕微,可能為遠親" elif f_stat < 0.06: return "moderate", "中度,符合二代表親子代預期" elif f_stat < 0.10: return "elevated", "升高,符合一代表親子代預期" else: return "high", "顯著升高,近親程度高於一代表親" def run_analysis(vcf_path: str, mother_idx: int = 0, father_idx: int = 1, proband_idx: int = 2, min_gq: int = 20, min_dp: int = 10) -> ConsanguinityResults: """Run complete consanguinity analysis""" print(f"解析 VCF: {vcf_path}") samples, variants, total_raw = parse_vcf_for_consanguinity(vcf_path, min_gq, min_dp) print(f"樣本: {samples}") print(f"原始變異數: {total_raw:,}") print(f"過濾後變異數: {len(variants):,}") results = ConsanguinityResults() results.total_variants = total_raw results.filtered_variants = len(variants) # ROH Analysis print("\n計算 ROH...") roh_segments = calculate_roh_segments(variants, proband_idx) results.roh_segments = roh_segments results.total_roh_mb = sum(s.length_mb for s in roh_segments) results.count_gt_1mb = sum(1 for s in roh_segments if s.length_mb > 1) results.count_gt_5mb = sum(1 for s in roh_segments if s.length_mb > 5) results.count_gt_10mb = sum(1 for s in roh_segments if s.length_mb > 10) results.longest_segment_mb = max((s.length_mb for s in roh_segments), default=0) print(f" ROH 區段數: {len(roh_segments)}") print(f" ROH 總長度: {results.total_roh_mb:.2f} Mb") # IBS Analysis print("\n計算父母 IBS...") ibs0, ibs1, ibs2, total = calculate_ibs_between_parents(variants, mother_idx, father_idx) results.ibs0 = ibs0 results.ibs1 = ibs1 results.ibs2 = ibs2 results.total_comparable = total if total > 0: print(f" IBS0: {ibs0:,} ({100*ibs0/total:.2f}%)") print(f" IBS1: {ibs1:,} ({100*ibs1/total:.2f}%)") print(f" IBS2: {ibs2:,} ({100*ibs2/total:.2f}%)") # Kinship coefficient print("\n計算親緣係數...") kinship_king, kinship_simple = calculate_kinship_coefficient(ibs0, ibs1, ibs2) results.kinship_king = kinship_king results.kinship_simple = kinship_simple print(f" KING-robust: {kinship_king:.4f}") print(f" Simple: {kinship_simple:.4f}") # F-statistic print("\n計算 F 統計量...") f_stat, obs_het, exp_het, info_sites = calculate_f_statistic( variants, proband_idx, mother_idx, father_idx ) results.f_statistic = f_stat results.observed_het_rate = obs_het results.expected_het_rate = exp_het results.informative_sites = info_sites print(f" F-statistic: {f_stat:.4f}") print(f" Informative sites: {info_sites:,}") return results, samples def generate_report(results: ConsanguinityResults, samples: List[str], vcf_path: str, output_path: str, mother_idx: int = 0, father_idx: int = 1, proband_idx: int = 2): """Generate Chinese report""" # Interpretations roh_level, roh_interp = interpret_roh(results.total_roh_mb, results.count_gt_5mb) kin_level, kin_interp = interpret_kinship(results.kinship_king) f_level, f_interp = interpret_f_statistic(results.f_statistic) with open(output_path, 'w', encoding='utf-8') as f: f.write("=" * 80 + "\n") f.write(" 親緣關係與近親婚配分析報告\n") f.write(" Parental Relatedness Analysis Report\n") f.write("=" * 80 + "\n\n") f.write(f"分析日期: {datetime.now().strftime('%Y-%m-%d %H:%M')}\n") f.write(f"輸入文件: {vcf_path}\n\n") f.write("樣本資訊:\n") f.write(f" - 母親 (Mother): {samples[mother_idx]} (index {mother_idx})\n") f.write(f" - 父親 (Father): {samples[father_idx]} (index {father_idx})\n") f.write(f" - 先證者 (Proband): {samples[proband_idx]} (index {proband_idx})\n") f.write("\n" + "=" * 80 + "\n") f.write("一、數據摘要\n") f.write("=" * 80 + "\n\n") f.write(f"原始變異位點數: {results.total_variants:,}\n") f.write(f"過濾後高質量 SNP: {results.filtered_variants:,}\n") f.write(f"過濾條件: GQ>=20, DP>=10, 雙等位基因 SNP, 常染色體\n") f.write("\n" + "=" * 80 + "\n") f.write("二、先證者純合子區段分析 (ROH Analysis)\n") f.write("=" * 80 + "\n\n") f.write("ROH 區段統計:\n") f.write(f" - ROH 區段總數: {len(results.roh_segments)}\n") f.write(f" - ROH 總長度: {results.total_roh_mb:.2f} Mb\n") f.write(f" - 最長區段: {results.longest_segment_mb:.2f} Mb\n") if results.roh_segments: avg_length = results.total_roh_mb / len(results.roh_segments) f.write(f" - 平均區段長度: {avg_length:.2f} Mb\n") f.write("\n區段長度分布:\n") f.write(f" - > 1 Mb 區段: {results.count_gt_1mb} 個\n") f.write(f" - > 5 Mb 區段: {results.count_gt_5mb} 個\n") f.write(f" - > 10 Mb 區段: {results.count_gt_10mb} 個\n") f.write(f"\n【解讀】{roh_interp}\n") f.write("\n參考值:\n") f.write(" - 非近親父母: 5-10 Mb (多為短區段)\n") f.write(" - 二代表親父母: 30-70 Mb\n") f.write(" - 一代表親父母: 100-150 Mb (含多個長區段)\n") f.write("\n" + "=" * 80 + "\n") f.write("三、父母間 IBS (Identity-by-State) 分析\n") f.write("=" * 80 + "\n\n") f.write("IBS 統計:\n") f.write(f" - 可比較位點數: {results.total_comparable:,}\n") if results.total_comparable > 0: ibs0_pct = 100 * results.ibs0 / results.total_comparable ibs1_pct = 100 * results.ibs1 / results.total_comparable ibs2_pct = 100 * results.ibs2 / results.total_comparable f.write(f" - IBS0 (完全不匹配): {results.ibs0:,} ({ibs0_pct:.2f}%)\n") f.write(f" - IBS1 (部分匹配): {results.ibs1:,} ({ibs1_pct:.2f}%)\n") f.write(f" - IBS2 (完全匹配): {results.ibs2:,} ({ibs2_pct:.2f}%)\n") f.write(f"\n【解讀】{kin_interp}\n") f.write("\n參考值:\n") f.write(" IBS2% IBS0%\n") f.write(" 非親緣關係: 70-75% 2-3%\n") f.write(" 一代表親: 80-85% <1%\n") f.write(" 二代表親: 77-80% ~1.5%\n") f.write("\n" + "=" * 80 + "\n") f.write("四、親緣係數估算 (Kinship Coefficient)\n") f.write("=" * 80 + "\n\n") f.write("計算結果:\n") f.write(f" - KING-robust 估算: {results.kinship_king:.4f}\n") f.write(f" - 簡化 IBS 估算: {results.kinship_simple:.4f}\n") f.write(f"\n【解讀】{kin_interp}\n") f.write("\n參考值:\n") f.write(" - 無親緣關係: ~0 (±0.02)\n") f.write(" - 二代表親: ~0.0156\n") f.write(" - 一代表親: ~0.0625\n") f.write(" - 半同胞: ~0.125\n") f.write(" - 全同胞/親子: ~0.25\n") f.write("\n" + "=" * 80 + "\n") f.write("五、近親繁殖係數 (F-statistic / Inbreeding Coefficient)\n") f.write("=" * 80 + "\n\n") f.write("先證者 F 統計量:\n") f.write(f" - 資訊位點數 (父母皆 HET): {results.informative_sites:,}\n") f.write(f" - 觀察雜合率: {results.observed_het_rate:.4f}\n") f.write(f" - 預期雜合率: {results.expected_het_rate:.4f}\n") f.write(f" - F 統計量: {results.f_statistic:.4f}\n") f.write(f"\n【解讀】{f_interp}\n") f.write("\n參考值:\n") f.write(" - 非近親: F ≈ 0\n") f.write(" - 二代表親子代: F ≈ 0.0156\n") f.write(" - 一代表親子代: F ≈ 0.0625\n") f.write(" - 叔姪/姑侄子代: F ≈ 0.125\n") f.write("\n" + "=" * 80 + "\n") f.write("六、綜合評估與結論\n") f.write("=" * 80 + "\n\n") # Comprehensive assessment - F-statistic is the most reliable indicator # If F is strongly negative, parents are definitely not consanguineous if f_level == "outbred" or results.f_statistic < -0.05: f.write("綜合評估:父母無親緣關係\n\n") f.write("F 統計量為負值({:.4f}),顯示先證者的雜合度高於預期。\n".format(results.f_statistic)) f.write("這是父母來自不同血緣背景的強力證據,明確排除近親婚配。\n") f.write("\n這是非近親婚配的典型結果。\n") evidence_count = 0 else: evidence_count = 0 if roh_level != "normal": evidence_count += 1 if kin_level != "unrelated": evidence_count += 1 if f_level not in ["none", "outbred"]: evidence_count += 1 if evidence_count == 0: f.write("綜合評估:無明顯親緣關係證據\n\n") f.write("三項指標均在正常範圍內,父母之間不存在明顯的血緣關係。\n") f.write("這是非近親婚配的典型結果。\n") elif evidence_count == 1: f.write("綜合評估:可能存在遠親關係\n\n") f.write("一項指標顯示輕微異常,可能存在遠親關係(如三代表親或更遠)。\n") f.write("這種程度的親緣關係在臨床上通常不具顯著影響。\n") elif evidence_count == 2: f.write("綜合評估:可能存在近親關係\n\n") f.write("多項指標顯示異常,父母之間可能存在二代表親或更近的血緣關係。\n") f.write("建議進一步諮詢遺傳諮詢師評估隱性遺傳疾病風險。\n") else: f.write("綜合評估:存在明顯近親關係\n\n") f.write("所有指標均顯示異常,父母之間存在明確的血緣關係。\n") f.write("強烈建議諮詢遺傳諮詢師,詳細評估子代的隱性遺傳疾病風險。\n") f.write("\n【臨床建議】\n") if evidence_count >= 2: f.write("- 建議進行遺傳諮詢,討論隱性遺傳疾病風險\n") f.write("- 可考慮針對常見隱性遺傳疾病進行帶因者篩檢\n") f.write("- 若有特定遺傳疾病家族史,應重點關注相關基因\n") else: f.write("- 無特殊臨床建議\n") f.write("- 如有家族遺傳疾病史,仍建議進行相關諮詢\n") # ROH segment details if results.roh_segments: f.write("\n" + "=" * 80 + "\n") f.write("附錄:ROH 區段詳細列表\n") f.write("=" * 80 + "\n\n") f.write(f"{'染色體':<8}{'起始位置':>15}{'終止位置':>15}{'長度(Mb)':>12}{'SNP數量':>10}\n") f.write("-" * 60 + "\n") for seg in sorted(results.roh_segments, key=lambda x: (int(x.chrom), x.start)): f.write(f"chr{seg.chrom:<5}{seg.start:>15,}{seg.end:>15,}{seg.length_mb:>12.2f}{seg.snp_count:>10}\n") f.write("\n" + "=" * 80 + "\n") f.write(" 分析完成\n") f.write("=" * 80 + "\n") print(f"\n報告已儲存: {output_path}") def main(): if len(sys.argv) < 2: print("用法: python consanguinity_analysis.py [output_path]") print("") print("範例:") print(" python consanguinity_analysis.py trio_joint.vcf.gz") print(" python consanguinity_analysis.py trio_joint.vcf.gz results.txt") sys.exit(1) vcf_path = sys.argv[1] output_path = sys.argv[2] if len(sys.argv) > 2 else vcf_path.replace('.vcf', '_consanguinity.txt').replace('.gz', '') # Sample indices: 0=mother, 1=father, 2=proband results, samples = run_analysis(vcf_path, mother_idx=0, father_idx=1, proband_idx=2) generate_report(results, samples, vcf_path, output_path, mother_idx=0, father_idx=1, proband_idx=2) if __name__ == '__main__': main()