Files
pdf_signature_extraction/extract_signatures_hybrid.py
gbanyan 52612e14ba Add hybrid signature extraction with name-based verification
Implement VLM name extraction + CV detection hybrid approach to
replace unreliable VLM coordinate system with name-based verification.

Key Features:
- VLM extracts signature names (周寶蓮, 魏興海, etc.)
- CV or PDF text layer detects regions
- VLM verifies each region against expected names
- Signatures saved with person names: signature_周寶蓮.png
- Duplicate prevention and rejection handling

Test Results:
- 5 PDF pages tested
- 7/10 signatures extracted (70% recall)
- 100% precision (no false positives)
- No blank regions extracted (previous issue resolved)

Files:
- extract_pages_from_csv.py: Extract pages from CSV (tested: 100 files)
- extract_signatures_hybrid.py: Hybrid extraction (current working solution)
- extract_handwriting.py: CV-only approach (component)
- extract_signatures_vlm.py: Deprecated VLM coordinate approach
- PROJECT_DOCUMENTATION.md: Complete project history and results
- SESSION_INIT.md: Session handoff documentation
- SESSION_CHECKLIST.md: Status checklist
- NEW_SESSION_PROMPT.txt: Template for next session
- HOW_TO_CONTINUE.txt: Visual handoff guide
- COMMIT_SUMMARY.md: Commit preparation guide
- README.md: Quick start guide
- README_page_extraction.md: Page extraction docs
- README_hybrid_extraction.md: Hybrid approach docs
- .gitignore: Exclude diagnostic scripts and outputs

Known Limitations:
- 30% of signatures missed due to conservative CV parameters
- Text layer method untested (all test PDFs are scanned images)
- Performance: ~24 seconds per PDF

Next Steps:
- Tune CV parameters for higher recall
- Test with larger dataset (100+ files)
- Process full dataset (86,073 files)

🤖 Generated with Claude Code
2025-10-26 23:39:52 +08:00

544 lines
17 KiB
Python

