180 lines
9.9 KiB
Markdown
180 lines
9.9 KiB
Markdown
---
|
|
phase: 01-data-infrastructure
|
|
plan: 03
|
|
type: execute
|
|
wave: 2
|
|
depends_on: ["01-01"]
|
|
files_modified:
|
|
- src/usher_pipeline/persistence/__init__.py
|
|
- src/usher_pipeline/persistence/duckdb_store.py
|
|
- src/usher_pipeline/persistence/provenance.py
|
|
- tests/test_persistence.py
|
|
autonomous: true
|
|
|
|
must_haves:
|
|
truths:
|
|
- "DuckDB database stores DataFrames as tables and exports to Parquet"
|
|
- "Checkpoint system detects existing tables to enable restart-from-checkpoint without re-downloading"
|
|
- "Provenance metadata captures pipeline version, data source versions, timestamps, and config hash"
|
|
- "Provenance sidecar JSON file is saved alongside every pipeline output"
|
|
artifacts:
|
|
- path: "src/usher_pipeline/persistence/duckdb_store.py"
|
|
provides: "DuckDB-based storage with checkpoint-restart capability"
|
|
contains: "class PipelineStore"
|
|
- path: "src/usher_pipeline/persistence/provenance.py"
|
|
provides: "Provenance metadata creation and persistence"
|
|
contains: "class ProvenanceTracker"
|
|
key_links:
|
|
- from: "src/usher_pipeline/persistence/duckdb_store.py"
|
|
to: "duckdb"
|
|
via: "duckdb.connect for file-based database"
|
|
pattern: "duckdb\\.connect"
|
|
- from: "src/usher_pipeline/persistence/provenance.py"
|
|
to: "src/usher_pipeline/config/schema.py"
|
|
via: "reads PipelineConfig for version info and config hash"
|
|
pattern: "config_hash|PipelineConfig"
|
|
- from: "src/usher_pipeline/persistence/duckdb_store.py"
|
|
to: "src/usher_pipeline/persistence/provenance.py"
|
|
via: "attaches provenance metadata when saving checkpoints"
|
|
pattern: "provenance|ProvenanceTracker"
|
|
---
|
|
|
|
<objective>
|
|
Build the DuckDB persistence layer for intermediate results with checkpoint-restart capability, and the provenance metadata system for reproducibility tracking.
|
|
|
|
Purpose: The pipeline fetches expensive API data that takes hours. DuckDB persistence enables restart-from-checkpoint so failures don't require re-downloading everything (INFRA-07). Provenance tracking ensures every output is traceable to specific data versions, config parameters, and timestamps (INFRA-06).
|
|
|
|
Output: PipelineStore class for DuckDB operations and ProvenanceTracker for metadata.
|
|
</objective>
|
|
|
|
<execution_context>
|
|
@/Users/gbanyan/.claude/get-shit-done/workflows/execute-plan.md
|
|
@/Users/gbanyan/.claude/get-shit-done/templates/summary.md
|
|
</execution_context>
|
|
|
|
<context>
|
|
@.planning/PROJECT.md
|
|
@.planning/ROADMAP.md
|
|
@.planning/STATE.md
|
|
@.planning/phases/01-data-infrastructure/01-RESEARCH.md
|
|
@.planning/phases/01-data-infrastructure/01-01-SUMMARY.md
|
|
</context>
|
|
|
|
<tasks>
|
|
|
|
<task type="auto">
|
|
<name>Task 1: Create DuckDB persistence layer with checkpoint-restart</name>
|
|
<files>
|
|
src/usher_pipeline/persistence/__init__.py
|
|
src/usher_pipeline/persistence/duckdb_store.py
|
|
</files>
|
|
<action>
|
|
1. Create `src/usher_pipeline/persistence/duckdb_store.py` with `PipelineStore` class:
|
|
- Constructor takes `db_path: Path`. Creates parent directories. Connects via `duckdb.connect(str(db_path))`. Creates internal `_checkpoints` metadata table on init: `CREATE TABLE IF NOT EXISTS _checkpoints (table_name VARCHAR PRIMARY KEY, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, row_count INTEGER, description VARCHAR)`.
|
|
- `save_dataframe(df: polars.DataFrame | pandas.DataFrame, table_name: str, description: str = "", replace: bool = True)`: Detect DataFrame type. For polars, use `conn.execute("CREATE OR REPLACE TABLE ... AS SELECT * FROM df")` via DuckDB's native polars support. For pandas, same pattern. Update _checkpoints metadata with row count and description. Use parameterized queries for metadata insert.
|
|
- `load_dataframe(table_name: str, as_polars: bool = True) -> polars.DataFrame | pandas.DataFrame | None`: Load table. Return None if table doesn't exist (catch duckdb.CatalogException). Return polars by default (use `conn.execute("SELECT * FROM ...").pl()`), pandas if as_polars=False.
|
|
- `has_checkpoint(table_name: str) -> bool`: Check _checkpoints table for existence.
|
|
- `list_checkpoints() -> list[dict]`: Return all checkpoint metadata (table_name, created_at, row_count, description).
|
|
- `delete_checkpoint(table_name: str)`: Drop table and remove from _checkpoints.
|
|
- `export_parquet(table_name: str, output_path: Path)`: Export table to Parquet file using `COPY ... TO ... (FORMAT PARQUET)`. Create parent dirs.
|
|
- `execute_query(query: str, params: list = None) -> polars.DataFrame`: Execute arbitrary SQL, return polars DataFrame.
|
|
- `close()`: Close DuckDB connection. Implement `__enter__` and `__exit__` for context manager support.
|
|
- `@classmethod from_config(cls, config: PipelineConfig)`: Create from config's duckdb_path.
|
|
|
|
2. Create `src/usher_pipeline/persistence/__init__.py` exporting PipelineStore and ProvenanceTracker.
|
|
</action>
|
|
<verify>
|
|
cd /Users/gbanyan/Project/usher-exploring && python -c "
|
|
from usher_pipeline.persistence.duckdb_store import PipelineStore
|
|
from pathlib import Path
|
|
import tempfile, polars as pl
|
|
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
store = PipelineStore(Path(tmp) / 'test.duckdb')
|
|
df = pl.DataFrame({'gene': ['BRCA1', 'TP53'], 'score': [0.95, 0.88]})
|
|
store.save_dataframe(df, 'test_genes', 'test data')
|
|
assert store.has_checkpoint('test_genes')
|
|
loaded = store.load_dataframe('test_genes')
|
|
assert loaded.shape == (2, 2)
|
|
assert not store.has_checkpoint('nonexistent')
|
|
store.close()
|
|
print('PipelineStore: all basic operations work')
|
|
"
|
|
</verify>
|
|
<done>
|
|
- PipelineStore creates DuckDB file, saves/loads polars DataFrames
|
|
- Checkpoint system tracks which tables exist with metadata
|
|
- has_checkpoint returns True for existing tables, False for missing
|
|
- Context manager support works for clean connection handling
|
|
</done>
|
|
</task>
|
|
|
|
<task type="auto">
|
|
<name>Task 2: Create provenance tracker with tests for both modules</name>
|
|
<files>
|
|
src/usher_pipeline/persistence/provenance.py
|
|
tests/test_persistence.py
|
|
</files>
|
|
<action>
|
|
1. Create `src/usher_pipeline/persistence/provenance.py` with `ProvenanceTracker`:
|
|
- Constructor takes `pipeline_version: str`, `config: PipelineConfig`.
|
|
- Stores config_hash from config.config_hash(), data_source_versions from config.versions.model_dump().
|
|
- `record_step(step_name: str, details: dict = None)`: Appends to internal processing_steps list with timestamp, step_name, and optional details dict.
|
|
- `create_metadata() -> dict`: Returns full provenance dict: pipeline_version, data_source_versions, config_hash, created_at (ISO timestamp), processing_steps list.
|
|
- `save_sidecar(output_path: Path)`: Saves provenance metadata as JSON sidecar file at `output_path.with_suffix('.provenance.json')`. Uses json.dumps with indent=2, default=str for Path serialization.
|
|
- `save_to_store(store: PipelineStore)`: Saves provenance metadata to a `_provenance` table in DuckDB (flattened: version, config_hash, created_at, steps_json).
|
|
- `@staticmethod load_sidecar(sidecar_path: Path) -> dict`: Reads and returns provenance JSON.
|
|
- `@staticmethod from_config(config: PipelineConfig, version: str = None) -> ProvenanceTracker`: Creates tracker, using version from usher_pipeline.__version__ if not provided.
|
|
|
|
2. Create `tests/test_persistence.py` with pytest tests using tmp_path:
|
|
|
|
DuckDB Store tests:
|
|
- test_store_creates_database: PipelineStore creates .duckdb file at specified path
|
|
- test_save_and_load_polars: save polars DataFrame, load back, verify shape and values
|
|
- test_save_and_load_pandas: save pandas DataFrame, load back as pandas (as_polars=False)
|
|
- test_checkpoint_lifecycle: save -> has_checkpoint True -> delete -> has_checkpoint False
|
|
- test_list_checkpoints: save 3 tables, list_checkpoints returns 3 entries with metadata
|
|
- test_export_parquet: save DataFrame, export to Parquet, verify Parquet file exists and is readable by polars
|
|
- test_load_nonexistent_returns_none: load_dataframe for non-existent table returns None
|
|
- test_context_manager: use `with PipelineStore(...) as store:` and verify operations work
|
|
|
|
Provenance tests:
|
|
- test_provenance_metadata_structure: create ProvenanceTracker, verify metadata dict has all required keys
|
|
- test_provenance_records_steps: record 2 steps, verify they appear in metadata with timestamps
|
|
- test_provenance_sidecar_roundtrip: save sidecar, load sidecar, verify content matches
|
|
- test_provenance_config_hash_included: verify config_hash in metadata matches config.config_hash()
|
|
- test_provenance_save_to_store: save to PipelineStore, verify _provenance table exists
|
|
|
|
For tests needing PipelineConfig, create a minimal test fixture that builds one with tmp_path directories.
|
|
</action>
|
|
<verify>
|
|
cd /Users/gbanyan/Project/usher-exploring && pytest tests/test_persistence.py -v
|
|
</verify>
|
|
<done>
|
|
- ProvenanceTracker creates metadata with pipeline version, data source versions, config hash, timestamps, and processing steps
|
|
- Sidecar JSON saves alongside outputs and round-trips correctly
|
|
- All 13 persistence tests pass (8 DuckDB store + 5 provenance)
|
|
</done>
|
|
</task>
|
|
|
|
</tasks>
|
|
|
|
<verification>
|
|
1. `pytest tests/test_persistence.py -v` -- all tests pass
|
|
2. `python -c "from usher_pipeline.persistence import PipelineStore, ProvenanceTracker"` -- imports work
|
|
3. DuckDB checkpoint-restart verified: save -> check -> load cycle works
|
|
4. Provenance sidecar JSON created with all required metadata fields
|
|
</verification>
|
|
|
|
<success_criteria>
|
|
- DuckDB store saves/loads DataFrames with checkpoint metadata tracking
|
|
- Checkpoint-restart pattern works: has_checkpoint -> skip expensive re-fetch
|
|
- Provenance tracker captures all required metadata (INFRA-06): pipeline version, data source versions, timestamps, config hash, processing steps
|
|
- Parquet export works for downstream compatibility
|
|
- All tests pass
|
|
</success_criteria>
|
|
|
|
<output>
|
|
After completion, create `.planning/phases/01-data-infrastructure/01-03-SUMMARY.md`
|
|
</output>
|