#!/usr/bin/env python3 """ 簽名清理與會計師歸檔 1. 標記 sig_count > 2 的 PDF,篩選最佳 2 個簽名 2. 用 OCR 或座標歸檔到會計師 3. 建立 accountants 表 """ import sqlite3 import json from collections import defaultdict from datetime import datetime from opencc import OpenCC # 簡繁轉換 cc_s2t = OpenCC('s2t') DB_PATH = '/Volumes/NV2/PDF-Processing/signature-analysis/signature_analysis.db' REPORT_DIR = '/Volumes/NV2/PDF-Processing/signature-analysis/reports' def get_connection(): conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row return conn def add_columns_if_needed(conn): """添加新欄位""" cur = conn.cursor() # 檢查現有欄位 cur.execute("PRAGMA table_info(signatures)") columns = [row[1] for row in cur.fetchall()] if 'is_valid' not in columns: cur.execute("ALTER TABLE signatures ADD COLUMN is_valid INTEGER DEFAULT 1") print("已添加 is_valid 欄位") if 'assigned_accountant' not in columns: cur.execute("ALTER TABLE signatures ADD COLUMN assigned_accountant TEXT") print("已添加 assigned_accountant 欄位") conn.commit() def create_accountants_table(conn): """建立 accountants 表""" cur = conn.cursor() cur.execute(""" CREATE TABLE IF NOT EXISTS accountants ( accountant_id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT UNIQUE NOT NULL, signature_count INTEGER DEFAULT 0, firm TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """) conn.commit() print("accountants 表已建立") def get_pdf_signatures(conn): """取得每份 PDF 的簽名資料""" cur = conn.cursor() cur.execute(""" SELECT s.signature_id, s.source_pdf, s.page_number, s.accountant_name, s.excel_accountant1, s.excel_accountant2, s.excel_firm, sb.x, sb.y, sb.width, sb.height FROM signatures s LEFT JOIN signature_boxes sb ON s.signature_id = sb.signature_id ORDER BY s.source_pdf, s.page_number, sb.y """) pdf_sigs = defaultdict(list) for row in cur.fetchall(): pdf_sigs[row['source_pdf']].append(dict(row)) return pdf_sigs def normalize_name(name): """正規化姓名(簡轉繁)""" if not name: return None return cc_s2t.convert(name) def names_match(ocr_name, excel_name): """檢查 OCR 姓名是否與 Excel 姓名匹配""" if not ocr_name or not excel_name: return False # 精確匹配 if ocr_name == excel_name: return True # 簡繁轉換後匹配 ocr_trad = normalize_name(ocr_name) if ocr_trad == excel_name: return True return False def score_signature(sig, excel_acc1, excel_acc2): """為簽名評分""" score = 0 ocr_name = sig.get('accountant_name', '') # 1. OCR 姓名匹配 (+100) if names_match(ocr_name, excel_acc1) or names_match(ocr_name, excel_acc2): score += 100 # 2. 合理尺寸 (+20) width = sig.get('width', 0) or 0 height = sig.get('height', 0) or 0 if 30 < width < 500 and 20 < height < 200: score += 20 # 3. 頁面位置 - Y 座標越大分數越高 (最多 +15) y = sig.get('y', 0) or 0 score += min(y / 100, 15) # 4. 如果尺寸過大(可能是印章),扣分 if width > 300 or height > 150: score -= 30 return score def select_best_two(signatures, excel_acc1, excel_acc2): """選擇最佳的 2 個簽名""" if len(signatures) <= 2: return signatures scored = [] for sig in signatures: score = score_signature(sig, excel_acc1, excel_acc2) scored.append((sig, score)) # 按分數排序 scored.sort(key=lambda x: -x[1]) # 取前 2 個 return [s[0] for s in scored[:2]] def assign_to_accountant(sig1, sig2, excel_acc1, excel_acc2): """將簽名歸檔到會計師""" ocr1 = sig1.get('accountant_name', '') ocr2 = sig2.get('accountant_name', '') # 方法 A: OCR 姓名匹配 if names_match(ocr1, excel_acc1): return [(sig1, excel_acc1), (sig2, excel_acc2)] elif names_match(ocr1, excel_acc2): return [(sig1, excel_acc2), (sig2, excel_acc1)] elif names_match(ocr2, excel_acc1): return [(sig1, excel_acc2), (sig2, excel_acc1)] elif names_match(ocr2, excel_acc2): return [(sig1, excel_acc1), (sig2, excel_acc2)] # 方法 B: 按 Y 座標(假設會計師1 在上) y1 = sig1.get('y', 0) or 0 y2 = sig2.get('y', 0) or 0 if y1 <= y2: return [(sig1, excel_acc1), (sig2, excel_acc2)] else: return [(sig1, excel_acc2), (sig2, excel_acc1)] def process_all_pdfs(conn): """處理所有 PDF""" print("正在載入簽名資料...") pdf_sigs = get_pdf_signatures(conn) print(f"共 {len(pdf_sigs)} 份 PDF") cur = conn.cursor() stats = { 'total_pdfs': len(pdf_sigs), 'sig_count_1': 0, 'sig_count_2': 0, 'sig_count_gt2': 0, 'valid_signatures': 0, 'invalid_signatures': 0, 'ocr_matched': 0, 'y_coordinate_assigned': 0, 'no_excel_data': 0, } assignments = [] # (signature_id, assigned_accountant, is_valid) for pdf_name, sigs in pdf_sigs.items(): sig_count = len(sigs) excel_acc1 = sigs[0].get('excel_accountant1') if sigs else None excel_acc2 = sigs[0].get('excel_accountant2') if sigs else None if not excel_acc1 and not excel_acc2: # 無 Excel 資料 stats['no_excel_data'] += 1 for sig in sigs: assignments.append((sig['signature_id'], None, 1)) continue if sig_count == 1: stats['sig_count_1'] += 1 # 只有 1 個簽名,保留但無法確定是哪位會計師 sig = sigs[0] ocr_name = sig.get('accountant_name', '') if names_match(ocr_name, excel_acc1): assignments.append((sig['signature_id'], excel_acc1, 1)) stats['ocr_matched'] += 1 elif names_match(ocr_name, excel_acc2): assignments.append((sig['signature_id'], excel_acc2, 1)) stats['ocr_matched'] += 1 else: # 無法確定,暫時不指派 assignments.append((sig['signature_id'], None, 1)) stats['valid_signatures'] += 1 elif sig_count == 2: stats['sig_count_2'] += 1 # 正常情況 sig1, sig2 = sigs[0], sigs[1] pairs = assign_to_accountant(sig1, sig2, excel_acc1, excel_acc2) for sig, acc in pairs: assignments.append((sig['signature_id'], acc, 1)) stats['valid_signatures'] += 1 # 統計匹配方式 ocr_name = sig.get('accountant_name', '') if names_match(ocr_name, acc): stats['ocr_matched'] += 1 else: stats['y_coordinate_assigned'] += 1 else: stats['sig_count_gt2'] += 1 # 需要篩選 best_two = select_best_two(sigs, excel_acc1, excel_acc2) # 標記有效/無效 valid_ids = {s['signature_id'] for s in best_two} for sig in sigs: if sig['signature_id'] in valid_ids: is_valid = 1 stats['valid_signatures'] += 1 else: is_valid = 0 stats['invalid_signatures'] += 1 assignments.append((sig['signature_id'], None, is_valid)) # 歸檔有效的 2 個 if len(best_two) == 2: sig1, sig2 = best_two[0], best_two[1] pairs = assign_to_accountant(sig1, sig2, excel_acc1, excel_acc2) for sig, acc in pairs: assignments.append((sig['signature_id'], acc, 1)) ocr_name = sig.get('accountant_name', '') if names_match(ocr_name, acc): stats['ocr_matched'] += 1 else: stats['y_coordinate_assigned'] += 1 elif len(best_two) == 1: sig = best_two[0] ocr_name = sig.get('accountant_name', '') if names_match(ocr_name, excel_acc1): assignments.append((sig['signature_id'], excel_acc1, 1)) elif names_match(ocr_name, excel_acc2): assignments.append((sig['signature_id'], excel_acc2, 1)) else: assignments.append((sig['signature_id'], None, 1)) # 批量更新資料庫 print(f"正在更新 {len(assignments)} 筆簽名...") for sig_id, acc, is_valid in assignments: cur.execute(""" UPDATE signatures SET assigned_accountant = ?, is_valid = ? WHERE signature_id = ? """, (acc, is_valid, sig_id)) conn.commit() return stats def build_accountants_table(conn): """建立會計師表""" cur = conn.cursor() # 清空現有資料 cur.execute("DELETE FROM accountants") # 收集所有會計師姓名 cur.execute(""" SELECT assigned_accountant, excel_firm, COUNT(*) as cnt FROM signatures WHERE assigned_accountant IS NOT NULL AND is_valid = 1 GROUP BY assigned_accountant """) accountants = {} for row in cur.fetchall(): name = row[0] firm = row[1] count = row[2] if name not in accountants: accountants[name] = {'count': 0, 'firms': defaultdict(int)} accountants[name]['count'] += count if firm: accountants[name]['firms'][firm] += count # 插入 accountants 表 for name, data in accountants.items(): # 找出最常見的事務所 main_firm = None if data['firms']: main_firm = max(data['firms'].items(), key=lambda x: x[1])[0] cur.execute(""" INSERT INTO accountants (name, signature_count, firm) VALUES (?, ?, ?) """, (name, data['count'], main_firm)) conn.commit() # 更新 signatures 的 accountant_id cur.execute(""" UPDATE signatures SET accountant_id = ( SELECT accountant_id FROM accountants WHERE accountants.name = signatures.assigned_accountant ) WHERE assigned_accountant IS NOT NULL """) conn.commit() return len(accountants) def generate_report(stats, accountant_count): """生成報告""" report = { 'generated_at': datetime.now().isoformat(), 'summary': { 'total_pdfs': stats['total_pdfs'], 'pdfs_with_1_sig': stats['sig_count_1'], 'pdfs_with_2_sigs': stats['sig_count_2'], 'pdfs_with_gt2_sigs': stats['sig_count_gt2'], 'pdfs_without_excel': stats['no_excel_data'], }, 'signatures': { 'valid': stats['valid_signatures'], 'invalid': stats['invalid_signatures'], 'total': stats['valid_signatures'] + stats['invalid_signatures'], }, 'assignment_method': { 'ocr_matched': stats['ocr_matched'], 'y_coordinate': stats['y_coordinate_assigned'], }, 'accountants': { 'total_unique': accountant_count, } } # 儲存 JSON json_path = f"{REPORT_DIR}/signature_cleanup_report.json" with open(json_path, 'w', encoding='utf-8') as f: json.dump(report, f, ensure_ascii=False, indent=2) # 儲存 Markdown md_path = f"{REPORT_DIR}/signature_cleanup_report.md" with open(md_path, 'w', encoding='utf-8') as f: f.write("# 簽名清理與歸檔報告\n\n") f.write(f"生成時間: {report['generated_at']}\n\n") f.write("## PDF 分布\n\n") f.write("| 類型 | 數量 |\n") f.write("|------|------|\n") f.write(f"| 總 PDF 數 | {stats['total_pdfs']} |\n") f.write(f"| 1 個簽名 | {stats['sig_count_1']} |\n") f.write(f"| 2 個簽名 (正常) | {stats['sig_count_2']} |\n") f.write(f"| >2 個簽名 (需篩選) | {stats['sig_count_gt2']} |\n") f.write(f"| 無 Excel 資料 | {stats['no_excel_data']} |\n") f.write("\n## 簽名統計\n\n") f.write("| 類型 | 數量 |\n") f.write("|------|------|\n") f.write(f"| 有效簽名 | {stats['valid_signatures']} |\n") f.write(f"| 無效簽名 (誤判) | {stats['invalid_signatures']} |\n") f.write("\n## 歸檔方式\n\n") f.write("| 方式 | 數量 |\n") f.write("|------|------|\n") f.write(f"| OCR 姓名匹配 | {stats['ocr_matched']} |\n") f.write(f"| Y 座標推斷 | {stats['y_coordinate_assigned']} |\n") f.write(f"\n## 會計師\n\n") f.write(f"唯一會計師數: **{accountant_count}**\n") print(f"報告已儲存: {json_path}") print(f"報告已儲存: {md_path}") return report def main(): print("=" * 60) print("簽名清理與會計師歸檔") print("=" * 60) conn = get_connection() # 1. 準備資料庫 print("\n[1/4] 準備資料庫...") add_columns_if_needed(conn) create_accountants_table(conn) # 2. 處理所有 PDF print("\n[2/4] 處理 PDF 簽名...") stats = process_all_pdfs(conn) # 3. 建立 accountants 表 print("\n[3/4] 建立會計師表...") accountant_count = build_accountants_table(conn) # 4. 生成報告 print("\n[4/4] 生成報告...") report = generate_report(stats, accountant_count) conn.close() print("\n" + "=" * 60) print("完成!") print("=" * 60) print(f"有效簽名: {stats['valid_signatures']}") print(f"無效簽名: {stats['invalid_signatures']}") print(f"唯一會計師: {accountant_count}") if __name__ == '__main__': main()