From 320953eec3e1c29d189f8ff819b57fa453ffc5d7 Mon Sep 17 00:00:00 2001 From: gbanyan Date: Mon, 5 Jan 2026 22:32:08 +0800 Subject: [PATCH] chore: save local changes --- .serena/.gitignore | 1 + .serena/project.yml | 84 +++++ consanguinity_analysis.py | 683 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 768 insertions(+) create mode 100644 .serena/.gitignore create mode 100644 .serena/project.yml create mode 100644 consanguinity_analysis.py diff --git a/.serena/.gitignore b/.serena/.gitignore new file mode 100644 index 0000000..14d86ad --- /dev/null +++ b/.serena/.gitignore @@ -0,0 +1 @@ +/cache diff --git a/.serena/project.yml b/.serena/project.yml new file mode 100644 index 0000000..21e9896 --- /dev/null +++ b/.serena/project.yml @@ -0,0 +1,84 @@ +# list of languages for which language servers are started; choose from: +# al bash clojure cpp csharp csharp_omnisharp +# dart elixir elm erlang fortran go +# haskell java julia kotlin lua markdown +# nix perl php python python_jedi r +# rego ruby ruby_solargraph rust scala swift +# terraform typescript typescript_vts yaml zig +# Note: +# - For C, use cpp +# - For JavaScript, use typescript +# Special requirements: +# - csharp: Requires the presence of a .sln file in the project folder. +# When using multiple languages, the first language server that supports a given file will be used for that file. +# The first language is the default language and the respective language server will be used as a fallback. +# Note that when using the JetBrains backend, language servers are not used and this list is correspondingly ignored. +languages: +- python + +# the encoding used by text files in the project +# For a list of possible encodings, see https://docs.python.org/3.11/library/codecs.html#standard-encodings +encoding: "utf-8" + +# whether to use the project's gitignore file to ignore files +# Added on 2025-04-07 +ignore_all_files_in_gitignore: true + +# list of additional paths to ignore +# same syntax as gitignore, so you can use * and ** +# Was previously called `ignored_dirs`, please update your config if you are using that. +# Added (renamed) on 2025-04-07 +ignored_paths: [] + +# whether the project is in read-only mode +# If set to true, all editing tools will be disabled and attempts to use them will result in an error +# Added on 2025-04-18 +read_only: false + +# list of tool names to exclude. We recommend not excluding any tools, see the readme for more details. +# Below is the complete list of tools for convenience. +# To make sure you have the latest list of tools, and to view their descriptions, +# execute `uv run scripts/print_tool_overview.py`. +# +# * `activate_project`: Activates a project by name. +# * `check_onboarding_performed`: Checks whether project onboarding was already performed. +# * `create_text_file`: Creates/overwrites a file in the project directory. +# * `delete_lines`: Deletes a range of lines within a file. +# * `delete_memory`: Deletes a memory from Serena's project-specific memory store. +# * `execute_shell_command`: Executes a shell command. +# * `find_referencing_code_snippets`: Finds code snippets in which the symbol at the given location is referenced. +# * `find_referencing_symbols`: Finds symbols that reference the symbol at the given location (optionally filtered by type). +# * `find_symbol`: Performs a global (or local) search for symbols with/containing a given name/substring (optionally filtered by type). +# * `get_current_config`: Prints the current configuration of the agent, including the active and available projects, tools, contexts, and modes. +# * `get_symbols_overview`: Gets an overview of the top-level symbols defined in a given file. +# * `initial_instructions`: Gets the initial instructions for the current project. +# Should only be used in settings where the system prompt cannot be set, +# e.g. in clients you have no control over, like Claude Desktop. +# * `insert_after_symbol`: Inserts content after the end of the definition of a given symbol. +# * `insert_at_line`: Inserts content at a given line in a file. +# * `insert_before_symbol`: Inserts content before the beginning of the definition of a given symbol. +# * `list_dir`: Lists files and directories in the given directory (optionally with recursion). +# * `list_memories`: Lists memories in Serena's project-specific memory store. +# * `onboarding`: Performs onboarding (identifying the project structure and essential tasks, e.g. for testing or building). +# * `prepare_for_new_conversation`: Provides instructions for preparing for a new conversation (in order to continue with the necessary context). +# * `read_file`: Reads a file within the project directory. +# * `read_memory`: Reads the memory with the given name from Serena's project-specific memory store. +# * `remove_project`: Removes a project from the Serena configuration. +# * `replace_lines`: Replaces a range of lines within a file with new content. +# * `replace_symbol_body`: Replaces the full definition of a symbol. +# * `restart_language_server`: Restarts the language server, may be necessary when edits not through Serena happen. +# * `search_for_pattern`: Performs a search for a pattern in the project. +# * `summarize_changes`: Provides instructions for summarizing the changes made to the codebase. +# * `switch_modes`: Activates modes by providing a list of their names +# * `think_about_collected_information`: Thinking tool for pondering the completeness of collected information. +# * `think_about_task_adherence`: Thinking tool for determining whether the agent is still on track with the current task. +# * `think_about_whether_you_are_done`: Thinking tool for determining whether the task is truly completed. +# * `write_memory`: Writes a named memory (for future reference) to Serena's project-specific memory store. +excluded_tools: [] + +# initial prompt for the project. It will always be given to the LLM upon activating the project +# (contrary to the memories, which are loaded on demand). +initial_prompt: "" + +project_name: "genomic-consultant" +included_optional_tools: [] diff --git a/consanguinity_analysis.py b/consanguinity_analysis.py new file mode 100644 index 0000000..e020066 --- /dev/null +++ b/consanguinity_analysis.py @@ -0,0 +1,683 @@ +#!/usr/bin/env python3 +""" +Consanguinity Analysis Script +Analyzes parental relatedness in trio WES data using: +1. Runs of Homozygosity (ROH) in proband +2. Identity-by-State (IBS) between parents +3. Kinship coefficient estimation +4. F-statistic (inbreeding coefficient) +""" + +import gzip +import re +import sys +from collections import defaultdict +from dataclasses import dataclass, field +from typing import List, Dict, Tuple, Optional +from datetime import datetime + +# GRCh37 chromosome lengths (from VCF header) +CHROMOSOME_LENGTHS = { + "1": 249250621, "2": 243199373, "3": 198022430, "4": 191154276, + "5": 180915260, "6": 171115067, "7": 159138663, "8": 146364022, + "9": 141213431, "10": 135534747, "11": 135006516, "12": 133851895, + "13": 115169878, "14": 107349540, "15": 102531392, "16": 90354753, + "17": 81195210, "18": 78077248, "19": 59128983, "20": 63025520, + "21": 48129895, "22": 51304566 +} + +AUTOSOMES = set(str(i) for i in range(1, 23)) + + +@dataclass +class VariantSite: + """Variant site for consanguinity analysis""" + chrom: str + pos: int + ref: str + alt: str + genotypes: Dict[int, str] # sample_idx -> genotype class (HOM_REF, HET, HOM_ALT) + + +@dataclass +class ROHSegment: + """Run of Homozygosity segment""" + chrom: str + start: int + end: int + snp_count: int + + @property + def length_bp(self) -> int: + return self.end - self.start + + @property + def length_mb(self) -> float: + return self.length_bp / 1_000_000 + + +@dataclass +class ConsanguinityResults: + """Results from consanguinity analysis""" + # ROH results + roh_segments: List[ROHSegment] = field(default_factory=list) + total_roh_mb: float = 0.0 + count_gt_1mb: int = 0 + count_gt_5mb: int = 0 + count_gt_10mb: int = 0 + longest_segment_mb: float = 0.0 + + # IBS results + ibs0: int = 0 + ibs1: int = 0 + ibs2: int = 0 + total_comparable: int = 0 + + # Kinship + kinship_king: float = 0.0 + kinship_simple: float = 0.0 + + # F-statistic + f_statistic: float = 0.0 + observed_het_rate: float = 0.0 + expected_het_rate: float = 0.0 + informative_sites: int = 0 + + # Stats + total_variants: int = 0 + filtered_variants: int = 0 + + +def get_genotype_class(gt: str) -> str: + """Classify genotype as HOM_REF, HET, HOM_ALT, or MISSING""" + if gt in ['./.', '.|.', '.', '']: + return 'MISSING' + + alleles = re.split('[/|]', gt) + alleles = [a for a in alleles if a != '.'] + + if not alleles: + return 'MISSING' + + if all(a == '0' for a in alleles): + return 'HOM_REF' + elif all(a != '0' for a in alleles): + return 'HOM_ALT' + else: + return 'HET' + + +def parse_vcf_for_consanguinity(vcf_path: str, min_gq: int = 20, min_dp: int = 10) -> Tuple[List[str], List[VariantSite], int]: + """ + Parse VCF extracting only high-quality biallelic autosomal SNPs. + Returns (sample_names, variant_sites, total_raw_variants) + """ + samples = [] + variants = [] + total_raw = 0 + + open_func = gzip.open if vcf_path.endswith('.gz') else open + mode = 'rt' if vcf_path.endswith('.gz') else 'r' + + with open_func(vcf_path, mode) as f: + for line in f: + if line.startswith('##'): + continue + elif line.startswith('#CHROM'): + parts = line.strip().split('\t') + samples = parts[9:] + continue + + total_raw += 1 + parts = line.strip().split('\t') + if len(parts) < 10: + continue + + chrom, pos, _, ref, alt, qual, filt, info, fmt = parts[:9] + gt_fields = parts[9:] + + # Filter: autosomal chromosomes only + chrom_clean = chrom.replace('chr', '') + if chrom_clean not in AUTOSOMES: + continue + + # Filter: biallelic SNPs only + if ',' in alt: # Multi-allelic + continue + if len(ref) != 1 or len(alt) != 1: # Not SNP + continue + + # Filter: PASS or . + if filt not in ['PASS', '.', '']: + continue + + # Parse FORMAT field indices + fmt_fields = fmt.split(':') + gt_idx = fmt_fields.index('GT') if 'GT' in fmt_fields else 0 + gq_idx = fmt_fields.index('GQ') if 'GQ' in fmt_fields else -1 + dp_idx = fmt_fields.index('DP') if 'DP' in fmt_fields else -1 + + # Parse genotypes with quality filtering + genotypes = {} + all_pass_quality = True + + for i in range(len(samples)): + if i >= len(gt_fields): + all_pass_quality = False + break + + gt_parts = gt_fields[i].split(':') + gt = gt_parts[gt_idx] if gt_idx < len(gt_parts) else './.' + + gt_class = get_genotype_class(gt) + if gt_class == 'MISSING': + all_pass_quality = False + break + + # Check GQ + if gq_idx >= 0 and gq_idx < len(gt_parts): + try: + gq = int(gt_parts[gq_idx]) + if gq < min_gq: + all_pass_quality = False + break + except (ValueError, TypeError): + pass # If GQ not parseable, don't fail + + # Check DP + if dp_idx >= 0 and dp_idx < len(gt_parts): + try: + dp = int(gt_parts[dp_idx]) + if dp < min_dp: + all_pass_quality = False + break + except (ValueError, TypeError): + pass # If DP not parseable, don't fail + + genotypes[i] = gt_class + + if not all_pass_quality: + continue + + if len(genotypes) != len(samples): + continue + + variant = VariantSite( + chrom=chrom_clean, + pos=int(pos), + ref=ref, + alt=alt, + genotypes=genotypes + ) + variants.append(variant) + + return samples, variants, total_raw + + +def calculate_roh_segments(variants: List[VariantSite], proband_idx: int = 2, + min_snps: int = 25, min_length_kb: int = 1000, + max_gap_kb: int = 500) -> List[ROHSegment]: + """ + Detect Runs of Homozygosity in proband using WES-optimized approach. + Only counts HOM_ALT (homozygous alternate) sites, not HOM_REF, + as HOM_REF at most WES sites is uninformative. + """ + # Group variants by chromosome - only HOM_ALT sites + by_chrom = defaultdict(list) + for v in variants: + gt = v.genotypes.get(proband_idx) + # Only count HOM_ALT - these are informative for autozygosity + if gt == 'HOM_ALT': + by_chrom[v.chrom].append(v) + + roh_segments = [] + + for chrom in sorted(by_chrom.keys(), key=lambda x: int(x)): + chrom_variants = sorted(by_chrom[chrom], key=lambda x: x.pos) + + if len(chrom_variants) < min_snps: + continue + + # Find homozygous stretches + current_start = chrom_variants[0].pos + current_end = chrom_variants[0].pos + current_count = 1 + + for i in range(1, len(chrom_variants)): + prev_pos = chrom_variants[i-1].pos + curr_pos = chrom_variants[i].pos + gap = (curr_pos - prev_pos) / 1000 # kb + + if gap <= max_gap_kb: + # Extend current segment + current_end = curr_pos + current_count += 1 + else: + # Check if current segment qualifies + length_kb = (current_end - current_start) / 1000 + if current_count >= min_snps and length_kb >= min_length_kb: + roh_segments.append(ROHSegment( + chrom=chrom, + start=current_start, + end=current_end, + snp_count=current_count + )) + + # Start new segment + current_start = curr_pos + current_end = curr_pos + current_count = 1 + + # Check final segment + length_kb = (current_end - current_start) / 1000 + if current_count >= min_snps and length_kb >= min_length_kb: + roh_segments.append(ROHSegment( + chrom=chrom, + start=current_start, + end=current_end, + snp_count=current_count + )) + + return roh_segments + + +def calculate_ibs_between_parents(variants: List[VariantSite], + mother_idx: int = 0, + father_idx: int = 1) -> Tuple[int, int, int, int]: + """ + Calculate Identity-by-State sharing between parents. + Returns (ibs0, ibs1, ibs2, total_comparable) + """ + ibs0 = 0 # Opposite homozygotes (AA vs BB) + ibs1 = 0 # One hom, one het + ibs2 = 0 # Identical genotypes + + for v in variants: + mother_gt = v.genotypes.get(mother_idx) + father_gt = v.genotypes.get(father_idx) + + if mother_gt == 'MISSING' or father_gt == 'MISSING': + continue + + # IBS classification + if mother_gt == father_gt: + ibs2 += 1 + elif mother_gt == 'HET' or father_gt == 'HET': + ibs1 += 1 + else: + # Both homozygous but different (HOM_REF vs HOM_ALT) + ibs0 += 1 + + total = ibs0 + ibs1 + ibs2 + return ibs0, ibs1, ibs2, total + + +def calculate_kinship_coefficient(ibs0: int, ibs1: int, ibs2: int) -> Tuple[float, float]: + """ + Calculate kinship coefficient estimates. + Returns (king_robust, simple_estimate) + """ + total = ibs0 + ibs1 + ibs2 + if total == 0: + return 0.0, 0.0 + + # KING-robust style estimator + denominator = ibs1 + 2 * ibs2 + if denominator > 0: + kinship_king = (ibs2 - 2 * ibs0) / denominator + else: + kinship_king = 0.0 + + # Simple IBS-based estimator + ibs2_frac = ibs2 / total + ibs0_frac = ibs0 / total + kinship_simple = 0.5 * (ibs2_frac - ibs0_frac) + + return kinship_king, kinship_simple + + +def calculate_f_statistic(variants: List[VariantSite], + proband_idx: int = 2, + mother_idx: int = 0, + father_idx: int = 1) -> Tuple[float, float, float, int]: + """ + Calculate F-statistic (inbreeding coefficient) from trio data. + Uses sites where both parents are HET (most informative). + Returns (f_statistic, observed_het_rate, expected_het_rate, informative_sites) + """ + # Find sites where both parents are heterozygous + informative_sites = 0 + proband_het_count = 0 + + for v in variants: + mother_gt = v.genotypes.get(mother_idx) + father_gt = v.genotypes.get(father_idx) + proband_gt = v.genotypes.get(proband_idx) + + # Both parents must be HET for most informative sites + if mother_gt == 'HET' and father_gt == 'HET': + informative_sites += 1 + if proband_gt == 'HET': + proband_het_count += 1 + + if informative_sites == 0: + return 0.0, 0.0, 0.5, 0 + + # Expected: 50% HET at HET x HET sites under Mendelian inheritance + expected_het_rate = 0.5 + observed_het_rate = proband_het_count / informative_sites + + # F = 1 - (observed / expected) + f_statistic = 1 - (observed_het_rate / expected_het_rate) + + return f_statistic, observed_het_rate, expected_het_rate, informative_sites + + +def interpret_roh(total_roh_mb: float, count_gt_5mb: int) -> Tuple[str, str]: + """Interpret ROH results. Returns (level, interpretation_zh)""" + if total_roh_mb < 10: + return "normal", "正常範圍,無親緣關係證據" + elif total_roh_mb < 30: + return "possible_distant", "輕度升高,可能存在遠親關係(三代表親或更遠)" + elif total_roh_mb < 70: + return "probable", "明顯升高,可能為二代表親或更近親緣" + elif total_roh_mb < 150: + return "strong", "高度升高,符合一代表親(first cousin)預期" + else: + return "very_close", "極度升高,可能為叔姪/姑侄關係或更近" + + +def interpret_kinship(kinship: float) -> Tuple[str, str]: + """Interpret kinship coefficient. Returns (level, interpretation_zh)""" + if kinship < 0.02: + return "unrelated", "無親緣關係" + elif kinship < 0.04: + return "distant", "可能為遠親(三代表親或更遠)" + elif kinship < 0.08: + return "second_cousin", "可能為二代表親" + elif kinship < 0.15: + return "first_cousin", "可能為一代表親(first cousins)" + else: + return "close", "近親關係(半同胞或更近)" + + +def interpret_f_statistic(f_stat: float) -> Tuple[str, str]: + """Interpret F-statistic. Returns (level, interpretation_zh)""" + if f_stat < -0.05: + return "outbred", "負值:雜合度高於預期,無近親繁殖,父母來自不同血緣背景" + elif f_stat < 0.01: + return "none", "無近親繁殖證據" + elif f_stat < 0.03: + return "minimal", "極輕微,可能為遠親" + elif f_stat < 0.06: + return "moderate", "中度,符合二代表親子代預期" + elif f_stat < 0.10: + return "elevated", "升高,符合一代表親子代預期" + else: + return "high", "顯著升高,近親程度高於一代表親" + + +def run_analysis(vcf_path: str, mother_idx: int = 0, father_idx: int = 1, + proband_idx: int = 2, min_gq: int = 20, min_dp: int = 10) -> ConsanguinityResults: + """Run complete consanguinity analysis""" + print(f"解析 VCF: {vcf_path}") + samples, variants, total_raw = parse_vcf_for_consanguinity(vcf_path, min_gq, min_dp) + + print(f"樣本: {samples}") + print(f"原始變異數: {total_raw:,}") + print(f"過濾後變異數: {len(variants):,}") + + results = ConsanguinityResults() + results.total_variants = total_raw + results.filtered_variants = len(variants) + + # ROH Analysis + print("\n計算 ROH...") + roh_segments = calculate_roh_segments(variants, proband_idx) + results.roh_segments = roh_segments + results.total_roh_mb = sum(s.length_mb for s in roh_segments) + results.count_gt_1mb = sum(1 for s in roh_segments if s.length_mb > 1) + results.count_gt_5mb = sum(1 for s in roh_segments if s.length_mb > 5) + results.count_gt_10mb = sum(1 for s in roh_segments if s.length_mb > 10) + results.longest_segment_mb = max((s.length_mb for s in roh_segments), default=0) + + print(f" ROH 區段數: {len(roh_segments)}") + print(f" ROH 總長度: {results.total_roh_mb:.2f} Mb") + + # IBS Analysis + print("\n計算父母 IBS...") + ibs0, ibs1, ibs2, total = calculate_ibs_between_parents(variants, mother_idx, father_idx) + results.ibs0 = ibs0 + results.ibs1 = ibs1 + results.ibs2 = ibs2 + results.total_comparable = total + + if total > 0: + print(f" IBS0: {ibs0:,} ({100*ibs0/total:.2f}%)") + print(f" IBS1: {ibs1:,} ({100*ibs1/total:.2f}%)") + print(f" IBS2: {ibs2:,} ({100*ibs2/total:.2f}%)") + + # Kinship coefficient + print("\n計算親緣係數...") + kinship_king, kinship_simple = calculate_kinship_coefficient(ibs0, ibs1, ibs2) + results.kinship_king = kinship_king + results.kinship_simple = kinship_simple + print(f" KING-robust: {kinship_king:.4f}") + print(f" Simple: {kinship_simple:.4f}") + + # F-statistic + print("\n計算 F 統計量...") + f_stat, obs_het, exp_het, info_sites = calculate_f_statistic( + variants, proband_idx, mother_idx, father_idx + ) + results.f_statistic = f_stat + results.observed_het_rate = obs_het + results.expected_het_rate = exp_het + results.informative_sites = info_sites + print(f" F-statistic: {f_stat:.4f}") + print(f" Informative sites: {info_sites:,}") + + return results, samples + + +def generate_report(results: ConsanguinityResults, samples: List[str], + vcf_path: str, output_path: str, + mother_idx: int = 0, father_idx: int = 1, proband_idx: int = 2): + """Generate Chinese report""" + + # Interpretations + roh_level, roh_interp = interpret_roh(results.total_roh_mb, results.count_gt_5mb) + kin_level, kin_interp = interpret_kinship(results.kinship_king) + f_level, f_interp = interpret_f_statistic(results.f_statistic) + + with open(output_path, 'w', encoding='utf-8') as f: + f.write("=" * 80 + "\n") + f.write(" 親緣關係與近親婚配分析報告\n") + f.write(" Parental Relatedness Analysis Report\n") + f.write("=" * 80 + "\n\n") + + f.write(f"分析日期: {datetime.now().strftime('%Y-%m-%d %H:%M')}\n") + f.write(f"輸入文件: {vcf_path}\n\n") + + f.write("樣本資訊:\n") + f.write(f" - 母親 (Mother): {samples[mother_idx]} (index {mother_idx})\n") + f.write(f" - 父親 (Father): {samples[father_idx]} (index {father_idx})\n") + f.write(f" - 先證者 (Proband): {samples[proband_idx]} (index {proband_idx})\n") + + f.write("\n" + "=" * 80 + "\n") + f.write("一、數據摘要\n") + f.write("=" * 80 + "\n\n") + + f.write(f"原始變異位點數: {results.total_variants:,}\n") + f.write(f"過濾後高質量 SNP: {results.filtered_variants:,}\n") + f.write(f"過濾條件: GQ>=20, DP>=10, 雙等位基因 SNP, 常染色體\n") + + f.write("\n" + "=" * 80 + "\n") + f.write("二、先證者純合子區段分析 (ROH Analysis)\n") + f.write("=" * 80 + "\n\n") + + f.write("ROH 區段統計:\n") + f.write(f" - ROH 區段總數: {len(results.roh_segments)}\n") + f.write(f" - ROH 總長度: {results.total_roh_mb:.2f} Mb\n") + f.write(f" - 最長區段: {results.longest_segment_mb:.2f} Mb\n") + if results.roh_segments: + avg_length = results.total_roh_mb / len(results.roh_segments) + f.write(f" - 平均區段長度: {avg_length:.2f} Mb\n") + + f.write("\n區段長度分布:\n") + f.write(f" - > 1 Mb 區段: {results.count_gt_1mb} 個\n") + f.write(f" - > 5 Mb 區段: {results.count_gt_5mb} 個\n") + f.write(f" - > 10 Mb 區段: {results.count_gt_10mb} 個\n") + + f.write(f"\n【解讀】{roh_interp}\n") + + f.write("\n參考值:\n") + f.write(" - 非近親父母: 5-10 Mb (多為短區段)\n") + f.write(" - 二代表親父母: 30-70 Mb\n") + f.write(" - 一代表親父母: 100-150 Mb (含多個長區段)\n") + + f.write("\n" + "=" * 80 + "\n") + f.write("三、父母間 IBS (Identity-by-State) 分析\n") + f.write("=" * 80 + "\n\n") + + f.write("IBS 統計:\n") + f.write(f" - 可比較位點數: {results.total_comparable:,}\n") + + if results.total_comparable > 0: + ibs0_pct = 100 * results.ibs0 / results.total_comparable + ibs1_pct = 100 * results.ibs1 / results.total_comparable + ibs2_pct = 100 * results.ibs2 / results.total_comparable + + f.write(f" - IBS0 (完全不匹配): {results.ibs0:,} ({ibs0_pct:.2f}%)\n") + f.write(f" - IBS1 (部分匹配): {results.ibs1:,} ({ibs1_pct:.2f}%)\n") + f.write(f" - IBS2 (完全匹配): {results.ibs2:,} ({ibs2_pct:.2f}%)\n") + + f.write(f"\n【解讀】{kin_interp}\n") + + f.write("\n參考值:\n") + f.write(" IBS2% IBS0%\n") + f.write(" 非親緣關係: 70-75% 2-3%\n") + f.write(" 一代表親: 80-85% <1%\n") + f.write(" 二代表親: 77-80% ~1.5%\n") + + f.write("\n" + "=" * 80 + "\n") + f.write("四、親緣係數估算 (Kinship Coefficient)\n") + f.write("=" * 80 + "\n\n") + + f.write("計算結果:\n") + f.write(f" - KING-robust 估算: {results.kinship_king:.4f}\n") + f.write(f" - 簡化 IBS 估算: {results.kinship_simple:.4f}\n") + + f.write(f"\n【解讀】{kin_interp}\n") + + f.write("\n參考值:\n") + f.write(" - 無親緣關係: ~0 (±0.02)\n") + f.write(" - 二代表親: ~0.0156\n") + f.write(" - 一代表親: ~0.0625\n") + f.write(" - 半同胞: ~0.125\n") + f.write(" - 全同胞/親子: ~0.25\n") + + f.write("\n" + "=" * 80 + "\n") + f.write("五、近親繁殖係數 (F-statistic / Inbreeding Coefficient)\n") + f.write("=" * 80 + "\n\n") + + f.write("先證者 F 統計量:\n") + f.write(f" - 資訊位點數 (父母皆 HET): {results.informative_sites:,}\n") + f.write(f" - 觀察雜合率: {results.observed_het_rate:.4f}\n") + f.write(f" - 預期雜合率: {results.expected_het_rate:.4f}\n") + f.write(f" - F 統計量: {results.f_statistic:.4f}\n") + + f.write(f"\n【解讀】{f_interp}\n") + + f.write("\n參考值:\n") + f.write(" - 非近親: F ≈ 0\n") + f.write(" - 二代表親子代: F ≈ 0.0156\n") + f.write(" - 一代表親子代: F ≈ 0.0625\n") + f.write(" - 叔姪/姑侄子代: F ≈ 0.125\n") + + f.write("\n" + "=" * 80 + "\n") + f.write("六、綜合評估與結論\n") + f.write("=" * 80 + "\n\n") + + # Comprehensive assessment - F-statistic is the most reliable indicator + # If F is strongly negative, parents are definitely not consanguineous + if f_level == "outbred" or results.f_statistic < -0.05: + f.write("綜合評估:父母無親緣關係\n\n") + f.write("F 統計量為負值({:.4f}),顯示先證者的雜合度高於預期。\n".format(results.f_statistic)) + f.write("這是父母來自不同血緣背景的強力證據,明確排除近親婚配。\n") + f.write("\n這是非近親婚配的典型結果。\n") + evidence_count = 0 + else: + evidence_count = 0 + if roh_level != "normal": + evidence_count += 1 + if kin_level != "unrelated": + evidence_count += 1 + if f_level not in ["none", "outbred"]: + evidence_count += 1 + + if evidence_count == 0: + f.write("綜合評估:無明顯親緣關係證據\n\n") + f.write("三項指標均在正常範圍內,父母之間不存在明顯的血緣關係。\n") + f.write("這是非近親婚配的典型結果。\n") + elif evidence_count == 1: + f.write("綜合評估:可能存在遠親關係\n\n") + f.write("一項指標顯示輕微異常,可能存在遠親關係(如三代表親或更遠)。\n") + f.write("這種程度的親緣關係在臨床上通常不具顯著影響。\n") + elif evidence_count == 2: + f.write("綜合評估:可能存在近親關係\n\n") + f.write("多項指標顯示異常,父母之間可能存在二代表親或更近的血緣關係。\n") + f.write("建議進一步諮詢遺傳諮詢師評估隱性遺傳疾病風險。\n") + else: + f.write("綜合評估:存在明顯近親關係\n\n") + f.write("所有指標均顯示異常,父母之間存在明確的血緣關係。\n") + f.write("強烈建議諮詢遺傳諮詢師,詳細評估子代的隱性遺傳疾病風險。\n") + + f.write("\n【臨床建議】\n") + if evidence_count >= 2: + f.write("- 建議進行遺傳諮詢,討論隱性遺傳疾病風險\n") + f.write("- 可考慮針對常見隱性遺傳疾病進行帶因者篩檢\n") + f.write("- 若有特定遺傳疾病家族史,應重點關注相關基因\n") + else: + f.write("- 無特殊臨床建議\n") + f.write("- 如有家族遺傳疾病史,仍建議進行相關諮詢\n") + + # ROH segment details + if results.roh_segments: + f.write("\n" + "=" * 80 + "\n") + f.write("附錄:ROH 區段詳細列表\n") + f.write("=" * 80 + "\n\n") + + f.write(f"{'染色體':<8}{'起始位置':>15}{'終止位置':>15}{'長度(Mb)':>12}{'SNP數量':>10}\n") + f.write("-" * 60 + "\n") + + for seg in sorted(results.roh_segments, key=lambda x: (int(x.chrom), x.start)): + f.write(f"chr{seg.chrom:<5}{seg.start:>15,}{seg.end:>15,}{seg.length_mb:>12.2f}{seg.snp_count:>10}\n") + + f.write("\n" + "=" * 80 + "\n") + f.write(" 分析完成\n") + f.write("=" * 80 + "\n") + + print(f"\n報告已儲存: {output_path}") + + +def main(): + if len(sys.argv) < 2: + print("用法: python consanguinity_analysis.py [output_path]") + print("") + print("範例:") + print(" python consanguinity_analysis.py trio_joint.vcf.gz") + print(" python consanguinity_analysis.py trio_joint.vcf.gz results.txt") + sys.exit(1) + + vcf_path = sys.argv[1] + output_path = sys.argv[2] if len(sys.argv) > 2 else vcf_path.replace('.vcf', '_consanguinity.txt').replace('.gz', '') + + # Sample indices: 0=mother, 1=father, 2=proband + results, samples = run_analysis(vcf_path, mother_idx=0, father_idx=1, proband_idx=2) + generate_report(results, samples, vcf_path, output_path, + mother_idx=0, father_idx=1, proband_idx=2) + + +if __name__ == '__main__': + main()