939a348da4
Paper draft includes all sections (Abstract through Conclusion), 36 references, and supporting scripts. Key methodology: Cosine similarity + dHash dual-method verification with thresholds calibrated against known-replication firm (Firm A). Includes: - 8 section markdown files (paper_a_*.md) - Ablation study script (ResNet-50 vs VGG-16 vs EfficientNet-B0) - Recalibrated classification script (84,386 PDFs, 5-tier system) - Figure generation and Word export scripts - Citation renumbering script ([1]-[36]) - Signature analysis pipeline (12 steps) - YOLO extraction scripts Three rounds of AI review completed (GPT-5.4, Claude Opus 4.6, Gemini 3 Pro). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
451 lines
14 KiB
Python
451 lines
14 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
簽名清理與會計師歸檔
|
|
|
|
1. 標記 sig_count > 2 的 PDF,篩選最佳 2 個簽名
|
|
2. 用 OCR 或座標歸檔到會計師
|
|
3. 建立 accountants 表
|
|
"""
|
|
|
|
import sqlite3
|
|
import json
|
|
from collections import defaultdict
|
|
from datetime import datetime
|
|
from opencc import OpenCC
|
|
|
|
# 簡繁轉換
|
|
cc_s2t = OpenCC('s2t')
|
|
|
|
DB_PATH = '/Volumes/NV2/PDF-Processing/signature-analysis/signature_analysis.db'
|
|
REPORT_DIR = '/Volumes/NV2/PDF-Processing/signature-analysis/reports'
|
|
|
|
|
|
def get_connection():
|
|
conn = sqlite3.connect(DB_PATH)
|
|
conn.row_factory = sqlite3.Row
|
|
return conn
|
|
|
|
|
|
def add_columns_if_needed(conn):
|
|
"""添加新欄位"""
|
|
cur = conn.cursor()
|
|
|
|
# 檢查現有欄位
|
|
cur.execute("PRAGMA table_info(signatures)")
|
|
columns = [row[1] for row in cur.fetchall()]
|
|
|
|
if 'is_valid' not in columns:
|
|
cur.execute("ALTER TABLE signatures ADD COLUMN is_valid INTEGER DEFAULT 1")
|
|
print("已添加 is_valid 欄位")
|
|
|
|
if 'assigned_accountant' not in columns:
|
|
cur.execute("ALTER TABLE signatures ADD COLUMN assigned_accountant TEXT")
|
|
print("已添加 assigned_accountant 欄位")
|
|
|
|
conn.commit()
|
|
|
|
|
|
def create_accountants_table(conn):
|
|
"""建立 accountants 表"""
|
|
cur = conn.cursor()
|
|
cur.execute("""
|
|
CREATE TABLE IF NOT EXISTS accountants (
|
|
accountant_id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
name TEXT UNIQUE NOT NULL,
|
|
signature_count INTEGER DEFAULT 0,
|
|
firm TEXT,
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
|
)
|
|
""")
|
|
conn.commit()
|
|
print("accountants 表已建立")
|
|
|
|
|
|
def get_pdf_signatures(conn):
|
|
"""取得每份 PDF 的簽名資料"""
|
|
cur = conn.cursor()
|
|
cur.execute("""
|
|
SELECT s.signature_id, s.source_pdf, s.page_number, s.accountant_name,
|
|
s.excel_accountant1, s.excel_accountant2, s.excel_firm,
|
|
sb.x, sb.y, sb.width, sb.height
|
|
FROM signatures s
|
|
LEFT JOIN signature_boxes sb ON s.signature_id = sb.signature_id
|
|
ORDER BY s.source_pdf, s.page_number, sb.y
|
|
""")
|
|
|
|
pdf_sigs = defaultdict(list)
|
|
for row in cur.fetchall():
|
|
pdf_sigs[row['source_pdf']].append(dict(row))
|
|
|
|
return pdf_sigs
|
|
|
|
|
|
def normalize_name(name):
|
|
"""正規化姓名(簡轉繁)"""
|
|
if not name:
|
|
return None
|
|
return cc_s2t.convert(name)
|
|
|
|
|
|
def names_match(ocr_name, excel_name):
|
|
"""檢查 OCR 姓名是否與 Excel 姓名匹配"""
|
|
if not ocr_name or not excel_name:
|
|
return False
|
|
|
|
# 精確匹配
|
|
if ocr_name == excel_name:
|
|
return True
|
|
|
|
# 簡繁轉換後匹配
|
|
ocr_trad = normalize_name(ocr_name)
|
|
if ocr_trad == excel_name:
|
|
return True
|
|
|
|
return False
|
|
|
|
|
|
def score_signature(sig, excel_acc1, excel_acc2):
|
|
"""為簽名評分"""
|
|
score = 0
|
|
ocr_name = sig.get('accountant_name', '')
|
|
|
|
# 1. OCR 姓名匹配 (+100)
|
|
if names_match(ocr_name, excel_acc1) or names_match(ocr_name, excel_acc2):
|
|
score += 100
|
|
|
|
# 2. 合理尺寸 (+20)
|
|
width = sig.get('width', 0) or 0
|
|
height = sig.get('height', 0) or 0
|
|
if 30 < width < 500 and 20 < height < 200:
|
|
score += 20
|
|
|
|
# 3. 頁面位置 - Y 座標越大分數越高 (最多 +15)
|
|
y = sig.get('y', 0) or 0
|
|
score += min(y / 100, 15)
|
|
|
|
# 4. 如果尺寸過大(可能是印章),扣分
|
|
if width > 300 or height > 150:
|
|
score -= 30
|
|
|
|
return score
|
|
|
|
|
|
def select_best_two(signatures, excel_acc1, excel_acc2):
|
|
"""選擇最佳的 2 個簽名"""
|
|
if len(signatures) <= 2:
|
|
return signatures
|
|
|
|
scored = []
|
|
for sig in signatures:
|
|
score = score_signature(sig, excel_acc1, excel_acc2)
|
|
scored.append((sig, score))
|
|
|
|
# 按分數排序
|
|
scored.sort(key=lambda x: -x[1])
|
|
|
|
# 取前 2 個
|
|
return [s[0] for s in scored[:2]]
|
|
|
|
|
|
def assign_to_accountant(sig1, sig2, excel_acc1, excel_acc2):
|
|
"""將簽名歸檔到會計師"""
|
|
ocr1 = sig1.get('accountant_name', '')
|
|
ocr2 = sig2.get('accountant_name', '')
|
|
|
|
# 方法 A: OCR 姓名匹配
|
|
if names_match(ocr1, excel_acc1):
|
|
return [(sig1, excel_acc1), (sig2, excel_acc2)]
|
|
elif names_match(ocr1, excel_acc2):
|
|
return [(sig1, excel_acc2), (sig2, excel_acc1)]
|
|
elif names_match(ocr2, excel_acc1):
|
|
return [(sig1, excel_acc2), (sig2, excel_acc1)]
|
|
elif names_match(ocr2, excel_acc2):
|
|
return [(sig1, excel_acc1), (sig2, excel_acc2)]
|
|
|
|
# 方法 B: 按 Y 座標(假設會計師1 在上)
|
|
y1 = sig1.get('y', 0) or 0
|
|
y2 = sig2.get('y', 0) or 0
|
|
|
|
if y1 <= y2:
|
|
return [(sig1, excel_acc1), (sig2, excel_acc2)]
|
|
else:
|
|
return [(sig1, excel_acc2), (sig2, excel_acc1)]
|
|
|
|
|
|
def process_all_pdfs(conn):
|
|
"""處理所有 PDF"""
|
|
print("正在載入簽名資料...")
|
|
pdf_sigs = get_pdf_signatures(conn)
|
|
print(f"共 {len(pdf_sigs)} 份 PDF")
|
|
|
|
cur = conn.cursor()
|
|
|
|
stats = {
|
|
'total_pdfs': len(pdf_sigs),
|
|
'sig_count_1': 0,
|
|
'sig_count_2': 0,
|
|
'sig_count_gt2': 0,
|
|
'valid_signatures': 0,
|
|
'invalid_signatures': 0,
|
|
'ocr_matched': 0,
|
|
'y_coordinate_assigned': 0,
|
|
'no_excel_data': 0,
|
|
}
|
|
|
|
assignments = [] # (signature_id, assigned_accountant, is_valid)
|
|
|
|
for pdf_name, sigs in pdf_sigs.items():
|
|
sig_count = len(sigs)
|
|
excel_acc1 = sigs[0].get('excel_accountant1') if sigs else None
|
|
excel_acc2 = sigs[0].get('excel_accountant2') if sigs else None
|
|
|
|
if not excel_acc1 and not excel_acc2:
|
|
# 無 Excel 資料
|
|
stats['no_excel_data'] += 1
|
|
for sig in sigs:
|
|
assignments.append((sig['signature_id'], None, 1))
|
|
continue
|
|
|
|
if sig_count == 1:
|
|
stats['sig_count_1'] += 1
|
|
# 只有 1 個簽名,保留但無法確定是哪位會計師
|
|
sig = sigs[0]
|
|
ocr_name = sig.get('accountant_name', '')
|
|
if names_match(ocr_name, excel_acc1):
|
|
assignments.append((sig['signature_id'], excel_acc1, 1))
|
|
stats['ocr_matched'] += 1
|
|
elif names_match(ocr_name, excel_acc2):
|
|
assignments.append((sig['signature_id'], excel_acc2, 1))
|
|
stats['ocr_matched'] += 1
|
|
else:
|
|
# 無法確定,暫時不指派
|
|
assignments.append((sig['signature_id'], None, 1))
|
|
stats['valid_signatures'] += 1
|
|
|
|
elif sig_count == 2:
|
|
stats['sig_count_2'] += 1
|
|
# 正常情況
|
|
sig1, sig2 = sigs[0], sigs[1]
|
|
pairs = assign_to_accountant(sig1, sig2, excel_acc1, excel_acc2)
|
|
|
|
for sig, acc in pairs:
|
|
assignments.append((sig['signature_id'], acc, 1))
|
|
stats['valid_signatures'] += 1
|
|
|
|
# 統計匹配方式
|
|
ocr_name = sig.get('accountant_name', '')
|
|
if names_match(ocr_name, acc):
|
|
stats['ocr_matched'] += 1
|
|
else:
|
|
stats['y_coordinate_assigned'] += 1
|
|
|
|
else:
|
|
stats['sig_count_gt2'] += 1
|
|
# 需要篩選
|
|
best_two = select_best_two(sigs, excel_acc1, excel_acc2)
|
|
|
|
# 標記有效/無效
|
|
valid_ids = {s['signature_id'] for s in best_two}
|
|
for sig in sigs:
|
|
if sig['signature_id'] in valid_ids:
|
|
is_valid = 1
|
|
stats['valid_signatures'] += 1
|
|
else:
|
|
is_valid = 0
|
|
stats['invalid_signatures'] += 1
|
|
assignments.append((sig['signature_id'], None, is_valid))
|
|
|
|
# 歸檔有效的 2 個
|
|
if len(best_two) == 2:
|
|
sig1, sig2 = best_two[0], best_two[1]
|
|
pairs = assign_to_accountant(sig1, sig2, excel_acc1, excel_acc2)
|
|
|
|
for sig, acc in pairs:
|
|
assignments.append((sig['signature_id'], acc, 1))
|
|
ocr_name = sig.get('accountant_name', '')
|
|
if names_match(ocr_name, acc):
|
|
stats['ocr_matched'] += 1
|
|
else:
|
|
stats['y_coordinate_assigned'] += 1
|
|
elif len(best_two) == 1:
|
|
sig = best_two[0]
|
|
ocr_name = sig.get('accountant_name', '')
|
|
if names_match(ocr_name, excel_acc1):
|
|
assignments.append((sig['signature_id'], excel_acc1, 1))
|
|
elif names_match(ocr_name, excel_acc2):
|
|
assignments.append((sig['signature_id'], excel_acc2, 1))
|
|
else:
|
|
assignments.append((sig['signature_id'], None, 1))
|
|
|
|
# 批量更新資料庫
|
|
print(f"正在更新 {len(assignments)} 筆簽名...")
|
|
for sig_id, acc, is_valid in assignments:
|
|
cur.execute("""
|
|
UPDATE signatures
|
|
SET assigned_accountant = ?, is_valid = ?
|
|
WHERE signature_id = ?
|
|
""", (acc, is_valid, sig_id))
|
|
|
|
conn.commit()
|
|
|
|
return stats
|
|
|
|
|
|
def build_accountants_table(conn):
|
|
"""建立會計師表"""
|
|
cur = conn.cursor()
|
|
|
|
# 清空現有資料
|
|
cur.execute("DELETE FROM accountants")
|
|
|
|
# 收集所有會計師姓名
|
|
cur.execute("""
|
|
SELECT assigned_accountant, excel_firm, COUNT(*) as cnt
|
|
FROM signatures
|
|
WHERE assigned_accountant IS NOT NULL AND is_valid = 1
|
|
GROUP BY assigned_accountant
|
|
""")
|
|
|
|
accountants = {}
|
|
for row in cur.fetchall():
|
|
name = row[0]
|
|
firm = row[1]
|
|
count = row[2]
|
|
|
|
if name not in accountants:
|
|
accountants[name] = {'count': 0, 'firms': defaultdict(int)}
|
|
accountants[name]['count'] += count
|
|
if firm:
|
|
accountants[name]['firms'][firm] += count
|
|
|
|
# 插入 accountants 表
|
|
for name, data in accountants.items():
|
|
# 找出最常見的事務所
|
|
main_firm = None
|
|
if data['firms']:
|
|
main_firm = max(data['firms'].items(), key=lambda x: x[1])[0]
|
|
|
|
cur.execute("""
|
|
INSERT INTO accountants (name, signature_count, firm)
|
|
VALUES (?, ?, ?)
|
|
""", (name, data['count'], main_firm))
|
|
|
|
conn.commit()
|
|
|
|
# 更新 signatures 的 accountant_id
|
|
cur.execute("""
|
|
UPDATE signatures
|
|
SET accountant_id = (
|
|
SELECT accountant_id FROM accountants
|
|
WHERE accountants.name = signatures.assigned_accountant
|
|
)
|
|
WHERE assigned_accountant IS NOT NULL
|
|
""")
|
|
conn.commit()
|
|
|
|
return len(accountants)
|
|
|
|
|
|
def generate_report(stats, accountant_count):
|
|
"""生成報告"""
|
|
report = {
|
|
'generated_at': datetime.now().isoformat(),
|
|
'summary': {
|
|
'total_pdfs': stats['total_pdfs'],
|
|
'pdfs_with_1_sig': stats['sig_count_1'],
|
|
'pdfs_with_2_sigs': stats['sig_count_2'],
|
|
'pdfs_with_gt2_sigs': stats['sig_count_gt2'],
|
|
'pdfs_without_excel': stats['no_excel_data'],
|
|
},
|
|
'signatures': {
|
|
'valid': stats['valid_signatures'],
|
|
'invalid': stats['invalid_signatures'],
|
|
'total': stats['valid_signatures'] + stats['invalid_signatures'],
|
|
},
|
|
'assignment_method': {
|
|
'ocr_matched': stats['ocr_matched'],
|
|
'y_coordinate': stats['y_coordinate_assigned'],
|
|
},
|
|
'accountants': {
|
|
'total_unique': accountant_count,
|
|
}
|
|
}
|
|
|
|
# 儲存 JSON
|
|
json_path = f"{REPORT_DIR}/signature_cleanup_report.json"
|
|
with open(json_path, 'w', encoding='utf-8') as f:
|
|
json.dump(report, f, ensure_ascii=False, indent=2)
|
|
|
|
# 儲存 Markdown
|
|
md_path = f"{REPORT_DIR}/signature_cleanup_report.md"
|
|
with open(md_path, 'w', encoding='utf-8') as f:
|
|
f.write("# 簽名清理與歸檔報告\n\n")
|
|
f.write(f"生成時間: {report['generated_at']}\n\n")
|
|
|
|
f.write("## PDF 分布\n\n")
|
|
f.write("| 類型 | 數量 |\n")
|
|
f.write("|------|------|\n")
|
|
f.write(f"| 總 PDF 數 | {stats['total_pdfs']} |\n")
|
|
f.write(f"| 1 個簽名 | {stats['sig_count_1']} |\n")
|
|
f.write(f"| 2 個簽名 (正常) | {stats['sig_count_2']} |\n")
|
|
f.write(f"| >2 個簽名 (需篩選) | {stats['sig_count_gt2']} |\n")
|
|
f.write(f"| 無 Excel 資料 | {stats['no_excel_data']} |\n")
|
|
|
|
f.write("\n## 簽名統計\n\n")
|
|
f.write("| 類型 | 數量 |\n")
|
|
f.write("|------|------|\n")
|
|
f.write(f"| 有效簽名 | {stats['valid_signatures']} |\n")
|
|
f.write(f"| 無效簽名 (誤判) | {stats['invalid_signatures']} |\n")
|
|
|
|
f.write("\n## 歸檔方式\n\n")
|
|
f.write("| 方式 | 數量 |\n")
|
|
f.write("|------|------|\n")
|
|
f.write(f"| OCR 姓名匹配 | {stats['ocr_matched']} |\n")
|
|
f.write(f"| Y 座標推斷 | {stats['y_coordinate_assigned']} |\n")
|
|
|
|
f.write(f"\n## 會計師\n\n")
|
|
f.write(f"唯一會計師數: **{accountant_count}**\n")
|
|
|
|
print(f"報告已儲存: {json_path}")
|
|
print(f"報告已儲存: {md_path}")
|
|
|
|
return report
|
|
|
|
|
|
def main():
|
|
print("=" * 60)
|
|
print("簽名清理與會計師歸檔")
|
|
print("=" * 60)
|
|
|
|
conn = get_connection()
|
|
|
|
# 1. 準備資料庫
|
|
print("\n[1/4] 準備資料庫...")
|
|
add_columns_if_needed(conn)
|
|
create_accountants_table(conn)
|
|
|
|
# 2. 處理所有 PDF
|
|
print("\n[2/4] 處理 PDF 簽名...")
|
|
stats = process_all_pdfs(conn)
|
|
|
|
# 3. 建立 accountants 表
|
|
print("\n[3/4] 建立會計師表...")
|
|
accountant_count = build_accountants_table(conn)
|
|
|
|
# 4. 生成報告
|
|
print("\n[4/4] 生成報告...")
|
|
report = generate_report(stats, accountant_count)
|
|
|
|
conn.close()
|
|
|
|
print("\n" + "=" * 60)
|
|
print("完成!")
|
|
print("=" * 60)
|
|
print(f"有效簽名: {stats['valid_signatures']}")
|
|
print(f"無效簽名: {stats['invalid_signatures']}")
|
|
print(f"唯一會計師: {accountant_count}")
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|