Files
usher-exploring/.planning/phases/01-data-infrastructure/01-03-PLAN.md

9.9 KiB

phase, plan, type, wave, depends_on, files_modified, autonomous, must_haves
phase plan type wave depends_on files_modified autonomous must_haves
01-data-infrastructure 03 execute 2
01-01
src/usher_pipeline/persistence/__init__.py
src/usher_pipeline/persistence/duckdb_store.py
src/usher_pipeline/persistence/provenance.py
tests/test_persistence.py
true
truths artifacts key_links
DuckDB database stores DataFrames as tables and exports to Parquet
Checkpoint system detects existing tables to enable restart-from-checkpoint without re-downloading
Provenance metadata captures pipeline version, data source versions, timestamps, and config hash
Provenance sidecar JSON file is saved alongside every pipeline output
path provides contains
src/usher_pipeline/persistence/duckdb_store.py DuckDB-based storage with checkpoint-restart capability class PipelineStore
path provides contains
src/usher_pipeline/persistence/provenance.py Provenance metadata creation and persistence class ProvenanceTracker
from to via pattern
src/usher_pipeline/persistence/duckdb_store.py duckdb duckdb.connect for file-based database duckdb.connect
from to via pattern
src/usher_pipeline/persistence/provenance.py src/usher_pipeline/config/schema.py reads PipelineConfig for version info and config hash config_hash|PipelineConfig
from to via pattern
src/usher_pipeline/persistence/duckdb_store.py src/usher_pipeline/persistence/provenance.py attaches provenance metadata when saving checkpoints provenance|ProvenanceTracker
Build the DuckDB persistence layer for intermediate results with checkpoint-restart capability, and the provenance metadata system for reproducibility tracking.

Purpose: The pipeline fetches expensive API data that takes hours. DuckDB persistence enables restart-from-checkpoint so failures don't require re-downloading everything (INFRA-07). Provenance tracking ensures every output is traceable to specific data versions, config parameters, and timestamps (INFRA-06).

Output: PipelineStore class for DuckDB operations and ProvenanceTracker for metadata.

<execution_context> @/Users/gbanyan/.claude/get-shit-done/workflows/execute-plan.md @/Users/gbanyan/.claude/get-shit-done/templates/summary.md </execution_context>

@.planning/PROJECT.md @.planning/ROADMAP.md @.planning/STATE.md @.planning/phases/01-data-infrastructure/01-RESEARCH.md @.planning/phases/01-data-infrastructure/01-01-SUMMARY.md Task 1: Create DuckDB persistence layer with checkpoint-restart src/usher_pipeline/persistence/__init__.py src/usher_pipeline/persistence/duckdb_store.py 1. Create `src/usher_pipeline/persistence/duckdb_store.py` with `PipelineStore` class: - Constructor takes `db_path: Path`. Creates parent directories. Connects via `duckdb.connect(str(db_path))`. Creates internal `_checkpoints` metadata table on init: `CREATE TABLE IF NOT EXISTS _checkpoints (table_name VARCHAR PRIMARY KEY, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, row_count INTEGER, description VARCHAR)`. - `save_dataframe(df: polars.DataFrame | pandas.DataFrame, table_name: str, description: str = "", replace: bool = True)`: Detect DataFrame type. For polars, use `conn.execute("CREATE OR REPLACE TABLE ... AS SELECT * FROM df")` via DuckDB's native polars support. For pandas, same pattern. Update _checkpoints metadata with row count and description. Use parameterized queries for metadata insert. - `load_dataframe(table_name: str, as_polars: bool = True) -> polars.DataFrame | pandas.DataFrame | None`: Load table. Return None if table doesn't exist (catch duckdb.CatalogException). Return polars by default (use `conn.execute("SELECT * FROM ...").pl()`), pandas if as_polars=False. - `has_checkpoint(table_name: str) -> bool`: Check _checkpoints table for existence. - `list_checkpoints() -> list[dict]`: Return all checkpoint metadata (table_name, created_at, row_count, description). - `delete_checkpoint(table_name: str)`: Drop table and remove from _checkpoints. - `export_parquet(table_name: str, output_path: Path)`: Export table to Parquet file using `COPY ... TO ... (FORMAT PARQUET)`. Create parent dirs. - `execute_query(query: str, params: list = None) -> polars.DataFrame`: Execute arbitrary SQL, return polars DataFrame. - `close()`: Close DuckDB connection. Implement `__enter__` and `__exit__` for context manager support. - `@classmethod from_config(cls, config: PipelineConfig)`: Create from config's duckdb_path.
2. Create `src/usher_pipeline/persistence/__init__.py` exporting PipelineStore and ProvenanceTracker.
cd /Users/gbanyan/Project/usher-exploring && python -c " from usher_pipeline.persistence.duckdb_store import PipelineStore from pathlib import Path import tempfile, polars as pl

