939a348da4
Paper draft includes all sections (Abstract through Conclusion), 36 references, and supporting scripts. Key methodology: Cosine similarity + dHash dual-method verification with thresholds calibrated against known-replication firm (Firm A). Includes: - 8 section markdown files (paper_a_*.md) - Ablation study script (ResNet-50 vs VGG-16 vs EfficientNet-B0) - Recalibrated classification script (84,386 PDFs, 5-tier system) - Figure generation and Word export scripts - Citation renumbering script ([1]-[36]) - Signature analysis pipeline (12 steps) - YOLO extraction scripts Three rounds of AI review completed (GPT-5.4, Claude Opus 4.6, Gemini 3 Pro). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
273 lines
8.7 KiB
Python
273 lines
8.7 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
第三階段:同人簽名聚類分析
|
|
|
|
對每位會計師的簽名進行相似度分析,判斷是否有「複製貼上」行為。
|
|
"""
|
|
|
|
import sqlite3
|
|
import numpy as np
|
|
import json
|
|
from collections import defaultdict
|
|
from datetime import datetime
|
|
from tqdm import tqdm
|
|
|
|
DB_PATH = '/Volumes/NV2/PDF-Processing/signature-analysis/signature_analysis.db'
|
|
FEATURES_PATH = '/Volumes/NV2/PDF-Processing/signature-analysis/features/signature_features.npy'
|
|
REPORT_DIR = '/Volumes/NV2/PDF-Processing/signature-analysis/reports'
|
|
|
|
|
|
def load_data():
|
|
"""載入特徵向量和會計師分配"""
|
|
print("載入特徵向量...")
|
|
features = np.load(FEATURES_PATH)
|
|
print(f"特徵矩陣形狀: {features.shape}")
|
|
|
|
conn = sqlite3.connect(DB_PATH)
|
|
cur = conn.cursor()
|
|
|
|
# 取得所有 signature_id 順序(與特徵向量對應)
|
|
cur.execute("SELECT signature_id FROM signatures ORDER BY signature_id")
|
|
all_sig_ids = [row[0] for row in cur.fetchall()]
|
|
sig_id_to_idx = {sig_id: idx for idx, sig_id in enumerate(all_sig_ids)}
|
|
|
|
# 取得有效簽名的會計師分配
|
|
cur.execute("""
|
|
SELECT s.signature_id, s.assigned_accountant, s.accountant_id, a.name, a.firm
|
|
FROM signatures s
|
|
LEFT JOIN accountants a ON s.accountant_id = a.accountant_id
|
|
WHERE s.is_valid = 1 AND s.assigned_accountant IS NOT NULL
|
|
ORDER BY s.signature_id
|
|
""")
|
|
|
|
acc_signatures = defaultdict(list)
|
|
acc_info = {}
|
|
|
|
for row in cur.fetchall():
|
|
sig_id, _, acc_id, acc_name, firm = row
|
|
if acc_id and sig_id in sig_id_to_idx:
|
|
acc_signatures[acc_id].append(sig_id)
|
|
if acc_id not in acc_info:
|
|
acc_info[acc_id] = {'name': acc_name, 'firm': firm}
|
|
|
|
conn.close()
|
|
|
|
return features, sig_id_to_idx, acc_signatures, acc_info
|
|
|
|
|
|
def compute_similarity_stats(features, sig_ids, sig_id_to_idx):
|
|
"""計算一組簽名的相似度統計"""
|
|
if len(sig_ids) < 2:
|
|
return None
|
|
|
|
# 取得特徵
|
|
indices = [sig_id_to_idx[sid] for sid in sig_ids]
|
|
feat = features[indices]
|
|
|
|
# 正規化
|
|
norms = np.linalg.norm(feat, axis=1, keepdims=True)
|
|
norms[norms == 0] = 1
|
|
feat_norm = feat / norms
|
|
|
|
# 計算餘弦相似度矩陣
|
|
sim_matrix = np.dot(feat_norm, feat_norm.T)
|
|
|
|
# 取上三角(排除對角線)
|
|
upper_tri = sim_matrix[np.triu_indices(len(sim_matrix), k=1)]
|
|
|
|
if len(upper_tri) == 0:
|
|
return None
|
|
|
|
# 統計
|
|
stats = {
|
|
'total_pairs': len(upper_tri),
|
|
'min_sim': float(upper_tri.min()),
|
|
'max_sim': float(upper_tri.max()),
|
|
'mean_sim': float(upper_tri.mean()),
|
|
'std_sim': float(upper_tri.std()),
|
|
'pairs_gt_90': int((upper_tri > 0.90).sum()),
|
|
'pairs_gt_95': int((upper_tri > 0.95).sum()),
|
|
'pairs_gt_99': int((upper_tri > 0.99).sum()),
|
|
}
|
|
|
|
# 計算比例
|
|
stats['ratio_gt_90'] = stats['pairs_gt_90'] / stats['total_pairs']
|
|
stats['ratio_gt_95'] = stats['pairs_gt_95'] / stats['total_pairs']
|
|
stats['ratio_gt_99'] = stats['pairs_gt_99'] / stats['total_pairs']
|
|
|
|
return stats
|
|
|
|
|
|
def analyze_all_accountants(features, sig_id_to_idx, acc_signatures, acc_info):
|
|
"""分析所有會計師"""
|
|
results = []
|
|
|
|
for acc_id, sig_ids in tqdm(acc_signatures.items(), desc="分析會計師"):
|
|
info = acc_info.get(acc_id, {})
|
|
stats = compute_similarity_stats(features, sig_ids, sig_id_to_idx)
|
|
|
|
if stats:
|
|
result = {
|
|
'accountant_id': acc_id,
|
|
'name': info.get('name', ''),
|
|
'firm': info.get('firm', ''),
|
|
'signature_count': len(sig_ids),
|
|
**stats
|
|
}
|
|
results.append(result)
|
|
|
|
return results
|
|
|
|
|
|
def classify_risk(result):
|
|
"""分類風險等級"""
|
|
ratio_95 = result.get('ratio_gt_95', 0)
|
|
ratio_99 = result.get('ratio_gt_99', 0)
|
|
mean_sim = result.get('mean_sim', 0)
|
|
|
|
# 高風險:大量高相似度對
|
|
if ratio_99 > 0.05 or ratio_95 > 0.3:
|
|
return 'high'
|
|
# 中風險
|
|
elif ratio_95 > 0.1 or mean_sim > 0.85:
|
|
return 'medium'
|
|
# 低風險
|
|
else:
|
|
return 'low'
|
|
|
|
|
|
def save_results(results, acc_signatures):
|
|
"""儲存結果"""
|
|
# 分類風險
|
|
for r in results:
|
|
r['risk_level'] = classify_risk(r)
|
|
|
|
# 統計
|
|
risk_counts = defaultdict(int)
|
|
for r in results:
|
|
risk_counts[r['risk_level']] += 1
|
|
|
|
summary = {
|
|
'generated_at': datetime.now().isoformat(),
|
|
'total_accountants': len(results),
|
|
'risk_distribution': dict(risk_counts),
|
|
'high_risk_count': risk_counts['high'],
|
|
'medium_risk_count': risk_counts['medium'],
|
|
'low_risk_count': risk_counts['low'],
|
|
}
|
|
|
|
# 按風險排序
|
|
results_sorted = sorted(results, key=lambda x: (-x.get('ratio_gt_95', 0), -x.get('mean_sim', 0)))
|
|
|
|
# 儲存 JSON
|
|
output = {
|
|
'summary': summary,
|
|
'accountants': results_sorted
|
|
}
|
|
|
|
json_path = f"{REPORT_DIR}/accountant_similarity_analysis.json"
|
|
with open(json_path, 'w', encoding='utf-8') as f:
|
|
json.dump(output, f, ensure_ascii=False, indent=2)
|
|
print(f"已儲存: {json_path}")
|
|
|
|
# 儲存 Markdown 報告
|
|
md_path = f"{REPORT_DIR}/accountant_similarity_analysis.md"
|
|
with open(md_path, 'w', encoding='utf-8') as f:
|
|
f.write("# 會計師簽名相似度分析報告\n\n")
|
|
f.write(f"生成時間: {summary['generated_at']}\n\n")
|
|
|
|
f.write("## 摘要\n\n")
|
|
f.write(f"| 指標 | 數值 |\n")
|
|
f.write(f"|------|------|\n")
|
|
f.write(f"| 總會計師數 | {summary['total_accountants']} |\n")
|
|
f.write(f"| 高風險 | {risk_counts['high']} |\n")
|
|
f.write(f"| 中風險 | {risk_counts['medium']} |\n")
|
|
f.write(f"| 低風險 | {risk_counts['low']} |\n")
|
|
|
|
f.write("\n## 風險分類標準\n\n")
|
|
f.write("- **高風險**: >5% 的簽名對相似度 >0.99,或 >30% 的簽名對相似度 >0.95\n")
|
|
f.write("- **中風險**: >10% 的簽名對相似度 >0.95,或平均相似度 >0.85\n")
|
|
f.write("- **低風險**: 其他情況\n")
|
|
|
|
f.write("\n## 高風險會計師 (Top 30)\n\n")
|
|
f.write("| 排名 | 姓名 | 事務所 | 簽名數 | 平均相似度 | >0.95比例 | >0.99比例 |\n")
|
|
f.write("|------|------|--------|--------|------------|-----------|----------|\n")
|
|
|
|
high_risk = [r for r in results_sorted if r['risk_level'] == 'high']
|
|
for i, r in enumerate(high_risk[:30], 1):
|
|
f.write(f"| {i} | {r['name']} | {r['firm'] or '-'} | {r['signature_count']} | ")
|
|
f.write(f"{r['mean_sim']:.3f} | {r['ratio_gt_95']*100:.1f}% | {r['ratio_gt_99']*100:.1f}% |\n")
|
|
|
|
f.write("\n## 所有會計師統計分布\n\n")
|
|
|
|
# 平均相似度分布
|
|
mean_sims = [r['mean_sim'] for r in results]
|
|
f.write("### 平均相似度分布\n\n")
|
|
f.write(f"- 最小: {min(mean_sims):.3f}\n")
|
|
f.write(f"- 最大: {max(mean_sims):.3f}\n")
|
|
f.write(f"- 平均: {np.mean(mean_sims):.3f}\n")
|
|
f.write(f"- 中位數: {np.median(mean_sims):.3f}\n")
|
|
|
|
print(f"已儲存: {md_path}")
|
|
|
|
return summary, results_sorted
|
|
|
|
|
|
def update_database(results):
|
|
"""更新資料庫,添加風險等級"""
|
|
conn = sqlite3.connect(DB_PATH)
|
|
cur = conn.cursor()
|
|
|
|
# 添加欄位
|
|
try:
|
|
cur.execute("ALTER TABLE accountants ADD COLUMN risk_level TEXT")
|
|
cur.execute("ALTER TABLE accountants ADD COLUMN mean_similarity REAL")
|
|
cur.execute("ALTER TABLE accountants ADD COLUMN ratio_gt_95 REAL")
|
|
except:
|
|
pass # 欄位已存在
|
|
|
|
# 更新
|
|
for r in results:
|
|
cur.execute("""
|
|
UPDATE accountants
|
|
SET risk_level = ?, mean_similarity = ?, ratio_gt_95 = ?
|
|
WHERE accountant_id = ?
|
|
""", (r['risk_level'], r['mean_sim'], r['ratio_gt_95'], r['accountant_id']))
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
print("資料庫已更新")
|
|
|
|
|
|
def main():
|
|
print("=" * 60)
|
|
print("第三階段:同人簽名聚類分析")
|
|
print("=" * 60)
|
|
|
|
# 載入資料
|
|
features, sig_id_to_idx, acc_signatures, acc_info = load_data()
|
|
print(f"會計師數: {len(acc_signatures)}")
|
|
|
|
# 分析所有會計師
|
|
print("\n開始分析...")
|
|
results = analyze_all_accountants(features, sig_id_to_idx, acc_signatures, acc_info)
|
|
|
|
# 儲存結果
|
|
print("\n儲存結果...")
|
|
summary, results_sorted = save_results(results, acc_signatures)
|
|
|
|
# 更新資料庫
|
|
update_database(results_sorted)
|
|
|
|
print("\n" + "=" * 60)
|
|
print("完成!")
|
|
print("=" * 60)
|
|
print(f"總會計師: {summary['total_accountants']}")
|
|
print(f"高風險: {summary['high_risk_count']}")
|
|
print(f"中風險: {summary['medium_risk_count']}")
|
|
print(f"低風險: {summary['low_risk_count']}")
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|