#!/usr/bin/env python3 """ Step 5: 從 PDF 提取會計師印刷姓名 流程: 1. 從資料庫讀取簽名記錄,按 (PDF, page) 分組 2. 對每個頁面重新執行 YOLO 獲取簽名框座標 3. 對整頁執行 PaddleOCR 提取印刷文字 4. 過濾出候選姓名(2-4 個中文字) 5. 配對簽名與最近的印刷姓名 6. 更新資料庫的 accountant_name 欄位 """ import sqlite3 import json import re import sys import time from pathlib import Path from typing import Optional, List, Dict, Tuple from collections import defaultdict from tqdm import tqdm import numpy as np import cv2 import fitz # PyMuPDF # 加入父目錄到路徑以便匯入 sys.path.insert(0, str(Path(__file__).parent.parent)) from paddleocr_client import PaddleOCRClient # 路徑配置 PDF_BASE = Path("/Volumes/NV2/PDF-Processing/total-pdf") YOLO_MODEL_PATH = Path("/Volumes/NV2/pdf_recognize/models/best.pt") DB_PATH = Path("/Volumes/NV2/PDF-Processing/signature-analysis/signature_analysis.db") REPORTS_PATH = Path("/Volumes/NV2/PDF-Processing/signature-analysis/reports") # 處理配置 DPI = 150 CONFIDENCE_THRESHOLD = 0.5 NAME_SEARCH_MARGIN = 200 # 簽名框周圍搜索姓名的像素範圍 PROGRESS_SAVE_INTERVAL = 100 # 每處理 N 個頁面保存一次進度 # 中文姓名正則 CHINESE_NAME_PATTERN = re.compile(r'^[\u4e00-\u9fff]{2,4}$') def find_pdf_file(filename: str) -> Optional[str]: """搜尋 PDF 檔案路徑""" # 先在 batch_* 子目錄尋找 for batch_dir in sorted(PDF_BASE.glob("batch_*")): pdf_path = batch_dir / filename if pdf_path.exists(): return str(pdf_path) # 再在頂層目錄尋找 pdf_path = PDF_BASE / filename if pdf_path.exists(): return str(pdf_path) return None def render_pdf_page(pdf_path: str, page_num: int) -> Optional[np.ndarray]: """渲染 PDF 頁面為圖像""" try: doc = fitz.open(pdf_path) if page_num < 1 or page_num > len(doc): doc.close() return None page = doc[page_num - 1] mat = fitz.Matrix(DPI / 72, DPI / 72) pix = page.get_pixmap(matrix=mat, alpha=False) image = np.frombuffer(pix.samples, dtype=np.uint8) image = image.reshape(pix.height, pix.width, pix.n) doc.close() return image except Exception as e: print(f"渲染失敗: {pdf_path} page {page_num}: {e}") return None def detect_signatures_yolo(image: np.ndarray, model) -> List[Dict]: """使用 YOLO 偵測簽名框""" results = model(image, conf=CONFIDENCE_THRESHOLD, verbose=False) signatures = [] for r in results: for box in r.boxes: x1, y1, x2, y2 = map(int, box.xyxy[0].cpu().numpy()) conf = float(box.conf[0].cpu().numpy()) signatures.append({ 'x': x1, 'y': y1, 'width': x2 - x1, 'height': y2 - y1, 'confidence': conf, 'center_x': (x1 + x2) / 2, 'center_y': (y1 + y2) / 2 }) # 按位置排序(上到下,左到右) signatures.sort(key=lambda s: (s['y'], s['x'])) return signatures def extract_text_candidates(image: np.ndarray, ocr_client: PaddleOCRClient) -> List[Dict]: """從圖像中提取所有文字候選""" try: results = ocr_client.ocr(image) candidates = [] for result in results: text = result.get('text', '').strip() box = result.get('box', []) confidence = result.get('confidence', 0.0) if not box or not text: continue # 計算邊界框中心 xs = [point[0] for point in box] ys = [point[1] for point in box] center_x = sum(xs) / len(xs) center_y = sum(ys) / len(ys) candidates.append({ 'text': text, 'center_x': center_x, 'center_y': center_y, 'x': min(xs), 'y': min(ys), 'width': max(xs) - min(xs), 'height': max(ys) - min(ys), 'confidence': confidence }) return candidates except Exception as e: print(f"OCR 失敗: {e}") return [] def filter_name_candidates(candidates: List[Dict]) -> List[Dict]: """過濾出可能是姓名的文字(2-4 個中文字,不含數字標點)""" names = [] for c in candidates: text = c['text'] # 移除空白和標點 text_clean = re.sub(r'[\s\:\:\,\,\.\。]', '', text) if CHINESE_NAME_PATTERN.match(text_clean): c['text_clean'] = text_clean names.append(c) return names def match_signature_to_name( sig: Dict, name_candidates: List[Dict], margin: int = NAME_SEARCH_MARGIN ) -> Optional[str]: """為簽名框配對最近的姓名候選""" sig_center_x = sig['center_x'] sig_center_y = sig['center_y'] # 過濾出在搜索範圍內的姓名 nearby_names = [] for name in name_candidates: dx = abs(name['center_x'] - sig_center_x) dy = abs(name['center_y'] - sig_center_y) # 在 margin 範圍內 if dx <= margin + sig['width']/2 and dy <= margin + sig['height']/2: distance = (dx**2 + dy**2) ** 0.5 nearby_names.append((name, distance)) if not nearby_names: return None # 返回距離最近的 nearby_names.sort(key=lambda x: x[1]) return nearby_names[0][0]['text_clean'] def get_pages_to_process(conn: sqlite3.Connection) -> List[Tuple[str, int, List[int]]]: """ 從資料庫獲取需要處理的 (PDF, page) 組合 Returns: List of (source_pdf, page_number, [signature_ids]) """ cursor = conn.cursor() # 查詢尚未有 accountant_name 的簽名,按 (PDF, page) 分組 cursor.execute(''' SELECT source_pdf, page_number, GROUP_CONCAT(signature_id) FROM signatures WHERE accountant_name IS NULL OR accountant_name = '' GROUP BY source_pdf, page_number ORDER BY source_pdf, page_number ''') pages = [] for row in cursor.fetchall(): source_pdf, page_number, sig_ids_str = row sig_ids = [int(x) for x in sig_ids_str.split(',')] pages.append((source_pdf, page_number, sig_ids)) return pages def update_signature_names( conn: sqlite3.Connection, updates: List[Tuple[int, str, int, int, int, int]] ): """ 更新資料庫中的簽名姓名和座標 Args: updates: List of (signature_id, accountant_name, x, y, width, height) """ cursor = conn.cursor() # 確保 signature_boxes 表存在 cursor.execute(''' CREATE TABLE IF NOT EXISTS signature_boxes ( signature_id INTEGER PRIMARY KEY, x INTEGER, y INTEGER, width INTEGER, height INTEGER, FOREIGN KEY (signature_id) REFERENCES signatures(signature_id) ) ''') for sig_id, name, x, y, w, h in updates: # 更新姓名 cursor.execute(''' UPDATE signatures SET accountant_name = ? WHERE signature_id = ? ''', (name, sig_id)) # 更新或插入座標 cursor.execute(''' INSERT OR REPLACE INTO signature_boxes (signature_id, x, y, width, height) VALUES (?, ?, ?, ?, ?) ''', (sig_id, x, y, w, h)) conn.commit() def process_page( source_pdf: str, page_number: int, sig_ids: List[int], yolo_model, ocr_client: PaddleOCRClient, conn: sqlite3.Connection ) -> Dict: """ 處理單一頁面:偵測簽名框、提取姓名、配對 Returns: 處理結果統計 """ result = { 'source_pdf': source_pdf, 'page_number': page_number, 'num_signatures': len(sig_ids), 'matched': 0, 'unmatched': 0, 'error': None } # 找 PDF 檔案 pdf_path = find_pdf_file(source_pdf) if pdf_path is None: result['error'] = 'PDF not found' return result # 渲染頁面 image = render_pdf_page(pdf_path, page_number) if image is None: result['error'] = 'Render failed' return result # YOLO 偵測簽名框 sig_boxes = detect_signatures_yolo(image, yolo_model) if len(sig_boxes) != len(sig_ids): # 簽名數量不匹配,嘗試按順序配對 pass # OCR 提取文字 text_candidates = extract_text_candidates(image, ocr_client) # 過濾出姓名候選 name_candidates = filter_name_candidates(text_candidates) # 配對簽名與姓名 updates = [] for i, (sig_id, sig_box) in enumerate(zip(sig_ids, sig_boxes)): matched_name = match_signature_to_name(sig_box, name_candidates) if matched_name: result['matched'] += 1 else: result['unmatched'] += 1 matched_name = '' # 空字串表示未配對 updates.append(( sig_id, matched_name, sig_box['x'], sig_box['y'], sig_box['width'], sig_box['height'] )) # 如果 YOLO 偵測數量少於記錄數量,處理剩餘的 if len(sig_boxes) < len(sig_ids): for sig_id in sig_ids[len(sig_boxes):]: updates.append((sig_id, '', 0, 0, 0, 0)) result['unmatched'] += 1 # 更新資料庫 update_signature_names(conn, updates) return result def main(): print("=" * 60) print("Step 5: 從 PDF 提取會計師印刷姓名") print("=" * 60) # 確保報告目錄存在 REPORTS_PATH.mkdir(parents=True, exist_ok=True) # 連接資料庫 print("\n連接資料庫...") conn = sqlite3.connect(DB_PATH) # 獲取需要處理的頁面 print("查詢待處理頁面...") pages = get_pages_to_process(conn) print(f"共 {len(pages)} 個頁面待處理") if not pages: print("沒有需要處理的頁面") conn.close() return # 初始化 YOLO print("\n載入 YOLO 模型...") from ultralytics import YOLO yolo_model = YOLO(str(YOLO_MODEL_PATH)) # 初始化 OCR 客戶端 print("連接 PaddleOCR 伺服器...") ocr_client = PaddleOCRClient() if not ocr_client.health_check(): print("錯誤: PaddleOCR 伺服器無法連接") print("請確認伺服器 http://192.168.30.36:5555 正在運行") conn.close() return print("OCR 伺服器連接成功") # 統計 stats = { 'total_pages': len(pages), 'processed': 0, 'matched': 0, 'unmatched': 0, 'errors': 0, 'start_time': time.time() } # 處理每個頁面 print(f"\n開始處理 {len(pages)} 個頁面...") for source_pdf, page_number, sig_ids in tqdm(pages, desc="處理頁面"): result = process_page( source_pdf, page_number, sig_ids, yolo_model, ocr_client, conn ) stats['processed'] += 1 stats['matched'] += result['matched'] stats['unmatched'] += result['unmatched'] if result['error']: stats['errors'] += 1 # 定期保存進度報告 if stats['processed'] % PROGRESS_SAVE_INTERVAL == 0: elapsed = time.time() - stats['start_time'] rate = stats['processed'] / elapsed remaining = (stats['total_pages'] - stats['processed']) / rate if rate > 0 else 0 print(f"\n進度: {stats['processed']}/{stats['total_pages']} " f"({stats['processed']/stats['total_pages']*100:.1f}%)") print(f"配對成功: {stats['matched']}, 未配對: {stats['unmatched']}") print(f"預估剩餘時間: {remaining/60:.1f} 分鐘") # 最終統計 elapsed = time.time() - stats['start_time'] stats['elapsed_seconds'] = elapsed print("\n" + "=" * 60) print("處理完成") print("=" * 60) print(f"總頁面數: {stats['total_pages']}") print(f"處理成功: {stats['processed']}") print(f"配對成功: {stats['matched']}") print(f"未配對: {stats['unmatched']}") print(f"錯誤: {stats['errors']}") print(f"耗時: {elapsed/60:.1f} 分鐘") # 保存報告 report_path = REPORTS_PATH / "name_extraction_report.json" with open(report_path, 'w', encoding='utf-8') as f: json.dump(stats, f, indent=2, ensure_ascii=False) print(f"\n報告已儲存: {report_path}") conn.close() if __name__ == "__main__": main()