Files
pdf_signature_extraction/signature_analysis/05_extract_names.py
T
gbanyan 939a348da4 Add Paper A (IEEE TAI) complete draft with Firm A-calibrated dual-method classification
Paper draft includes all sections (Abstract through Conclusion), 36 references,
and supporting scripts. Key methodology: Cosine similarity + dHash dual-method
verification with thresholds calibrated against known-replication firm (Firm A).

Includes:
- 8 section markdown files (paper_a_*.md)
- Ablation study script (ResNet-50 vs VGG-16 vs EfficientNet-B0)
- Recalibrated classification script (84,386 PDFs, 5-tier system)
- Figure generation and Word export scripts
- Citation renumbering script ([1]-[36])
- Signature analysis pipeline (12 steps)
- YOLO extraction scripts

Three rounds of AI review completed (GPT-5.4, Claude Opus 4.6, Gemini 3 Pro).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-06 23:05:33 +08:00

433 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
Step 5: 從 PDF 提取會計師印刷姓名
流程:
1. 從資料庫讀取簽名記錄,按 (PDF, page) 分組
2. 對每個頁面重新執行 YOLO 獲取簽名框座標
3. 對整頁執行 PaddleOCR 提取印刷文字
4. 過濾出候選姓名(2-4 個中文字)
5. 配對簽名與最近的印刷姓名
6. 更新資料庫的 accountant_name 欄位
"""
import sqlite3
import json
import re
import sys
import time
from pathlib import Path
from typing import Optional, List, Dict, Tuple
from collections import defaultdict
from tqdm import tqdm
import numpy as np
import cv2
import fitz # PyMuPDF
# 加入父目錄到路徑以便匯入
sys.path.insert(0, str(Path(__file__).parent.parent))
from paddleocr_client import PaddleOCRClient
# 路徑配置
PDF_BASE = Path("/Volumes/NV2/PDF-Processing/total-pdf")
YOLO_MODEL_PATH = Path("/Volumes/NV2/pdf_recognize/models/best.pt")
DB_PATH = Path("/Volumes/NV2/PDF-Processing/signature-analysis/signature_analysis.db")
REPORTS_PATH = Path("/Volumes/NV2/PDF-Processing/signature-analysis/reports")
# 處理配置
DPI = 150
CONFIDENCE_THRESHOLD = 0.5
NAME_SEARCH_MARGIN = 200 # 簽名框周圍搜索姓名的像素範圍
PROGRESS_SAVE_INTERVAL = 100 # 每處理 N 個頁面保存一次進度
# 中文姓名正則
CHINESE_NAME_PATTERN = re.compile(r'^[\u4e00-\u9fff]{2,4}$')
def find_pdf_file(filename: str) -> Optional[str]:
"""搜尋 PDF 檔案路徑"""
# 先在 batch_* 子目錄尋找
for batch_dir in sorted(PDF_BASE.glob("batch_*")):
pdf_path = batch_dir / filename
if pdf_path.exists():
return str(pdf_path)
# 再在頂層目錄尋找
pdf_path = PDF_BASE / filename
if pdf_path.exists():
return str(pdf_path)
return None
def render_pdf_page(pdf_path: str, page_num: int) -> Optional[np.ndarray]:
"""渲染 PDF 頁面為圖像"""
try:
doc = fitz.open(pdf_path)
if page_num < 1 or page_num > len(doc):
doc.close()
return None
page = doc[page_num - 1]
mat = fitz.Matrix(DPI / 72, DPI / 72)
pix = page.get_pixmap(matrix=mat, alpha=False)
image = np.frombuffer(pix.samples, dtype=np.uint8)
image = image.reshape(pix.height, pix.width, pix.n)
doc.close()
return image
except Exception as e:
print(f"渲染失敗: {pdf_path} page {page_num}: {e}")
return None
def detect_signatures_yolo(image: np.ndarray, model) -> List[Dict]:
"""使用 YOLO 偵測簽名框"""
results = model(image, conf=CONFIDENCE_THRESHOLD, verbose=False)
signatures = []
for r in results:
for box in r.boxes:
x1, y1, x2, y2 = map(int, box.xyxy[0].cpu().numpy())
conf = float(box.conf[0].cpu().numpy())
signatures.append({
'x': x1,
'y': y1,
'width': x2 - x1,
'height': y2 - y1,
'confidence': conf,
'center_x': (x1 + x2) / 2,
'center_y': (y1 + y2) / 2
})
# 按位置排序(上到下,左到右)
signatures.sort(key=lambda s: (s['y'], s['x']))
return signatures
def extract_text_candidates(image: np.ndarray, ocr_client: PaddleOCRClient) -> List[Dict]:
"""從圖像中提取所有文字候選"""
try:
results = ocr_client.ocr(image)
candidates = []
for result in results:
text = result.get('text', '').strip()
box = result.get('box', [])
confidence = result.get('confidence', 0.0)
if not box or not text:
continue
# 計算邊界框中心
xs = [point[0] for point in box]
ys = [point[1] for point in box]
center_x = sum(xs) / len(xs)
center_y = sum(ys) / len(ys)
candidates.append({
'text': text,
'center_x': center_x,
'center_y': center_y,
'x': min(xs),
'y': min(ys),
'width': max(xs) - min(xs),
'height': max(ys) - min(ys),
'confidence': confidence
})
return candidates
except Exception as e:
print(f"OCR 失敗: {e}")
return []
def filter_name_candidates(candidates: List[Dict]) -> List[Dict]:
"""過濾出可能是姓名的文字(2-4 個中文字,不含數字標點)"""
names = []
for c in candidates:
text = c['text']
# 移除空白和標點
text_clean = re.sub(r'[\s\:\\,\\.\。]', '', text)
if CHINESE_NAME_PATTERN.match(text_clean):
c['text_clean'] = text_clean
names.append(c)
return names
def match_signature_to_name(
sig: Dict,
name_candidates: List[Dict],
margin: int = NAME_SEARCH_MARGIN
) -> Optional[str]:
"""為簽名框配對最近的姓名候選"""
sig_center_x = sig['center_x']
sig_center_y = sig['center_y']
# 過濾出在搜索範圍內的姓名
nearby_names = []
for name in name_candidates:
dx = abs(name['center_x'] - sig_center_x)
dy = abs(name['center_y'] - sig_center_y)
# 在 margin 範圍內
if dx <= margin + sig['width']/2 and dy <= margin + sig['height']/2:
distance = (dx**2 + dy**2) ** 0.5
nearby_names.append((name, distance))
if not nearby_names:
return None
# 返回距離最近的
nearby_names.sort(key=lambda x: x[1])
return nearby_names[0][0]['text_clean']
def get_pages_to_process(conn: sqlite3.Connection) -> List[Tuple[str, int, List[int]]]:
"""
從資料庫獲取需要處理的 (PDF, page) 組合
Returns:
List of (source_pdf, page_number, [signature_ids])
"""
cursor = conn.cursor()
# 查詢尚未有 accountant_name 的簽名,按 (PDF, page) 分組
cursor.execute('''
SELECT source_pdf, page_number, GROUP_CONCAT(signature_id)
FROM signatures
WHERE accountant_name IS NULL OR accountant_name = ''
GROUP BY source_pdf, page_number
ORDER BY source_pdf, page_number
''')
pages = []
for row in cursor.fetchall():
source_pdf, page_number, sig_ids_str = row
sig_ids = [int(x) for x in sig_ids_str.split(',')]
pages.append((source_pdf, page_number, sig_ids))
return pages
def update_signature_names(
conn: sqlite3.Connection,
updates: List[Tuple[int, str, int, int, int, int]]
):
"""
更新資料庫中的簽名姓名和座標
Args:
updates: List of (signature_id, accountant_name, x, y, width, height)
"""
cursor = conn.cursor()
# 確保 signature_boxes 表存在
cursor.execute('''
CREATE TABLE IF NOT EXISTS signature_boxes (
signature_id INTEGER PRIMARY KEY,
x INTEGER,
y INTEGER,
width INTEGER,
height INTEGER,
FOREIGN KEY (signature_id) REFERENCES signatures(signature_id)
)
''')
for sig_id, name, x, y, w, h in updates:
# 更新姓名
cursor.execute('''
UPDATE signatures SET accountant_name = ? WHERE signature_id = ?
''', (name, sig_id))
# 更新或插入座標
cursor.execute('''
INSERT OR REPLACE INTO signature_boxes (signature_id, x, y, width, height)
VALUES (?, ?, ?, ?, ?)
''', (sig_id, x, y, w, h))
conn.commit()
def process_page(
source_pdf: str,
page_number: int,
sig_ids: List[int],
yolo_model,
ocr_client: PaddleOCRClient,
conn: sqlite3.Connection
) -> Dict:
"""
處理單一頁面:偵測簽名框、提取姓名、配對
Returns:
處理結果統計
"""
result = {
'source_pdf': source_pdf,
'page_number': page_number,
'num_signatures': len(sig_ids),
'matched': 0,
'unmatched': 0,
'error': None
}
# 找 PDF 檔案
pdf_path = find_pdf_file(source_pdf)
if pdf_path is None:
result['error'] = 'PDF not found'
return result
# 渲染頁面
image = render_pdf_page(pdf_path, page_number)
if image is None:
result['error'] = 'Render failed'
return result
# YOLO 偵測簽名框
sig_boxes = detect_signatures_yolo(image, yolo_model)
if len(sig_boxes) != len(sig_ids):
# 簽名數量不匹配,嘗試按順序配對
pass
# OCR 提取文字
text_candidates = extract_text_candidates(image, ocr_client)
# 過濾出姓名候選
name_candidates = filter_name_candidates(text_candidates)
# 配對簽名與姓名
updates = []
for i, (sig_id, sig_box) in enumerate(zip(sig_ids, sig_boxes)):
matched_name = match_signature_to_name(sig_box, name_candidates)
if matched_name:
result['matched'] += 1
else:
result['unmatched'] += 1
matched_name = '' # 空字串表示未配對
updates.append((
sig_id,
matched_name,
sig_box['x'],
sig_box['y'],
sig_box['width'],
sig_box['height']
))
# 如果 YOLO 偵測數量少於記錄數量,處理剩餘的
if len(sig_boxes) < len(sig_ids):
for sig_id in sig_ids[len(sig_boxes):]:
updates.append((sig_id, '', 0, 0, 0, 0))
result['unmatched'] += 1
# 更新資料庫
update_signature_names(conn, updates)
return result
def main():
print("=" * 60)
print("Step 5: 從 PDF 提取會計師印刷姓名")
print("=" * 60)
# 確保報告目錄存在
REPORTS_PATH.mkdir(parents=True, exist_ok=True)
# 連接資料庫
print("\n連接資料庫...")
conn = sqlite3.connect(DB_PATH)
# 獲取需要處理的頁面
print("查詢待處理頁面...")
pages = get_pages_to_process(conn)
print(f"{len(pages)} 個頁面待處理")
if not pages:
print("沒有需要處理的頁面")
conn.close()
return
# 初始化 YOLO
print("\n載入 YOLO 模型...")
from ultralytics import YOLO
yolo_model = YOLO(str(YOLO_MODEL_PATH))
# 初始化 OCR 客戶端
print("連接 PaddleOCR 伺服器...")
ocr_client = PaddleOCRClient()
if not ocr_client.health_check():
print("錯誤: PaddleOCR 伺服器無法連接")
print("請確認伺服器 http://192.168.30.36:5555 正在運行")
conn.close()
return
print("OCR 伺服器連接成功")
# 統計
stats = {
'total_pages': len(pages),
'processed': 0,
'matched': 0,
'unmatched': 0,
'errors': 0,
'start_time': time.time()
}
# 處理每個頁面
print(f"\n開始處理 {len(pages)} 個頁面...")
for source_pdf, page_number, sig_ids in tqdm(pages, desc="處理頁面"):
result = process_page(
source_pdf, page_number, sig_ids,
yolo_model, ocr_client, conn
)
stats['processed'] += 1
stats['matched'] += result['matched']
stats['unmatched'] += result['unmatched']
if result['error']:
stats['errors'] += 1
# 定期保存進度報告
if stats['processed'] % PROGRESS_SAVE_INTERVAL == 0:
elapsed = time.time() - stats['start_time']
rate = stats['processed'] / elapsed
remaining = (stats['total_pages'] - stats['processed']) / rate if rate > 0 else 0
print(f"\n進度: {stats['processed']}/{stats['total_pages']} "
f"({stats['processed']/stats['total_pages']*100:.1f}%)")
print(f"配對成功: {stats['matched']}, 未配對: {stats['unmatched']}")
print(f"預估剩餘時間: {remaining/60:.1f} 分鐘")
# 最終統計
elapsed = time.time() - stats['start_time']
stats['elapsed_seconds'] = elapsed
print("\n" + "=" * 60)
print("處理完成")
print("=" * 60)
print(f"總頁面數: {stats['total_pages']}")
print(f"處理成功: {stats['processed']}")
print(f"配對成功: {stats['matched']}")
print(f"未配對: {stats['unmatched']}")
print(f"錯誤: {stats['errors']}")
print(f"耗時: {elapsed/60:.1f} 分鐘")
# 保存報告
report_path = REPORTS_PATH / "name_extraction_report.json"
with open(report_path, 'w', encoding='utf-8') as f:
json.dump(stats, f, indent=2, ensure_ascii=False)
print(f"\n報告已儲存: {report_path}")
conn.close()
if __name__ == "__main__":
main()