Files
pdf_signature_extraction/signature_analysis/05_extract_names_full.py
gbanyan 939a348da4 Add Paper A (IEEE TAI) complete draft with Firm A-calibrated dual-method classification
Paper draft includes all sections (Abstract through Conclusion), 36 references,
and supporting scripts. Key methodology: Cosine similarity + dHash dual-method
verification with thresholds calibrated against known-replication firm (Firm A).

Includes:
- 8 section markdown files (paper_a_*.md)
- Ablation study script (ResNet-50 vs VGG-16 vs EfficientNet-B0)
- Recalibrated classification script (84,386 PDFs, 5-tier system)
- Figure generation and Word export scripts
- Citation renumbering script ([1]-[36])
- Signature analysis pipeline (12 steps)
- YOLO extraction scripts

Three rounds of AI review completed (GPT-5.4, Claude Opus 4.6, Gemini 3 Pro).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-06 23:05:33 +08:00

403 lines
13 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
Step 5: 從 PDF 提取會計師姓名 - 完整處理版本
流程:
1. 從資料庫讀取簽名記錄,按 (PDF, page) 分組
2. 對每個頁面重新執行 YOLO 獲取簽名框座標
3. 對整頁執行 PaddleOCR 提取文字
4. 過濾出候選姓名(2-4 個中文字)
5. 配對簽名與最近的姓名
6. 更新資料庫並生成報告
"""
import sqlite3
import json
import re
import sys
import time
from pathlib import Path
from typing import Optional, List, Dict, Tuple
from collections import defaultdict
from datetime import datetime
from tqdm import tqdm
import numpy as np
import fitz # PyMuPDF
# 加入父目錄到路徑
sys.path.insert(0, str(Path(__file__).parent.parent))
from paddleocr_client import PaddleOCRClient
# 路徑配置
PDF_BASE = Path("/Volumes/NV2/PDF-Processing/total-pdf")
YOLO_MODEL_PATH = Path("/Volumes/NV2/pdf_recognize/models/best.pt")
DB_PATH = Path("/Volumes/NV2/PDF-Processing/signature-analysis/signature_analysis.db")
REPORTS_PATH = Path("/Volumes/NV2/PDF-Processing/signature-analysis/reports")
# 處理配置
DPI = 150
CONFIDENCE_THRESHOLD = 0.5
NAME_SEARCH_MARGIN = 200
PROGRESS_SAVE_INTERVAL = 100
BATCH_COMMIT_SIZE = 50
# 中文姓名正則
CHINESE_NAME_PATTERN = re.compile(r'^[\u4e00-\u9fff]{2,4}$')
# 排除的常見詞
EXCLUDE_WORDS = {'會計', '會計師', '事務所', '', '聯合', '出具報告'}
def find_pdf_file(filename: str) -> Optional[str]:
"""搜尋 PDF 檔案路徑"""
for batch_dir in sorted(PDF_BASE.glob("batch_*")):
pdf_path = batch_dir / filename
if pdf_path.exists():
return str(pdf_path)
pdf_path = PDF_BASE / filename
if pdf_path.exists():
return str(pdf_path)
return None
def render_pdf_page(pdf_path: str, page_num: int) -> Optional[np.ndarray]:
"""渲染 PDF 頁面為圖像"""
try:
doc = fitz.open(pdf_path)
if page_num < 1 or page_num > len(doc):
doc.close()
return None
page = doc[page_num - 1]
mat = fitz.Matrix(DPI / 72, DPI / 72)
pix = page.get_pixmap(matrix=mat, alpha=False)
image = np.frombuffer(pix.samples, dtype=np.uint8)
image = image.reshape(pix.height, pix.width, pix.n)
doc.close()
return image
except Exception:
return None
def detect_signatures_yolo(image: np.ndarray, model) -> List[Dict]:
"""使用 YOLO 偵測簽名框"""
results = model(image, conf=CONFIDENCE_THRESHOLD, verbose=False)
signatures = []
for r in results:
for box in r.boxes:
x1, y1, x2, y2 = map(int, box.xyxy[0].cpu().numpy())
conf = float(box.conf[0].cpu().numpy())
signatures.append({
'x': x1, 'y': y1,
'width': x2 - x1, 'height': y2 - y1,
'confidence': conf,
'center_x': (x1 + x2) / 2,
'center_y': (y1 + y2) / 2
})
signatures.sort(key=lambda s: (s['y'], s['x']))
return signatures
def extract_and_filter_names(image: np.ndarray, ocr_client: PaddleOCRClient) -> List[Dict]:
"""從圖像提取並過濾姓名候選"""
try:
results = ocr_client.ocr(image)
except Exception:
return []
candidates = []
for result in results:
text = result.get('text', '').strip()
box = result.get('box', [])
if not box or not text:
continue
# 清理文字
text_clean = re.sub(r'[\s\:\\,\\.\\、]', '', text)
# 檢查是否為姓名候選
if CHINESE_NAME_PATTERN.match(text_clean) and text_clean not in EXCLUDE_WORDS:
xs = [point[0] for point in box]
ys = [point[1] for point in box]
candidates.append({
'text': text_clean,
'center_x': sum(xs) / len(xs),
'center_y': sum(ys) / len(ys),
})
return candidates
def match_signature_to_name(sig: Dict, name_candidates: List[Dict]) -> Optional[str]:
"""為簽名框配對最近的姓名"""
margin = NAME_SEARCH_MARGIN
nearby = []
for name in name_candidates:
dx = abs(name['center_x'] - sig['center_x'])
dy = abs(name['center_y'] - sig['center_y'])
if dx <= margin + sig['width']/2 and dy <= margin + sig['height']/2:
distance = (dx**2 + dy**2) ** 0.5
nearby.append((name['text'], distance))
if nearby:
nearby.sort(key=lambda x: x[1])
return nearby[0][0]
return None
def get_pages_to_process(conn: sqlite3.Connection) -> List[Tuple[str, int, List[int]]]:
"""從資料庫獲取需要處理的頁面"""
cursor = conn.cursor()
cursor.execute('''
SELECT source_pdf, page_number, GROUP_CONCAT(signature_id)
FROM signatures
WHERE accountant_name IS NULL OR accountant_name = ''
GROUP BY source_pdf, page_number
ORDER BY source_pdf, page_number
''')
pages = []
for row in cursor.fetchall():
source_pdf, page_number, sig_ids_str = row
sig_ids = [int(x) for x in sig_ids_str.split(',')]
pages.append((source_pdf, page_number, sig_ids))
return pages
def process_page(
source_pdf: str, page_number: int, sig_ids: List[int],
yolo_model, ocr_client: PaddleOCRClient
) -> Dict:
"""處理單一頁面"""
result = {
'source_pdf': source_pdf,
'page_number': page_number,
'num_signatures': len(sig_ids),
'matched': 0,
'unmatched': 0,
'error': None,
'updates': []
}
pdf_path = find_pdf_file(source_pdf)
if pdf_path is None:
result['error'] = 'PDF not found'
return result
image = render_pdf_page(pdf_path, page_number)
if image is None:
result['error'] = 'Render failed'
return result
sig_boxes = detect_signatures_yolo(image, yolo_model)
name_candidates = extract_and_filter_names(image, ocr_client)
for i, sig_id in enumerate(sig_ids):
if i < len(sig_boxes):
sig = sig_boxes[i]
matched_name = match_signature_to_name(sig, name_candidates)
if matched_name:
result['matched'] += 1
else:
result['unmatched'] += 1
matched_name = ''
result['updates'].append((
sig_id, matched_name,
sig['x'], sig['y'], sig['width'], sig['height']
))
else:
result['updates'].append((sig_id, '', 0, 0, 0, 0))
result['unmatched'] += 1
return result
def save_updates_to_db(conn: sqlite3.Connection, updates: List[Tuple]):
"""批次更新資料庫"""
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS signature_boxes (
signature_id INTEGER PRIMARY KEY,
x INTEGER, y INTEGER, width INTEGER, height INTEGER,
FOREIGN KEY (signature_id) REFERENCES signatures(signature_id)
)
''')
for sig_id, name, x, y, w, h in updates:
cursor.execute('UPDATE signatures SET accountant_name = ? WHERE signature_id = ?', (name, sig_id))
if x > 0: # 有座標才存
cursor.execute('''
INSERT OR REPLACE INTO signature_boxes (signature_id, x, y, width, height)
VALUES (?, ?, ?, ?, ?)
''', (sig_id, x, y, w, h))
conn.commit()
def generate_report(stats: Dict, output_path: Path):
"""生成處理報告"""
report = {
'title': '會計師姓名提取報告',
'generated_at': datetime.now().isoformat(),
'summary': {
'total_pages': stats['total_pages'],
'processed_pages': stats['processed'],
'total_signatures': stats['total_sigs'],
'matched_signatures': stats['matched'],
'unmatched_signatures': stats['unmatched'],
'match_rate': f"{stats['matched']/stats['total_sigs']*100:.1f}%" if stats['total_sigs'] > 0 else "N/A",
'errors': stats['errors'],
'elapsed_seconds': stats['elapsed_seconds'],
'elapsed_human': f"{stats['elapsed_seconds']/3600:.1f} 小時"
},
'methodology': {
'step1': 'YOLO 模型偵測簽名框座標',
'step2': 'PaddleOCR 整頁 OCR 提取文字',
'step3': '過濾 2-4 個中文字作為姓名候選',
'step4': f'在簽名框周圍 {NAME_SEARCH_MARGIN}px 範圍內配對最近的姓名',
'dpi': DPI,
'yolo_confidence': CONFIDENCE_THRESHOLD
},
'name_distribution': stats.get('name_distribution', {}),
'error_samples': stats.get('error_samples', [])
}
with open(output_path, 'w', encoding='utf-8') as f:
json.dump(report, f, indent=2, ensure_ascii=False)
# 同時生成 Markdown 報告
md_path = output_path.with_suffix('.md')
with open(md_path, 'w', encoding='utf-8') as f:
f.write(f"# {report['title']}\n\n")
f.write(f"生成時間: {report['generated_at']}\n\n")
f.write("## 摘要\n\n")
f.write(f"| 指標 | 數值 |\n|------|------|\n")
for k, v in report['summary'].items():
f.write(f"| {k} | {v} |\n")
f.write("\n## 方法論\n\n")
for k, v in report['methodology'].items():
f.write(f"- **{k}**: {v}\n")
f.write("\n## 姓名分布 (Top 50)\n\n")
names = sorted(report['name_distribution'].items(), key=lambda x: -x[1])[:50]
for name, count in names:
f.write(f"- {name}: {count}\n")
return report
def main():
print("=" * 70)
print("Step 5: 從 PDF 提取會計師姓名 - 完整處理")
print("=" * 70)
print(f"開始時間: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
REPORTS_PATH.mkdir(parents=True, exist_ok=True)
# 連接資料庫
conn = sqlite3.connect(DB_PATH)
pages = get_pages_to_process(conn)
print(f"\n待處理頁面: {len(pages):,}")
if not pages:
print("沒有需要處理的頁面")
conn.close()
return
# 載入 YOLO
print("\n載入 YOLO 模型...")
from ultralytics import YOLO
yolo_model = YOLO(str(YOLO_MODEL_PATH))
# 連接 OCR
print("連接 PaddleOCR 伺服器...")
ocr_client = PaddleOCRClient()
if not ocr_client.health_check():
print("錯誤: PaddleOCR 伺服器無法連接")
conn.close()
return
print("OCR 伺服器連接成功\n")
# 統計
stats = {
'total_pages': len(pages),
'processed': 0,
'total_sigs': sum(len(p[2]) for p in pages),
'matched': 0,
'unmatched': 0,
'errors': 0,
'error_samples': [],
'name_distribution': defaultdict(int),
'start_time': time.time()
}
all_updates = []
# 處理每個頁面
for source_pdf, page_number, sig_ids in tqdm(pages, desc="處理頁面"):
result = process_page(source_pdf, page_number, sig_ids, yolo_model, ocr_client)
stats['processed'] += 1
stats['matched'] += result['matched']
stats['unmatched'] += result['unmatched']
if result['error']:
stats['errors'] += 1
if len(stats['error_samples']) < 20:
stats['error_samples'].append({
'pdf': source_pdf,
'page': page_number,
'error': result['error']
})
else:
all_updates.extend(result['updates'])
for update in result['updates']:
if update[1]: # 有姓名
stats['name_distribution'][update[1]] += 1
# 批次提交
if len(all_updates) >= BATCH_COMMIT_SIZE:
save_updates_to_db(conn, all_updates)
all_updates = []
# 定期顯示進度
if stats['processed'] % PROGRESS_SAVE_INTERVAL == 0:
elapsed = time.time() - stats['start_time']
rate = stats['processed'] / elapsed
remaining = (stats['total_pages'] - stats['processed']) / rate if rate > 0 else 0
print(f"\n[進度] {stats['processed']:,}/{stats['total_pages']:,} "
f"({stats['processed']/stats['total_pages']*100:.1f}%) | "
f"配對: {stats['matched']:,} | "
f"剩餘: {remaining/60:.1f} 分鐘")
# 最後一批提交
if all_updates:
save_updates_to_db(conn, all_updates)
stats['elapsed_seconds'] = time.time() - stats['start_time']
stats['name_distribution'] = dict(stats['name_distribution'])
# 生成報告
print("\n生成報告...")
report_path = REPORTS_PATH / "name_extraction_report.json"
generate_report(stats, report_path)
print("\n" + "=" * 70)
print("處理完成!")
print("=" * 70)
print(f"總頁面: {stats['total_pages']:,}")
print(f"總簽名: {stats['total_sigs']:,}")
print(f"配對成功: {stats['matched']:,} ({stats['matched']/stats['total_sigs']*100:.1f}%)")
print(f"未配對: {stats['unmatched']:,}")
print(f"錯誤: {stats['errors']:,}")
print(f"耗時: {stats['elapsed_seconds']/3600:.2f} 小時")
print(f"\n報告已儲存:")
print(f" - {report_path}")
print(f" - {report_path.with_suffix('.md')}")
conn.close()
if __name__ == "__main__":
main()