docs(01-data-infrastructure): create phase plan

This commit is contained in:
2026-02-11 16:04:42 +08:00
parent 982f7f5a9b
commit cab2f5fc66
5 changed files with 726 additions and 3 deletions

View File

@@ -0,0 +1,179 @@
---
phase: 01-data-infrastructure
plan: 03
type: execute
wave: 2
depends_on: ["01-01"]
files_modified:
- src/usher_pipeline/persistence/__init__.py
- src/usher_pipeline/persistence/duckdb_store.py
- src/usher_pipeline/persistence/provenance.py
- tests/test_persistence.py
autonomous: true
must_haves:
truths:
- "DuckDB database stores DataFrames as tables and exports to Parquet"
- "Checkpoint system detects existing tables to enable restart-from-checkpoint without re-downloading"
- "Provenance metadata captures pipeline version, data source versions, timestamps, and config hash"
- "Provenance sidecar JSON file is saved alongside every pipeline output"
artifacts:
- path: "src/usher_pipeline/persistence/duckdb_store.py"
provides: "DuckDB-based storage with checkpoint-restart capability"
contains: "class PipelineStore"
- path: "src/usher_pipeline/persistence/provenance.py"
provides: "Provenance metadata creation and persistence"
contains: "class ProvenanceTracker"
key_links:
- from: "src/usher_pipeline/persistence/duckdb_store.py"
to: "duckdb"
via: "duckdb.connect for file-based database"
pattern: "duckdb\\.connect"
- from: "src/usher_pipeline/persistence/provenance.py"
to: "src/usher_pipeline/config/schema.py"
via: "reads PipelineConfig for version info and config hash"
pattern: "config_hash|PipelineConfig"
- from: "src/usher_pipeline/persistence/duckdb_store.py"
to: "src/usher_pipeline/persistence/provenance.py"
via: "attaches provenance metadata when saving checkpoints"
pattern: "provenance|ProvenanceTracker"
---
<objective>
Build the DuckDB persistence layer for intermediate results with checkpoint-restart capability, and the provenance metadata system for reproducibility tracking.
Purpose: The pipeline fetches expensive API data that takes hours. DuckDB persistence enables restart-from-checkpoint so failures don't require re-downloading everything (INFRA-07). Provenance tracking ensures every output is traceable to specific data versions, config parameters, and timestamps (INFRA-06).
Output: PipelineStore class for DuckDB operations and ProvenanceTracker for metadata.
</objective>
<execution_context>
@/Users/gbanyan/.claude/get-shit-done/workflows/execute-plan.md
@/Users/gbanyan/.claude/get-shit-done/templates/summary.md
</execution_context>
<context>
@.planning/PROJECT.md
@.planning/ROADMAP.md
@.planning/STATE.md
@.planning/phases/01-data-infrastructure/01-RESEARCH.md
@.planning/phases/01-data-infrastructure/01-01-SUMMARY.md
</context>
<tasks>
<task type="auto">
<name>Task 1: Create DuckDB persistence layer with checkpoint-restart</name>
<files>
src/usher_pipeline/persistence/__init__.py
src/usher_pipeline/persistence/duckdb_store.py
</files>
<action>
1. Create `src/usher_pipeline/persistence/duckdb_store.py` with `PipelineStore` class:
- Constructor takes `db_path: Path`. Creates parent directories. Connects via `duckdb.connect(str(db_path))`. Creates internal `_checkpoints` metadata table on init: `CREATE TABLE IF NOT EXISTS _checkpoints (table_name VARCHAR PRIMARY KEY, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, row_count INTEGER, description VARCHAR)`.
- `save_dataframe(df: polars.DataFrame | pandas.DataFrame, table_name: str, description: str = "", replace: bool = True)`: Detect DataFrame type. For polars, use `conn.execute("CREATE OR REPLACE TABLE ... AS SELECT * FROM df")` via DuckDB's native polars support. For pandas, same pattern. Update _checkpoints metadata with row count and description. Use parameterized queries for metadata insert.
- `load_dataframe(table_name: str, as_polars: bool = True) -> polars.DataFrame | pandas.DataFrame | None`: Load table. Return None if table doesn't exist (catch duckdb.CatalogException). Return polars by default (use `conn.execute("SELECT * FROM ...").pl()`), pandas if as_polars=False.
- `has_checkpoint(table_name: str) -> bool`: Check _checkpoints table for existence.
- `list_checkpoints() -> list[dict]`: Return all checkpoint metadata (table_name, created_at, row_count, description).
- `delete_checkpoint(table_name: str)`: Drop table and remove from _checkpoints.
- `export_parquet(table_name: str, output_path: Path)`: Export table to Parquet file using `COPY ... TO ... (FORMAT PARQUET)`. Create parent dirs.
- `execute_query(query: str, params: list = None) -> polars.DataFrame`: Execute arbitrary SQL, return polars DataFrame.
- `close()`: Close DuckDB connection. Implement `__enter__` and `__exit__` for context manager support.
- `@classmethod from_config(cls, config: PipelineConfig)`: Create from config's duckdb_path.
2. Create `src/usher_pipeline/persistence/__init__.py` exporting PipelineStore and ProvenanceTracker.
</action>
<verify>
cd /Users/gbanyan/Project/usher-exploring && python -c "
from usher_pipeline.persistence.duckdb_store import PipelineStore
from pathlib import Path
import tempfile, polars as pl
with tempfile.TemporaryDirectory() as tmp:
store = PipelineStore(Path(tmp) / 'test.duckdb')
df = pl.DataFrame({'gene': ['BRCA1', 'TP53'], 'score': [0.95, 0.88]})
store.save_dataframe(df, 'test_genes', 'test data')
assert store.has_checkpoint('test_genes')
loaded = store.load_dataframe('test_genes')
assert loaded.shape == (2, 2)
assert not store.has_checkpoint('nonexistent')
store.close()
print('PipelineStore: all basic operations work')
"
</verify>
<done>
- PipelineStore creates DuckDB file, saves/loads polars DataFrames
- Checkpoint system tracks which tables exist with metadata
- has_checkpoint returns True for existing tables, False for missing
- Context manager support works for clean connection handling
</done>
</task>
<task type="auto">
<name>Task 2: Create provenance tracker with tests for both modules</name>
<files>
src/usher_pipeline/persistence/provenance.py
tests/test_persistence.py
</files>
<action>
1. Create `src/usher_pipeline/persistence/provenance.py` with `ProvenanceTracker`:
- Constructor takes `pipeline_version: str`, `config: PipelineConfig`.
- Stores config_hash from config.config_hash(), data_source_versions from config.versions.model_dump().
- `record_step(step_name: str, details: dict = None)`: Appends to internal processing_steps list with timestamp, step_name, and optional details dict.
- `create_metadata() -> dict`: Returns full provenance dict: pipeline_version, data_source_versions, config_hash, created_at (ISO timestamp), processing_steps list.
- `save_sidecar(output_path: Path)`: Saves provenance metadata as JSON sidecar file at `output_path.with_suffix('.provenance.json')`. Uses json.dumps with indent=2, default=str for Path serialization.
- `save_to_store(store: PipelineStore)`: Saves provenance metadata to a `_provenance` table in DuckDB (flattened: version, config_hash, created_at, steps_json).
- `@staticmethod load_sidecar(sidecar_path: Path) -> dict`: Reads and returns provenance JSON.
- `@staticmethod from_config(config: PipelineConfig, version: str = None) -> ProvenanceTracker`: Creates tracker, using version from usher_pipeline.__version__ if not provided.
2. Create `tests/test_persistence.py` with pytest tests using tmp_path:
DuckDB Store tests:
- test_store_creates_database: PipelineStore creates .duckdb file at specified path
- test_save_and_load_polars: save polars DataFrame, load back, verify shape and values
- test_save_and_load_pandas: save pandas DataFrame, load back as pandas (as_polars=False)
- test_checkpoint_lifecycle: save -> has_checkpoint True -> delete -> has_checkpoint False
- test_list_checkpoints: save 3 tables, list_checkpoints returns 3 entries with metadata
- test_export_parquet: save DataFrame, export to Parquet, verify Parquet file exists and is readable by polars
- test_load_nonexistent_returns_none: load_dataframe for non-existent table returns None
- test_context_manager: use `with PipelineStore(...) as store:` and verify operations work
Provenance tests:
- test_provenance_metadata_structure: create ProvenanceTracker, verify metadata dict has all required keys
- test_provenance_records_steps: record 2 steps, verify they appear in metadata with timestamps
- test_provenance_sidecar_roundtrip: save sidecar, load sidecar, verify content matches
- test_provenance_config_hash_included: verify config_hash in metadata matches config.config_hash()
- test_provenance_save_to_store: save to PipelineStore, verify _provenance table exists
For tests needing PipelineConfig, create a minimal test fixture that builds one with tmp_path directories.
</action>
<verify>
cd /Users/gbanyan/Project/usher-exploring && pytest tests/test_persistence.py -v
</verify>
<done>
- ProvenanceTracker creates metadata with pipeline version, data source versions, config hash, timestamps, and processing steps
- Sidecar JSON saves alongside outputs and round-trips correctly
- All 13 persistence tests pass (8 DuckDB store + 5 provenance)
</done>
</task>
</tasks>
<verification>
1. `pytest tests/test_persistence.py -v` -- all tests pass
2. `python -c "from usher_pipeline.persistence import PipelineStore, ProvenanceTracker"` -- imports work
3. DuckDB checkpoint-restart verified: save -> check -> load cycle works
4. Provenance sidecar JSON created with all required metadata fields
</verification>
<success_criteria>
- DuckDB store saves/loads DataFrames with checkpoint metadata tracking
- Checkpoint-restart pattern works: has_checkpoint -> skip expensive re-fetch
- Provenance tracker captures all required metadata (INFRA-06): pipeline version, data source versions, timestamps, config hash, processing steps
- Parquet export works for downstream compatibility
- All tests pass
</success_criteria>
<output>
After completion, create `.planning/phases/01-data-infrastructure/01-03-SUMMARY.md`
</output>