#!/usr/bin/env python3 """ YOLO Full PDF Signature Scanner Scans all PDFs to detect handwritten signatures using a trained YOLOv11n model. Supports multi-process GPU acceleration and checkpoint resumption. Features: - Skip first page of each PDF - Stop scanning once signature is found - Extract and save signature crops with red stamp removal - Progress checkpoint for resumption - Detailed statistics report """ import argparse import csv import json import os import sys import time from concurrent.futures import ProcessPoolExecutor, as_completed from datetime import datetime from pathlib import Path from typing import Optional import cv2 import fitz # PyMuPDF import numpy as np # Will be imported in worker processes # from ultralytics import YOLO # Configuration DPI = 150 # Lower DPI for faster processing (150 vs 300) CONFIDENCE_THRESHOLD = 0.5 PROGRESS_SAVE_INTERVAL = 100 # Save progress every N files def remove_red_stamp(image: np.ndarray) -> np.ndarray: """Remove red stamp pixels from an image by replacing them with white.""" hsv = cv2.cvtColor(image, cv2.COLOR_RGB2HSV) # Red color ranges in HSV lower_red1 = np.array([0, 50, 50]) upper_red1 = np.array([10, 255, 255]) lower_red2 = np.array([160, 50, 50]) upper_red2 = np.array([180, 255, 255]) mask1 = cv2.inRange(hsv, lower_red1, upper_red1) mask2 = cv2.inRange(hsv, lower_red2, upper_red2) red_mask = cv2.bitwise_or(mask1, mask2) kernel = np.ones((3, 3), np.uint8) red_mask = cv2.dilate(red_mask, kernel, iterations=1) result = image.copy() result[red_mask > 0] = [255, 255, 255] return result def render_pdf_page(doc, page_num: int, dpi: int = DPI) -> Optional[np.ndarray]: """Render a PDF page to an image array.""" try: page = doc[page_num] mat = fitz.Matrix(dpi / 72, dpi / 72) pix = page.get_pixmap(matrix=mat, alpha=False) image = np.frombuffer(pix.samples, dtype=np.uint8) image = image.reshape(pix.height, pix.width, pix.n) return image except Exception: return None def scan_single_pdf(args: tuple) -> dict: """ Scan a single PDF for signatures. Args: args: Tuple of (pdf_path, model_path, output_dir, conf_threshold) Returns: Result dictionary with signature info """ pdf_path, model_path, output_dir, conf_threshold = args # Import here to avoid issues with multiprocessing from ultralytics import YOLO result = { 'filename': os.path.basename(pdf_path), 'source_dir': os.path.basename(os.path.dirname(pdf_path)), 'has_signature': False, 'page': None, 'num_signatures': 0, 'confidence_avg': 0.0, 'error': None } try: # Load model (each worker loads its own) model = YOLO(model_path) doc = fitz.open(pdf_path) num_pages = len(doc) # Skip first page, scan remaining pages for page_num in range(1, num_pages): # Start from page 2 (index 1) image = render_pdf_page(doc, page_num) if image is None: continue # Run YOLO detection results = model(image, conf=conf_threshold, verbose=False) signatures = [] for r in results: for box in r.boxes: x1, y1, x2, y2 = map(int, box.xyxy[0].cpu().numpy()) conf = float(box.conf[0].cpu().numpy()) signatures.append({ 'box': (x1, y1, x2 - x1, y2 - y1), 'xyxy': (x1, y1, x2, y2), 'confidence': conf }) if signatures: # Found signatures! Record and stop scanning result['has_signature'] = True result['page'] = page_num + 1 # 1-indexed result['num_signatures'] = len(signatures) result['confidence_avg'] = sum(s['confidence'] for s in signatures) / len(signatures) # Extract and save signature crops base_name = Path(pdf_path).stem for i, sig in enumerate(signatures): x, y, w, h = sig['box'] x = max(0, x) y = max(0, y) x2 = min(image.shape[1], x + w) y2 = min(image.shape[0], y + h) crop = image[y:y2, x:x2] crop_no_stamp = remove_red_stamp(crop) # Save to output directory crop_filename = f"{base_name}_page{page_num + 1}_sig{i + 1}.png" crop_path = os.path.join(output_dir, "images", crop_filename) cv2.imwrite(crop_path, cv2.cvtColor(crop_no_stamp, cv2.COLOR_RGB2BGR)) doc.close() return result doc.close() except Exception as e: result['error'] = str(e) return result def collect_pdf_files(input_dirs: list[str]) -> list[str]: """Collect all PDF files from input directories.""" pdf_files = [] for input_dir in input_dirs: input_path = Path(input_dir) if not input_path.exists(): print(f"Warning: Directory not found: {input_dir}") continue # Check for batch subdirectories batch_dirs = list(input_path.glob("batch_*")) if batch_dirs: # Has batch subdirectories for batch_dir in sorted(batch_dirs): for pdf_file in batch_dir.glob("*.pdf"): pdf_files.append(str(pdf_file)) else: # Flat directory for pdf_file in input_path.glob("*.pdf"): pdf_files.append(str(pdf_file)) return sorted(pdf_files) def load_progress(progress_file: str) -> set: """Load completed files from progress checkpoint.""" if os.path.exists(progress_file): try: with open(progress_file, 'r') as f: data = json.load(f) return set(data.get('completed_files', [])) except Exception: pass return set() def save_progress(progress_file: str, completed: set, total: int, start_time: float): """Save progress checkpoint.""" elapsed = time.time() - start_time data = { 'last_updated': datetime.now().isoformat(), 'total_pdfs': total, 'processed': len(completed), 'remaining': total - len(completed), 'elapsed_seconds': elapsed, 'completed_files': list(completed) } with open(progress_file, 'w') as f: json.dump(data, f) def main(): parser = argparse.ArgumentParser(description='YOLO Full PDF Signature Scanner') parser.add_argument('--input', nargs='+', required=True, help='Input directories containing PDFs') parser.add_argument('--output', required=True, help='Output directory for results') parser.add_argument('--model', default='best.pt', help='Path to YOLO model') parser.add_argument('--workers', type=int, default=4, help='Number of parallel workers') parser.add_argument('--conf', type=float, default=0.5, help='Confidence threshold') parser.add_argument('--resume', action='store_true', help='Resume from checkpoint') args = parser.parse_args() # Setup output directories output_dir = Path(args.output) output_dir.mkdir(parents=True, exist_ok=True) (output_dir / "images").mkdir(exist_ok=True) progress_file = str(output_dir / "progress.json") csv_file = str(output_dir / "yolo_signatures.csv") report_file = str(output_dir / "scan_report.json") print("=" * 70) print("YOLO Full PDF Signature Scanner") print("=" * 70) print(f"Input directories: {args.input}") print(f"Output directory: {args.output}") print(f"Model: {args.model}") print(f"Workers: {args.workers}") print(f"Confidence threshold: {args.conf}") print(f"Resume mode: {args.resume}") print("=" * 70) # Collect all PDF files print("\nCollecting PDF files...") all_pdfs = collect_pdf_files(args.input) total_pdfs = len(all_pdfs) print(f"Found {total_pdfs} PDF files") # Load progress if resuming completed_files = set() if args.resume: completed_files = load_progress(progress_file) print(f"Resuming from checkpoint: {len(completed_files)} files already processed") # Filter out already processed files pdfs_to_process = [p for p in all_pdfs if os.path.basename(p) not in completed_files] print(f"PDFs to process: {len(pdfs_to_process)}") if not pdfs_to_process: print("All files already processed!") return # Prepare arguments for workers work_args = [ (pdf_path, args.model, str(output_dir), args.conf) for pdf_path in pdfs_to_process ] # Statistics results_with_sig = [] results_without_sig = [] errors = [] source_stats = {} start_time = time.time() processed_count = len(completed_files) # Process with multiprocessing print(f"\nStarting scan with {args.workers} workers...") print("-" * 70) with ProcessPoolExecutor(max_workers=args.workers) as executor: futures = {executor.submit(scan_single_pdf, arg): arg[0] for arg in work_args} for future in as_completed(futures): pdf_path = futures[future] filename = os.path.basename(pdf_path) try: result = future.result() # Update statistics source_dir = result['source_dir'] if source_dir not in source_stats: source_stats[source_dir] = {'scanned': 0, 'with_sig': 0} source_stats[source_dir]['scanned'] += 1 if result['error']: errors.append(result) elif result['has_signature']: results_with_sig.append(result) source_stats[source_dir]['with_sig'] += 1 else: results_without_sig.append(result) # Track completion completed_files.add(filename) processed_count += 1 # Progress output elapsed = time.time() - start_time rate = (processed_count - len(load_progress(progress_file) if args.resume else set())) / elapsed if elapsed > 0 else 0 eta = (total_pdfs - processed_count) / rate / 3600 if rate > 0 else 0 status = "SIG" if result['has_signature'] else "---" print(f"[{processed_count}/{total_pdfs}] {status} {filename[:50]:50s} " f"({rate:.1f}/s, ETA: {eta:.1f}h)") # Save progress periodically if processed_count % PROGRESS_SAVE_INTERVAL == 0: save_progress(progress_file, completed_files, total_pdfs, start_time) except Exception as e: print(f"Error processing {filename}: {e}") errors.append({'filename': filename, 'error': str(e)}) # Final progress save save_progress(progress_file, completed_files, total_pdfs, start_time) # Write CSV index print("\nWriting CSV index...") with open(csv_file, 'w', newline='') as f: writer = csv.DictWriter(f, fieldnames=['filename', 'page', 'num_signatures', 'confidence_avg']) writer.writeheader() for result in results_with_sig: writer.writerow({ 'filename': result['filename'], 'page': result['page'], 'num_signatures': result['num_signatures'], 'confidence_avg': round(result['confidence_avg'], 4) }) # Generate report elapsed_total = time.time() - start_time report = { 'scan_date': datetime.now().isoformat(), 'total_pdfs': total_pdfs, 'with_signature': len(results_with_sig), 'without_signature': len(results_without_sig), 'errors': len(errors), 'signature_rate': f"{len(results_with_sig) / total_pdfs * 100:.2f}%" if total_pdfs > 0 else "0%", 'total_signatures_extracted': sum(r['num_signatures'] for r in results_with_sig), 'processing_time_hours': round(elapsed_total / 3600, 2), 'processing_rate_per_second': round(len(pdfs_to_process) / elapsed_total, 2) if elapsed_total > 0 else 0, 'source_breakdown': source_stats, 'model': args.model, 'confidence_threshold': args.conf, 'workers': args.workers } with open(report_file, 'w') as f: json.dump(report, f, indent=2) # Print summary print("\n" + "=" * 70) print("SCAN COMPLETE") print("=" * 70) print(f"Total PDFs scanned: {total_pdfs}") print(f"With signature: {len(results_with_sig)} ({len(results_with_sig)/total_pdfs*100:.1f}%)") print(f"Without signature: {len(results_without_sig)} ({len(results_without_sig)/total_pdfs*100:.1f}%)") print(f"Errors: {len(errors)}") print(f"Total signatures: {sum(r['num_signatures'] for r in results_with_sig)}") print(f"Processing time: {elapsed_total/3600:.2f} hours") print(f"Processing rate: {len(pdfs_to_process)/elapsed_total:.1f} PDFs/second") print("-" * 70) print(f"Results saved to: {output_dir}") print("=" * 70) if __name__ == "__main__": main()