with tempfile.TemporaryDirectory() as tmp: store = PipelineStore(Path(tmp) / 'test.duckdb') df = pl.DataFrame({'gene': ['BRCA1', 'TP53'], 'score': [0.95, 0.88]}) store.save_dataframe(df, 'test_genes', 'test data') assert store.has_checkpoint('test_genes') loaded = store.load_dataframe('test_genes') assert loaded.shape == (2, 2) assert not store.has_checkpoint('nonexistent') store.close() print('PipelineStore: all basic operations work') " - PipelineStore creates DuckDB file, saves/loads polars DataFrames - Checkpoint system tracks which tables exist with metadata - has_checkpoint returns True for existing tables, False for missing - Context manager support works for clean connection handling

Task 2: Create provenance tracker with tests for both modules src/usher_pipeline/persistence/provenance.py tests/test_persistence.py 1. Create `src/usher_pipeline/persistence/provenance.py` with `ProvenanceTracker`: - Constructor takes `pipeline_version: str`, `config: PipelineConfig`. - Stores config_hash from config.config_hash(), data_source_versions from config.versions.model_dump(). - `record_step(step_name: str, details: dict = None)`: Appends to internal processing_steps list with timestamp, step_name, and optional details dict. - `create_metadata() -> dict`: Returns full provenance dict: pipeline_version, data_source_versions, config_hash, created_at (ISO timestamp), processing_steps list. - `save_sidecar(output_path: Path)`: Saves provenance metadata as JSON sidecar file at `output_path.with_suffix('.provenance.json')`. Uses json.dumps with indent=2, default=str for Path serialization. - `save_to_store(store: PipelineStore)`: Saves provenance metadata to a `_provenance` table in DuckDB (flattened: version, config_hash, created_at, steps_json). - `@staticmethod load_sidecar(sidecar_path: Path) -> dict`: Reads and returns provenance JSON. - `@staticmethod from_config(config: PipelineConfig, version: str = None) -> ProvenanceTracker`: Creates tracker, using version from usher_pipeline.__version__ if not provided.
2. Create `tests/test_persistence.py` with pytest tests using tmp_path:

   DuckDB Store tests:
   - test_store_creates_database: PipelineStore creates .duckdb file at specified path
   - test_save_and_load_polars: save polars DataFrame, load back, verify shape and values
   - test_save_and_load_pandas: save pandas DataFrame, load back as pandas (as_polars=False)
   - test_checkpoint_lifecycle: save -> has_checkpoint True -> delete -> has_checkpoint False
   - test_list_checkpoints: save 3 tables, list_checkpoints returns 3 entries with metadata
   - test_export_parquet: save DataFrame, export to Parquet, verify Parquet file exists and is readable by polars
   - test_load_nonexistent_returns_none: load_dataframe for non-existent table returns None
   - test_context_manager: use `with PipelineStore(...) as store:` and verify operations work

   Provenance tests:
   - test_provenance_metadata_structure: create ProvenanceTracker, verify metadata dict has all required keys
   - test_provenance_records_steps: record 2 steps, verify they appear in metadata with timestamps
   - test_provenance_sidecar_roundtrip: save sidecar, load sidecar, verify content matches
   - test_provenance_config_hash_included: verify config_hash in metadata matches config.config_hash()
   - test_provenance_save_to_store: save to PipelineStore, verify _provenance table exists

For tests needing PipelineConfig, create a minimal test fixture that builds one with tmp_path directories.
cd /Users/gbanyan/Project/usher-exploring && pytest tests/test_persistence.py -v - ProvenanceTracker creates metadata with pipeline version, data source versions, config hash, timestamps, and processing steps - Sidecar JSON saves alongside outputs and round-trips correctly - All 13 persistence tests pass (8 DuckDB store + 5 provenance) 1. `pytest tests/test_persistence.py -v` -- all tests pass 2. `python -c "from usher_pipeline.persistence import PipelineStore, ProvenanceTracker"` -- imports work 3. DuckDB checkpoint-restart verified: save -> check -> load cycle works 4. Provenance sidecar JSON created with all required metadata fields

<success_criteria>

  • DuckDB store saves/loads DataFrames with checkpoint metadata tracking
  • Checkpoint-restart pattern works: has_checkpoint -> skip expensive re-fetch
  • Provenance tracker captures all required metadata (INFRA-06): pipeline version, data source versions, timestamps, config hash, processing steps
  • Parquet export works for downstream compatibility
  • All tests pass </success_criteria>
After completion, create `.planning/phases/01-data-infrastructure/01-03-SUMMARY.md`