939a348da4
Paper draft includes all sections (Abstract through Conclusion), 36 references, and supporting scripts. Key methodology: Cosine similarity + dHash dual-method verification with thresholds calibrated against known-replication firm (Firm A). Includes: - 8 section markdown files (paper_a_*.md) - Ablation study script (ResNet-50 vs VGG-16 vs EfficientNet-B0) - Recalibrated classification script (84,386 PDFs, 5-tier system) - Figure generation and Word export scripts - Citation renumbering script ([1]-[36]) - Signature analysis pipeline (12 steps) - YOLO extraction scripts Three rounds of AI review completed (GPT-5.4, Claude Opus 4.6, Gemini 3 Pro). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
433 lines
12 KiB
Python
433 lines
12 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
Step 5: 從 PDF 提取會計師印刷姓名
|
||
|
||
流程:
|
||
1. 從資料庫讀取簽名記錄,按 (PDF, page) 分組
|
||
2. 對每個頁面重新執行 YOLO 獲取簽名框座標
|
||
3. 對整頁執行 PaddleOCR 提取印刷文字
|
||
4. 過濾出候選姓名(2-4 個中文字)
|
||
5. 配對簽名與最近的印刷姓名
|
||
6. 更新資料庫的 accountant_name 欄位
|
||
"""
|
||
|
||
import sqlite3
|
||
import json
|
||
import re
|
||
import sys
|
||
import time
|
||
from pathlib import Path
|
||
from typing import Optional, List, Dict, Tuple
|
||
from collections import defaultdict
|
||
from tqdm import tqdm
|
||
import numpy as np
|
||
import cv2
|
||
import fitz # PyMuPDF
|
||
|
||
# 加入父目錄到路徑以便匯入
|
||
sys.path.insert(0, str(Path(__file__).parent.parent))
|
||
from paddleocr_client import PaddleOCRClient
|
||
|
||
# 路徑配置
|
||
PDF_BASE = Path("/Volumes/NV2/PDF-Processing/total-pdf")
|
||
YOLO_MODEL_PATH = Path("/Volumes/NV2/pdf_recognize/models/best.pt")
|
||
DB_PATH = Path("/Volumes/NV2/PDF-Processing/signature-analysis/signature_analysis.db")
|
||
REPORTS_PATH = Path("/Volumes/NV2/PDF-Processing/signature-analysis/reports")
|
||
|
||
# 處理配置
|
||
DPI = 150
|
||
CONFIDENCE_THRESHOLD = 0.5
|
||
NAME_SEARCH_MARGIN = 200 # 簽名框周圍搜索姓名的像素範圍
|
||
PROGRESS_SAVE_INTERVAL = 100 # 每處理 N 個頁面保存一次進度
|
||
|
||
# 中文姓名正則
|
||
CHINESE_NAME_PATTERN = re.compile(r'^[\u4e00-\u9fff]{2,4}$')
|
||
|
||
|
||
def find_pdf_file(filename: str) -> Optional[str]:
|
||
"""搜尋 PDF 檔案路徑"""
|
||
# 先在 batch_* 子目錄尋找
|
||
for batch_dir in sorted(PDF_BASE.glob("batch_*")):
|
||
pdf_path = batch_dir / filename
|
||
if pdf_path.exists():
|
||
return str(pdf_path)
|
||
|
||
# 再在頂層目錄尋找
|
||
pdf_path = PDF_BASE / filename
|
||
if pdf_path.exists():
|
||
return str(pdf_path)
|
||
|
||
return None
|
||
|
||
|
||
def render_pdf_page(pdf_path: str, page_num: int) -> Optional[np.ndarray]:
|
||
"""渲染 PDF 頁面為圖像"""
|
||
try:
|
||
doc = fitz.open(pdf_path)
|
||
if page_num < 1 or page_num > len(doc):
|
||
doc.close()
|
||
return None
|
||
|
||
page = doc[page_num - 1]
|
||
mat = fitz.Matrix(DPI / 72, DPI / 72)
|
||
pix = page.get_pixmap(matrix=mat, alpha=False)
|
||
image = np.frombuffer(pix.samples, dtype=np.uint8)
|
||
image = image.reshape(pix.height, pix.width, pix.n)
|
||
doc.close()
|
||
return image
|
||
except Exception as e:
|
||
print(f"渲染失敗: {pdf_path} page {page_num}: {e}")
|
||
return None
|
||
|
||
|
||
def detect_signatures_yolo(image: np.ndarray, model) -> List[Dict]:
|
||
"""使用 YOLO 偵測簽名框"""
|
||
results = model(image, conf=CONFIDENCE_THRESHOLD, verbose=False)
|
||
|
||
signatures = []
|
||
for r in results:
|
||
for box in r.boxes:
|
||
x1, y1, x2, y2 = map(int, box.xyxy[0].cpu().numpy())
|
||
conf = float(box.conf[0].cpu().numpy())
|
||
signatures.append({
|
||
'x': x1,
|
||
'y': y1,
|
||
'width': x2 - x1,
|
||
'height': y2 - y1,
|
||
'confidence': conf,
|
||
'center_x': (x1 + x2) / 2,
|
||
'center_y': (y1 + y2) / 2
|
||
})
|
||
|
||
# 按位置排序(上到下,左到右)
|
||
signatures.sort(key=lambda s: (s['y'], s['x']))
|
||
|
||
return signatures
|
||
|
||
|
||
def extract_text_candidates(image: np.ndarray, ocr_client: PaddleOCRClient) -> List[Dict]:
|
||
"""從圖像中提取所有文字候選"""
|
||
try:
|
||
results = ocr_client.ocr(image)
|
||
|
||
candidates = []
|
||
for result in results:
|
||
text = result.get('text', '').strip()
|
||
box = result.get('box', [])
|
||
confidence = result.get('confidence', 0.0)
|
||
|
||
if not box or not text:
|
||
continue
|
||
|
||
# 計算邊界框中心
|
||
xs = [point[0] for point in box]
|
||
ys = [point[1] for point in box]
|
||
center_x = sum(xs) / len(xs)
|
||
center_y = sum(ys) / len(ys)
|
||
|
||
candidates.append({
|
||
'text': text,
|
||
'center_x': center_x,
|
||
'center_y': center_y,
|
||
'x': min(xs),
|
||
'y': min(ys),
|
||
'width': max(xs) - min(xs),
|
||
'height': max(ys) - min(ys),
|
||
'confidence': confidence
|
||
})
|
||
|
||
return candidates
|
||
except Exception as e:
|
||
print(f"OCR 失敗: {e}")
|
||
return []
|
||
|
||
|
||
def filter_name_candidates(candidates: List[Dict]) -> List[Dict]:
|
||
"""過濾出可能是姓名的文字(2-4 個中文字,不含數字標點)"""
|
||
names = []
|
||
for c in candidates:
|
||
text = c['text']
|
||
# 移除空白和標點
|
||
text_clean = re.sub(r'[\s\:\:\,\,\.\。]', '', text)
|
||
|
||
if CHINESE_NAME_PATTERN.match(text_clean):
|
||
c['text_clean'] = text_clean
|
||
names.append(c)
|
||
|
||
return names
|
||
|
||
|
||
def match_signature_to_name(
|
||
sig: Dict,
|
||
name_candidates: List[Dict],
|
||
margin: int = NAME_SEARCH_MARGIN
|
||
) -> Optional[str]:
|
||
"""為簽名框配對最近的姓名候選"""
|
||
sig_center_x = sig['center_x']
|
||
sig_center_y = sig['center_y']
|
||
|
||
# 過濾出在搜索範圍內的姓名
|
||
nearby_names = []
|
||
for name in name_candidates:
|
||
dx = abs(name['center_x'] - sig_center_x)
|
||
dy = abs(name['center_y'] - sig_center_y)
|
||
|
||
# 在 margin 範圍內
|
||
if dx <= margin + sig['width']/2 and dy <= margin + sig['height']/2:
|
||
distance = (dx**2 + dy**2) ** 0.5
|
||
nearby_names.append((name, distance))
|
||
|
||
if not nearby_names:
|
||
return None
|
||
|
||
# 返回距離最近的
|
||
nearby_names.sort(key=lambda x: x[1])
|
||
return nearby_names[0][0]['text_clean']
|
||
|
||
|
||
def get_pages_to_process(conn: sqlite3.Connection) -> List[Tuple[str, int, List[int]]]:
|
||
"""
|
||
從資料庫獲取需要處理的 (PDF, page) 組合
|
||
|
||
Returns:
|
||
List of (source_pdf, page_number, [signature_ids])
|
||
"""
|
||
cursor = conn.cursor()
|
||
|
||
# 查詢尚未有 accountant_name 的簽名,按 (PDF, page) 分組
|
||
cursor.execute('''
|
||
SELECT source_pdf, page_number, GROUP_CONCAT(signature_id)
|
||
FROM signatures
|
||
WHERE accountant_name IS NULL OR accountant_name = ''
|
||
GROUP BY source_pdf, page_number
|
||
ORDER BY source_pdf, page_number
|
||
''')
|
||
|
||
pages = []
|
||
for row in cursor.fetchall():
|
||
source_pdf, page_number, sig_ids_str = row
|
||
sig_ids = [int(x) for x in sig_ids_str.split(',')]
|
||
pages.append((source_pdf, page_number, sig_ids))
|
||
|
||
return pages
|
||
|
||
|
||
def update_signature_names(
|
||
conn: sqlite3.Connection,
|
||
updates: List[Tuple[int, str, int, int, int, int]]
|
||
):
|
||
"""
|
||
更新資料庫中的簽名姓名和座標
|
||
|
||
Args:
|
||
updates: List of (signature_id, accountant_name, x, y, width, height)
|
||
"""
|
||
cursor = conn.cursor()
|
||
|
||
# 確保 signature_boxes 表存在
|
||
cursor.execute('''
|
||
CREATE TABLE IF NOT EXISTS signature_boxes (
|
||
signature_id INTEGER PRIMARY KEY,
|
||
x INTEGER,
|
||
y INTEGER,
|
||
width INTEGER,
|
||
height INTEGER,
|
||
FOREIGN KEY (signature_id) REFERENCES signatures(signature_id)
|
||
)
|
||
''')
|
||
|
||
for sig_id, name, x, y, w, h in updates:
|
||
# 更新姓名
|
||
cursor.execute('''
|
||
UPDATE signatures SET accountant_name = ? WHERE signature_id = ?
|
||
''', (name, sig_id))
|
||
|
||
# 更新或插入座標
|
||
cursor.execute('''
|
||
INSERT OR REPLACE INTO signature_boxes (signature_id, x, y, width, height)
|
||
VALUES (?, ?, ?, ?, ?)
|
||
''', (sig_id, x, y, w, h))
|
||
|
||
conn.commit()
|
||
|
||
|
||
def process_page(
|
||
source_pdf: str,
|
||
page_number: int,
|
||
sig_ids: List[int],
|
||
yolo_model,
|
||
ocr_client: PaddleOCRClient,
|
||
conn: sqlite3.Connection
|
||
) -> Dict:
|
||
"""
|
||
處理單一頁面:偵測簽名框、提取姓名、配對
|
||
|
||
Returns:
|
||
處理結果統計
|
||
"""
|
||
result = {
|
||
'source_pdf': source_pdf,
|
||
'page_number': page_number,
|
||
'num_signatures': len(sig_ids),
|
||
'matched': 0,
|
||
'unmatched': 0,
|
||
'error': None
|
||
}
|
||
|
||
# 找 PDF 檔案
|
||
pdf_path = find_pdf_file(source_pdf)
|
||
if pdf_path is None:
|
||
result['error'] = 'PDF not found'
|
||
return result
|
||
|
||
# 渲染頁面
|
||
image = render_pdf_page(pdf_path, page_number)
|
||
if image is None:
|
||
result['error'] = 'Render failed'
|
||
return result
|
||
|
||
# YOLO 偵測簽名框
|
||
sig_boxes = detect_signatures_yolo(image, yolo_model)
|
||
|
||
if len(sig_boxes) != len(sig_ids):
|
||
# 簽名數量不匹配,嘗試按順序配對
|
||
pass
|
||
|
||
# OCR 提取文字
|
||
text_candidates = extract_text_candidates(image, ocr_client)
|
||
|
||
# 過濾出姓名候選
|
||
name_candidates = filter_name_candidates(text_candidates)
|
||
|
||
# 配對簽名與姓名
|
||
updates = []
|
||
|
||
for i, (sig_id, sig_box) in enumerate(zip(sig_ids, sig_boxes)):
|
||
matched_name = match_signature_to_name(sig_box, name_candidates)
|
||
|
||
if matched_name:
|
||
result['matched'] += 1
|
||
else:
|
||
result['unmatched'] += 1
|
||
matched_name = '' # 空字串表示未配對
|
||
|
||
updates.append((
|
||
sig_id,
|
||
matched_name,
|
||
sig_box['x'],
|
||
sig_box['y'],
|
||
sig_box['width'],
|
||
sig_box['height']
|
||
))
|
||
|
||
# 如果 YOLO 偵測數量少於記錄數量,處理剩餘的
|
||
if len(sig_boxes) < len(sig_ids):
|
||
for sig_id in sig_ids[len(sig_boxes):]:
|
||
updates.append((sig_id, '', 0, 0, 0, 0))
|
||
result['unmatched'] += 1
|
||
|
||
# 更新資料庫
|
||
update_signature_names(conn, updates)
|
||
|
||
return result
|
||
|
||
|
||
def main():
|
||
print("=" * 60)
|
||
print("Step 5: 從 PDF 提取會計師印刷姓名")
|
||
print("=" * 60)
|
||
|
||
# 確保報告目錄存在
|
||
REPORTS_PATH.mkdir(parents=True, exist_ok=True)
|
||
|
||
# 連接資料庫
|
||
print("\n連接資料庫...")
|
||
conn = sqlite3.connect(DB_PATH)
|
||
|
||
# 獲取需要處理的頁面
|
||
print("查詢待處理頁面...")
|
||
pages = get_pages_to_process(conn)
|
||
print(f"共 {len(pages)} 個頁面待處理")
|
||
|
||
if not pages:
|
||
print("沒有需要處理的頁面")
|
||
conn.close()
|
||
return
|
||
|
||
# 初始化 YOLO
|
||
print("\n載入 YOLO 模型...")
|
||
from ultralytics import YOLO
|
||
yolo_model = YOLO(str(YOLO_MODEL_PATH))
|
||
|
||
# 初始化 OCR 客戶端
|
||
print("連接 PaddleOCR 伺服器...")
|
||
ocr_client = PaddleOCRClient()
|
||
if not ocr_client.health_check():
|
||
print("錯誤: PaddleOCR 伺服器無法連接")
|
||
print("請確認伺服器 http://192.168.30.36:5555 正在運行")
|
||
conn.close()
|
||
return
|
||
print("OCR 伺服器連接成功")
|
||
|
||
# 統計
|
||
stats = {
|
||
'total_pages': len(pages),
|
||
'processed': 0,
|
||
'matched': 0,
|
||
'unmatched': 0,
|
||
'errors': 0,
|
||
'start_time': time.time()
|
||
}
|
||
|
||
# 處理每個頁面
|
||
print(f"\n開始處理 {len(pages)} 個頁面...")
|
||
|
||
for source_pdf, page_number, sig_ids in tqdm(pages, desc="處理頁面"):
|
||
result = process_page(
|
||
source_pdf, page_number, sig_ids,
|
||
yolo_model, ocr_client, conn
|
||
)
|
||
|
||
stats['processed'] += 1
|
||
stats['matched'] += result['matched']
|
||
stats['unmatched'] += result['unmatched']
|
||
if result['error']:
|
||
stats['errors'] += 1
|
||
|
||
# 定期保存進度報告
|
||
if stats['processed'] % PROGRESS_SAVE_INTERVAL == 0:
|
||
elapsed = time.time() - stats['start_time']
|
||
rate = stats['processed'] / elapsed
|
||
remaining = (stats['total_pages'] - stats['processed']) / rate if rate > 0 else 0
|
||
|
||
print(f"\n進度: {stats['processed']}/{stats['total_pages']} "
|
||
f"({stats['processed']/stats['total_pages']*100:.1f}%)")
|
||
print(f"配對成功: {stats['matched']}, 未配對: {stats['unmatched']}")
|
||
print(f"預估剩餘時間: {remaining/60:.1f} 分鐘")
|
||
|
||
# 最終統計
|
||
elapsed = time.time() - stats['start_time']
|
||
stats['elapsed_seconds'] = elapsed
|
||
|
||
print("\n" + "=" * 60)
|
||
print("處理完成")
|
||
print("=" * 60)
|
||
print(f"總頁面數: {stats['total_pages']}")
|
||
print(f"處理成功: {stats['processed']}")
|
||
print(f"配對成功: {stats['matched']}")
|
||
print(f"未配對: {stats['unmatched']}")
|
||
print(f"錯誤: {stats['errors']}")
|
||
print(f"耗時: {elapsed/60:.1f} 分鐘")
|
||
|
||
# 保存報告
|
||
report_path = REPORTS_PATH / "name_extraction_report.json"
|
||
with open(report_path, 'w', encoding='utf-8') as f:
|
||
json.dump(stats, f, indent=2, ensure_ascii=False)
|
||
print(f"\n報告已儲存: {report_path}")
|
||
|
||
conn.close()
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|