Files
pdf_signature_extraction/yolo_extract_from_index.py
T
gbanyan 939a348da4 Add Paper A (IEEE TAI) complete draft with Firm A-calibrated dual-method classification
Paper draft includes all sections (Abstract through Conclusion), 36 references,
and supporting scripts. Key methodology: Cosine similarity + dHash dual-method
verification with thresholds calibrated against known-replication firm (Firm A).

Includes:
- 8 section markdown files (paper_a_*.md)
- Ablation study script (ResNet-50 vs VGG-16 vs EfficientNet-B0)
- Recalibrated classification script (84,386 PDFs, 5-tier system)
- Figure generation and Word export scripts
- Citation renumbering script ([1]-[36])
- Signature analysis pipeline (12 steps)
- YOLO extraction scripts

Three rounds of AI review completed (GPT-5.4, Claude Opus 4.6, Gemini 3 Pro).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-06 23:05:33 +08:00

381 lines
12 KiB
Python

#!/usr/bin/env python3
"""
YOLO Signature Extraction from VLM Index
Extracts signatures from PDF pages specified in master_signatures.csv.
Uses VLM-filtered index + YOLO for precise localization and cropping.
Pipeline:
CSV Index → Load specified page → YOLO Detection → Crop & Remove Red Stamp → Output
"""
import argparse
import csv
import json
import os
import sys
import time
from concurrent.futures import ProcessPoolExecutor, as_completed
from datetime import datetime
from pathlib import Path
from typing import Optional
import cv2
import fitz # PyMuPDF
import numpy as np
# Configuration
DPI = 150
CONFIDENCE_THRESHOLD = 0.5
PROGRESS_SAVE_INTERVAL = 500
def remove_red_stamp(image: np.ndarray) -> np.ndarray:
"""Remove red stamp pixels from an image by replacing them with white."""
hsv = cv2.cvtColor(image, cv2.COLOR_RGB2HSV)
lower_red1 = np.array([0, 50, 50])
upper_red1 = np.array([10, 255, 255])
lower_red2 = np.array([160, 50, 50])
upper_red2 = np.array([180, 255, 255])
mask1 = cv2.inRange(hsv, lower_red1, upper_red1)
mask2 = cv2.inRange(hsv, lower_red2, upper_red2)
red_mask = cv2.bitwise_or(mask1, mask2)
kernel = np.ones((3, 3), np.uint8)
red_mask = cv2.dilate(red_mask, kernel, iterations=1)
result = image.copy()
result[red_mask > 0] = [255, 255, 255]
return result
def render_pdf_page(pdf_path: str, page_num: int, dpi: int = DPI) -> Optional[np.ndarray]:
"""Render a specific PDF page to an image array."""
try:
doc = fitz.open(pdf_path)
if page_num < 1 or page_num > len(doc):
doc.close()
return None
page = doc[page_num - 1] # Convert to 0-indexed
mat = fitz.Matrix(dpi / 72, dpi / 72)
pix = page.get_pixmap(matrix=mat, alpha=False)
image = np.frombuffer(pix.samples, dtype=np.uint8)
image = image.reshape(pix.height, pix.width, pix.n)
doc.close()
return image
except Exception:
return None
def find_pdf_file(filename: str, pdf_base: str) -> Optional[str]:
"""Search for PDF file in batch directories."""
base_path = Path(pdf_base)
# Check for batch subdirectories
for batch_dir in sorted(base_path.glob("batch_*")):
pdf_path = batch_dir / filename
if pdf_path.exists():
return str(pdf_path)
# Check flat directory
pdf_path = base_path / filename
if pdf_path.exists():
return str(pdf_path)
return None
def process_single_entry(args: tuple) -> dict:
"""
Process a single CSV entry: render page, detect signatures, crop and save.
Args:
args: Tuple of (row_dict, model_path, pdf_base, output_dir, conf_threshold)
Returns:
Result dictionary
"""
row, model_path, pdf_base, output_dir, conf_threshold = args
from ultralytics import YOLO
filename = row['filename']
page_num = int(row['page'])
base_name = Path(filename).stem
result = {
'filename': filename,
'page': page_num,
'num_signatures': 0,
'confidence_avg': 0.0,
'image_files': [],
'error': None
}
try:
# Find PDF
pdf_path = find_pdf_file(filename, pdf_base)
if pdf_path is None:
result['error'] = 'PDF not found'
return result
# Render page
image = render_pdf_page(pdf_path, page_num)
if image is None:
result['error'] = 'Render failed'
return result
# Load model and detect
model = YOLO(model_path)
results = model(image, conf=conf_threshold, verbose=False)
signatures = []
for r in results:
for box in r.boxes:
x1, y1, x2, y2 = map(int, box.xyxy[0].cpu().numpy())
conf = float(box.conf[0].cpu().numpy())
signatures.append({
'box': (x1, y1, x2 - x1, y2 - y1),
'confidence': conf
})
if not signatures:
result['num_signatures'] = 0
return result
# Sort signatures by position (top-left to bottom-right)
signatures.sort(key=lambda s: (s['box'][1], s['box'][0]))
result['num_signatures'] = len(signatures)
result['confidence_avg'] = sum(s['confidence'] for s in signatures) / len(signatures)
# Extract and save crops
image_files = []
for i, sig in enumerate(signatures):
x, y, w, h = sig['box']
x = max(0, x)
y = max(0, y)
x2 = min(image.shape[1], x + w)
y2 = min(image.shape[0], y + h)
crop = image[y:y2, x:x2]
crop_clean = remove_red_stamp(crop)
crop_filename = f"{base_name}_page{page_num}_sig{i + 1}.png"
crop_path = os.path.join(output_dir, "images", crop_filename)
cv2.imwrite(crop_path, cv2.cvtColor(crop_clean, cv2.COLOR_RGB2BGR))
image_files.append(crop_filename)
result['image_files'] = image_files
except Exception as e:
result['error'] = str(e)
return result
def load_progress(progress_file: str) -> set:
"""Load completed entries from progress checkpoint."""
if os.path.exists(progress_file):
try:
with open(progress_file, 'r') as f:
data = json.load(f)
return set(data.get('completed_keys', []))
except Exception:
pass
return set()
def save_progress(progress_file: str, completed: set, total: int, start_time: float):
"""Save progress checkpoint."""
elapsed = time.time() - start_time
data = {
'last_updated': datetime.now().isoformat(),
'total_entries': total,
'processed': len(completed),
'remaining': total - len(completed),
'elapsed_seconds': elapsed,
'completed_keys': list(completed)
}
with open(progress_file, 'w') as f:
json.dump(data, f)
def main():
parser = argparse.ArgumentParser(description='YOLO Signature Extraction from VLM Index')
parser.add_argument('--csv', required=True, help='Path to master_signatures.csv')
parser.add_argument('--pdf-base', required=True, help='Base directory containing PDFs')
parser.add_argument('--output', required=True, help='Output directory')
parser.add_argument('--model', default='best.pt', help='Path to YOLO model')
parser.add_argument('--workers', type=int, default=8, help='Number of parallel workers')
parser.add_argument('--conf', type=float, default=0.5, help='Confidence threshold')
parser.add_argument('--resume', action='store_true', help='Resume from checkpoint')
args = parser.parse_args()
# Setup output directories
output_dir = Path(args.output)
output_dir.mkdir(parents=True, exist_ok=True)
(output_dir / "images").mkdir(exist_ok=True)
progress_file = str(output_dir / "progress.json")
csv_output = str(output_dir / "extraction_results.csv")
report_file = str(output_dir / "extraction_report.json")
print("=" * 70)
print("YOLO Signature Extraction from VLM Index")
print("=" * 70)
print(f"CSV Index: {args.csv}")
print(f"PDF Base: {args.pdf_base}")
print(f"Output: {args.output}")
print(f"Model: {args.model}")
print(f"Workers: {args.workers}")
print(f"Confidence: {args.conf}")
print("=" * 70)
# Load CSV
print("\nLoading CSV index...")
with open(args.csv, 'r') as f:
reader = csv.DictReader(f)
all_entries = list(reader)
total_entries = len(all_entries)
print(f"Total entries: {total_entries}")
# Load progress if resuming
completed_keys = set()
if args.resume:
completed_keys = load_progress(progress_file)
print(f"Resuming: {len(completed_keys)} entries already processed")
# Filter out completed entries
def entry_key(row):
return f"{row['filename']}_{row['page']}"
entries_to_process = [e for e in all_entries if entry_key(e) not in completed_keys]
print(f"Entries to process: {len(entries_to_process)}")
if not entries_to_process:
print("All entries already processed!")
return
# Prepare work arguments
work_args = [
(entry, args.model, args.pdf_base, str(output_dir), args.conf)
for entry in entries_to_process
]
# Results
results_success = []
results_no_sig = []
errors = []
start_time = time.time()
processed_count = len(completed_keys)
print(f"\nStarting extraction with {args.workers} workers...")
print("-" * 70)
with ProcessPoolExecutor(max_workers=args.workers) as executor:
futures = {executor.submit(process_single_entry, arg): arg[0] for arg in work_args}
for future in as_completed(futures):
entry = futures[future]
key = entry_key(entry)
try:
result = future.result()
if result['error']:
errors.append(result)
elif result['num_signatures'] > 0:
results_success.append(result)
else:
results_no_sig.append(result)
completed_keys.add(key)
processed_count += 1
# Progress output
elapsed = time.time() - start_time
rate = (processed_count - len(load_progress(progress_file) if args.resume else set())) / elapsed if elapsed > 0 else 0
eta = (total_entries - processed_count) / rate / 60 if rate > 0 else 0
status = f"SIG({result['num_signatures']})" if result['num_signatures'] > 0 else "---"
if result['error']:
status = "ERR"
print(f"[{processed_count}/{total_entries}] {status:8s} {result['filename'][:45]:45s} "
f"({rate:.1f}/s, ETA: {eta:.1f}m)")
# Save progress
if processed_count % PROGRESS_SAVE_INTERVAL == 0:
save_progress(progress_file, completed_keys, total_entries, start_time)
except Exception as e:
print(f"Error: {e}")
errors.append({'filename': entry['filename'], 'error': str(e)})
# Final progress save
save_progress(progress_file, completed_keys, total_entries, start_time)
# Write CSV results
print("\nWriting results CSV...")
with open(csv_output, 'w', newline='') as f:
writer = csv.DictWriter(f, fieldnames=[
'filename', 'page', 'num_signatures', 'confidence_avg', 'image_files'
])
writer.writeheader()
for r in results_success:
writer.writerow({
'filename': r['filename'],
'page': r['page'],
'num_signatures': r['num_signatures'],
'confidence_avg': round(r['confidence_avg'], 4),
'image_files': ','.join(r['image_files'])
})
# Generate report
elapsed_total = time.time() - start_time
total_sigs = sum(r['num_signatures'] for r in results_success)
report = {
'extraction_date': datetime.now().isoformat(),
'total_index_entries': total_entries,
'with_signatures_detected': len(results_success),
'no_signatures_detected': len(results_no_sig),
'errors': len(errors),
'total_signatures_extracted': total_sigs,
'detection_rate': f"{len(results_success) / total_entries * 100:.2f}%" if total_entries > 0 else "0%",
'processing_time_minutes': round(elapsed_total / 60, 2),
'processing_rate_per_second': round(len(entries_to_process) / elapsed_total, 2) if elapsed_total > 0 else 0,
'model': args.model,
'confidence_threshold': args.conf,
'workers': args.workers
}
with open(report_file, 'w') as f:
json.dump(report, f, indent=2)
# Print summary
print("\n" + "=" * 70)
print("EXTRACTION COMPLETE")
print("=" * 70)
print(f"Total index entries: {total_entries}")
print(f"With signatures: {len(results_success)} ({len(results_success)/total_entries*100:.1f}%)")
print(f"No signatures detected: {len(results_no_sig)} ({len(results_no_sig)/total_entries*100:.1f}%)")
print(f"Errors: {len(errors)}")
print(f"Total signatures: {total_sigs}")
print(f"Processing time: {elapsed_total/60:.1f} minutes")
print(f"Rate: {len(entries_to_process)/elapsed_total:.1f} entries/second")
print("-" * 70)
print(f"Results saved to: {output_dir}")
print("=" * 70)
if __name__ == "__main__":
main()