Files
pdf_signature_extraction/signature_analysis/07_cleanup_and_assign.py
gbanyan 939a348da4 Add Paper A (IEEE TAI) complete draft with Firm A-calibrated dual-method classification
Paper draft includes all sections (Abstract through Conclusion), 36 references,
and supporting scripts. Key methodology: Cosine similarity + dHash dual-method
verification with thresholds calibrated against known-replication firm (Firm A).

Includes:
- 8 section markdown files (paper_a_*.md)
- Ablation study script (ResNet-50 vs VGG-16 vs EfficientNet-B0)
- Recalibrated classification script (84,386 PDFs, 5-tier system)
- Figure generation and Word export scripts
- Citation renumbering script ([1]-[36])
- Signature analysis pipeline (12 steps)
- YOLO extraction scripts

Three rounds of AI review completed (GPT-5.4, Claude Opus 4.6, Gemini 3 Pro).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-06 23:05:33 +08:00

451 lines
14 KiB
Python

#!/usr/bin/env python3
"""
簽名清理與會計師歸檔
1. 標記 sig_count > 2 的 PDF,篩選最佳 2 個簽名
2. 用 OCR 或座標歸檔到會計師
3. 建立 accountants 表
"""
import sqlite3
import json
from collections import defaultdict
from datetime import datetime
from opencc import OpenCC
# 簡繁轉換
cc_s2t = OpenCC('s2t')
DB_PATH = '/Volumes/NV2/PDF-Processing/signature-analysis/signature_analysis.db'
REPORT_DIR = '/Volumes/NV2/PDF-Processing/signature-analysis/reports'
def get_connection():
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
return conn
def add_columns_if_needed(conn):
"""添加新欄位"""
cur = conn.cursor()
# 檢查現有欄位
cur.execute("PRAGMA table_info(signatures)")
columns = [row[1] for row in cur.fetchall()]
if 'is_valid' not in columns:
cur.execute("ALTER TABLE signatures ADD COLUMN is_valid INTEGER DEFAULT 1")
print("已添加 is_valid 欄位")
if 'assigned_accountant' not in columns:
cur.execute("ALTER TABLE signatures ADD COLUMN assigned_accountant TEXT")
print("已添加 assigned_accountant 欄位")
conn.commit()
def create_accountants_table(conn):
"""建立 accountants 表"""
cur = conn.cursor()
cur.execute("""
CREATE TABLE IF NOT EXISTS accountants (
accountant_id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT UNIQUE NOT NULL,
signature_count INTEGER DEFAULT 0,
firm TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
conn.commit()
print("accountants 表已建立")
def get_pdf_signatures(conn):
"""取得每份 PDF 的簽名資料"""
cur = conn.cursor()
cur.execute("""
SELECT s.signature_id, s.source_pdf, s.page_number, s.accountant_name,
s.excel_accountant1, s.excel_accountant2, s.excel_firm,
sb.x, sb.y, sb.width, sb.height
FROM signatures s
LEFT JOIN signature_boxes sb ON s.signature_id = sb.signature_id
ORDER BY s.source_pdf, s.page_number, sb.y
""")
pdf_sigs = defaultdict(list)
for row in cur.fetchall():
pdf_sigs[row['source_pdf']].append(dict(row))
return pdf_sigs
def normalize_name(name):
"""正規化姓名(簡轉繁)"""
if not name:
return None
return cc_s2t.convert(name)
def names_match(ocr_name, excel_name):
"""檢查 OCR 姓名是否與 Excel 姓名匹配"""
if not ocr_name or not excel_name:
return False
# 精確匹配
if ocr_name == excel_name:
return True
# 簡繁轉換後匹配
ocr_trad = normalize_name(ocr_name)
if ocr_trad == excel_name:
return True
return False
def score_signature(sig, excel_acc1, excel_acc2):
"""為簽名評分"""
score = 0
ocr_name = sig.get('accountant_name', '')
# 1. OCR 姓名匹配 (+100)
if names_match(ocr_name, excel_acc1) or names_match(ocr_name, excel_acc2):
score += 100
# 2. 合理尺寸 (+20)
width = sig.get('width', 0) or 0
height = sig.get('height', 0) or 0
if 30 < width < 500 and 20 < height < 200:
score += 20
# 3. 頁面位置 - Y 座標越大分數越高 (最多 +15)
y = sig.get('y', 0) or 0
score += min(y / 100, 15)
# 4. 如果尺寸過大(可能是印章),扣分
if width > 300 or height > 150:
score -= 30
return score
def select_best_two(signatures, excel_acc1, excel_acc2):
"""選擇最佳的 2 個簽名"""
if len(signatures) <= 2:
return signatures
scored = []
for sig in signatures:
score = score_signature(sig, excel_acc1, excel_acc2)
scored.append((sig, score))
# 按分數排序
scored.sort(key=lambda x: -x[1])
# 取前 2 個
return [s[0] for s in scored[:2]]
def assign_to_accountant(sig1, sig2, excel_acc1, excel_acc2):
"""將簽名歸檔到會計師"""
ocr1 = sig1.get('accountant_name', '')
ocr2 = sig2.get('accountant_name', '')
# 方法 A: OCR 姓名匹配
if names_match(ocr1, excel_acc1):
return [(sig1, excel_acc1), (sig2, excel_acc2)]
elif names_match(ocr1, excel_acc2):
return [(sig1, excel_acc2), (sig2, excel_acc1)]
elif names_match(ocr2, excel_acc1):
return [(sig1, excel_acc2), (sig2, excel_acc1)]
elif names_match(ocr2, excel_acc2):
return [(sig1, excel_acc1), (sig2, excel_acc2)]
# 方法 B: 按 Y 座標(假設會計師1 在上)
y1 = sig1.get('y', 0) or 0
y2 = sig2.get('y', 0) or 0
if y1 <= y2:
return [(sig1, excel_acc1), (sig2, excel_acc2)]
else:
return [(sig1, excel_acc2), (sig2, excel_acc1)]
def process_all_pdfs(conn):
"""處理所有 PDF"""
print("正在載入簽名資料...")
pdf_sigs = get_pdf_signatures(conn)
print(f"{len(pdf_sigs)} 份 PDF")
cur = conn.cursor()
stats = {
'total_pdfs': len(pdf_sigs),
'sig_count_1': 0,
'sig_count_2': 0,
'sig_count_gt2': 0,
'valid_signatures': 0,
'invalid_signatures': 0,
'ocr_matched': 0,
'y_coordinate_assigned': 0,
'no_excel_data': 0,
}
assignments = [] # (signature_id, assigned_accountant, is_valid)
for pdf_name, sigs in pdf_sigs.items():
sig_count = len(sigs)
excel_acc1 = sigs[0].get('excel_accountant1') if sigs else None
excel_acc2 = sigs[0].get('excel_accountant2') if sigs else None
if not excel_acc1 and not excel_acc2:
# 無 Excel 資料
stats['no_excel_data'] += 1
for sig in sigs:
assignments.append((sig['signature_id'], None, 1))
continue
if sig_count == 1:
stats['sig_count_1'] += 1
# 只有 1 個簽名,保留但無法確定是哪位會計師
sig = sigs[0]
ocr_name = sig.get('accountant_name', '')
if names_match(ocr_name, excel_acc1):
assignments.append((sig['signature_id'], excel_acc1, 1))
stats['ocr_matched'] += 1
elif names_match(ocr_name, excel_acc2):
assignments.append((sig['signature_id'], excel_acc2, 1))
stats['ocr_matched'] += 1
else:
# 無法確定,暫時不指派
assignments.append((sig['signature_id'], None, 1))
stats['valid_signatures'] += 1
elif sig_count == 2:
stats['sig_count_2'] += 1
# 正常情況
sig1, sig2 = sigs[0], sigs[1]
pairs = assign_to_accountant(sig1, sig2, excel_acc1, excel_acc2)
for sig, acc in pairs:
assignments.append((sig['signature_id'], acc, 1))
stats['valid_signatures'] += 1
# 統計匹配方式
ocr_name = sig.get('accountant_name', '')
if names_match(ocr_name, acc):
stats['ocr_matched'] += 1
else:
stats['y_coordinate_assigned'] += 1
else:
stats['sig_count_gt2'] += 1
# 需要篩選
best_two = select_best_two(sigs, excel_acc1, excel_acc2)
# 標記有效/無效
valid_ids = {s['signature_id'] for s in best_two}
for sig in sigs:
if sig['signature_id'] in valid_ids:
is_valid = 1
stats['valid_signatures'] += 1
else:
is_valid = 0
stats['invalid_signatures'] += 1
assignments.append((sig['signature_id'], None, is_valid))
# 歸檔有效的 2 個
if len(best_two) == 2:
sig1, sig2 = best_two[0], best_two[1]
pairs = assign_to_accountant(sig1, sig2, excel_acc1, excel_acc2)
for sig, acc in pairs:
assignments.append((sig['signature_id'], acc, 1))
ocr_name = sig.get('accountant_name', '')
if names_match(ocr_name, acc):
stats['ocr_matched'] += 1
else:
stats['y_coordinate_assigned'] += 1
elif len(best_two) == 1:
sig = best_two[0]
ocr_name = sig.get('accountant_name', '')
if names_match(ocr_name, excel_acc1):
assignments.append((sig['signature_id'], excel_acc1, 1))
elif names_match(ocr_name, excel_acc2):
assignments.append((sig['signature_id'], excel_acc2, 1))
else:
assignments.append((sig['signature_id'], None, 1))
# 批量更新資料庫
print(f"正在更新 {len(assignments)} 筆簽名...")
for sig_id, acc, is_valid in assignments:
cur.execute("""
UPDATE signatures
SET assigned_accountant = ?, is_valid = ?
WHERE signature_id = ?
""", (acc, is_valid, sig_id))
conn.commit()
return stats
def build_accountants_table(conn):
"""建立會計師表"""
cur = conn.cursor()
# 清空現有資料
cur.execute("DELETE FROM accountants")
# 收集所有會計師姓名
cur.execute("""
SELECT assigned_accountant, excel_firm, COUNT(*) as cnt
FROM signatures
WHERE assigned_accountant IS NOT NULL AND is_valid = 1
GROUP BY assigned_accountant
""")
accountants = {}
for row in cur.fetchall():
name = row[0]
firm = row[1]
count = row[2]
if name not in accountants:
accountants[name] = {'count': 0, 'firms': defaultdict(int)}
accountants[name]['count'] += count
if firm:
accountants[name]['firms'][firm] += count
# 插入 accountants 表
for name, data in accountants.items():
# 找出最常見的事務所
main_firm = None
if data['firms']:
main_firm = max(data['firms'].items(), key=lambda x: x[1])[0]
cur.execute("""
INSERT INTO accountants (name, signature_count, firm)
VALUES (?, ?, ?)
""", (name, data['count'], main_firm))
conn.commit()
# 更新 signatures 的 accountant_id
cur.execute("""
UPDATE signatures
SET accountant_id = (
SELECT accountant_id FROM accountants
WHERE accountants.name = signatures.assigned_accountant
)
WHERE assigned_accountant IS NOT NULL
""")
conn.commit()
return len(accountants)
def generate_report(stats, accountant_count):
"""生成報告"""
report = {
'generated_at': datetime.now().isoformat(),
'summary': {
'total_pdfs': stats['total_pdfs'],
'pdfs_with_1_sig': stats['sig_count_1'],
'pdfs_with_2_sigs': stats['sig_count_2'],
'pdfs_with_gt2_sigs': stats['sig_count_gt2'],
'pdfs_without_excel': stats['no_excel_data'],
},
'signatures': {
'valid': stats['valid_signatures'],
'invalid': stats['invalid_signatures'],
'total': stats['valid_signatures'] + stats['invalid_signatures'],
},
'assignment_method': {
'ocr_matched': stats['ocr_matched'],
'y_coordinate': stats['y_coordinate_assigned'],
},
'accountants': {
'total_unique': accountant_count,
}
}
# 儲存 JSON
json_path = f"{REPORT_DIR}/signature_cleanup_report.json"
with open(json_path, 'w', encoding='utf-8') as f:
json.dump(report, f, ensure_ascii=False, indent=2)
# 儲存 Markdown
md_path = f"{REPORT_DIR}/signature_cleanup_report.md"
with open(md_path, 'w', encoding='utf-8') as f:
f.write("# 簽名清理與歸檔報告\n\n")
f.write(f"生成時間: {report['generated_at']}\n\n")
f.write("## PDF 分布\n\n")
f.write("| 類型 | 數量 |\n")
f.write("|------|------|\n")
f.write(f"| 總 PDF 數 | {stats['total_pdfs']} |\n")
f.write(f"| 1 個簽名 | {stats['sig_count_1']} |\n")
f.write(f"| 2 個簽名 (正常) | {stats['sig_count_2']} |\n")
f.write(f"| >2 個簽名 (需篩選) | {stats['sig_count_gt2']} |\n")
f.write(f"| 無 Excel 資料 | {stats['no_excel_data']} |\n")
f.write("\n## 簽名統計\n\n")
f.write("| 類型 | 數量 |\n")
f.write("|------|------|\n")
f.write(f"| 有效簽名 | {stats['valid_signatures']} |\n")
f.write(f"| 無效簽名 (誤判) | {stats['invalid_signatures']} |\n")
f.write("\n## 歸檔方式\n\n")
f.write("| 方式 | 數量 |\n")
f.write("|------|------|\n")
f.write(f"| OCR 姓名匹配 | {stats['ocr_matched']} |\n")
f.write(f"| Y 座標推斷 | {stats['y_coordinate_assigned']} |\n")
f.write(f"\n## 會計師\n\n")
f.write(f"唯一會計師數: **{accountant_count}**\n")
print(f"報告已儲存: {json_path}")
print(f"報告已儲存: {md_path}")
return report
def main():
print("=" * 60)
print("簽名清理與會計師歸檔")
print("=" * 60)
conn = get_connection()
# 1. 準備資料庫
print("\n[1/4] 準備資料庫...")
add_columns_if_needed(conn)
create_accountants_table(conn)
# 2. 處理所有 PDF
print("\n[2/4] 處理 PDF 簽名...")
stats = process_all_pdfs(conn)
# 3. 建立 accountants 表
print("\n[3/4] 建立會計師表...")
accountant_count = build_accountants_table(conn)
# 4. 生成報告
print("\n[4/4] 生成報告...")
report = generate_report(stats, accountant_count)
conn.close()
print("\n" + "=" * 60)
print("完成!")
print("=" * 60)
print(f"有效簽名: {stats['valid_signatures']}")
print(f"無效簽名: {stats['invalid_signatures']}")
print(f"唯一會計師: {accountant_count}")
if __name__ == '__main__':
main()