From 5af63eab465944229d2251da6bd58e27d81863c3 Mon Sep 17 00:00:00 2001 From: gbanyan Date: Thu, 12 Feb 2026 04:00:21 +0800 Subject: [PATCH] feat(05-02): implement reproducibility report module with JSON and Markdown output - Create ReproducibilityReport dataclass with all metadata fields - Implement generate_reproducibility_report function - Extract parameters from PipelineConfig (scoring weights, data versions) - Capture software environment (Python, polars, duckdb versions) - Build filtering steps from ProvenanceTracker - Compute tier statistics from tiered DataFrame - Support optional validation metrics - to_json: write as indented JSON for machine-readable format - to_markdown: write with tables and headers for human-readable format - 7 tests covering all report fields, formats, and edge cases --- src/usher_pipeline/output/reproducibility.py | 320 +++++++++++++++++++ tests/test_reproducibility.py | 245 ++++++++++++++ 2 files changed, 565 insertions(+) create mode 100644 src/usher_pipeline/output/reproducibility.py create mode 100644 tests/test_reproducibility.py diff --git a/src/usher_pipeline/output/reproducibility.py b/src/usher_pipeline/output/reproducibility.py new file mode 100644 index 0000000..97db8c2 --- /dev/null +++ b/src/usher_pipeline/output/reproducibility.py @@ -0,0 +1,320 @@ +"""Reproducibility report generation for pipeline runs.""" + +import json +import sys +import uuid +from dataclasses import dataclass, field +from datetime import datetime, timezone +from pathlib import Path + +import duckdb +import polars as pl + +from usher_pipeline.config.schema import PipelineConfig +from usher_pipeline.persistence.provenance import ProvenanceTracker + + +@dataclass +class FilteringStep: + """Record of a data filtering/processing step.""" + + step_name: str + input_count: int + output_count: int + criteria: str + + +@dataclass +class ReproducibilityReport: + """ + Comprehensive reproducibility report for a pipeline run. + + Contains all information needed to reproduce the analysis: + - Pipeline version and parameters + - Data source versions + - Software environment + - Filtering steps with gene counts + - Validation metrics + - Tier statistics + """ + + run_id: str + timestamp: str + pipeline_version: str + parameters: dict + data_versions: dict + software_environment: dict + filtering_steps: list[FilteringStep] = field(default_factory=list) + validation_metrics: dict = field(default_factory=dict) + tier_statistics: dict = field(default_factory=dict) + + def to_dict(self) -> dict: + """ + Convert report to dictionary. + + Returns: + Dictionary representation of the report + """ + return { + "run_id": self.run_id, + "timestamp": self.timestamp, + "pipeline_version": self.pipeline_version, + "parameters": self.parameters, + "data_versions": self.data_versions, + "software_environment": self.software_environment, + "filtering_steps": [ + { + "step_name": step.step_name, + "input_count": step.input_count, + "output_count": step.output_count, + "criteria": step.criteria, + } + for step in self.filtering_steps + ], + "validation_metrics": self.validation_metrics, + "tier_statistics": self.tier_statistics, + } + + def to_json(self, path: Path) -> Path: + """ + Write report as JSON file. + + Args: + path: Output path for JSON file + + Returns: + Path to the written file + """ + path.parent.mkdir(parents=True, exist_ok=True) + + with open(path, "w") as f: + json.dump(self.to_dict(), f, indent=2, default=str) + + return path + + def to_markdown(self, path: Path) -> Path: + """ + Write report as human-readable Markdown file. + + Args: + path: Output path for Markdown file + + Returns: + Path to the written file + """ + path.parent.mkdir(parents=True, exist_ok=True) + + lines = [ + "# Pipeline Reproducibility Report", + "", + f"**Run ID:** `{self.run_id}`", + f"**Timestamp:** {self.timestamp}", + f"**Pipeline Version:** {self.pipeline_version}", + "", + "## Parameters", + "", + "**Scoring Weights:**", + "", + ] + + # Add scoring weights if available + if "gnomad" in self.parameters: + lines.extend([ + f"- gnomAD: {self.parameters['gnomad']:.2f}", + f"- Expression: {self.parameters['expression']:.2f}", + f"- Annotation: {self.parameters['annotation']:.2f}", + f"- Localization: {self.parameters['localization']:.2f}", + f"- Animal Model: {self.parameters['animal_model']:.2f}", + f"- Literature: {self.parameters['literature']:.2f}", + "", + ]) + + # Add data versions + lines.extend([ + "## Data Versions", + "", + ]) + + for key, value in self.data_versions.items(): + lines.append(f"- **{key}:** {value}") + + lines.append("") + + # Add software environment + lines.extend([ + "## Software Environment", + "", + ]) + + for key, value in self.software_environment.items(): + lines.append(f"- **{key}:** {value}") + + lines.append("") + + # Add filtering steps if available + if self.filtering_steps: + lines.extend([ + "## Filtering Steps", + "", + "| Step | Input Count | Output Count | Criteria |", + "|------|-------------|--------------|----------|", + ]) + + for step in self.filtering_steps: + lines.append( + f"| {step.step_name} | {step.input_count} | " + f"{step.output_count} | {step.criteria} |" + ) + + lines.append("") + + # Add tier statistics + lines.extend([ + "## Tier Statistics", + "", + f"- **Total Candidates:** {self.tier_statistics.get('total', 0)}", + f"- **HIGH:** {self.tier_statistics.get('high', 0)}", + f"- **MEDIUM:** {self.tier_statistics.get('medium', 0)}", + f"- **LOW:** {self.tier_statistics.get('low', 0)}", + "", + ]) + + # Add validation metrics if available + if self.validation_metrics: + lines.extend([ + "## Validation Metrics", + "", + ]) + + for key, value in self.validation_metrics.items(): + if isinstance(value, float): + lines.append(f"- **{key}:** {value:.3f}") + else: + lines.append(f"- **{key}:** {value}") + + lines.append("") + + # Write to file + with open(path, "w") as f: + f.write("\n".join(lines)) + + return path + + +def generate_reproducibility_report( + config: PipelineConfig, + tiered_df: pl.DataFrame, + provenance: ProvenanceTracker, + validation_result: dict | None = None, +) -> ReproducibilityReport: + """ + Generate comprehensive reproducibility report. + + Args: + config: Pipeline configuration + tiered_df: Scored and tiered DataFrame + provenance: Provenance tracker with processing steps + validation_result: Optional validation results dictionary + + Returns: + ReproducibilityReport instance + + Notes: + - Extracts parameters from config (scoring weights, data versions) + - Computes tier statistics from tiered_df + - Builds filtering steps from provenance steps + - Captures software versions (Python, polars, duckdb) + - Generates unique run ID + """ + # Generate run ID + run_id = str(uuid.uuid4()) + + # Get current timestamp + timestamp = datetime.now(timezone.utc).isoformat() + + # Extract pipeline version from provenance + pipeline_version = provenance.pipeline_version + + # Extract parameters from config + parameters = config.scoring.model_dump() + + # Extract data versions from config + data_versions = config.versions.model_dump() + + # Build software environment + software_environment = { + "python": sys.version.split()[0], + "polars": pl.__version__, + "duckdb": duckdb.__version__, + } + + # Build filtering steps from provenance + filtering_steps = [] + for step in provenance.get_steps(): + details = step.get("details", {}) + + # Extract counts if available + input_count = details.get("input_count", 0) + output_count = details.get("output_count", 0) + criteria = details.get("criteria", "") + + filtering_steps.append( + FilteringStep( + step_name=step["step_name"], + input_count=input_count, + output_count=output_count, + criteria=criteria, + ) + ) + + # Compute tier statistics + total = tiered_df.height + high = 0 + medium = 0 + low = 0 + + if "confidence_tier" in tiered_df.columns: + tier_counts = tiered_df.group_by("confidence_tier").agg( + pl.len().alias("count") + ) + + for row in tier_counts.to_dicts(): + tier = row["confidence_tier"] + count = row["count"] + + if tier == "HIGH": + high = count + elif tier == "MEDIUM": + medium = count + elif tier == "LOW": + low = count + + tier_statistics = { + "total": total, + "high": high, + "medium": medium, + "low": low, + } + + # Extract validation metrics if provided + validation_metrics = {} + if validation_result: + validation_metrics = { + "median_percentile": validation_result.get("median_percentile", 0.0), + "top_quartile_fraction": validation_result.get( + "top_quartile_fraction", 0.0 + ), + "validation_passed": validation_result.get("validation_passed", False), + } + + return ReproducibilityReport( + run_id=run_id, + timestamp=timestamp, + pipeline_version=pipeline_version, + parameters=parameters, + data_versions=data_versions, + software_environment=software_environment, + filtering_steps=filtering_steps, + validation_metrics=validation_metrics, + tier_statistics=tier_statistics, + ) diff --git a/tests/test_reproducibility.py b/tests/test_reproducibility.py new file mode 100644 index 0000000..78ddce0 --- /dev/null +++ b/tests/test_reproducibility.py @@ -0,0 +1,245 @@ +"""Tests for reproducibility report generation.""" + +import json +from pathlib import Path + +import polars as pl +import pytest + +from usher_pipeline.config.schema import ( + APIConfig, + DataSourceVersions, + PipelineConfig, + ScoringWeights, +) +from usher_pipeline.output.reproducibility import generate_reproducibility_report +from usher_pipeline.persistence.provenance import ProvenanceTracker + + +@pytest.fixture +def mock_config(tmp_path): + """Create mock pipeline configuration.""" + return PipelineConfig( + data_dir=tmp_path / "data", + cache_dir=tmp_path / "cache", + duckdb_path=tmp_path / "pipeline.db", + versions=DataSourceVersions( + ensembl_release=113, + gnomad_version="v4.1", + gtex_version="v8", + hpa_version="23.0", + ), + api=APIConfig(), + scoring=ScoringWeights( + gnomad=0.20, + expression=0.20, + annotation=0.15, + localization=0.15, + animal_model=0.15, + literature=0.15, + ), + ) + + +@pytest.fixture +def mock_provenance(mock_config): + """Create mock provenance tracker.""" + provenance = ProvenanceTracker( + pipeline_version="0.1.0", + config=mock_config, + ) + + # Record some processing steps + provenance.record_step( + "gene_universe_fetch", + details={ + "input_count": 0, + "output_count": 20000, + "criteria": "Human protein-coding genes from Ensembl", + }, + ) + + provenance.record_step( + "gnomad_filtering", + details={ + "input_count": 20000, + "output_count": 19500, + "criteria": "Remove genes with quality flags", + }, + ) + + return provenance + + +@pytest.fixture +def synthetic_tiered_df(): + """Create synthetic tiered DataFrame.""" + return pl.DataFrame({ + "gene_id": [f"ENSG{i:011d}" for i in range(100)], + "gene_symbol": [f"GENE{i}" for i in range(100)], + "composite_score": [0.1 + i * 0.008 for i in range(100)], + "confidence_tier": ( + ["HIGH"] * 30 + ["MEDIUM"] * 40 + ["LOW"] * 30 + ), + }) + + +def test_generate_report_has_all_fields( + mock_config, mock_provenance, synthetic_tiered_df +): + """Test that report contains all required fields.""" + report = generate_reproducibility_report( + config=mock_config, + tiered_df=synthetic_tiered_df, + provenance=mock_provenance, + validation_result=None, + ) + + # Check all required fields exist + assert report.run_id is not None + assert report.timestamp is not None + assert report.pipeline_version == "0.1.0" + assert report.parameters is not None + assert report.data_versions is not None + assert report.software_environment is not None + assert report.tier_statistics is not None + + +def test_report_to_json_parseable( + mock_config, mock_provenance, synthetic_tiered_df, tmp_path +): + """Test that JSON output is valid and parseable.""" + report = generate_reproducibility_report( + config=mock_config, + tiered_df=synthetic_tiered_df, + provenance=mock_provenance, + ) + + json_path = tmp_path / "report.json" + report.to_json(json_path) + + # Read back and verify it's valid JSON + with open(json_path) as f: + data = json.load(f) + + # Verify expected keys + assert "run_id" in data + assert "timestamp" in data + assert "pipeline_version" in data + assert "parameters" in data + assert "data_versions" in data + assert "software_environment" in data + assert "filtering_steps" in data + assert "tier_statistics" in data + + +def test_report_to_markdown_has_headers( + mock_config, mock_provenance, synthetic_tiered_df, tmp_path +): + """Test that Markdown output contains required sections.""" + report = generate_reproducibility_report( + config=mock_config, + tiered_df=synthetic_tiered_df, + provenance=mock_provenance, + ) + + md_path = tmp_path / "report.md" + report.to_markdown(md_path) + + # Read content + content = md_path.read_text() + + # Verify headers + assert "# Pipeline Reproducibility Report" in content + assert "## Parameters" in content + assert "## Data Versions" in content + assert "## Filtering Steps" in content + assert "## Tier Statistics" in content + assert "## Software Environment" in content + + +def test_report_tier_statistics_match( + mock_config, mock_provenance, synthetic_tiered_df +): + """Test that tier statistics match DataFrame counts.""" + report = generate_reproducibility_report( + config=mock_config, + tiered_df=synthetic_tiered_df, + provenance=mock_provenance, + ) + + # Verify total matches + assert report.tier_statistics["total"] == synthetic_tiered_df.height + + # Verify tier counts + assert report.tier_statistics["high"] == 30 + assert report.tier_statistics["medium"] == 40 + assert report.tier_statistics["low"] == 30 + + # Verify sum + tier_sum = ( + report.tier_statistics["high"] + + report.tier_statistics["medium"] + + report.tier_statistics["low"] + ) + assert tier_sum == report.tier_statistics["total"] + + +def test_report_includes_validation_when_provided( + mock_config, mock_provenance, synthetic_tiered_df +): + """Test that validation metrics are included when provided.""" + validation_result = { + "median_percentile": 0.85, + "top_quartile_fraction": 0.92, + "validation_passed": True, + } + + report = generate_reproducibility_report( + config=mock_config, + tiered_df=synthetic_tiered_df, + provenance=mock_provenance, + validation_result=validation_result, + ) + + # Verify validation metrics are present + assert "median_percentile" in report.validation_metrics + assert report.validation_metrics["median_percentile"] == 0.85 + assert report.validation_metrics["top_quartile_fraction"] == 0.92 + assert report.validation_metrics["validation_passed"] is True + + +def test_report_without_validation( + mock_config, mock_provenance, synthetic_tiered_df +): + """Test that report generates without error when validation_result is None.""" + report = generate_reproducibility_report( + config=mock_config, + tiered_df=synthetic_tiered_df, + provenance=mock_provenance, + validation_result=None, + ) + + # Should have empty validation metrics + assert report.validation_metrics == {} + + +def test_report_software_versions( + mock_config, mock_provenance, synthetic_tiered_df +): + """Test that software environment contains expected keys.""" + report = generate_reproducibility_report( + config=mock_config, + tiered_df=synthetic_tiered_df, + provenance=mock_provenance, + ) + + # Verify software versions are captured + assert "python" in report.software_environment + assert "polars" in report.software_environment + assert "duckdb" in report.software_environment + + # Verify they're not empty + assert report.software_environment["python"] != "" + assert report.software_environment["polars"] != "" + assert report.software_environment["duckdb"] != ""