#!/usr/bin/env python3 """ Hybrid signature extraction using VLM name recognition + text layer/CV detection. Workflow: 1. VLM extracts signature names from document 2. Try PDF text layer search for those names (precise coordinates) 3. Fallback to computer vision if no text layer 4. Extract regions around detected locations 5. VLM verifies each region contains the specific signature """ import cv2 import numpy as np import os import sys import json import base64 import requests import re from pathlib import Path from datetime import datetime import fitz # PyMuPDF import csv # Configuration PDF_INPUT_PATH = "/Volumes/NV2/PDF-Processing/signature-image-output" OUTPUT_PATH = "/Volumes/NV2/PDF-Processing/signature-image-output/signatures" REJECTED_PATH = "/Volumes/NV2/PDF-Processing/signature-image-output/signatures/rejected" LOG_FILE = None # Ollama Configuration OLLAMA_URL = "http://192.168.30.36:11434" OLLAMA_MODEL = "qwen2.5vl:32b" # Image processing parameters DPI = 300 def encode_image_to_base64(image_array): """Encode numpy image array to base64 string.""" image_rgb = cv2.cvtColor(image_array, cv2.COLOR_BGR2RGB) _, buffer = cv2.imencode('.jpg', image_rgb) image_base64 = base64.b64encode(buffer).decode('utf-8') return image_base64 def call_ollama_vision(image_base64, prompt): """Call Ollama vision model with image and prompt.""" try: url = f"{OLLAMA_URL}/api/generate" payload = { "model": OLLAMA_MODEL, "prompt": prompt, "images": [image_base64], "stream": False } response = requests.post(url, json=payload, timeout=120) response.raise_for_status() result = response.json() return result.get('response', ''), None except Exception as e: return None, str(e) def render_pdf_page_as_image(pdf_path, dpi=300): """Render PDF page as a high-resolution image.""" try: doc = fitz.open(pdf_path) page = doc[0] mat = fitz.Matrix(dpi / 72, dpi / 72) pix = page.get_pixmap(matrix=mat, alpha=False) img = np.frombuffer(pix.samples, dtype=np.uint8).reshape(pix.height, pix.width, pix.n) if pix.n == 3: img = cv2.cvtColor(img, cv2.COLOR_RGB2BGR) elif pix.n == 1: img = cv2.cvtColor(img, cv2.COLOR_GRAY2BGR) doc.close() return img, pix.width, pix.height, None except Exception as e: return None, 0, 0, str(e) def extract_signature_names_with_vlm(image_base64): """ Step 1: Ask VLM to extract the names of people who signed the document. Returns: list of Chinese names """ prompt = """Please identify the handwritten signatures with Chinese names on this document. List ONLY the Chinese names of the people who signed (the handwritten names, not printed text). Format your response as a simple list, one name per line: 周寶蓮 魏興海 If no handwritten signatures found, say "No signatures found".""" response, error = call_ollama_vision(image_base64, prompt) if error: return [], error # Parse names from response # Look for Chinese characters (pattern: 2-4 consecutive Chinese characters) names = [] for line in response.split('\n'): line = line.strip() # Match Chinese names (2-4 characters is typical) chinese_pattern = r'[\u4e00-\u9fff]{2,4}' matches = re.findall(chinese_pattern, line) for name in matches: if name not in names and len(name) >= 2: names.append(name) return names, None def search_pdf_text_layer(pdf_path, names, dpi=300): """ Step 2a: Search for signature names in PDF text layer. Returns: list of bounding boxes [(x, y, w, h, name), ...] Coordinates are in pixels at specified DPI. """ try: doc = fitz.open(pdf_path) page = doc[0] # Get page dimensions page_rect = page.rect page_width_pts = page_rect.width page_height_pts = page_rect.height # Calculate scaling factor from points (72 DPI) to target DPI scale = dpi / 72.0 found_locations = [] for name in names: # Search for the name in the page text text_instances = page.search_for(name) for inst in text_instances: # inst is a Rect in points, convert to pixels at target DPI x = int(inst.x0 * scale) y = int(inst.y0 * scale) w = int((inst.x1 - inst.x0) * scale) h = int((inst.y1 - inst.y0) * scale) found_locations.append((x, y, w, h, name)) doc.close() return found_locations, None except Exception as e: return [], str(e) def detect_signature_regions_cv(image): """ Step 2b: Use computer vision to detect signature-like regions. Returns: list of bounding boxes [(x, y, w, h), ...] """ # Convert to grayscale gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) # Find dark regions (potential handwriting) _, binary = cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU) # Morphological operations to connect nearby strokes kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (20, 10)) dilated = cv2.dilate(binary, kernel, iterations=2) # Find contours contours, _ = cv2.findContours(dilated, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) # Filter contours for signature-like characteristics bounding_boxes = [] for contour in contours: area = cv2.contourArea(contour) # Filter by area (signatures are medium-sized) if 5000 < area < 200000: x, y, w, h = cv2.boundingRect(contour) # Filter by aspect ratio and size aspect_ratio = w / float(h) if h > 0 else 0 # Signatures are usually wider than tall, but not extremely so if 0.5 < aspect_ratio < 10 and w > 50 and h > 20: bounding_boxes.append((x, y, w, h)) return bounding_boxes def expand_bbox_for_signature(bbox, image_shape, expansion_factor=2.0): """ Expand bounding box to capture nearby handwritten signature. If bbox is from text, signature is usually near it. """ x, y, w, h = bbox[:4] img_height, img_width = image_shape[:2] # Expand box significantly to capture signature near printed name expand_w = int(w * expansion_factor) expand_h = int(h * expansion_factor) # Center the expansion new_x = max(0, x - expand_w // 2) new_y = max(0, y - expand_h // 2) new_w = min(img_width - new_x, w + expand_w) new_h = min(img_height - new_y, h + expand_h) return (new_x, new_y, new_w, new_h) def extract_region_with_opencv(image, bbox, output_path): """Extract region from image and save.""" try: x, y, w, h = bbox # Ensure coordinates are within image bounds x = max(0, x) y = max(0, y) x_end = min(image.shape[1], x + w) y_end = min(image.shape[0], y + h) region = image[y:y_end, x:x_end] # Save output_file = f"{output_path}.png" cv2.imwrite(output_file, region) return True, None, output_file except Exception as e: return False, str(e), None def verify_signature_with_names(image_path, expected_names): """ Step 4: Verify that extracted region contains signature of any expected person. Returns: (is_signature, matched_name_or_none, error) """ try: image = cv2.imread(image_path) image_base64 = encode_image_to_base64(image) # Ask about all names at once names_str = ", ".join([f'"{name}"' for name in expected_names]) prompt = f"""Does this image contain a handwritten signature with any of these Chinese names: {names_str}? Look carefully for handwritten Chinese characters matching one of these names. If you find a signature, respond with: "yes: [name]" where [name] is the matching name. If no signature matches these names, respond with: "no".""" response, error = call_ollama_vision(image_base64, prompt) if error: return False, None, error response_lower = response.lower() # Check if VLM found a match if 'yes' in response_lower: # Try to extract which name matched for name in expected_names: if name in response: return True, name, None # VLM said yes but didn't specify which name return True, expected_names[0], None else: return False, None, None except Exception as e: return False, None, str(e) def merge_overlapping_boxes(boxes, merge_threshold=100): """Merge bounding boxes that overlap or are very close.""" if not boxes: return [] boxes = sorted(boxes, key=lambda b: (b[1], b[0])) # Sort by y, then x merged = [] current = list(boxes[0]) for box in boxes[1:]: x, y, w, h = box[:4] cx, cy, cw, ch = current[:4] # Check if boxes overlap or are close if (abs(y - cy) < merge_threshold and x < cx + cw + merge_threshold and x + w > cx - merge_threshold): # Merge new_x = min(cx, x) new_y = min(cy, y) new_w = max(cx + cw, x + w) - new_x new_h = max(cy + ch, y + h) - new_y current = [new_x, new_y, new_w, new_h] if len(box) > 4: current.append(box[4]) # Preserve name if present else: merged.append(tuple(current)) current = list(box) merged.append(tuple(current)) return merged def process_pdf_page(pdf_path, output_dir): """ Process a single PDF page using hybrid approach. Returns: (signature_count, extracted_files, method_used, error) """ pdf_name = Path(pdf_path).stem # Render page as image print(" - Rendering page...", end='', flush=True) image, page_width, page_height, error = render_pdf_page_as_image(pdf_path, DPI) if error: print(f" ERROR") return 0, [], "none", f"Render error: {error}" print(" OK") # Step 1: Extract signature names with VLM print(" - Extracting signature names with VLM...", end='', flush=True) image_base64 = encode_image_to_base64(image) names, error = extract_signature_names_with_vlm(image_base64) if error: print(f" ERROR") return 0, [], "none", f"VLM error: {error}" if not names: print(" No names found") return 0, [], "none", None print(f" OK - Found: {', '.join(names)}") # Step 2a: Try PDF text layer search print(" - Searching PDF text layer...", end='', flush=True) text_locations, error = search_pdf_text_layer(pdf_path, names, DPI) candidate_boxes = [] method_used = "none" if text_locations: print(f" OK - Found {len(text_locations)} text instances") method_used = "text_layer" # Expand boxes to capture nearby signatures for loc in text_locations: expanded = expand_bbox_for_signature(loc, image.shape) candidate_boxes.append(expanded) else: print(" No text layer or names not found") # Step 2b: Fallback to computer vision print(" - Using computer vision detection...", end='', flush=True) cv_boxes = detect_signature_regions_cv(image) if cv_boxes: print(f" OK - Found {len(cv_boxes)} regions") method_used = "computer_vision" candidate_boxes = cv_boxes else: print(" No regions detected") return 0, [], "none", None # Merge overlapping boxes candidate_boxes = merge_overlapping_boxes(candidate_boxes) print(f" - Found {len(candidate_boxes)} candidate region(s)") # Step 3 & 4: Extract and verify each region extracted_files = [] verified_names = set() for idx, bbox_info in enumerate(candidate_boxes): bbox = bbox_info[:4] print(f" - Region {idx + 1}: Extracting...", end='', flush=True) output_base = os.path.join(output_dir, f"{pdf_name}_region_{idx + 1}") success, error, output_file = extract_region_with_opencv(image, bbox, output_base) if not success: print(f" FAILED: {error}") continue print(f" OK - Verifying...", end='', flush=True) # Verify this region contains any of the expected signatures is_signature, matched_name, verify_error = verify_signature_with_names(output_file, names) if verify_error: print(f" ERROR: {verify_error}") os.remove(output_file) # Remove failed verification attempts continue if is_signature and matched_name: # Found a signature! Rename file with the person's name final_filename = f"{pdf_name}_signature_{matched_name}.png" final_path = os.path.join(output_dir, final_filename) # Check if we already found this person's signature if matched_name in verified_names: print(f" DUPLICATE ({matched_name}) - rejected") os.remove(output_file) else: os.rename(output_file, final_path) verified_names.add(matched_name) print(f" VERIFIED ({matched_name})") extracted_files.append(final_path) else: print(f" NOT A SIGNATURE - rejected") rejected_file = os.path.join(REJECTED_PATH, os.path.basename(output_file)) os.rename(output_file, rejected_file) return len(extracted_files), extracted_files, method_used, None def main(): """Main processing function""" global LOG_FILE print(f"Starting hybrid signature extraction...") print(f"Ollama URL: {OLLAMA_URL}") print(f"Model: {OLLAMA_MODEL}") print(f"Input path: {PDF_INPUT_PATH}") print(f"Output path: {OUTPUT_PATH}") print() # Test Ollama connection print("Testing Ollama connection...") try: response = requests.get(f"{OLLAMA_URL}/api/tags", timeout=5) response.raise_for_status() print("✓ Ollama connection successful\n") except Exception as e: print(f"✗ Ollama connection failed: {e}") return # Create output directories os.makedirs(OUTPUT_PATH, exist_ok=True) os.makedirs(REJECTED_PATH, exist_ok=True) LOG_FILE = os.path.join(OUTPUT_PATH, f"hybrid_extraction_log_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv") # Get PDF files (test with first 5) pdf_files = sorted(Path(PDF_INPUT_PATH).glob("*.pdf"))[:5] if not pdf_files: print("ERROR: No PDF files found!") return print(f"Found {len(pdf_files)} PDF files to process (testing with first 5)\n") # Statistics stats = { 'total_pdfs': 0, 'pdfs_with_signatures': 0, 'total_signatures': 0, 'text_layer_used': 0, 'cv_used': 0, 'errors': 0 } # Open log file with open(LOG_FILE, 'w', newline='') as log_file: log_writer = csv.writer(log_file) log_writer.writerow([ 'pdf_filename', 'signatures_found', 'method_used', 'extracted_files', 'error' ]) # Process each PDF for i, pdf_path in enumerate(pdf_files): stats['total_pdfs'] += 1 pdf_filename = pdf_path.name print(f"[{i+1}/{len(pdf_files)}] Processing: {pdf_filename}") sig_count, extracted_files, method, error = process_pdf_page(str(pdf_path), OUTPUT_PATH) if error: print(f" ERROR: {error}\n") stats['errors'] += 1 log_writer.writerow([pdf_filename, 0, method, "", error]) continue if sig_count > 0: stats['pdfs_with_signatures'] += 1 stats['total_signatures'] += sig_count if method == "text_layer": stats['text_layer_used'] += 1 elif method == "computer_vision": stats['cv_used'] += 1 print(f" ✓ Extracted {sig_count} signature(s) using {method}\n") filenames = [Path(f).name for f in extracted_files] log_writer.writerow([ pdf_filename, sig_count, method, ", ".join(filenames), "" ]) else: print(f" No signatures extracted\n") log_writer.writerow([pdf_filename, 0, method, "", ""]) # Print summary print("="*60) print("HYBRID EXTRACTION SUMMARY") print("="*60) print(f"Total PDFs processed: {stats['total_pdfs']}") print(f"PDFs with signatures: {stats['pdfs_with_signatures']}") print(f"Total signatures extracted: {stats['total_signatures']}") print(f"Text layer method used: {stats['text_layer_used']}") print(f"Computer vision used: {stats['cv_used']}") print(f"Errors: {stats['errors']}") print(f"\nLog file: {LOG_FILE}") print("="*60) if __name__ == "__main__": try: main() except KeyboardInterrupt: print("\n\nProcess interrupted by user.") sys.exit(1) except Exception as e: print(f"\n\nFATAL ERROR: {e}") import traceback traceback.print_exc() sys.exit(1)