From 63e3cccd3c8a1509ecce8b5e17167dea051567a6 Mon Sep 17 00:00:00 2001 From: gbanyan Date: Fri, 13 Feb 2026 21:14:16 +0800 Subject: [PATCH] fix: implement real checkpoint persistence for literature layer Previously checkpoints only logged a message but never wrote to DuckDB, causing all progress to be lost on process kill. Now every batch_size genes (default 500) are persisted to a literature_partial table. On restart the CLI loads partial results and resumes from where it left off. The partial table is cleaned up after successful completion. Co-Authored-By: Claude Opus 4.6 --- src/usher_pipeline/cli/evidence_cmd.py | 30 ++++++++++++++++++- .../evidence/literature/fetch.py | 23 +++++++++----- .../evidence/literature/transform.py | 2 ++ 3 files changed, 46 insertions(+), 9 deletions(-) diff --git a/src/usher_pipeline/cli/evidence_cmd.py b/src/usher_pipeline/cli/evidence_cmd.py index a4a7be9..a54ecb5 100644 --- a/src/usher_pipeline/cli/evidence_cmd.py +++ b/src/usher_pipeline/cli/evidence_cmd.py @@ -1110,6 +1110,27 @@ def literature(ctx, force, email, api_key, batch_size): )) click.echo() + # Load partial checkpoint if exists (for resume after interrupt) + partial_checkpoint = None + if store.has_checkpoint('literature_partial'): + partial_checkpoint = store.load_dataframe('literature_partial') + if partial_checkpoint is not None and partial_checkpoint.height > 0: + click.echo(click.style( + f" Resuming from partial checkpoint: {partial_checkpoint.height} genes already fetched", + fg='cyan' + )) + else: + partial_checkpoint = None + + # Define checkpoint callback to persist partial results to DuckDB + def save_partial_checkpoint(partial_df: pl.DataFrame): + store.save_dataframe( + df=partial_df, + table_name="literature_partial", + description="Partial literature fetch checkpoint (in-progress)", + replace=True, + ) + # Process literature evidence click.echo("Fetching and processing literature evidence from PubMed...") click.echo(f" Email: {email}") @@ -1124,7 +1145,8 @@ def literature(ctx, force, email, api_key, batch_size): email=email, api_key=api_key, batch_size=batch_size, - checkpoint_df=None, # Future: load partial checkpoint if exists + checkpoint_df=partial_checkpoint, + checkpoint_callback=save_partial_checkpoint, ) click.echo(click.style( f" Processed {len(df)} genes", @@ -1156,6 +1178,12 @@ def literature(ctx, force, email, api_key, batch_size): provenance=provenance, description="PubMed literature evidence with context-specific queries and quality-weighted scoring" ) + # Clean up partial checkpoint after successful full load + try: + store.conn.execute("DROP TABLE IF EXISTS literature_partial") + store.conn.execute("DELETE FROM _checkpoints WHERE table_name = 'literature_partial'") + except Exception: + pass # Non-critical cleanup click.echo(click.style( f" Saved to 'literature_evidence' table", fg='green' diff --git a/src/usher_pipeline/evidence/literature/fetch.py b/src/usher_pipeline/evidence/literature/fetch.py index 0629a10..b23bfc8 100644 --- a/src/usher_pipeline/evidence/literature/fetch.py +++ b/src/usher_pipeline/evidence/literature/fetch.py @@ -153,6 +153,7 @@ def fetch_literature_evidence( api_key: Optional[str] = None, batch_size: int = 500, checkpoint_df: Optional[pl.DataFrame] = None, + checkpoint_callback=None, ) -> pl.DataFrame: """Fetch literature evidence for all genes with progress tracking and checkpointing. @@ -168,6 +169,7 @@ def fetch_literature_evidence( api_key: Optional NCBI API key for 10 req/sec rate limit batch_size: Save checkpoint every N genes (default: 500) checkpoint_df: Optional partial results DataFrame to resume from + checkpoint_callback: Optional callable(pl.DataFrame) to persist partial results Returns: DataFrame with columns: gene_symbol, total_pubmed_count, cilia_context_count, @@ -175,6 +177,7 @@ def fetch_literature_evidence( direct_experimental_count, hts_screen_count. NULL values indicate failed queries (API errors), not zero publications. """ + all_gene_symbols = gene_symbols # Estimate time queries_per_gene = 6 # total + 4 contexts + direct + hts total_queries = len(gene_symbols) * queries_per_gene @@ -205,6 +208,8 @@ def fetch_literature_evidence( else: results = [] + total_all = len(all_gene_symbols) + # Process genes with progress logging for i, gene_symbol in enumerate(gene_symbols, start=1): # Query PubMed for this gene @@ -218,21 +223,23 @@ def fetch_literature_evidence( # Log progress every 100 genes if i % 100 == 0: - pct = (i / len(gene_symbols)) * 100 + pct = (len(results) / total_all) * 100 logger.info( "pubmed_fetch_progress", - processed=i, - total=len(gene_symbols), + processed=len(results), + total=total_all, percent=round(pct, 1), gene_symbol=gene_symbol, ) - # Checkpoint every batch_size genes - if i % batch_size == 0: + # Checkpoint every batch_size genes — persist to DuckDB + if i % batch_size == 0 and checkpoint_callback is not None: + checkpoint_partial = pl.DataFrame(results) + checkpoint_callback(checkpoint_partial) logger.info( - "pubmed_fetch_checkpoint", - processed=i, - total=len(gene_symbols), + "pubmed_fetch_checkpoint_saved", + processed=len(results), + total=total_all, batch_size=batch_size, ) diff --git a/src/usher_pipeline/evidence/literature/transform.py b/src/usher_pipeline/evidence/literature/transform.py index 80b3d4b..e70b04e 100644 --- a/src/usher_pipeline/evidence/literature/transform.py +++ b/src/usher_pipeline/evidence/literature/transform.py @@ -208,6 +208,7 @@ def process_literature_evidence( api_key: Optional[str] = None, batch_size: int = 500, checkpoint_df: Optional[pl.DataFrame] = None, + checkpoint_callback=None, ) -> pl.DataFrame: """End-to-end literature evidence processing pipeline. @@ -256,6 +257,7 @@ def process_literature_evidence( api_key=api_key, batch_size=batch_size, checkpoint_df=checkpoint_df, + checkpoint_callback=checkpoint_callback, ) # Step 3: Classify evidence tiers