Files
pdf_signature_extraction/signature_analysis/08_accountant_similarity_analysis.py
gbanyan 939a348da4 Add Paper A (IEEE TAI) complete draft with Firm A-calibrated dual-method classification
Paper draft includes all sections (Abstract through Conclusion), 36 references,
and supporting scripts. Key methodology: Cosine similarity + dHash dual-method
verification with thresholds calibrated against known-replication firm (Firm A).

Includes:
- 8 section markdown files (paper_a_*.md)
- Ablation study script (ResNet-50 vs VGG-16 vs EfficientNet-B0)
- Recalibrated classification script (84,386 PDFs, 5-tier system)
- Figure generation and Word export scripts
- Citation renumbering script ([1]-[36])
- Signature analysis pipeline (12 steps)
- YOLO extraction scripts

Three rounds of AI review completed (GPT-5.4, Claude Opus 4.6, Gemini 3 Pro).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-06 23:05:33 +08:00

273 lines
8.7 KiB
Python

#!/usr/bin/env python3
"""
第三階段:同人簽名聚類分析
對每位會計師的簽名進行相似度分析,判斷是否有「複製貼上」行為。
"""
import sqlite3
import numpy as np
import json
from collections import defaultdict
from datetime import datetime
from tqdm import tqdm
DB_PATH = '/Volumes/NV2/PDF-Processing/signature-analysis/signature_analysis.db'
FEATURES_PATH = '/Volumes/NV2/PDF-Processing/signature-analysis/features/signature_features.npy'
REPORT_DIR = '/Volumes/NV2/PDF-Processing/signature-analysis/reports'
def load_data():
"""載入特徵向量和會計師分配"""
print("載入特徵向量...")
features = np.load(FEATURES_PATH)
print(f"特徵矩陣形狀: {features.shape}")
conn = sqlite3.connect(DB_PATH)
cur = conn.cursor()
# 取得所有 signature_id 順序(與特徵向量對應)
cur.execute("SELECT signature_id FROM signatures ORDER BY signature_id")
all_sig_ids = [row[0] for row in cur.fetchall()]
sig_id_to_idx = {sig_id: idx for idx, sig_id in enumerate(all_sig_ids)}
# 取得有效簽名的會計師分配
cur.execute("""
SELECT s.signature_id, s.assigned_accountant, s.accountant_id, a.name, a.firm
FROM signatures s
LEFT JOIN accountants a ON s.accountant_id = a.accountant_id
WHERE s.is_valid = 1 AND s.assigned_accountant IS NOT NULL
ORDER BY s.signature_id
""")
acc_signatures = defaultdict(list)
acc_info = {}
for row in cur.fetchall():
sig_id, _, acc_id, acc_name, firm = row
if acc_id and sig_id in sig_id_to_idx:
acc_signatures[acc_id].append(sig_id)
if acc_id not in acc_info:
acc_info[acc_id] = {'name': acc_name, 'firm': firm}
conn.close()
return features, sig_id_to_idx, acc_signatures, acc_info
def compute_similarity_stats(features, sig_ids, sig_id_to_idx):
"""計算一組簽名的相似度統計"""
if len(sig_ids) < 2:
return None
# 取得特徵
indices = [sig_id_to_idx[sid] for sid in sig_ids]
feat = features[indices]
# 正規化
norms = np.linalg.norm(feat, axis=1, keepdims=True)
norms[norms == 0] = 1
feat_norm = feat / norms
# 計算餘弦相似度矩陣
sim_matrix = np.dot(feat_norm, feat_norm.T)
# 取上三角(排除對角線)
upper_tri = sim_matrix[np.triu_indices(len(sim_matrix), k=1)]
if len(upper_tri) == 0:
return None
# 統計
stats = {
'total_pairs': len(upper_tri),
'min_sim': float(upper_tri.min()),
'max_sim': float(upper_tri.max()),
'mean_sim': float(upper_tri.mean()),
'std_sim': float(upper_tri.std()),
'pairs_gt_90': int((upper_tri > 0.90).sum()),
'pairs_gt_95': int((upper_tri > 0.95).sum()),
'pairs_gt_99': int((upper_tri > 0.99).sum()),
}
# 計算比例
stats['ratio_gt_90'] = stats['pairs_gt_90'] / stats['total_pairs']
stats['ratio_gt_95'] = stats['pairs_gt_95'] / stats['total_pairs']
stats['ratio_gt_99'] = stats['pairs_gt_99'] / stats['total_pairs']
return stats
def analyze_all_accountants(features, sig_id_to_idx, acc_signatures, acc_info):
"""分析所有會計師"""
results = []
for acc_id, sig_ids in tqdm(acc_signatures.items(), desc="分析會計師"):
info = acc_info.get(acc_id, {})
stats = compute_similarity_stats(features, sig_ids, sig_id_to_idx)
if stats:
result = {
'accountant_id': acc_id,
'name': info.get('name', ''),
'firm': info.get('firm', ''),
'signature_count': len(sig_ids),
**stats
}
results.append(result)
return results
def classify_risk(result):
"""分類風險等級"""
ratio_95 = result.get('ratio_gt_95', 0)
ratio_99 = result.get('ratio_gt_99', 0)
mean_sim = result.get('mean_sim', 0)
# 高風險:大量高相似度對
if ratio_99 > 0.05 or ratio_95 > 0.3:
return 'high'
# 中風險
elif ratio_95 > 0.1 or mean_sim > 0.85:
return 'medium'
# 低風險
else:
return 'low'
def save_results(results, acc_signatures):
"""儲存結果"""
# 分類風險
for r in results:
r['risk_level'] = classify_risk(r)
# 統計
risk_counts = defaultdict(int)
for r in results:
risk_counts[r['risk_level']] += 1
summary = {
'generated_at': datetime.now().isoformat(),
'total_accountants': len(results),
'risk_distribution': dict(risk_counts),
'high_risk_count': risk_counts['high'],
'medium_risk_count': risk_counts['medium'],
'low_risk_count': risk_counts['low'],
}
# 按風險排序
results_sorted = sorted(results, key=lambda x: (-x.get('ratio_gt_95', 0), -x.get('mean_sim', 0)))
# 儲存 JSON
output = {
'summary': summary,
'accountants': results_sorted
}
json_path = f"{REPORT_DIR}/accountant_similarity_analysis.json"
with open(json_path, 'w', encoding='utf-8') as f:
json.dump(output, f, ensure_ascii=False, indent=2)
print(f"已儲存: {json_path}")
# 儲存 Markdown 報告
md_path = f"{REPORT_DIR}/accountant_similarity_analysis.md"
with open(md_path, 'w', encoding='utf-8') as f:
f.write("# 會計師簽名相似度分析報告\n\n")
f.write(f"生成時間: {summary['generated_at']}\n\n")
f.write("## 摘要\n\n")
f.write(f"| 指標 | 數值 |\n")
f.write(f"|------|------|\n")
f.write(f"| 總會計師數 | {summary['total_accountants']} |\n")
f.write(f"| 高風險 | {risk_counts['high']} |\n")
f.write(f"| 中風險 | {risk_counts['medium']} |\n")
f.write(f"| 低風險 | {risk_counts['low']} |\n")
f.write("\n## 風險分類標準\n\n")
f.write("- **高風險**: >5% 的簽名對相似度 >0.99,或 >30% 的簽名對相似度 >0.95\n")
f.write("- **中風險**: >10% 的簽名對相似度 >0.95,或平均相似度 >0.85\n")
f.write("- **低風險**: 其他情況\n")
f.write("\n## 高風險會計師 (Top 30)\n\n")
f.write("| 排名 | 姓名 | 事務所 | 簽名數 | 平均相似度 | >0.95比例 | >0.99比例 |\n")
f.write("|------|------|--------|--------|------------|-----------|----------|\n")
high_risk = [r for r in results_sorted if r['risk_level'] == 'high']
for i, r in enumerate(high_risk[:30], 1):
f.write(f"| {i} | {r['name']} | {r['firm'] or '-'} | {r['signature_count']} | ")
f.write(f"{r['mean_sim']:.3f} | {r['ratio_gt_95']*100:.1f}% | {r['ratio_gt_99']*100:.1f}% |\n")
f.write("\n## 所有會計師統計分布\n\n")
# 平均相似度分布
mean_sims = [r['mean_sim'] for r in results]
f.write("### 平均相似度分布\n\n")
f.write(f"- 最小: {min(mean_sims):.3f}\n")
f.write(f"- 最大: {max(mean_sims):.3f}\n")
f.write(f"- 平均: {np.mean(mean_sims):.3f}\n")
f.write(f"- 中位數: {np.median(mean_sims):.3f}\n")
print(f"已儲存: {md_path}")
return summary, results_sorted
def update_database(results):
"""更新資料庫,添加風險等級"""
conn = sqlite3.connect(DB_PATH)
cur = conn.cursor()
# 添加欄位
try:
cur.execute("ALTER TABLE accountants ADD COLUMN risk_level TEXT")
cur.execute("ALTER TABLE accountants ADD COLUMN mean_similarity REAL")
cur.execute("ALTER TABLE accountants ADD COLUMN ratio_gt_95 REAL")
except:
pass # 欄位已存在
# 更新
for r in results:
cur.execute("""
UPDATE accountants
SET risk_level = ?, mean_similarity = ?, ratio_gt_95 = ?
WHERE accountant_id = ?
""", (r['risk_level'], r['mean_sim'], r['ratio_gt_95'], r['accountant_id']))
conn.commit()
conn.close()
print("資料庫已更新")
def main():
print("=" * 60)
print("第三階段:同人簽名聚類分析")
print("=" * 60)
# 載入資料
features, sig_id_to_idx, acc_signatures, acc_info = load_data()
print(f"會計師數: {len(acc_signatures)}")
# 分析所有會計師
print("\n開始分析...")
results = analyze_all_accountants(features, sig_id_to_idx, acc_signatures, acc_info)
# 儲存結果
print("\n儲存結果...")
summary, results_sorted = save_results(results, acc_signatures)
# 更新資料庫
update_database(results_sorted)
print("\n" + "=" * 60)
print("完成!")
print("=" * 60)
print(f"總會計師: {summary['total_accountants']}")
print(f"高風險: {summary['high_risk_count']}")
print(f"中風險: {summary['medium_risk_count']}")
print(f"低風險: {summary['low_risk_count']}")
if __name__ == '__main__':
main()