- Implemented comprehensive feature analysis based on size, stroke length, and regularity - Size-based scoring: height >50px indicates handwriting - Stroke length ratio: >0.4 indicates handwriting - Irregularity metrics: low compactness/solidity indicates handwriting - Successfully tested on sample PDF with 2 signatures (楊智惠, 張志銘) - Created detailed documentation: CURRENT_STATUS.md and NEW_SESSION_HANDOFF.md - Stable PaddleOCR 2.7.3 configuration documented (numpy 1.26.4, opencv 4.6.0.66) - Prepared research plan for PP-OCRv5 upgrade investigation 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
416 lines
13 KiB
Python
416 lines
13 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
PaddleOCR Signature Extraction - Improved Pipeline
|
|
|
|
Implements:
|
|
- Method B: Region Merging (merge nearby regions to avoid splits)
|
|
- Method E: Two-Stage Approach (second OCR pass on regions)
|
|
|
|
Pipeline:
|
|
1. PaddleOCR detects printed text on full page
|
|
2. Mask printed text with padding
|
|
3. Detect candidate regions
|
|
4. Merge nearby regions (METHOD B)
|
|
5. For each region: Run OCR again to remove remaining printed text (METHOD E)
|
|
6. VLM verification (optional)
|
|
7. Save cleaned handwriting regions
|
|
"""
|
|
|
|
import fitz # PyMuPDF
|
|
import numpy as np
|
|
import cv2
|
|
from pathlib import Path
|
|
from paddleocr_client import create_ocr_client
|
|
from typing import List, Dict, Tuple
|
|
import base64
|
|
import requests
|
|
|
|
# Configuration
|
|
TEST_PDF = "/Volumes/NV2/PDF-Processing/signature-image-output/201301_1324_AI1_page3.pdf"
|
|
OUTPUT_DIR = "/Volumes/NV2/PDF-Processing/signature-image-output/paddleocr_improved"
|
|
DPI = 300
|
|
|
|
# PaddleOCR Settings
|
|
MASKING_PADDING = 25 # Pixels to expand text boxes when masking
|
|
|
|
# Region Detection Parameters
|
|
MIN_REGION_AREA = 3000
|
|
MAX_REGION_AREA = 300000
|
|
MIN_ASPECT_RATIO = 0.3
|
|
MAX_ASPECT_RATIO = 15.0
|
|
|
|
# Region Merging Parameters (METHOD B)
|
|
MERGE_DISTANCE_HORIZONTAL = 100 # pixels
|
|
MERGE_DISTANCE_VERTICAL = 50 # pixels
|
|
|
|
# VLM Settings (optional)
|
|
USE_VLM_VERIFICATION = False # Set to True to enable VLM filtering
|
|
OLLAMA_URL = "http://192.168.30.36:11434"
|
|
OLLAMA_MODEL = "qwen2.5vl:32b"
|
|
|
|
|
|
def merge_nearby_regions(regions: List[Dict],
|
|
h_distance: int = 100,
|
|
v_distance: int = 50) -> List[Dict]:
|
|
"""
|
|
Merge regions that are close to each other (METHOD B).
|
|
|
|
Args:
|
|
regions: List of region dicts with 'box': (x, y, w, h)
|
|
h_distance: Maximum horizontal distance between regions to merge
|
|
v_distance: Maximum vertical distance between regions to merge
|
|
|
|
Returns:
|
|
List of merged regions
|
|
"""
|
|
if not regions:
|
|
return []
|
|
|
|
# Sort regions by y-coordinate (top to bottom)
|
|
regions = sorted(regions, key=lambda r: r['box'][1])
|
|
|
|
merged = []
|
|
skip_indices = set()
|
|
|
|
for i, region1 in enumerate(regions):
|
|
if i in skip_indices:
|
|
continue
|
|
|
|
x1, y1, w1, h1 = region1['box']
|
|
|
|
# Find all regions that should merge with this one
|
|
merge_group = [region1]
|
|
|
|
for j, region2 in enumerate(regions[i+1:], start=i+1):
|
|
if j in skip_indices:
|
|
continue
|
|
|
|
x2, y2, w2, h2 = region2['box']
|
|
|
|
# Calculate distances
|
|
# Horizontal distance: gap between boxes horizontally
|
|
h_dist = max(0, max(x1, x2) - min(x1 + w1, x2 + w2))
|
|
|
|
# Vertical distance: gap between boxes vertically
|
|
v_dist = max(0, max(y1, y2) - min(y1 + h1, y2 + h2))
|
|
|
|
# Check if regions are close enough to merge
|
|
if h_dist <= h_distance and v_dist <= v_distance:
|
|
merge_group.append(region2)
|
|
skip_indices.add(j)
|
|
# Update bounding box to include new region
|
|
x1 = min(x1, x2)
|
|
y1 = min(y1, y2)
|
|
w1 = max(x1 + w1, x2 + w2) - x1
|
|
h1 = max(y1 + h1, y2 + h2) - y1
|
|
|
|
# Create merged region
|
|
merged_box = (x1, y1, w1, h1)
|
|
merged_area = w1 * h1
|
|
merged_aspect = w1 / h1 if h1 > 0 else 0
|
|
|
|
merged.append({
|
|
'box': merged_box,
|
|
'area': merged_area,
|
|
'aspect_ratio': merged_aspect,
|
|
'merged_count': len(merge_group)
|
|
})
|
|
|
|
return merged
|
|
|
|
|
|
def clean_region_with_ocr(region_image: np.ndarray,
|
|
ocr_client,
|
|
padding: int = 10) -> np.ndarray:
|
|
"""
|
|
Remove printed text from a region using second OCR pass (METHOD E).
|
|
|
|
Args:
|
|
region_image: The region image to clean
|
|
ocr_client: PaddleOCR client
|
|
padding: Padding around detected text boxes
|
|
|
|
Returns:
|
|
Cleaned region with printed text masked
|
|
"""
|
|
try:
|
|
# Run OCR on this specific region
|
|
text_boxes = ocr_client.get_text_boxes(region_image)
|
|
|
|
if not text_boxes:
|
|
return region_image # No text found, return as-is
|
|
|
|
# Mask detected printed text
|
|
cleaned = region_image.copy()
|
|
for (x, y, w, h) in text_boxes:
|
|
# Add padding
|
|
x_pad = max(0, x - padding)
|
|
y_pad = max(0, y - padding)
|
|
w_pad = min(cleaned.shape[1] - x_pad, w + 2*padding)
|
|
h_pad = min(cleaned.shape[0] - y_pad, h + 2*padding)
|
|
|
|
cv2.rectangle(cleaned, (x_pad, y_pad),
|
|
(x_pad + w_pad, y_pad + h_pad),
|
|
(255, 255, 255), -1) # Fill with white
|
|
|
|
return cleaned
|
|
|
|
except Exception as e:
|
|
print(f" Warning: OCR cleaning failed: {e}")
|
|
return region_image
|
|
|
|
|
|
def verify_handwriting_with_vlm(image: np.ndarray) -> Tuple[bool, float]:
|
|
"""
|
|
Use VLM to verify if image contains handwriting.
|
|
|
|
Args:
|
|
image: Region image (RGB numpy array)
|
|
|
|
Returns:
|
|
(is_handwriting: bool, confidence: float)
|
|
"""
|
|
try:
|
|
# Convert image to base64
|
|
from PIL import Image
|
|
from io import BytesIO
|
|
|
|
pil_image = Image.fromarray(image.astype(np.uint8))
|
|
buffered = BytesIO()
|
|
pil_image.save(buffered, format="PNG")
|
|
image_base64 = base64.b64encode(buffered.getvalue()).decode('utf-8')
|
|
|
|
# Ask VLM
|
|
prompt = """Does this image contain handwritten text or a handwritten signature?
|
|
|
|
Answer only 'yes' or 'no', followed by a confidence score 0-100.
|
|
Format: yes 95 OR no 80"""
|
|
|
|
payload = {
|
|
"model": OLLAMA_MODEL,
|
|
"prompt": prompt,
|
|
"images": [image_base64],
|
|
"stream": False
|
|
}
|
|
|
|
response = requests.post(f"{OLLAMA_URL}/api/generate",
|
|
json=payload, timeout=30)
|
|
response.raise_for_status()
|
|
answer = response.json()['response'].strip().lower()
|
|
|
|
# Parse answer
|
|
is_handwriting = 'yes' in answer
|
|
|
|
# Try to extract confidence
|
|
confidence = 0.5
|
|
parts = answer.split()
|
|
for part in parts:
|
|
try:
|
|
conf = float(part)
|
|
if 0 <= conf <= 100:
|
|
confidence = conf / 100
|
|
break
|
|
except:
|
|
continue
|
|
|
|
return is_handwriting, confidence
|
|
|
|
except Exception as e:
|
|
print(f" Warning: VLM verification failed: {e}")
|
|
return True, 0.5 # Default to accepting the region
|
|
|
|
|
|
print("="*80)
|
|
print("PaddleOCR Improved Pipeline - Region Merging + Two-Stage Cleaning")
|
|
print("="*80)
|
|
|
|
# Create output directory
|
|
Path(OUTPUT_DIR).mkdir(parents=True, exist_ok=True)
|
|
|
|
# Step 1: Connect to PaddleOCR
|
|
print("\n1. Connecting to PaddleOCR server...")
|
|
try:
|
|
ocr_client = create_ocr_client()
|
|
print(f" ✅ Connected: {ocr_client.server_url}")
|
|
except Exception as e:
|
|
print(f" ❌ Error: {e}")
|
|
exit(1)
|
|
|
|
# Step 2: Render PDF
|
|
print("\n2. Rendering PDF...")
|
|
try:
|
|
doc = fitz.open(TEST_PDF)
|
|
page = doc[0]
|
|
mat = fitz.Matrix(DPI/72, DPI/72)
|
|
pix = page.get_pixmap(matrix=mat)
|
|
original_image = np.frombuffer(pix.samples, dtype=np.uint8).reshape(
|
|
pix.height, pix.width, pix.n)
|
|
|
|
if pix.n == 4:
|
|
original_image = cv2.cvtColor(original_image, cv2.COLOR_RGBA2RGB)
|
|
|
|
print(f" ✅ Rendered: {original_image.shape[1]}x{original_image.shape[0]}")
|
|
doc.close()
|
|
except Exception as e:
|
|
print(f" ❌ Error: {e}")
|
|
exit(1)
|
|
|
|
# Step 3: Detect printed text (Stage 1)
|
|
print("\n3. Detecting printed text (Stage 1 OCR)...")
|
|
try:
|
|
text_boxes = ocr_client.get_text_boxes(original_image)
|
|
print(f" ✅ Detected {len(text_boxes)} text regions")
|
|
except Exception as e:
|
|
print(f" ❌ Error: {e}")
|
|
exit(1)
|
|
|
|
# Step 4: Mask printed text with padding
|
|
print(f"\n4. Masking printed text (padding={MASKING_PADDING}px)...")
|
|
try:
|
|
masked_image = original_image.copy()
|
|
|
|
for (x, y, w, h) in text_boxes:
|
|
# Add padding
|
|
x_pad = max(0, x - MASKING_PADDING)
|
|
y_pad = max(0, y - MASKING_PADDING)
|
|
w_pad = min(masked_image.shape[1] - x_pad, w + 2*MASKING_PADDING)
|
|
h_pad = min(masked_image.shape[0] - y_pad, h + 2*MASKING_PADDING)
|
|
|
|
cv2.rectangle(masked_image, (x_pad, y_pad),
|
|
(x_pad + w_pad, y_pad + h_pad), (0, 0, 0), -1)
|
|
|
|
print(f" ✅ Masked {len(text_boxes)} regions")
|
|
except Exception as e:
|
|
print(f" ❌ Error: {e}")
|
|
exit(1)
|
|
|
|
# Step 5: Detect candidate regions
|
|
print("\n5. Detecting candidate regions...")
|
|
try:
|
|
gray = cv2.cvtColor(masked_image, cv2.COLOR_RGB2GRAY)
|
|
_, binary = cv2.threshold(gray, 250, 255, cv2.THRESH_BINARY_INV)
|
|
|
|
kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (5, 5))
|
|
morphed = cv2.morphologyEx(binary, cv2.MORPH_CLOSE, kernel, iterations=2)
|
|
|
|
contours, _ = cv2.findContours(morphed, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
|
|
|
|
candidate_regions = []
|
|
for contour in contours:
|
|
x, y, w, h = cv2.boundingRect(contour)
|
|
area = w * h
|
|
aspect_ratio = w / h if h > 0 else 0
|
|
|
|
if (MIN_REGION_AREA <= area <= MAX_REGION_AREA and
|
|
MIN_ASPECT_RATIO <= aspect_ratio <= MAX_ASPECT_RATIO):
|
|
candidate_regions.append({
|
|
'box': (x, y, w, h),
|
|
'area': area,
|
|
'aspect_ratio': aspect_ratio
|
|
})
|
|
|
|
print(f" ✅ Found {len(candidate_regions)} candidate regions")
|
|
except Exception as e:
|
|
print(f" ❌ Error: {e}")
|
|
exit(1)
|
|
|
|
# Step 6: Merge nearby regions (METHOD B)
|
|
print(f"\n6. Merging nearby regions (h_dist<={MERGE_DISTANCE_HORIZONTAL}, v_dist<={MERGE_DISTANCE_VERTICAL})...")
|
|
try:
|
|
merged_regions = merge_nearby_regions(
|
|
candidate_regions,
|
|
h_distance=MERGE_DISTANCE_HORIZONTAL,
|
|
v_distance=MERGE_DISTANCE_VERTICAL
|
|
)
|
|
print(f" ✅ Merged {len(candidate_regions)} → {len(merged_regions)} regions")
|
|
|
|
for i, region in enumerate(merged_regions):
|
|
if region['merged_count'] > 1:
|
|
print(f" Region {i+1}: Merged {region['merged_count']} sub-regions")
|
|
except Exception as e:
|
|
print(f" ❌ Error: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
exit(1)
|
|
|
|
# Step 7: Extract and clean each region (METHOD E)
|
|
print("\n7. Extracting and cleaning regions (Stage 2 OCR)...")
|
|
final_signatures = []
|
|
|
|
for i, region in enumerate(merged_regions):
|
|
x, y, w, h = region['box']
|
|
print(f"\n Region {i+1}/{len(merged_regions)}: ({x}, {y}, {w}, {h})")
|
|
|
|
# Extract region from ORIGINAL image (not masked)
|
|
padding = 10
|
|
x_pad = max(0, x - padding)
|
|
y_pad = max(0, y - padding)
|
|
w_pad = min(original_image.shape[1] - x_pad, w + 2*padding)
|
|
h_pad = min(original_image.shape[0] - y_pad, h + 2*padding)
|
|
|
|
region_img = original_image[y_pad:y_pad+h_pad, x_pad:x_pad+w_pad].copy()
|
|
|
|
print(f" - Extracted: {region_img.shape[1]}x{region_img.shape[0]}px")
|
|
|
|
# Clean with second OCR pass
|
|
print(f" - Running Stage 2 OCR to remove printed text...")
|
|
cleaned_region = clean_region_with_ocr(region_img, ocr_client, padding=5)
|
|
|
|
# VLM verification (optional)
|
|
if USE_VLM_VERIFICATION:
|
|
print(f" - VLM verification...")
|
|
is_handwriting, confidence = verify_handwriting_with_vlm(cleaned_region)
|
|
print(f" - VLM says: {'✅ Handwriting' if is_handwriting else '❌ Not handwriting'} (confidence: {confidence:.2f})")
|
|
|
|
if not is_handwriting:
|
|
print(f" - Skipping (not handwriting)")
|
|
continue
|
|
|
|
# Save
|
|
final_signatures.append({
|
|
'image': cleaned_region,
|
|
'box': region['box'],
|
|
'original_image': region_img
|
|
})
|
|
|
|
print(f" ✅ Kept as signature candidate")
|
|
|
|
print(f"\n ✅ Final signatures: {len(final_signatures)}")
|
|
|
|
# Step 8: Save results
|
|
print("\n8. Saving results...")
|
|
|
|
for i, sig in enumerate(final_signatures):
|
|
# Save cleaned signature
|
|
sig_path = Path(OUTPUT_DIR) / f"signature_{i+1:02d}_cleaned.png"
|
|
cv2.imwrite(str(sig_path), cv2.cvtColor(sig['image'], cv2.COLOR_RGB2BGR))
|
|
|
|
# Save original region for comparison
|
|
orig_path = Path(OUTPUT_DIR) / f"signature_{i+1:02d}_original.png"
|
|
cv2.imwrite(str(orig_path), cv2.cvtColor(sig['original_image'], cv2.COLOR_RGB2BGR))
|
|
|
|
print(f" 📁 Signature {i+1}: {sig_path.name}")
|
|
|
|
# Save visualizations
|
|
vis_merged = original_image.copy()
|
|
for region in merged_regions:
|
|
x, y, w, h = region['box']
|
|
color = (255, 0, 0) if region in [{'box': s['box']} for s in final_signatures] else (128, 128, 128)
|
|
cv2.rectangle(vis_merged, (x, y), (x + w, y + h), color, 3)
|
|
|
|
vis_path = Path(OUTPUT_DIR) / "visualization_merged_regions.png"
|
|
cv2.imwrite(str(vis_path), cv2.cvtColor(vis_merged, cv2.COLOR_RGB2BGR))
|
|
print(f" 📁 Visualization: {vis_path.name}")
|
|
|
|
print("\n" + "="*80)
|
|
print("Pipeline completed!")
|
|
print(f"Results: {OUTPUT_DIR}")
|
|
print("="*80)
|
|
print(f"\nSummary:")
|
|
print(f" - Stage 1 OCR: {len(text_boxes)} text regions masked")
|
|
print(f" - Initial candidates: {len(candidate_regions)}")
|
|
print(f" - After merging: {len(merged_regions)}")
|
|
print(f" - Final signatures: {len(final_signatures)}")
|
|
print(f" - Expected signatures: 2 (楊智惠, 張志銘)")
|
|
print("="*80)
|