#!/usr/bin/env python3
"""
Hybrid signature extraction using VLM name recognition + text layer/CV detection.
Workflow:
1. VLM extracts signature names from document
2. Try PDF text layer search for those names (precise coordinates)
3. Fallback to computer vision if no text layer
4. Extract regions around detected locations
5. VLM verifies each region contains the specific signature
"""
import cv2
import numpy as np
import os
import sys
import json
import base64
import requests
import re
from pathlib import Path
from datetime import datetime
import fitz # PyMuPDF
import csv
# Configuration
PDF_INPUT_PATH = "/Volumes/NV2/PDF-Processing/signature-image-output"
OUTPUT_PATH = "/Volumes/NV2/PDF-Processing/signature-image-output/signatures"
REJECTED_PATH = "/Volumes/NV2/PDF-Processing/signature-image-output/signatures/rejected"
LOG_FILE = None
# Ollama Configuration
OLLAMA_URL = "http://192.168.30.36:11434"
OLLAMA_MODEL = "qwen2.5vl:32b"
# Image processing parameters
DPI = 300
def encode_image_to_base64(image_array):
"""Encode numpy image array to base64 string."""
image_rgb = cv2.cvtColor(image_array, cv2.COLOR_BGR2RGB)
_, buffer = cv2.imencode('.jpg', image_rgb)
image_base64 = base64.b64encode(buffer).decode('utf-8')
return image_base64
def call_ollama_vision(image_base64, prompt):
"""Call Ollama vision model with image and prompt."""
try:
url = f"{OLLAMA_URL}/api/generate"
payload = {
"model": OLLAMA_MODEL,
"prompt": prompt,
"images": [image_base64],
"stream": False
}
response = requests.post(url, json=payload, timeout=120)
response.raise_for_status()
result = response.json()
return result.get('response', ''), None
except Exception as e:
return None, str(e)
def render_pdf_page_as_image(pdf_path, dpi=300):
"""Render PDF page as a high-resolution image."""
try:
doc = fitz.open(pdf_path)
page = doc[0]
mat = fitz.Matrix(dpi / 72, dpi / 72)
pix = page.get_pixmap(matrix=mat, alpha=False)
img = np.frombuffer(pix.samples, dtype=np.uint8).reshape(pix.height, pix.width, pix.n)
if pix.n == 3:
img = cv2.cvtColor(img, cv2.COLOR_RGB2BGR)
elif pix.n == 1:
img = cv2.cvtColor(img, cv2.COLOR_GRAY2BGR)
doc.close()
return img, pix.width, pix.height, None
except Exception as e:
return None, 0, 0, str(e)
def extract_signature_names_with_vlm(image_base64):
"""
Step 1: Ask VLM to extract the names of people who signed the document.
Returns: list of Chinese names
"""
prompt = """Please identify the handwritten signatures with Chinese names on this document.
List ONLY the Chinese names of the people who signed (the handwritten names, not printed text).
Format your response as a simple list, one name per line:
周寶蓮
魏興海
If no handwritten signatures found, say "No signatures found"."""
response, error = call_ollama_vision(image_base64, prompt)
if error:
return [], error
# Parse names from response
# Look for Chinese characters (pattern: 2-4 consecutive Chinese characters)
names = []
for line in response.split('\n'):
line = line.strip()
# Match Chinese names (2-4 characters is typical)
chinese_pattern = r'[\u4e00-\u9fff]{2,4}'
matches = re.findall(chinese_pattern, line)
for name in matches:
if name not in names and len(name) >= 2:
names.append(name)
return names, None
def search_pdf_text_layer(pdf_path, names, dpi=300):
"""
Step 2a: Search for signature names in PDF text layer.
Returns: list of bounding boxes [(x, y, w, h, name), ...]
Coordinates are in pixels at specified DPI.
"""
try:
doc = fitz.open(pdf_path)
page = doc[0]
# Get page dimensions
page_rect = page.rect
page_width_pts = page_rect.width
page_height_pts = page_rect.height
# Calculate scaling factor from points (72 DPI) to target DPI
scale = dpi / 72.0
found_locations = []
for name in names:
# Search for the name in the page text
text_instances = page.search_for(name)
for inst in text_instances:
# inst is a Rect in points, convert to pixels at target DPI
x = int(inst.x0 * scale)
y = int(inst.y0 * scale)
w = int((inst.x1 - inst.x0) * scale)
h = int((inst.y1 - inst.y0) * scale)
found_locations.append((x, y, w, h, name))
doc.close()
return found_locations, None
except Exception as e:
return [], str(e)
def detect_signature_regions_cv(image):
"""
Step 2b: Use computer vision to detect signature-like regions.
Returns: list of bounding boxes [(x, y, w, h), ...]
"""
# Convert to grayscale
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
# Find dark regions (potential handwriting)
_, binary = cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU)
# Morphological operations to connect nearby strokes
kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (20, 10))
dilated = cv2.dilate(binary, kernel, iterations=2)
# Find contours
contours, _ = cv2.findContours(dilated, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
# Filter contours for signature-like characteristics
bounding_boxes = []
for contour in contours:
area = cv2.contourArea(contour)
# Filter by area (signatures are medium-sized)
if 5000 < area < 200000:
x, y, w, h = cv2.boundingRect(contour)
# Filter by aspect ratio and size
aspect_ratio = w / float(h) if h > 0 else 0
# Signatures are usually wider than tall, but not extremely so
if 0.5 < aspect_ratio < 10 and w > 50 and h > 20:
bounding_boxes.append((x, y, w, h))
return bounding_boxes
def expand_bbox_for_signature(bbox, image_shape, expansion_factor=2.0):
"""
Expand bounding box to capture nearby handwritten signature.
If bbox is from text, signature is usually near it.
"""
x, y, w, h = bbox[:4]
img_height, img_width = image_shape[:2]
# Expand box significantly to capture signature near printed name
expand_w = int(w * expansion_factor)
expand_h = int(h * expansion_factor)
# Center the expansion
new_x = max(0, x - expand_w // 2)
new_y = max(0, y - expand_h // 2)
new_w = min(img_width - new_x, w + expand_w)
new_h = min(img_height - new_y, h + expand_h)
return (new_x, new_y, new_w, new_h)
def extract_region_with_opencv(image, bbox, output_path):
"""Extract region from image and save."""
try:
x, y, w, h = bbox
# Ensure coordinates are within image bounds
x = max(0, x)
y = max(0, y)
x_end = min(image.shape[1], x + w)
y_end = min(image.shape[0], y + h)
region = image[y:y_end, x:x_end]
# Save
output_file = f"{output_path}.png"
cv2.imwrite(output_file, region)
return True, None, output_file
except Exception as e:
return False, str(e), None
def verify_signature_with_names(image_path, expected_names):
"""
Step 4: Verify that extracted region contains signature of any expected person.
Returns: (is_signature, matched_name_or_none, error)
"""
try:
image = cv2.imread(image_path)
image_base64 = encode_image_to_base64(image)
# Ask about all names at once
names_str = ", ".join([f'"{name}"' for name in expected_names])
prompt = f"""Does this image contain a handwritten signature with any of these Chinese names: {names_str}?
Look carefully for handwritten Chinese characters matching one of these names.
If you find a signature, respond with: "yes: [name]" where [name] is the matching name.
If no signature matches these names, respond with: "no"."""
response, error = call_ollama_vision(image_base64, prompt)
if error:
return False, None, error
response_lower = response.lower()
# Check if VLM found a match
if 'yes' in response_lower:
# Try to extract which name matched
for name in expected_names:
if name in response:
return True, name, None
# VLM said yes but didn't specify which name
return True, expected_names[0], None
else:
return False, None, None
except Exception as e:
return False, None, str(e)
def merge_overlapping_boxes(boxes, merge_threshold=100):
"""Merge bounding boxes that overlap or are very close."""
if not boxes:
return []
boxes = sorted(boxes, key=lambda b: (b[1], b[0])) # Sort by y, then x
merged = []
current = list(boxes[0])
for box in boxes[1:]:
x, y, w, h = box[:4]
cx, cy, cw, ch = current[:4]
# Check if boxes overlap or are close
if (abs(y - cy) < merge_threshold and
x < cx + cw + merge_threshold and
x + w > cx - merge_threshold):
# Merge
new_x = min(cx, x)
new_y = min(cy, y)
new_w = max(cx + cw, x + w) - new_x
new_h = max(cy + ch, y + h) - new_y
current = [new_x, new_y, new_w, new_h]
if len(box) > 4:
current.append(box[4]) # Preserve name if present
else:
merged.append(tuple(current))
current = list(box)
merged.append(tuple(current))
return merged
def process_pdf_page(pdf_path, output_dir):
"""
Process a single PDF page using hybrid approach.
Returns: (signature_count, extracted_files, method_used, error)
"""
pdf_name = Path(pdf_path).stem
# Render page as image
print(" - Rendering page...", end='', flush=True)
image, page_width, page_height, error = render_pdf_page_as_image(pdf_path, DPI)
if error:
print(f" ERROR")
return 0, [], "none", f"Render error: {error}"
print(" OK")
# Step 1: Extract signature names with VLM
print(" - Extracting signature names with VLM...", end='', flush=True)
image_base64 = encode_image_to_base64(image)
names, error = extract_signature_names_with_vlm(image_base64)
if error:
print(f" ERROR")
return 0, [], "none", f"VLM error: {error}"
if not names:
print(" No names found")
return 0, [], "none", None
print(f" OK - Found: {', '.join(names)}")
# Step 2a: Try PDF text layer search
print(" - Searching PDF text layer...", end='', flush=True)
text_locations, error = search_pdf_text_layer(pdf_path, names, DPI)
candidate_boxes = []
method_used = "none"
if text_locations:
print(f" OK - Found {len(text_locations)} text instances")
method_used = "text_layer"
# Expand boxes to capture nearby signatures
for loc in text_locations:
expanded = expand_bbox_for_signature(loc, image.shape)
candidate_boxes.append(expanded)
else:
print(" No text layer or names not found")
# Step 2b: Fallback to computer vision
print(" - Using computer vision detection...", end='', flush=True)
cv_boxes = detect_signature_regions_cv(image)
if cv_boxes:
print(f" OK - Found {len(cv_boxes)} regions")
method_used = "computer_vision"
candidate_boxes = cv_boxes
else:
print(" No regions detected")
return 0, [], "none", None
# Merge overlapping boxes
candidate_boxes = merge_overlapping_boxes(candidate_boxes)
print(f" - Found {len(candidate_boxes)} candidate region(s)")
# Step 3 & 4: Extract and verify each region
extracted_files = []
verified_names = set()
for idx, bbox_info in enumerate(candidate_boxes):
bbox = bbox_info[:4]
print(f" - Region {idx + 1}: Extracting...", end='', flush=True)
output_base = os.path.join(output_dir, f"{pdf_name}_region_{idx + 1}")
success, error, output_file = extract_region_with_opencv(image, bbox, output_base)
if not success:
print(f" FAILED: {error}")
continue
print(f" OK - Verifying...", end='', flush=True)
# Verify this region contains any of the expected signatures
is_signature, matched_name, verify_error = verify_signature_with_names(output_file, names)
if verify_error:
print(f" ERROR: {verify_error}")
os.remove(output_file) # Remove failed verification attempts
continue
if is_signature and matched_name:
# Found a signature! Rename file with the person's name
final_filename = f"{pdf_name}_signature_{matched_name}.png"
final_path = os.path.join(output_dir, final_filename)
# Check if we already found this person's signature
if matched_name in verified_names:
print(f" DUPLICATE ({matched_name}) - rejected")
os.remove(output_file)
else:
os.rename(output_file, final_path)
verified_names.add(matched_name)
print(f" VERIFIED ({matched_name})")
extracted_files.append(final_path)
else:
print(f" NOT A SIGNATURE - rejected")
rejected_file = os.path.join(REJECTED_PATH, os.path.basename(output_file))
os.rename(output_file, rejected_file)
return len(extracted_files), extracted_files, method_used, None
def main():
"""Main processing function"""
global LOG_FILE
print(f"Starting hybrid signature extraction...")
print(f"Ollama URL: {OLLAMA_URL}")
print(f"Model: {OLLAMA_MODEL}")
print(f"Input path: {PDF_INPUT_PATH}")
print(f"Output path: {OUTPUT_PATH}")
print()
# Test Ollama connection
print("Testing Ollama connection...")
try:
response = requests.get(f"{OLLAMA_URL}/api/tags", timeout=5)
response.raise_for_status()
print("✓ Ollama connection successful\n")
except Exception as e:
print(f"✗ Ollama connection failed: {e}")
return
# Create output directories
os.makedirs(OUTPUT_PATH, exist_ok=True)
os.makedirs(REJECTED_PATH, exist_ok=True)
LOG_FILE = os.path.join(OUTPUT_PATH, f"hybrid_extraction_log_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv")
# Get PDF files (test with first 5)
pdf_files = sorted(Path(PDF_INPUT_PATH).glob("*.pdf"))[:5]
if not pdf_files:
print("ERROR: No PDF files found!")
return
print(f"Found {len(pdf_files)} PDF files to process (testing with first 5)\n")
# Statistics
stats = {
'total_pdfs': 0,
'pdfs_with_signatures': 0,
'total_signatures': 0,
'text_layer_used': 0,
'cv_used': 0,
'errors': 0
}
# Open log file
with open(LOG_FILE, 'w', newline='') as log_file:
log_writer = csv.writer(log_file)
log_writer.writerow([
'pdf_filename', 'signatures_found', 'method_used', 'extracted_files', 'error'
])
# Process each PDF
for i, pdf_path in enumerate(pdf_files):
stats['total_pdfs'] += 1
pdf_filename = pdf_path.name
print(f"[{i+1}/{len(pdf_files)}] Processing: {pdf_filename}")
sig_count, extracted_files, method, error = process_pdf_page(str(pdf_path), OUTPUT_PATH)
if error:
print(f" ERROR: {error}\n")
stats['errors'] += 1
log_writer.writerow([pdf_filename, 0, method, "", error])
continue
if sig_count > 0:
stats['pdfs_with_signatures'] += 1
stats['total_signatures'] += sig_count
if method == "text_layer":
stats['text_layer_used'] += 1
elif method == "computer_vision":
stats['cv_used'] += 1
print(f" ✓ Extracted {sig_count} signature(s) using {method}\n")
filenames = [Path(f).name for f in extracted_files]
log_writer.writerow([
pdf_filename,
sig_count,
method,
", ".join(filenames),
""
])
else:
print(f" No signatures extracted\n")
log_writer.writerow([pdf_filename, 0, method, "", ""])
# Print summary
print("="*60)
print("HYBRID EXTRACTION SUMMARY")
print("="*60)
print(f"Total PDFs processed: {stats['total_pdfs']}")
print(f"PDFs with signatures: {stats['pdfs_with_signatures']}")
print(f"Total signatures extracted: {stats['total_signatures']}")
print(f"Text layer method used: {stats['text_layer_used']}")
print(f"Computer vision used: {stats['cv_used']}")
print(f"Errors: {stats['errors']}")
print(f"\nLog file: {LOG_FILE}")
print("="*60)
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print("\n\nProcess interrupted by user.")
sys.exit(1)
except Exception as e:
print(f"\n\nFATAL ERROR: {e}")
import traceback
traceback.print_exc()
sys.exit(1)