939a348da4
Paper draft includes all sections (Abstract through Conclusion), 36 references, and supporting scripts. Key methodology: Cosine similarity + dHash dual-method verification with thresholds calibrated against known-replication firm (Firm A). Includes: - 8 section markdown files (paper_a_*.md) - Ablation study script (ResNet-50 vs VGG-16 vs EfficientNet-B0) - Recalibrated classification script (84,386 PDFs, 5-tier system) - Figure generation and Word export scripts - Citation renumbering script ([1]-[36]) - Signature analysis pipeline (12 steps) - YOLO extraction scripts Three rounds of AI review completed (GPT-5.4, Claude Opus 4.6, Gemini 3 Pro). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
381 lines
12 KiB
Python
381 lines
12 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
YOLO Signature Extraction from VLM Index
|
|
|
|
Extracts signatures from PDF pages specified in master_signatures.csv.
|
|
Uses VLM-filtered index + YOLO for precise localization and cropping.
|
|
|
|
Pipeline:
|
|
CSV Index → Load specified page → YOLO Detection → Crop & Remove Red Stamp → Output
|
|
"""
|
|
|
|
import argparse
|
|
import csv
|
|
import json
|
|
import os
|
|
import sys
|
|
import time
|
|
from concurrent.futures import ProcessPoolExecutor, as_completed
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
|
|
import cv2
|
|
import fitz # PyMuPDF
|
|
import numpy as np
|
|
|
|
# Configuration
|
|
DPI = 150
|
|
CONFIDENCE_THRESHOLD = 0.5
|
|
PROGRESS_SAVE_INTERVAL = 500
|
|
|
|
|
|
def remove_red_stamp(image: np.ndarray) -> np.ndarray:
|
|
"""Remove red stamp pixels from an image by replacing them with white."""
|
|
hsv = cv2.cvtColor(image, cv2.COLOR_RGB2HSV)
|
|
|
|
lower_red1 = np.array([0, 50, 50])
|
|
upper_red1 = np.array([10, 255, 255])
|
|
lower_red2 = np.array([160, 50, 50])
|
|
upper_red2 = np.array([180, 255, 255])
|
|
|
|
mask1 = cv2.inRange(hsv, lower_red1, upper_red1)
|
|
mask2 = cv2.inRange(hsv, lower_red2, upper_red2)
|
|
red_mask = cv2.bitwise_or(mask1, mask2)
|
|
|
|
kernel = np.ones((3, 3), np.uint8)
|
|
red_mask = cv2.dilate(red_mask, kernel, iterations=1)
|
|
|
|
result = image.copy()
|
|
result[red_mask > 0] = [255, 255, 255]
|
|
return result
|
|
|
|
|
|
def render_pdf_page(pdf_path: str, page_num: int, dpi: int = DPI) -> Optional[np.ndarray]:
|
|
"""Render a specific PDF page to an image array."""
|
|
try:
|
|
doc = fitz.open(pdf_path)
|
|
if page_num < 1 or page_num > len(doc):
|
|
doc.close()
|
|
return None
|
|
|
|
page = doc[page_num - 1] # Convert to 0-indexed
|
|
mat = fitz.Matrix(dpi / 72, dpi / 72)
|
|
pix = page.get_pixmap(matrix=mat, alpha=False)
|
|
image = np.frombuffer(pix.samples, dtype=np.uint8)
|
|
image = image.reshape(pix.height, pix.width, pix.n)
|
|
doc.close()
|
|
return image
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def find_pdf_file(filename: str, pdf_base: str) -> Optional[str]:
|
|
"""Search for PDF file in batch directories."""
|
|
base_path = Path(pdf_base)
|
|
|
|
# Check for batch subdirectories
|
|
for batch_dir in sorted(base_path.glob("batch_*")):
|
|
pdf_path = batch_dir / filename
|
|
if pdf_path.exists():
|
|
return str(pdf_path)
|
|
|
|
# Check flat directory
|
|
pdf_path = base_path / filename
|
|
if pdf_path.exists():
|
|
return str(pdf_path)
|
|
|
|
return None
|
|
|
|
|
|
def process_single_entry(args: tuple) -> dict:
|
|
"""
|
|
Process a single CSV entry: render page, detect signatures, crop and save.
|
|
|
|
Args:
|
|
args: Tuple of (row_dict, model_path, pdf_base, output_dir, conf_threshold)
|
|
|
|
Returns:
|
|
Result dictionary
|
|
"""
|
|
row, model_path, pdf_base, output_dir, conf_threshold = args
|
|
|
|
from ultralytics import YOLO
|
|
|
|
filename = row['filename']
|
|
page_num = int(row['page'])
|
|
base_name = Path(filename).stem
|
|
|
|
result = {
|
|
'filename': filename,
|
|
'page': page_num,
|
|
'num_signatures': 0,
|
|
'confidence_avg': 0.0,
|
|
'image_files': [],
|
|
'error': None
|
|
}
|
|
|
|
try:
|
|
# Find PDF
|
|
pdf_path = find_pdf_file(filename, pdf_base)
|
|
if pdf_path is None:
|
|
result['error'] = 'PDF not found'
|
|
return result
|
|
|
|
# Render page
|
|
image = render_pdf_page(pdf_path, page_num)
|
|
if image is None:
|
|
result['error'] = 'Render failed'
|
|
return result
|
|
|
|
# Load model and detect
|
|
model = YOLO(model_path)
|
|
results = model(image, conf=conf_threshold, verbose=False)
|
|
|
|
signatures = []
|
|
for r in results:
|
|
for box in r.boxes:
|
|
x1, y1, x2, y2 = map(int, box.xyxy[0].cpu().numpy())
|
|
conf = float(box.conf[0].cpu().numpy())
|
|
signatures.append({
|
|
'box': (x1, y1, x2 - x1, y2 - y1),
|
|
'confidence': conf
|
|
})
|
|
|
|
if not signatures:
|
|
result['num_signatures'] = 0
|
|
return result
|
|
|
|
# Sort signatures by position (top-left to bottom-right)
|
|
signatures.sort(key=lambda s: (s['box'][1], s['box'][0]))
|
|
|
|
result['num_signatures'] = len(signatures)
|
|
result['confidence_avg'] = sum(s['confidence'] for s in signatures) / len(signatures)
|
|
|
|
# Extract and save crops
|
|
image_files = []
|
|
for i, sig in enumerate(signatures):
|
|
x, y, w, h = sig['box']
|
|
x = max(0, x)
|
|
y = max(0, y)
|
|
x2 = min(image.shape[1], x + w)
|
|
y2 = min(image.shape[0], y + h)
|
|
|
|
crop = image[y:y2, x:x2]
|
|
crop_clean = remove_red_stamp(crop)
|
|
|
|
crop_filename = f"{base_name}_page{page_num}_sig{i + 1}.png"
|
|
crop_path = os.path.join(output_dir, "images", crop_filename)
|
|
cv2.imwrite(crop_path, cv2.cvtColor(crop_clean, cv2.COLOR_RGB2BGR))
|
|
|
|
image_files.append(crop_filename)
|
|
|
|
result['image_files'] = image_files
|
|
|
|
except Exception as e:
|
|
result['error'] = str(e)
|
|
|
|
return result
|
|
|
|
|
|
def load_progress(progress_file: str) -> set:
|
|
"""Load completed entries from progress checkpoint."""
|
|
if os.path.exists(progress_file):
|
|
try:
|
|
with open(progress_file, 'r') as f:
|
|
data = json.load(f)
|
|
return set(data.get('completed_keys', []))
|
|
except Exception:
|
|
pass
|
|
return set()
|
|
|
|
|
|
def save_progress(progress_file: str, completed: set, total: int, start_time: float):
|
|
"""Save progress checkpoint."""
|
|
elapsed = time.time() - start_time
|
|
data = {
|
|
'last_updated': datetime.now().isoformat(),
|
|
'total_entries': total,
|
|
'processed': len(completed),
|
|
'remaining': total - len(completed),
|
|
'elapsed_seconds': elapsed,
|
|
'completed_keys': list(completed)
|
|
}
|
|
with open(progress_file, 'w') as f:
|
|
json.dump(data, f)
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description='YOLO Signature Extraction from VLM Index')
|
|
parser.add_argument('--csv', required=True, help='Path to master_signatures.csv')
|
|
parser.add_argument('--pdf-base', required=True, help='Base directory containing PDFs')
|
|
parser.add_argument('--output', required=True, help='Output directory')
|
|
parser.add_argument('--model', default='best.pt', help='Path to YOLO model')
|
|
parser.add_argument('--workers', type=int, default=8, help='Number of parallel workers')
|
|
parser.add_argument('--conf', type=float, default=0.5, help='Confidence threshold')
|
|
parser.add_argument('--resume', action='store_true', help='Resume from checkpoint')
|
|
args = parser.parse_args()
|
|
|
|
# Setup output directories
|
|
output_dir = Path(args.output)
|
|
output_dir.mkdir(parents=True, exist_ok=True)
|
|
(output_dir / "images").mkdir(exist_ok=True)
|
|
|
|
progress_file = str(output_dir / "progress.json")
|
|
csv_output = str(output_dir / "extraction_results.csv")
|
|
report_file = str(output_dir / "extraction_report.json")
|
|
|
|
print("=" * 70)
|
|
print("YOLO Signature Extraction from VLM Index")
|
|
print("=" * 70)
|
|
print(f"CSV Index: {args.csv}")
|
|
print(f"PDF Base: {args.pdf_base}")
|
|
print(f"Output: {args.output}")
|
|
print(f"Model: {args.model}")
|
|
print(f"Workers: {args.workers}")
|
|
print(f"Confidence: {args.conf}")
|
|
print("=" * 70)
|
|
|
|
# Load CSV
|
|
print("\nLoading CSV index...")
|
|
with open(args.csv, 'r') as f:
|
|
reader = csv.DictReader(f)
|
|
all_entries = list(reader)
|
|
|
|
total_entries = len(all_entries)
|
|
print(f"Total entries: {total_entries}")
|
|
|
|
# Load progress if resuming
|
|
completed_keys = set()
|
|
if args.resume:
|
|
completed_keys = load_progress(progress_file)
|
|
print(f"Resuming: {len(completed_keys)} entries already processed")
|
|
|
|
# Filter out completed entries
|
|
def entry_key(row):
|
|
return f"{row['filename']}_{row['page']}"
|
|
|
|
entries_to_process = [e for e in all_entries if entry_key(e) not in completed_keys]
|
|
print(f"Entries to process: {len(entries_to_process)}")
|
|
|
|
if not entries_to_process:
|
|
print("All entries already processed!")
|
|
return
|
|
|
|
# Prepare work arguments
|
|
work_args = [
|
|
(entry, args.model, args.pdf_base, str(output_dir), args.conf)
|
|
for entry in entries_to_process
|
|
]
|
|
|
|
# Results
|
|
results_success = []
|
|
results_no_sig = []
|
|
errors = []
|
|
|
|
start_time = time.time()
|
|
processed_count = len(completed_keys)
|
|
|
|
print(f"\nStarting extraction with {args.workers} workers...")
|
|
print("-" * 70)
|
|
|
|
with ProcessPoolExecutor(max_workers=args.workers) as executor:
|
|
futures = {executor.submit(process_single_entry, arg): arg[0] for arg in work_args}
|
|
|
|
for future in as_completed(futures):
|
|
entry = futures[future]
|
|
key = entry_key(entry)
|
|
|
|
try:
|
|
result = future.result()
|
|
|
|
if result['error']:
|
|
errors.append(result)
|
|
elif result['num_signatures'] > 0:
|
|
results_success.append(result)
|
|
else:
|
|
results_no_sig.append(result)
|
|
|
|
completed_keys.add(key)
|
|
processed_count += 1
|
|
|
|
# Progress output
|
|
elapsed = time.time() - start_time
|
|
rate = (processed_count - len(load_progress(progress_file) if args.resume else set())) / elapsed if elapsed > 0 else 0
|
|
eta = (total_entries - processed_count) / rate / 60 if rate > 0 else 0
|
|
|
|
status = f"SIG({result['num_signatures']})" if result['num_signatures'] > 0 else "---"
|
|
if result['error']:
|
|
status = "ERR"
|
|
|
|
print(f"[{processed_count}/{total_entries}] {status:8s} {result['filename'][:45]:45s} "
|
|
f"({rate:.1f}/s, ETA: {eta:.1f}m)")
|
|
|
|
# Save progress
|
|
if processed_count % PROGRESS_SAVE_INTERVAL == 0:
|
|
save_progress(progress_file, completed_keys, total_entries, start_time)
|
|
|
|
except Exception as e:
|
|
print(f"Error: {e}")
|
|
errors.append({'filename': entry['filename'], 'error': str(e)})
|
|
|
|
# Final progress save
|
|
save_progress(progress_file, completed_keys, total_entries, start_time)
|
|
|
|
# Write CSV results
|
|
print("\nWriting results CSV...")
|
|
with open(csv_output, 'w', newline='') as f:
|
|
writer = csv.DictWriter(f, fieldnames=[
|
|
'filename', 'page', 'num_signatures', 'confidence_avg', 'image_files'
|
|
])
|
|
writer.writeheader()
|
|
for r in results_success:
|
|
writer.writerow({
|
|
'filename': r['filename'],
|
|
'page': r['page'],
|
|
'num_signatures': r['num_signatures'],
|
|
'confidence_avg': round(r['confidence_avg'], 4),
|
|
'image_files': ','.join(r['image_files'])
|
|
})
|
|
|
|
# Generate report
|
|
elapsed_total = time.time() - start_time
|
|
total_sigs = sum(r['num_signatures'] for r in results_success)
|
|
|
|
report = {
|
|
'extraction_date': datetime.now().isoformat(),
|
|
'total_index_entries': total_entries,
|
|
'with_signatures_detected': len(results_success),
|
|
'no_signatures_detected': len(results_no_sig),
|
|
'errors': len(errors),
|
|
'total_signatures_extracted': total_sigs,
|
|
'detection_rate': f"{len(results_success) / total_entries * 100:.2f}%" if total_entries > 0 else "0%",
|
|
'processing_time_minutes': round(elapsed_total / 60, 2),
|
|
'processing_rate_per_second': round(len(entries_to_process) / elapsed_total, 2) if elapsed_total > 0 else 0,
|
|
'model': args.model,
|
|
'confidence_threshold': args.conf,
|
|
'workers': args.workers
|
|
}
|
|
|
|
with open(report_file, 'w') as f:
|
|
json.dump(report, f, indent=2)
|
|
|
|
# Print summary
|
|
print("\n" + "=" * 70)
|
|
print("EXTRACTION COMPLETE")
|
|
print("=" * 70)
|
|
print(f"Total index entries: {total_entries}")
|
|
print(f"With signatures: {len(results_success)} ({len(results_success)/total_entries*100:.1f}%)")
|
|
print(f"No signatures detected: {len(results_no_sig)} ({len(results_no_sig)/total_entries*100:.1f}%)")
|
|
print(f"Errors: {len(errors)}")
|
|
print(f"Total signatures: {total_sigs}")
|
|
print(f"Processing time: {elapsed_total/60:.1f} minutes")
|
|
print(f"Rate: {len(entries_to_process)/elapsed_total:.1f} entries/second")
|
|
print("-" * 70)
|
|
print(f"Results saved to: {output_dir}")
|
|
print("=" * 70)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|