Implement VLM name extraction + CV detection hybrid approach to
replace unreliable VLM coordinate system with name-based verification.
Key Features:
- VLM extracts signature names (周寶蓮, 魏興海, etc.)
- CV or PDF text layer detects regions
- VLM verifies each region against expected names
- Signatures saved with person names: signature_周寶蓮.png
- Duplicate prevention and rejection handling
Test Results:
- 5 PDF pages tested
- 7/10 signatures extracted (70% recall)
- 100% precision (no false positives)
- No blank regions extracted (previous issue resolved)
Files:
- extract_pages_from_csv.py: Extract pages from CSV (tested: 100 files)
- extract_signatures_hybrid.py: Hybrid extraction (current working solution)
- extract_handwriting.py: CV-only approach (component)
- extract_signatures_vlm.py: Deprecated VLM coordinate approach
- PROJECT_DOCUMENTATION.md: Complete project history and results
- SESSION_INIT.md: Session handoff documentation
- SESSION_CHECKLIST.md: Status checklist
- NEW_SESSION_PROMPT.txt: Template for next session
- HOW_TO_CONTINUE.txt: Visual handoff guide
- COMMIT_SUMMARY.md: Commit preparation guide
- README.md: Quick start guide
- README_page_extraction.md: Page extraction docs
- README_hybrid_extraction.md: Hybrid approach docs
- .gitignore: Exclude diagnostic scripts and outputs
Known Limitations:
- 30% of signatures missed due to conservative CV parameters
- Text layer method untested (all test PDFs are scanned images)
- Performance: ~24 seconds per PDF
Next Steps:
- Tune CV parameters for higher recall
- Test with larger dataset (100+ files)
- Process full dataset (86,073 files)
🤖 Generated with Claude Code
544 lines
17 KiB
Python
544 lines
17 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Hybrid signature extraction using VLM name recognition + text layer/CV detection.
|
|
|
|
Workflow:
|
|
1. VLM extracts signature names from document
|
|
2. Try PDF text layer search for those names (precise coordinates)
|
|
3. Fallback to computer vision if no text layer
|
|
4. Extract regions around detected locations
|
|
5. VLM verifies each region contains the specific signature
|
|
"""
|
|
|
|
import cv2
|
|
import numpy as np
|
|
import os
|
|
import sys
|
|
import json
|
|
import base64
|
|
import requests
|
|
import re
|
|
from pathlib import Path
|
|
from datetime import datetime
|
|
import fitz # PyMuPDF
|
|
import csv
|
|
|
|
# Configuration
|
|
PDF_INPUT_PATH = "/Volumes/NV2/PDF-Processing/signature-image-output"
|
|
OUTPUT_PATH = "/Volumes/NV2/PDF-Processing/signature-image-output/signatures"
|
|
REJECTED_PATH = "/Volumes/NV2/PDF-Processing/signature-image-output/signatures/rejected"
|
|
LOG_FILE = None
|
|
|
|
# Ollama Configuration
|
|
OLLAMA_URL = "http://192.168.30.36:11434"
|
|
OLLAMA_MODEL = "qwen2.5vl:32b"
|
|
|
|
# Image processing parameters
|
|
DPI = 300
|
|
|
|
|
|
def encode_image_to_base64(image_array):
|
|
"""Encode numpy image array to base64 string."""
|
|
image_rgb = cv2.cvtColor(image_array, cv2.COLOR_BGR2RGB)
|
|
_, buffer = cv2.imencode('.jpg', image_rgb)
|
|
image_base64 = base64.b64encode(buffer).decode('utf-8')
|
|
return image_base64
|
|
|
|
|
|
def call_ollama_vision(image_base64, prompt):
|
|
"""Call Ollama vision model with image and prompt."""
|
|
try:
|
|
url = f"{OLLAMA_URL}/api/generate"
|
|
payload = {
|
|
"model": OLLAMA_MODEL,
|
|
"prompt": prompt,
|
|
"images": [image_base64],
|
|
"stream": False
|
|
}
|
|
response = requests.post(url, json=payload, timeout=120)
|
|
response.raise_for_status()
|
|
result = response.json()
|
|
return result.get('response', ''), None
|
|
except Exception as e:
|
|
return None, str(e)
|
|
|
|
|
|
def render_pdf_page_as_image(pdf_path, dpi=300):
|
|
"""Render PDF page as a high-resolution image."""
|
|
try:
|
|
doc = fitz.open(pdf_path)
|
|
page = doc[0]
|
|
mat = fitz.Matrix(dpi / 72, dpi / 72)
|
|
pix = page.get_pixmap(matrix=mat, alpha=False)
|
|
img = np.frombuffer(pix.samples, dtype=np.uint8).reshape(pix.height, pix.width, pix.n)
|
|
|
|
if pix.n == 3:
|
|
img = cv2.cvtColor(img, cv2.COLOR_RGB2BGR)
|
|
elif pix.n == 1:
|
|
img = cv2.cvtColor(img, cv2.COLOR_GRAY2BGR)
|
|
|
|
doc.close()
|
|
return img, pix.width, pix.height, None
|
|
except Exception as e:
|
|
return None, 0, 0, str(e)
|
|
|
|
|
|
def extract_signature_names_with_vlm(image_base64):
|
|
"""
|
|
Step 1: Ask VLM to extract the names of people who signed the document.
|
|
Returns: list of Chinese names
|
|
"""
|
|
prompt = """Please identify the handwritten signatures with Chinese names on this document.
|
|
|
|
List ONLY the Chinese names of the people who signed (the handwritten names, not printed text).
|
|
|
|
Format your response as a simple list, one name per line:
|
|
周寶蓮
|
|
魏興海
|
|
|
|
If no handwritten signatures found, say "No signatures found"."""
|
|
|
|
response, error = call_ollama_vision(image_base64, prompt)
|
|
|
|
if error:
|
|
return [], error
|
|
|
|
# Parse names from response
|
|
# Look for Chinese characters (pattern: 2-4 consecutive Chinese characters)
|
|
names = []
|
|
for line in response.split('\n'):
|
|
line = line.strip()
|
|
# Match Chinese names (2-4 characters is typical)
|
|
chinese_pattern = r'[\u4e00-\u9fff]{2,4}'
|
|
matches = re.findall(chinese_pattern, line)
|
|
for name in matches:
|
|
if name not in names and len(name) >= 2:
|
|
names.append(name)
|
|
|
|
return names, None
|
|
|
|
|
|
def search_pdf_text_layer(pdf_path, names, dpi=300):
|
|
"""
|
|
Step 2a: Search for signature names in PDF text layer.
|
|
Returns: list of bounding boxes [(x, y, w, h, name), ...]
|
|
Coordinates are in pixels at specified DPI.
|
|
"""
|
|
try:
|
|
doc = fitz.open(pdf_path)
|
|
page = doc[0]
|
|
|
|
# Get page dimensions
|
|
page_rect = page.rect
|
|
page_width_pts = page_rect.width
|
|
page_height_pts = page_rect.height
|
|
|
|
# Calculate scaling factor from points (72 DPI) to target DPI
|
|
scale = dpi / 72.0
|
|
|
|
found_locations = []
|
|
|
|
for name in names:
|
|
# Search for the name in the page text
|
|
text_instances = page.search_for(name)
|
|
|
|
for inst in text_instances:
|
|
# inst is a Rect in points, convert to pixels at target DPI
|
|
x = int(inst.x0 * scale)
|
|
y = int(inst.y0 * scale)
|
|
w = int((inst.x1 - inst.x0) * scale)
|
|
h = int((inst.y1 - inst.y0) * scale)
|
|
|
|
found_locations.append((x, y, w, h, name))
|
|
|
|
doc.close()
|
|
|
|
return found_locations, None
|
|
|
|
except Exception as e:
|
|
return [], str(e)
|
|
|
|
|
|
def detect_signature_regions_cv(image):
|
|
"""
|
|
Step 2b: Use computer vision to detect signature-like regions.
|
|
Returns: list of bounding boxes [(x, y, w, h), ...]
|
|
"""
|
|
# Convert to grayscale
|
|
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
|
|
|
|
# Find dark regions (potential handwriting)
|
|
_, binary = cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU)
|
|
|
|
# Morphological operations to connect nearby strokes
|
|
kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (20, 10))
|
|
dilated = cv2.dilate(binary, kernel, iterations=2)
|
|
|
|
# Find contours
|
|
contours, _ = cv2.findContours(dilated, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
|
|
|
|
# Filter contours for signature-like characteristics
|
|
bounding_boxes = []
|
|
for contour in contours:
|
|
area = cv2.contourArea(contour)
|
|
|
|
# Filter by area (signatures are medium-sized)
|
|
if 5000 < area < 200000:
|
|
x, y, w, h = cv2.boundingRect(contour)
|
|
|
|
# Filter by aspect ratio and size
|
|
aspect_ratio = w / float(h) if h > 0 else 0
|
|
|
|
# Signatures are usually wider than tall, but not extremely so
|
|
if 0.5 < aspect_ratio < 10 and w > 50 and h > 20:
|
|
bounding_boxes.append((x, y, w, h))
|
|
|
|
return bounding_boxes
|
|
|
|
|
|
def expand_bbox_for_signature(bbox, image_shape, expansion_factor=2.0):
|
|
"""
|
|
Expand bounding box to capture nearby handwritten signature.
|
|
If bbox is from text, signature is usually near it.
|
|
"""
|
|
x, y, w, h = bbox[:4]
|
|
img_height, img_width = image_shape[:2]
|
|
|
|
# Expand box significantly to capture signature near printed name
|
|
expand_w = int(w * expansion_factor)
|
|
expand_h = int(h * expansion_factor)
|
|
|
|
# Center the expansion
|
|
new_x = max(0, x - expand_w // 2)
|
|
new_y = max(0, y - expand_h // 2)
|
|
new_w = min(img_width - new_x, w + expand_w)
|
|
new_h = min(img_height - new_y, h + expand_h)
|
|
|
|
return (new_x, new_y, new_w, new_h)
|
|
|
|
|
|
def extract_region_with_opencv(image, bbox, output_path):
|
|
"""Extract region from image and save."""
|
|
try:
|
|
x, y, w, h = bbox
|
|
|
|
# Ensure coordinates are within image bounds
|
|
x = max(0, x)
|
|
y = max(0, y)
|
|
x_end = min(image.shape[1], x + w)
|
|
y_end = min(image.shape[0], y + h)
|
|
|
|
region = image[y:y_end, x:x_end]
|
|
|
|
# Save
|
|
output_file = f"{output_path}.png"
|
|
cv2.imwrite(output_file, region)
|
|
|
|
return True, None, output_file
|
|
except Exception as e:
|
|
return False, str(e), None
|
|
|
|
|
|
def verify_signature_with_names(image_path, expected_names):
|
|
"""
|
|
Step 4: Verify that extracted region contains signature of any expected person.
|
|
Returns: (is_signature, matched_name_or_none, error)
|
|
"""
|
|
try:
|
|
image = cv2.imread(image_path)
|
|
image_base64 = encode_image_to_base64(image)
|
|
|
|
# Ask about all names at once
|
|
names_str = ", ".join([f'"{name}"' for name in expected_names])
|
|
prompt = f"""Does this image contain a handwritten signature with any of these Chinese names: {names_str}?
|
|
|
|
Look carefully for handwritten Chinese characters matching one of these names.
|
|
|
|
If you find a signature, respond with: "yes: [name]" where [name] is the matching name.
|
|
If no signature matches these names, respond with: "no"."""
|
|
|
|
response, error = call_ollama_vision(image_base64, prompt)
|
|
|
|
if error:
|
|
return False, None, error
|
|
|
|
response_lower = response.lower()
|
|
|
|
# Check if VLM found a match
|
|
if 'yes' in response_lower:
|
|
# Try to extract which name matched
|
|
for name in expected_names:
|
|
if name in response:
|
|
return True, name, None
|
|
# VLM said yes but didn't specify which name
|
|
return True, expected_names[0], None
|
|
else:
|
|
return False, None, None
|
|
|
|
except Exception as e:
|
|
return False, None, str(e)
|
|
|
|
|
|
def merge_overlapping_boxes(boxes, merge_threshold=100):
|
|
"""Merge bounding boxes that overlap or are very close."""
|
|
if not boxes:
|
|
return []
|
|
|
|
boxes = sorted(boxes, key=lambda b: (b[1], b[0])) # Sort by y, then x
|
|
merged = []
|
|
current = list(boxes[0])
|
|
|
|
for box in boxes[1:]:
|
|
x, y, w, h = box[:4]
|
|
cx, cy, cw, ch = current[:4]
|
|
|
|
# Check if boxes overlap or are close
|
|
if (abs(y - cy) < merge_threshold and
|
|
x < cx + cw + merge_threshold and
|
|
x + w > cx - merge_threshold):
|
|
# Merge
|
|
new_x = min(cx, x)
|
|
new_y = min(cy, y)
|
|
new_w = max(cx + cw, x + w) - new_x
|
|
new_h = max(cy + ch, y + h) - new_y
|
|
current = [new_x, new_y, new_w, new_h]
|
|
if len(box) > 4:
|
|
current.append(box[4]) # Preserve name if present
|
|
else:
|
|
merged.append(tuple(current))
|
|
current = list(box)
|
|
|
|
merged.append(tuple(current))
|
|
return merged
|
|
|
|
|
|
def process_pdf_page(pdf_path, output_dir):
|
|
"""
|
|
Process a single PDF page using hybrid approach.
|
|
Returns: (signature_count, extracted_files, method_used, error)
|
|
"""
|
|
pdf_name = Path(pdf_path).stem
|
|
|
|
# Render page as image
|
|
print(" - Rendering page...", end='', flush=True)
|
|
image, page_width, page_height, error = render_pdf_page_as_image(pdf_path, DPI)
|
|
if error:
|
|
print(f" ERROR")
|
|
return 0, [], "none", f"Render error: {error}"
|
|
print(" OK")
|
|
|
|
# Step 1: Extract signature names with VLM
|
|
print(" - Extracting signature names with VLM...", end='', flush=True)
|
|
image_base64 = encode_image_to_base64(image)
|
|
names, error = extract_signature_names_with_vlm(image_base64)
|
|
|
|
if error:
|
|
print(f" ERROR")
|
|
return 0, [], "none", f"VLM error: {error}"
|
|
|
|
if not names:
|
|
print(" No names found")
|
|
return 0, [], "none", None
|
|
|
|
print(f" OK - Found: {', '.join(names)}")
|
|
|
|
# Step 2a: Try PDF text layer search
|
|
print(" - Searching PDF text layer...", end='', flush=True)
|
|
text_locations, error = search_pdf_text_layer(pdf_path, names, DPI)
|
|
|
|
candidate_boxes = []
|
|
method_used = "none"
|
|
|
|
if text_locations:
|
|
print(f" OK - Found {len(text_locations)} text instances")
|
|
method_used = "text_layer"
|
|
|
|
# Expand boxes to capture nearby signatures
|
|
for loc in text_locations:
|
|
expanded = expand_bbox_for_signature(loc, image.shape)
|
|
candidate_boxes.append(expanded)
|
|
else:
|
|
print(" No text layer or names not found")
|
|
|
|
# Step 2b: Fallback to computer vision
|
|
print(" - Using computer vision detection...", end='', flush=True)
|
|
cv_boxes = detect_signature_regions_cv(image)
|
|
|
|
if cv_boxes:
|
|
print(f" OK - Found {len(cv_boxes)} regions")
|
|
method_used = "computer_vision"
|
|
candidate_boxes = cv_boxes
|
|
else:
|
|
print(" No regions detected")
|
|
return 0, [], "none", None
|
|
|
|
# Merge overlapping boxes
|
|
candidate_boxes = merge_overlapping_boxes(candidate_boxes)
|
|
|
|
print(f" - Found {len(candidate_boxes)} candidate region(s)")
|
|
|
|
# Step 3 & 4: Extract and verify each region
|
|
extracted_files = []
|
|
verified_names = set()
|
|
|
|
for idx, bbox_info in enumerate(candidate_boxes):
|
|
bbox = bbox_info[:4]
|
|
|
|
print(f" - Region {idx + 1}: Extracting...", end='', flush=True)
|
|
|
|
output_base = os.path.join(output_dir, f"{pdf_name}_region_{idx + 1}")
|
|
success, error, output_file = extract_region_with_opencv(image, bbox, output_base)
|
|
|
|
if not success:
|
|
print(f" FAILED: {error}")
|
|
continue
|
|
|
|
print(f" OK - Verifying...", end='', flush=True)
|
|
|
|
# Verify this region contains any of the expected signatures
|
|
is_signature, matched_name, verify_error = verify_signature_with_names(output_file, names)
|
|
|
|
if verify_error:
|
|
print(f" ERROR: {verify_error}")
|
|
os.remove(output_file) # Remove failed verification attempts
|
|
continue
|
|
|
|
if is_signature and matched_name:
|
|
# Found a signature! Rename file with the person's name
|
|
final_filename = f"{pdf_name}_signature_{matched_name}.png"
|
|
final_path = os.path.join(output_dir, final_filename)
|
|
|
|
# Check if we already found this person's signature
|
|
if matched_name in verified_names:
|
|
print(f" DUPLICATE ({matched_name}) - rejected")
|
|
os.remove(output_file)
|
|
else:
|
|
os.rename(output_file, final_path)
|
|
verified_names.add(matched_name)
|
|
print(f" VERIFIED ({matched_name})")
|
|
extracted_files.append(final_path)
|
|
else:
|
|
print(f" NOT A SIGNATURE - rejected")
|
|
rejected_file = os.path.join(REJECTED_PATH, os.path.basename(output_file))
|
|
os.rename(output_file, rejected_file)
|
|
|
|
return len(extracted_files), extracted_files, method_used, None
|
|
|
|
|
|
def main():
|
|
"""Main processing function"""
|
|
global LOG_FILE
|
|
|
|
print(f"Starting hybrid signature extraction...")
|
|
print(f"Ollama URL: {OLLAMA_URL}")
|
|
print(f"Model: {OLLAMA_MODEL}")
|
|
print(f"Input path: {PDF_INPUT_PATH}")
|
|
print(f"Output path: {OUTPUT_PATH}")
|
|
print()
|
|
|
|
# Test Ollama connection
|
|
print("Testing Ollama connection...")
|
|
try:
|
|
response = requests.get(f"{OLLAMA_URL}/api/tags", timeout=5)
|
|
response.raise_for_status()
|
|
print("✓ Ollama connection successful\n")
|
|
except Exception as e:
|
|
print(f"✗ Ollama connection failed: {e}")
|
|
return
|
|
|
|
# Create output directories
|
|
os.makedirs(OUTPUT_PATH, exist_ok=True)
|
|
os.makedirs(REJECTED_PATH, exist_ok=True)
|
|
|
|
LOG_FILE = os.path.join(OUTPUT_PATH, f"hybrid_extraction_log_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv")
|
|
|
|
# Get PDF files (test with first 5)
|
|
pdf_files = sorted(Path(PDF_INPUT_PATH).glob("*.pdf"))[:5]
|
|
|
|
if not pdf_files:
|
|
print("ERROR: No PDF files found!")
|
|
return
|
|
|
|
print(f"Found {len(pdf_files)} PDF files to process (testing with first 5)\n")
|
|
|
|
# Statistics
|
|
stats = {
|
|
'total_pdfs': 0,
|
|
'pdfs_with_signatures': 0,
|
|
'total_signatures': 0,
|
|
'text_layer_used': 0,
|
|
'cv_used': 0,
|
|
'errors': 0
|
|
}
|
|
|
|
# Open log file
|
|
with open(LOG_FILE, 'w', newline='') as log_file:
|
|
log_writer = csv.writer(log_file)
|
|
log_writer.writerow([
|
|
'pdf_filename', 'signatures_found', 'method_used', 'extracted_files', 'error'
|
|
])
|
|
|
|
# Process each PDF
|
|
for i, pdf_path in enumerate(pdf_files):
|
|
stats['total_pdfs'] += 1
|
|
pdf_filename = pdf_path.name
|
|
|
|
print(f"[{i+1}/{len(pdf_files)}] Processing: {pdf_filename}")
|
|
|
|
sig_count, extracted_files, method, error = process_pdf_page(str(pdf_path), OUTPUT_PATH)
|
|
|
|
if error:
|
|
print(f" ERROR: {error}\n")
|
|
stats['errors'] += 1
|
|
log_writer.writerow([pdf_filename, 0, method, "", error])
|
|
continue
|
|
|
|
if sig_count > 0:
|
|
stats['pdfs_with_signatures'] += 1
|
|
stats['total_signatures'] += sig_count
|
|
|
|
if method == "text_layer":
|
|
stats['text_layer_used'] += 1
|
|
elif method == "computer_vision":
|
|
stats['cv_used'] += 1
|
|
|
|
print(f" ✓ Extracted {sig_count} signature(s) using {method}\n")
|
|
|
|
filenames = [Path(f).name for f in extracted_files]
|
|
log_writer.writerow([
|
|
pdf_filename,
|
|
sig_count,
|
|
method,
|
|
", ".join(filenames),
|
|
""
|
|
])
|
|
else:
|
|
print(f" No signatures extracted\n")
|
|
log_writer.writerow([pdf_filename, 0, method, "", ""])
|
|
|
|
# Print summary
|
|
print("="*60)
|
|
print("HYBRID EXTRACTION SUMMARY")
|
|
print("="*60)
|
|
print(f"Total PDFs processed: {stats['total_pdfs']}")
|
|
print(f"PDFs with signatures: {stats['pdfs_with_signatures']}")
|
|
print(f"Total signatures extracted: {stats['total_signatures']}")
|
|
print(f"Text layer method used: {stats['text_layer_used']}")
|
|
print(f"Computer vision used: {stats['cv_used']}")
|
|
print(f"Errors: {stats['errors']}")
|
|
print(f"\nLog file: {LOG_FILE}")
|
|
print("="*60)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
try:
|
|
main()
|
|
except KeyboardInterrupt:
|
|
print("\n\nProcess interrupted by user.")
|
|
sys.exit(1)
|
|
except Exception as e:
|
|
print(f"\n\nFATAL ERROR: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
sys.exit(1)
|