#!/usr/bin/env python3 """ Step 5: 從 PDF 提取會計師姓名 - 完整處理版本 流程: 1. 從資料庫讀取簽名記錄,按 (PDF, page) 分組 2. 對每個頁面重新執行 YOLO 獲取簽名框座標 3. 對整頁執行 PaddleOCR 提取文字 4. 過濾出候選姓名(2-4 個中文字) 5. 配對簽名與最近的姓名 6. 更新資料庫並生成報告 """ import sqlite3 import json import re import sys import time from pathlib import Path from typing import Optional, List, Dict, Tuple from collections import defaultdict from datetime import datetime from tqdm import tqdm import numpy as np import fitz # PyMuPDF # 加入父目錄到路徑 sys.path.insert(0, str(Path(__file__).parent.parent)) from paddleocr_client import PaddleOCRClient # 路徑配置 PDF_BASE = Path("/Volumes/NV2/PDF-Processing/total-pdf") YOLO_MODEL_PATH = Path("/Volumes/NV2/pdf_recognize/models/best.pt") DB_PATH = Path("/Volumes/NV2/PDF-Processing/signature-analysis/signature_analysis.db") REPORTS_PATH = Path("/Volumes/NV2/PDF-Processing/signature-analysis/reports") # 處理配置 DPI = 150 CONFIDENCE_THRESHOLD = 0.5 NAME_SEARCH_MARGIN = 200 PROGRESS_SAVE_INTERVAL = 100 BATCH_COMMIT_SIZE = 50 # 中文姓名正則 CHINESE_NAME_PATTERN = re.compile(r'^[\u4e00-\u9fff]{2,4}$') # 排除的常見詞 EXCLUDE_WORDS = {'會計', '會計師', '事務所', '師', '聯合', '出具報告'} def find_pdf_file(filename: str) -> Optional[str]: """搜尋 PDF 檔案路徑""" for batch_dir in sorted(PDF_BASE.glob("batch_*")): pdf_path = batch_dir / filename if pdf_path.exists(): return str(pdf_path) pdf_path = PDF_BASE / filename if pdf_path.exists(): return str(pdf_path) return None def render_pdf_page(pdf_path: str, page_num: int) -> Optional[np.ndarray]: """渲染 PDF 頁面為圖像""" try: doc = fitz.open(pdf_path) if page_num < 1 or page_num > len(doc): doc.close() return None page = doc[page_num - 1] mat = fitz.Matrix(DPI / 72, DPI / 72) pix = page.get_pixmap(matrix=mat, alpha=False) image = np.frombuffer(pix.samples, dtype=np.uint8) image = image.reshape(pix.height, pix.width, pix.n) doc.close() return image except Exception: return None def detect_signatures_yolo(image: np.ndarray, model) -> List[Dict]: """使用 YOLO 偵測簽名框""" results = model(image, conf=CONFIDENCE_THRESHOLD, verbose=False) signatures = [] for r in results: for box in r.boxes: x1, y1, x2, y2 = map(int, box.xyxy[0].cpu().numpy()) conf = float(box.conf[0].cpu().numpy()) signatures.append({ 'x': x1, 'y': y1, 'width': x2 - x1, 'height': y2 - y1, 'confidence': conf, 'center_x': (x1 + x2) / 2, 'center_y': (y1 + y2) / 2 }) signatures.sort(key=lambda s: (s['y'], s['x'])) return signatures def extract_and_filter_names(image: np.ndarray, ocr_client: PaddleOCRClient) -> List[Dict]: """從圖像提取並過濾姓名候選""" try: results = ocr_client.ocr(image) except Exception: return [] candidates = [] for result in results: text = result.get('text', '').strip() box = result.get('box', []) if not box or not text: continue # 清理文字 text_clean = re.sub(r'[\s\:\:\,\,\.\。\、]', '', text) # 檢查是否為姓名候選 if CHINESE_NAME_PATTERN.match(text_clean) and text_clean not in EXCLUDE_WORDS: xs = [point[0] for point in box] ys = [point[1] for point in box] candidates.append({ 'text': text_clean, 'center_x': sum(xs) / len(xs), 'center_y': sum(ys) / len(ys), }) return candidates def match_signature_to_name(sig: Dict, name_candidates: List[Dict]) -> Optional[str]: """為簽名框配對最近的姓名""" margin = NAME_SEARCH_MARGIN nearby = [] for name in name_candidates: dx = abs(name['center_x'] - sig['center_x']) dy = abs(name['center_y'] - sig['center_y']) if dx <= margin + sig['width']/2 and dy <= margin + sig['height']/2: distance = (dx**2 + dy**2) ** 0.5 nearby.append((name['text'], distance)) if nearby: nearby.sort(key=lambda x: x[1]) return nearby[0][0] return None def get_pages_to_process(conn: sqlite3.Connection) -> List[Tuple[str, int, List[int]]]: """從資料庫獲取需要處理的頁面""" cursor = conn.cursor() cursor.execute(''' SELECT source_pdf, page_number, GROUP_CONCAT(signature_id) FROM signatures WHERE accountant_name IS NULL OR accountant_name = '' GROUP BY source_pdf, page_number ORDER BY source_pdf, page_number ''') pages = [] for row in cursor.fetchall(): source_pdf, page_number, sig_ids_str = row sig_ids = [int(x) for x in sig_ids_str.split(',')] pages.append((source_pdf, page_number, sig_ids)) return pages def process_page( source_pdf: str, page_number: int, sig_ids: List[int], yolo_model, ocr_client: PaddleOCRClient ) -> Dict: """處理單一頁面""" result = { 'source_pdf': source_pdf, 'page_number': page_number, 'num_signatures': len(sig_ids), 'matched': 0, 'unmatched': 0, 'error': None, 'updates': [] } pdf_path = find_pdf_file(source_pdf) if pdf_path is None: result['error'] = 'PDF not found' return result image = render_pdf_page(pdf_path, page_number) if image is None: result['error'] = 'Render failed' return result sig_boxes = detect_signatures_yolo(image, yolo_model) name_candidates = extract_and_filter_names(image, ocr_client) for i, sig_id in enumerate(sig_ids): if i < len(sig_boxes): sig = sig_boxes[i] matched_name = match_signature_to_name(sig, name_candidates) if matched_name: result['matched'] += 1 else: result['unmatched'] += 1 matched_name = '' result['updates'].append(( sig_id, matched_name, sig['x'], sig['y'], sig['width'], sig['height'] )) else: result['updates'].append((sig_id, '', 0, 0, 0, 0)) result['unmatched'] += 1 return result def save_updates_to_db(conn: sqlite3.Connection, updates: List[Tuple]): """批次更新資料庫""" cursor = conn.cursor() cursor.execute(''' CREATE TABLE IF NOT EXISTS signature_boxes ( signature_id INTEGER PRIMARY KEY, x INTEGER, y INTEGER, width INTEGER, height INTEGER, FOREIGN KEY (signature_id) REFERENCES signatures(signature_id) ) ''') for sig_id, name, x, y, w, h in updates: cursor.execute('UPDATE signatures SET accountant_name = ? WHERE signature_id = ?', (name, sig_id)) if x > 0: # 有座標才存 cursor.execute(''' INSERT OR REPLACE INTO signature_boxes (signature_id, x, y, width, height) VALUES (?, ?, ?, ?, ?) ''', (sig_id, x, y, w, h)) conn.commit() def generate_report(stats: Dict, output_path: Path): """生成處理報告""" report = { 'title': '會計師姓名提取報告', 'generated_at': datetime.now().isoformat(), 'summary': { 'total_pages': stats['total_pages'], 'processed_pages': stats['processed'], 'total_signatures': stats['total_sigs'], 'matched_signatures': stats['matched'], 'unmatched_signatures': stats['unmatched'], 'match_rate': f"{stats['matched']/stats['total_sigs']*100:.1f}%" if stats['total_sigs'] > 0 else "N/A", 'errors': stats['errors'], 'elapsed_seconds': stats['elapsed_seconds'], 'elapsed_human': f"{stats['elapsed_seconds']/3600:.1f} 小時" }, 'methodology': { 'step1': 'YOLO 模型偵測簽名框座標', 'step2': 'PaddleOCR 整頁 OCR 提取文字', 'step3': '過濾 2-4 個中文字作為姓名候選', 'step4': f'在簽名框周圍 {NAME_SEARCH_MARGIN}px 範圍內配對最近的姓名', 'dpi': DPI, 'yolo_confidence': CONFIDENCE_THRESHOLD }, 'name_distribution': stats.get('name_distribution', {}), 'error_samples': stats.get('error_samples', []) } with open(output_path, 'w', encoding='utf-8') as f: json.dump(report, f, indent=2, ensure_ascii=False) # 同時生成 Markdown 報告 md_path = output_path.with_suffix('.md') with open(md_path, 'w', encoding='utf-8') as f: f.write(f"# {report['title']}\n\n") f.write(f"生成時間: {report['generated_at']}\n\n") f.write("## 摘要\n\n") f.write(f"| 指標 | 數值 |\n|------|------|\n") for k, v in report['summary'].items(): f.write(f"| {k} | {v} |\n") f.write("\n## 方法論\n\n") for k, v in report['methodology'].items(): f.write(f"- **{k}**: {v}\n") f.write("\n## 姓名分布 (Top 50)\n\n") names = sorted(report['name_distribution'].items(), key=lambda x: -x[1])[:50] for name, count in names: f.write(f"- {name}: {count}\n") return report def main(): print("=" * 70) print("Step 5: 從 PDF 提取會計師姓名 - 完整處理") print("=" * 70) print(f"開始時間: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") REPORTS_PATH.mkdir(parents=True, exist_ok=True) # 連接資料庫 conn = sqlite3.connect(DB_PATH) pages = get_pages_to_process(conn) print(f"\n待處理頁面: {len(pages):,}") if not pages: print("沒有需要處理的頁面") conn.close() return # 載入 YOLO print("\n載入 YOLO 模型...") from ultralytics import YOLO yolo_model = YOLO(str(YOLO_MODEL_PATH)) # 連接 OCR print("連接 PaddleOCR 伺服器...") ocr_client = PaddleOCRClient() if not ocr_client.health_check(): print("錯誤: PaddleOCR 伺服器無法連接") conn.close() return print("OCR 伺服器連接成功\n") # 統計 stats = { 'total_pages': len(pages), 'processed': 0, 'total_sigs': sum(len(p[2]) for p in pages), 'matched': 0, 'unmatched': 0, 'errors': 0, 'error_samples': [], 'name_distribution': defaultdict(int), 'start_time': time.time() } all_updates = [] # 處理每個頁面 for source_pdf, page_number, sig_ids in tqdm(pages, desc="處理頁面"): result = process_page(source_pdf, page_number, sig_ids, yolo_model, ocr_client) stats['processed'] += 1 stats['matched'] += result['matched'] stats['unmatched'] += result['unmatched'] if result['error']: stats['errors'] += 1 if len(stats['error_samples']) < 20: stats['error_samples'].append({ 'pdf': source_pdf, 'page': page_number, 'error': result['error'] }) else: all_updates.extend(result['updates']) for update in result['updates']: if update[1]: # 有姓名 stats['name_distribution'][update[1]] += 1 # 批次提交 if len(all_updates) >= BATCH_COMMIT_SIZE: save_updates_to_db(conn, all_updates) all_updates = [] # 定期顯示進度 if stats['processed'] % PROGRESS_SAVE_INTERVAL == 0: elapsed = time.time() - stats['start_time'] rate = stats['processed'] / elapsed remaining = (stats['total_pages'] - stats['processed']) / rate if rate > 0 else 0 print(f"\n[進度] {stats['processed']:,}/{stats['total_pages']:,} " f"({stats['processed']/stats['total_pages']*100:.1f}%) | " f"配對: {stats['matched']:,} | " f"剩餘: {remaining/60:.1f} 分鐘") # 最後一批提交 if all_updates: save_updates_to_db(conn, all_updates) stats['elapsed_seconds'] = time.time() - stats['start_time'] stats['name_distribution'] = dict(stats['name_distribution']) # 生成報告 print("\n生成報告...") report_path = REPORTS_PATH / "name_extraction_report.json" generate_report(stats, report_path) print("\n" + "=" * 70) print("處理完成!") print("=" * 70) print(f"總頁面: {stats['total_pages']:,}") print(f"總簽名: {stats['total_sigs']:,}") print(f"配對成功: {stats['matched']:,} ({stats['matched']/stats['total_sigs']*100:.1f}%)") print(f"未配對: {stats['unmatched']:,}") print(f"錯誤: {stats['errors']:,}") print(f"耗時: {stats['elapsed_seconds']/3600:.2f} 小時") print(f"\n報告已儲存:") print(f" - {report_path}") print(f" - {report_path.with_suffix('.md')}") conn.close() if __name__ == "__main__": main()