fix: implement real checkpoint persistence for literature layer

Previously checkpoints only logged a message but never wrote to DuckDB,
causing all progress to be lost on process kill. Now every batch_size
genes (default 500) are persisted to a literature_partial table.
On restart the CLI loads partial results and resumes from where it
left off. The partial table is cleaned up after successful completion.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-02-13 21:14:16 +08:00
parent dc36730cb4
commit 63e3cccd3c
3 changed files with 46 additions and 9 deletions

View File

@@ -1110,6 +1110,27 @@ def literature(ctx, force, email, api_key, batch_size):
)) ))
click.echo() click.echo()
# Load partial checkpoint if exists (for resume after interrupt)
partial_checkpoint = None
if store.has_checkpoint('literature_partial'):
partial_checkpoint = store.load_dataframe('literature_partial')
if partial_checkpoint is not None and partial_checkpoint.height > 0:
click.echo(click.style(
f" Resuming from partial checkpoint: {partial_checkpoint.height} genes already fetched",
fg='cyan'
))
else:
partial_checkpoint = None
# Define checkpoint callback to persist partial results to DuckDB
def save_partial_checkpoint(partial_df: pl.DataFrame):
store.save_dataframe(
df=partial_df,
table_name="literature_partial",
description="Partial literature fetch checkpoint (in-progress)",
replace=True,
)
# Process literature evidence # Process literature evidence
click.echo("Fetching and processing literature evidence from PubMed...") click.echo("Fetching and processing literature evidence from PubMed...")
click.echo(f" Email: {email}") click.echo(f" Email: {email}")
@@ -1124,7 +1145,8 @@ def literature(ctx, force, email, api_key, batch_size):
email=email, email=email,
api_key=api_key, api_key=api_key,
batch_size=batch_size, batch_size=batch_size,
checkpoint_df=None, # Future: load partial checkpoint if exists checkpoint_df=partial_checkpoint,
checkpoint_callback=save_partial_checkpoint,
) )
click.echo(click.style( click.echo(click.style(
f" Processed {len(df)} genes", f" Processed {len(df)} genes",
@@ -1156,6 +1178,12 @@ def literature(ctx, force, email, api_key, batch_size):
provenance=provenance, provenance=provenance,
description="PubMed literature evidence with context-specific queries and quality-weighted scoring" description="PubMed literature evidence with context-specific queries and quality-weighted scoring"
) )
# Clean up partial checkpoint after successful full load
try:
store.conn.execute("DROP TABLE IF EXISTS literature_partial")
store.conn.execute("DELETE FROM _checkpoints WHERE table_name = 'literature_partial'")
except Exception:
pass # Non-critical cleanup
click.echo(click.style( click.echo(click.style(
f" Saved to 'literature_evidence' table", f" Saved to 'literature_evidence' table",
fg='green' fg='green'

View File

@@ -153,6 +153,7 @@ def fetch_literature_evidence(
api_key: Optional[str] = None, api_key: Optional[str] = None,
batch_size: int = 500, batch_size: int = 500,
checkpoint_df: Optional[pl.DataFrame] = None, checkpoint_df: Optional[pl.DataFrame] = None,
checkpoint_callback=None,
) -> pl.DataFrame: ) -> pl.DataFrame:
"""Fetch literature evidence for all genes with progress tracking and checkpointing. """Fetch literature evidence for all genes with progress tracking and checkpointing.
@@ -168,6 +169,7 @@ def fetch_literature_evidence(
api_key: Optional NCBI API key for 10 req/sec rate limit api_key: Optional NCBI API key for 10 req/sec rate limit
batch_size: Save checkpoint every N genes (default: 500) batch_size: Save checkpoint every N genes (default: 500)
checkpoint_df: Optional partial results DataFrame to resume from checkpoint_df: Optional partial results DataFrame to resume from
checkpoint_callback: Optional callable(pl.DataFrame) to persist partial results
Returns: Returns:
DataFrame with columns: gene_symbol, total_pubmed_count, cilia_context_count, DataFrame with columns: gene_symbol, total_pubmed_count, cilia_context_count,
@@ -175,6 +177,7 @@ def fetch_literature_evidence(
direct_experimental_count, hts_screen_count. direct_experimental_count, hts_screen_count.
NULL values indicate failed queries (API errors), not zero publications. NULL values indicate failed queries (API errors), not zero publications.
""" """
all_gene_symbols = gene_symbols
# Estimate time # Estimate time
queries_per_gene = 6 # total + 4 contexts + direct + hts queries_per_gene = 6 # total + 4 contexts + direct + hts
total_queries = len(gene_symbols) * queries_per_gene total_queries = len(gene_symbols) * queries_per_gene
@@ -205,6 +208,8 @@ def fetch_literature_evidence(
else: else:
results = [] results = []
total_all = len(all_gene_symbols)
# Process genes with progress logging # Process genes with progress logging
for i, gene_symbol in enumerate(gene_symbols, start=1): for i, gene_symbol in enumerate(gene_symbols, start=1):
# Query PubMed for this gene # Query PubMed for this gene
@@ -218,21 +223,23 @@ def fetch_literature_evidence(
# Log progress every 100 genes # Log progress every 100 genes
if i % 100 == 0: if i % 100 == 0:
pct = (i / len(gene_symbols)) * 100 pct = (len(results) / total_all) * 100
logger.info( logger.info(
"pubmed_fetch_progress", "pubmed_fetch_progress",
processed=i, processed=len(results),
total=len(gene_symbols), total=total_all,
percent=round(pct, 1), percent=round(pct, 1),
gene_symbol=gene_symbol, gene_symbol=gene_symbol,
) )
# Checkpoint every batch_size genes # Checkpoint every batch_size genes — persist to DuckDB
if i % batch_size == 0: if i % batch_size == 0 and checkpoint_callback is not None:
checkpoint_partial = pl.DataFrame(results)
checkpoint_callback(checkpoint_partial)
logger.info( logger.info(
"pubmed_fetch_checkpoint", "pubmed_fetch_checkpoint_saved",
processed=i, processed=len(results),
total=len(gene_symbols), total=total_all,
batch_size=batch_size, batch_size=batch_size,
) )

View File

@@ -208,6 +208,7 @@ def process_literature_evidence(
api_key: Optional[str] = None, api_key: Optional[str] = None,
batch_size: int = 500, batch_size: int = 500,
checkpoint_df: Optional[pl.DataFrame] = None, checkpoint_df: Optional[pl.DataFrame] = None,
checkpoint_callback=None,
) -> pl.DataFrame: ) -> pl.DataFrame:
"""End-to-end literature evidence processing pipeline. """End-to-end literature evidence processing pipeline.
@@ -256,6 +257,7 @@ def process_literature_evidence(
api_key=api_key, api_key=api_key,
batch_size=batch_size, batch_size=batch_size,
checkpoint_df=checkpoint_df, checkpoint_df=checkpoint_df,
checkpoint_callback=checkpoint_callback,
) )
# Step 3: Classify evidence tiers # Step 3: Classify evidence tiers