--- phase: 01-data-infrastructure plan: 03 type: execute wave: 2 depends_on: ["01-01"] files_modified: - src/usher_pipeline/persistence/__init__.py - src/usher_pipeline/persistence/duckdb_store.py - src/usher_pipeline/persistence/provenance.py - tests/test_persistence.py autonomous: true must_haves: truths: - "DuckDB database stores DataFrames as tables and exports to Parquet" - "Checkpoint system detects existing tables to enable restart-from-checkpoint without re-downloading" - "Provenance metadata captures pipeline version, data source versions, timestamps, and config hash" - "Provenance sidecar JSON file is saved alongside every pipeline output" artifacts: - path: "src/usher_pipeline/persistence/duckdb_store.py" provides: "DuckDB-based storage with checkpoint-restart capability" contains: "class PipelineStore" - path: "src/usher_pipeline/persistence/provenance.py" provides: "Provenance metadata creation and persistence" contains: "class ProvenanceTracker" key_links: - from: "src/usher_pipeline/persistence/duckdb_store.py" to: "duckdb" via: "duckdb.connect for file-based database" pattern: "duckdb\\.connect" - from: "src/usher_pipeline/persistence/provenance.py" to: "src/usher_pipeline/config/schema.py" via: "reads PipelineConfig for version info and config hash" pattern: "config_hash|PipelineConfig" - from: "src/usher_pipeline/persistence/duckdb_store.py" to: "src/usher_pipeline/persistence/provenance.py" via: "attaches provenance metadata when saving checkpoints" pattern: "provenance|ProvenanceTracker" --- Build the DuckDB persistence layer for intermediate results with checkpoint-restart capability, and the provenance metadata system for reproducibility tracking. Purpose: The pipeline fetches expensive API data that takes hours. DuckDB persistence enables restart-from-checkpoint so failures don't require re-downloading everything (INFRA-07). Provenance tracking ensures every output is traceable to specific data versions, config parameters, and timestamps (INFRA-06). Output: PipelineStore class for DuckDB operations and ProvenanceTracker for metadata. @/Users/gbanyan/.claude/get-shit-done/workflows/execute-plan.md @/Users/gbanyan/.claude/get-shit-done/templates/summary.md @.planning/PROJECT.md @.planning/ROADMAP.md @.planning/STATE.md @.planning/phases/01-data-infrastructure/01-RESEARCH.md @.planning/phases/01-data-infrastructure/01-01-SUMMARY.md Task 1: Create DuckDB persistence layer with checkpoint-restart src/usher_pipeline/persistence/__init__.py src/usher_pipeline/persistence/duckdb_store.py 1. Create `src/usher_pipeline/persistence/duckdb_store.py` with `PipelineStore` class: - Constructor takes `db_path: Path`. Creates parent directories. Connects via `duckdb.connect(str(db_path))`. Creates internal `_checkpoints` metadata table on init: `CREATE TABLE IF NOT EXISTS _checkpoints (table_name VARCHAR PRIMARY KEY, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, row_count INTEGER, description VARCHAR)`. - `save_dataframe(df: polars.DataFrame | pandas.DataFrame, table_name: str, description: str = "", replace: bool = True)`: Detect DataFrame type. For polars, use `conn.execute("CREATE OR REPLACE TABLE ... AS SELECT * FROM df")` via DuckDB's native polars support. For pandas, same pattern. Update _checkpoints metadata with row count and description. Use parameterized queries for metadata insert. - `load_dataframe(table_name: str, as_polars: bool = True) -> polars.DataFrame | pandas.DataFrame | None`: Load table. Return None if table doesn't exist (catch duckdb.CatalogException). Return polars by default (use `conn.execute("SELECT * FROM ...").pl()`), pandas if as_polars=False. - `has_checkpoint(table_name: str) -> bool`: Check _checkpoints table for existence. - `list_checkpoints() -> list[dict]`: Return all checkpoint metadata (table_name, created_at, row_count, description). - `delete_checkpoint(table_name: str)`: Drop table and remove from _checkpoints. - `export_parquet(table_name: str, output_path: Path)`: Export table to Parquet file using `COPY ... TO ... (FORMAT PARQUET)`. Create parent dirs. - `execute_query(query: str, params: list = None) -> polars.DataFrame`: Execute arbitrary SQL, return polars DataFrame. - `close()`: Close DuckDB connection. Implement `__enter__` and `__exit__` for context manager support. - `@classmethod from_config(cls, config: PipelineConfig)`: Create from config's duckdb_path. 2. Create `src/usher_pipeline/persistence/__init__.py` exporting PipelineStore and ProvenanceTracker. cd /Users/gbanyan/Project/usher-exploring && python -c " from usher_pipeline.persistence.duckdb_store import PipelineStore from pathlib import Path import tempfile, polars as pl with tempfile.TemporaryDirectory() as tmp: store = PipelineStore(Path(tmp) / 'test.duckdb') df = pl.DataFrame({'gene': ['BRCA1', 'TP53'], 'score': [0.95, 0.88]}) store.save_dataframe(df, 'test_genes', 'test data') assert store.has_checkpoint('test_genes') loaded = store.load_dataframe('test_genes') assert loaded.shape == (2, 2) assert not store.has_checkpoint('nonexistent') store.close() print('PipelineStore: all basic operations work') " - PipelineStore creates DuckDB file, saves/loads polars DataFrames - Checkpoint system tracks which tables exist with metadata - has_checkpoint returns True for existing tables, False for missing - Context manager support works for clean connection handling Task 2: Create provenance tracker with tests for both modules src/usher_pipeline/persistence/provenance.py tests/test_persistence.py 1. Create `src/usher_pipeline/persistence/provenance.py` with `ProvenanceTracker`: - Constructor takes `pipeline_version: str`, `config: PipelineConfig`. - Stores config_hash from config.config_hash(), data_source_versions from config.versions.model_dump(). - `record_step(step_name: str, details: dict = None)`: Appends to internal processing_steps list with timestamp, step_name, and optional details dict. - `create_metadata() -> dict`: Returns full provenance dict: pipeline_version, data_source_versions, config_hash, created_at (ISO timestamp), processing_steps list. - `save_sidecar(output_path: Path)`: Saves provenance metadata as JSON sidecar file at `output_path.with_suffix('.provenance.json')`. Uses json.dumps with indent=2, default=str for Path serialization. - `save_to_store(store: PipelineStore)`: Saves provenance metadata to a `_provenance` table in DuckDB (flattened: version, config_hash, created_at, steps_json). - `@staticmethod load_sidecar(sidecar_path: Path) -> dict`: Reads and returns provenance JSON. - `@staticmethod from_config(config: PipelineConfig, version: str = None) -> ProvenanceTracker`: Creates tracker, using version from usher_pipeline.__version__ if not provided. 2. Create `tests/test_persistence.py` with pytest tests using tmp_path: DuckDB Store tests: - test_store_creates_database: PipelineStore creates .duckdb file at specified path - test_save_and_load_polars: save polars DataFrame, load back, verify shape and values - test_save_and_load_pandas: save pandas DataFrame, load back as pandas (as_polars=False) - test_checkpoint_lifecycle: save -> has_checkpoint True -> delete -> has_checkpoint False - test_list_checkpoints: save 3 tables, list_checkpoints returns 3 entries with metadata - test_export_parquet: save DataFrame, export to Parquet, verify Parquet file exists and is readable by polars - test_load_nonexistent_returns_none: load_dataframe for non-existent table returns None - test_context_manager: use `with PipelineStore(...) as store:` and verify operations work Provenance tests: - test_provenance_metadata_structure: create ProvenanceTracker, verify metadata dict has all required keys - test_provenance_records_steps: record 2 steps, verify they appear in metadata with timestamps - test_provenance_sidecar_roundtrip: save sidecar, load sidecar, verify content matches - test_provenance_config_hash_included: verify config_hash in metadata matches config.config_hash() - test_provenance_save_to_store: save to PipelineStore, verify _provenance table exists For tests needing PipelineConfig, create a minimal test fixture that builds one with tmp_path directories. cd /Users/gbanyan/Project/usher-exploring && pytest tests/test_persistence.py -v - ProvenanceTracker creates metadata with pipeline version, data source versions, config hash, timestamps, and processing steps - Sidecar JSON saves alongside outputs and round-trips correctly - All 13 persistence tests pass (8 DuckDB store + 5 provenance) 1. `pytest tests/test_persistence.py -v` -- all tests pass 2. `python -c "from usher_pipeline.persistence import PipelineStore, ProvenanceTracker"` -- imports work 3. DuckDB checkpoint-restart verified: save -> check -> load cycle works 4. Provenance sidecar JSON created with all required metadata fields - DuckDB store saves/loads DataFrames with checkpoint metadata tracking - Checkpoint-restart pattern works: has_checkpoint -> skip expensive re-fetch - Provenance tracker captures all required metadata (INFRA-06): pipeline version, data source versions, timestamps, config hash, processing steps - Parquet export works for downstream compatibility - All tests pass After completion, create `.planning/phases/01-data-infrastructure/01-03-SUMMARY.md`