939a348da4
Paper draft includes all sections (Abstract through Conclusion), 36 references, and supporting scripts. Key methodology: Cosine similarity + dHash dual-method verification with thresholds calibrated against known-replication firm (Firm A). Includes: - 8 section markdown files (paper_a_*.md) - Ablation study script (ResNet-50 vs VGG-16 vs EfficientNet-B0) - Recalibrated classification script (84,386 PDFs, 5-tier system) - Figure generation and Word export scripts - Citation renumbering script ([1]-[36]) - Signature analysis pipeline (12 steps) - YOLO extraction scripts Three rounds of AI review completed (GPT-5.4, Claude Opus 4.6, Gemini 3 Pro). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
372 lines
12 KiB
Python
372 lines
12 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
第四階段:PDF 簽名真偽判定
|
|
|
|
對每份 PDF 的簽名判斷是「親簽」還是「複製貼上」
|
|
"""
|
|
|
|
import sqlite3
|
|
import numpy as np
|
|
import json
|
|
import csv
|
|
from collections import defaultdict
|
|
from datetime import datetime
|
|
from tqdm import tqdm
|
|
|
|
DB_PATH = '/Volumes/NV2/PDF-Processing/signature-analysis/signature_analysis.db'
|
|
FEATURES_PATH = '/Volumes/NV2/PDF-Processing/signature-analysis/features/signature_features.npy'
|
|
REPORT_DIR = '/Volumes/NV2/PDF-Processing/signature-analysis/reports'
|
|
|
|
# 門檻設定
|
|
THRESHOLD_COPY = 0.95 # 高於此值判定為「複製貼上」
|
|
THRESHOLD_AUTHENTIC = 0.85 # 低於此值判定為「親簽」
|
|
# 介於兩者之間為「不確定」
|
|
|
|
|
|
def load_data():
|
|
"""載入資料"""
|
|
print("載入特徵向量...")
|
|
features = np.load(FEATURES_PATH)
|
|
|
|
# 正規化
|
|
norms = np.linalg.norm(features, axis=1, keepdims=True)
|
|
norms[norms == 0] = 1
|
|
features_norm = features / norms
|
|
|
|
conn = sqlite3.connect(DB_PATH)
|
|
cur = conn.cursor()
|
|
|
|
# 取得簽名資訊
|
|
cur.execute("""
|
|
SELECT s.signature_id, s.source_pdf, s.assigned_accountant,
|
|
s.excel_accountant1, s.excel_accountant2, s.excel_firm
|
|
FROM signatures s
|
|
WHERE s.is_valid = 1 AND s.assigned_accountant IS NOT NULL
|
|
ORDER BY s.signature_id
|
|
""")
|
|
|
|
sig_data = {}
|
|
pdf_signatures = defaultdict(list)
|
|
acc_signatures = defaultdict(list)
|
|
pdf_info = {}
|
|
|
|
for row in cur.fetchall():
|
|
sig_id, pdf, acc_name, acc1, acc2, firm = row
|
|
sig_data[sig_id] = {
|
|
'pdf': pdf,
|
|
'accountant': acc_name,
|
|
}
|
|
pdf_signatures[pdf].append((sig_id, acc_name))
|
|
acc_signatures[acc_name].append(sig_id)
|
|
|
|
if pdf not in pdf_info:
|
|
pdf_info[pdf] = {
|
|
'accountant1': acc1,
|
|
'accountant2': acc2,
|
|
'firm': firm
|
|
}
|
|
|
|
# signature_id -> feature index
|
|
cur.execute("SELECT signature_id FROM signatures ORDER BY signature_id")
|
|
all_sig_ids = [row[0] for row in cur.fetchall()]
|
|
sig_id_to_idx = {sid: idx for idx, sid in enumerate(all_sig_ids)}
|
|
|
|
conn.close()
|
|
|
|
return features_norm, sig_data, pdf_signatures, acc_signatures, pdf_info, sig_id_to_idx
|
|
|
|
|
|
def get_max_similarity_to_others(sig_id, acc_name, acc_signatures, sig_id_to_idx, features_norm):
|
|
"""計算該簽名與同一會計師其他簽名的最大相似度"""
|
|
other_sigs = [s for s in acc_signatures[acc_name] if s != sig_id and s in sig_id_to_idx]
|
|
if not other_sigs:
|
|
return None, None
|
|
|
|
idx = sig_id_to_idx[sig_id]
|
|
other_indices = [sig_id_to_idx[s] for s in other_sigs]
|
|
|
|
feat = features_norm[idx]
|
|
other_feats = features_norm[other_indices]
|
|
|
|
similarities = np.dot(other_feats, feat)
|
|
max_idx = similarities.argmax()
|
|
|
|
return float(similarities[max_idx]), other_sigs[max_idx]
|
|
|
|
|
|
def classify_signature(max_sim):
|
|
"""分類簽名"""
|
|
if max_sim is None:
|
|
return 'unknown' # 無法判定(沒有其他簽名可比對)
|
|
elif max_sim >= THRESHOLD_COPY:
|
|
return 'copy' # 複製貼上
|
|
elif max_sim <= THRESHOLD_AUTHENTIC:
|
|
return 'authentic' # 親簽
|
|
else:
|
|
return 'uncertain' # 不確定
|
|
|
|
|
|
def classify_pdf(verdicts):
|
|
"""根據兩個簽名的判定結果,給出 PDF 整體判定"""
|
|
if not verdicts:
|
|
return 'unknown'
|
|
|
|
# 如果有任一簽名是複製,整份 PDF 判定為複製
|
|
if 'copy' in verdicts:
|
|
return 'copy'
|
|
# 如果兩個都是親簽
|
|
elif all(v == 'authentic' for v in verdicts):
|
|
return 'authentic'
|
|
# 如果有不確定的
|
|
elif 'uncertain' in verdicts:
|
|
return 'uncertain'
|
|
else:
|
|
return 'unknown'
|
|
|
|
|
|
def analyze_all_pdfs(features_norm, sig_data, pdf_signatures, acc_signatures, pdf_info, sig_id_to_idx):
|
|
"""分析所有 PDF"""
|
|
results = []
|
|
|
|
for pdf, sigs in tqdm(pdf_signatures.items(), desc="分析 PDF"):
|
|
info = pdf_info.get(pdf, {})
|
|
|
|
pdf_result = {
|
|
'pdf': pdf,
|
|
'accountant1': info.get('accountant1', ''),
|
|
'accountant2': info.get('accountant2', ''),
|
|
'firm': info.get('firm', ''),
|
|
'signatures': []
|
|
}
|
|
|
|
verdicts = []
|
|
|
|
for sig_id, acc_name in sigs:
|
|
max_sim, most_similar_sig = get_max_similarity_to_others(
|
|
sig_id, acc_name, acc_signatures, sig_id_to_idx, features_norm
|
|
)
|
|
verdict = classify_signature(max_sim)
|
|
verdicts.append(verdict)
|
|
|
|
pdf_result['signatures'].append({
|
|
'signature_id': sig_id,
|
|
'accountant': acc_name,
|
|
'max_similarity': max_sim,
|
|
'verdict': verdict
|
|
})
|
|
|
|
pdf_result['pdf_verdict'] = classify_pdf(verdicts)
|
|
results.append(pdf_result)
|
|
|
|
return results
|
|
|
|
|
|
def generate_statistics(results):
|
|
"""生成統計"""
|
|
stats = {
|
|
'total_pdfs': len(results),
|
|
'pdf_verdicts': defaultdict(int),
|
|
'signature_verdicts': defaultdict(int),
|
|
'by_firm': defaultdict(lambda: defaultdict(int))
|
|
}
|
|
|
|
for r in results:
|
|
stats['pdf_verdicts'][r['pdf_verdict']] += 1
|
|
firm = r['firm'] or '未知'
|
|
stats['by_firm'][firm][r['pdf_verdict']] += 1
|
|
|
|
for sig in r['signatures']:
|
|
stats['signature_verdicts'][sig['verdict']] += 1
|
|
|
|
return stats
|
|
|
|
|
|
def save_results(results, stats):
|
|
"""儲存結果"""
|
|
timestamp = datetime.now().isoformat()
|
|
|
|
# 1. 儲存完整 JSON
|
|
json_path = f"{REPORT_DIR}/pdf_signature_verdicts.json"
|
|
output = {
|
|
'generated_at': timestamp,
|
|
'thresholds': {
|
|
'copy': THRESHOLD_COPY,
|
|
'authentic': THRESHOLD_AUTHENTIC
|
|
},
|
|
'statistics': {
|
|
'total_pdfs': stats['total_pdfs'],
|
|
'pdf_verdicts': dict(stats['pdf_verdicts']),
|
|
'signature_verdicts': dict(stats['signature_verdicts'])
|
|
},
|
|
'results': results
|
|
}
|
|
with open(json_path, 'w', encoding='utf-8') as f:
|
|
json.dump(output, f, ensure_ascii=False, indent=2)
|
|
print(f"已儲存: {json_path}")
|
|
|
|
# 2. 儲存 CSV(簡易版)
|
|
csv_path = f"{REPORT_DIR}/pdf_signature_verdicts.csv"
|
|
with open(csv_path, 'w', encoding='utf-8', newline='') as f:
|
|
writer = csv.writer(f)
|
|
writer.writerow(['PDF', '會計師1', '會計師2', '事務所', '判定結果',
|
|
'簽名1_會計師', '簽名1_相似度', '簽名1_判定',
|
|
'簽名2_會計師', '簽名2_相似度', '簽名2_判定'])
|
|
|
|
for r in results:
|
|
row = [
|
|
r['pdf'],
|
|
r['accountant1'],
|
|
r['accountant2'],
|
|
r['firm'] or '',
|
|
r['pdf_verdict']
|
|
]
|
|
|
|
for sig in r['signatures'][:2]: # 最多 2 個簽名
|
|
row.extend([
|
|
sig['accountant'],
|
|
f"{sig['max_similarity']:.3f}" if sig['max_similarity'] else '',
|
|
sig['verdict']
|
|
])
|
|
|
|
# 補齊欄位
|
|
while len(row) < 11:
|
|
row.append('')
|
|
|
|
writer.writerow(row)
|
|
print(f"已儲存: {csv_path}")
|
|
|
|
# 3. 儲存 Markdown 報告
|
|
md_path = f"{REPORT_DIR}/pdf_signature_verdict_report.md"
|
|
with open(md_path, 'w', encoding='utf-8') as f:
|
|
f.write("# PDF 簽名真偽判定報告\n\n")
|
|
f.write(f"生成時間: {timestamp}\n\n")
|
|
|
|
f.write("## 判定標準\n\n")
|
|
f.write(f"- **複製貼上 (copy)**: 與同一會計師其他簽名相似度 ≥ {THRESHOLD_COPY}\n")
|
|
f.write(f"- **親簽 (authentic)**: 與同一會計師其他簽名相似度 ≤ {THRESHOLD_AUTHENTIC}\n")
|
|
f.write(f"- **不確定 (uncertain)**: 相似度介於 {THRESHOLD_AUTHENTIC} ~ {THRESHOLD_COPY}\n")
|
|
f.write(f"- **無法判定 (unknown)**: 該會計師只有此一份簽名,無法比對\n\n")
|
|
|
|
f.write("## 整體統計\n\n")
|
|
f.write("### PDF 判定結果\n\n")
|
|
f.write("| 判定 | 數量 | 百分比 |\n")
|
|
f.write("|------|------|--------|\n")
|
|
|
|
total = stats['total_pdfs']
|
|
for verdict in ['copy', 'uncertain', 'authentic', 'unknown']:
|
|
count = stats['pdf_verdicts'].get(verdict, 0)
|
|
pct = count / total * 100 if total > 0 else 0
|
|
label = {
|
|
'copy': '複製貼上',
|
|
'authentic': '親簽',
|
|
'uncertain': '不確定',
|
|
'unknown': '無法判定'
|
|
}.get(verdict, verdict)
|
|
f.write(f"| {label} | {count:,} | {pct:.1f}% |\n")
|
|
|
|
f.write(f"\n**總計: {total:,} 份 PDF**\n")
|
|
|
|
f.write("\n### 簽名判定結果\n\n")
|
|
f.write("| 判定 | 數量 | 百分比 |\n")
|
|
f.write("|------|------|--------|\n")
|
|
|
|
sig_total = sum(stats['signature_verdicts'].values())
|
|
for verdict in ['copy', 'uncertain', 'authentic', 'unknown']:
|
|
count = stats['signature_verdicts'].get(verdict, 0)
|
|
pct = count / sig_total * 100 if sig_total > 0 else 0
|
|
label = {
|
|
'copy': '複製貼上',
|
|
'authentic': '親簽',
|
|
'uncertain': '不確定',
|
|
'unknown': '無法判定'
|
|
}.get(verdict, verdict)
|
|
f.write(f"| {label} | {count:,} | {pct:.1f}% |\n")
|
|
|
|
f.write(f"\n**總計: {sig_total:,} 個簽名**\n")
|
|
|
|
f.write("\n### 按事務所統計\n\n")
|
|
f.write("| 事務所 | 複製貼上 | 不確定 | 親簽 | 無法判定 | 總計 |\n")
|
|
f.write("|--------|----------|--------|------|----------|------|\n")
|
|
|
|
# 按總數排序
|
|
firms_sorted = sorted(stats['by_firm'].items(),
|
|
key=lambda x: sum(x[1].values()), reverse=True)
|
|
|
|
for firm, verdicts in firms_sorted[:20]:
|
|
copy_n = verdicts.get('copy', 0)
|
|
uncertain_n = verdicts.get('uncertain', 0)
|
|
authentic_n = verdicts.get('authentic', 0)
|
|
unknown_n = verdicts.get('unknown', 0)
|
|
total_n = copy_n + uncertain_n + authentic_n + unknown_n
|
|
f.write(f"| {firm} | {copy_n:,} | {uncertain_n:,} | {authentic_n:,} | {unknown_n:,} | {total_n:,} |\n")
|
|
|
|
print(f"已儲存: {md_path}")
|
|
|
|
return stats
|
|
|
|
|
|
def update_database(results):
|
|
"""更新資料庫"""
|
|
conn = sqlite3.connect(DB_PATH)
|
|
cur = conn.cursor()
|
|
|
|
# 添加欄位
|
|
try:
|
|
cur.execute("ALTER TABLE signatures ADD COLUMN signature_verdict TEXT")
|
|
cur.execute("ALTER TABLE signatures ADD COLUMN max_similarity_to_same_accountant REAL")
|
|
except:
|
|
pass
|
|
|
|
# 更新
|
|
for r in results:
|
|
for sig in r['signatures']:
|
|
cur.execute("""
|
|
UPDATE signatures
|
|
SET signature_verdict = ?, max_similarity_to_same_accountant = ?
|
|
WHERE signature_id = ?
|
|
""", (sig['verdict'], sig['max_similarity'], sig['signature_id']))
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
print("資料庫已更新")
|
|
|
|
|
|
def main():
|
|
print("=" * 60)
|
|
print("第四階段:PDF 簽名真偽判定")
|
|
print("=" * 60)
|
|
|
|
# 載入資料
|
|
features_norm, sig_data, pdf_signatures, acc_signatures, pdf_info, sig_id_to_idx = load_data()
|
|
print(f"PDF 數: {len(pdf_signatures)}")
|
|
print(f"有效簽名: {len(sig_data)}")
|
|
|
|
# 分析所有 PDF
|
|
print("\n開始分析...")
|
|
results = analyze_all_pdfs(
|
|
features_norm, sig_data, pdf_signatures, acc_signatures, pdf_info, sig_id_to_idx
|
|
)
|
|
|
|
# 生成統計
|
|
stats = generate_statistics(results)
|
|
|
|
# 儲存結果
|
|
print("\n儲存結果...")
|
|
save_results(results, stats)
|
|
|
|
# 更新資料庫
|
|
update_database(results)
|
|
|
|
print("\n" + "=" * 60)
|
|
print("完成!")
|
|
print("=" * 60)
|
|
print(f"\nPDF 判定結果:")
|
|
print(f" 複製貼上: {stats['pdf_verdicts'].get('copy', 0):,}")
|
|
print(f" 不確定: {stats['pdf_verdicts'].get('uncertain', 0):,}")
|
|
print(f" 親簽: {stats['pdf_verdicts'].get('authentic', 0):,}")
|
|
print(f" 無法判定: {stats['pdf_verdicts'].get('unknown', 0):,}")
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|