939a348da4
Paper draft includes all sections (Abstract through Conclusion), 36 references, and supporting scripts. Key methodology: Cosine similarity + dHash dual-method verification with thresholds calibrated against known-replication firm (Firm A). Includes: - 8 section markdown files (paper_a_*.md) - Ablation study script (ResNet-50 vs VGG-16 vs EfficientNet-B0) - Recalibrated classification script (84,386 PDFs, 5-tier system) - Figure generation and Word export scripts - Citation renumbering script ([1]-[36]) - Signature analysis pipeline (12 steps) - YOLO extraction scripts Three rounds of AI review completed (GPT-5.4, Claude Opus 4.6, Gemini 3 Pro). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
369 lines
13 KiB
Python
369 lines
13 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Step 3: 相似度分布探索
|
|
|
|
1. 隨機抽樣 100,000 對簽名
|
|
2. 計算 cosine similarity
|
|
3. 繪製直方圖分布
|
|
4. 找出高相似度對 (>0.95)
|
|
5. 分析高相似度對的來源
|
|
"""
|
|
|
|
import numpy as np
|
|
import matplotlib.pyplot as plt
|
|
import seaborn as sns
|
|
from pathlib import Path
|
|
from tqdm import tqdm
|
|
import random
|
|
from collections import defaultdict
|
|
import json
|
|
|
|
# 路徑配置
|
|
OUTPUT_DIR = Path("/Volumes/NV2/PDF-Processing/signature-analysis")
|
|
FEATURES_PATH = OUTPUT_DIR / "features" / "signature_features.npy"
|
|
FILENAMES_PATH = OUTPUT_DIR / "features" / "signature_filenames.txt"
|
|
REPORTS_PATH = OUTPUT_DIR / "reports"
|
|
|
|
# 分析配置
|
|
NUM_RANDOM_PAIRS = 100000
|
|
HIGH_SIMILARITY_THRESHOLD = 0.95
|
|
VERY_HIGH_SIMILARITY_THRESHOLD = 0.99
|
|
|
|
|
|
def load_data():
|
|
"""載入特徵向量和檔名"""
|
|
print("載入特徵向量...")
|
|
features = np.load(FEATURES_PATH)
|
|
print(f"特徵矩陣形狀: {features.shape}")
|
|
|
|
print("載入檔名列表...")
|
|
with open(FILENAMES_PATH, 'r') as f:
|
|
filenames = [line.strip() for line in f.readlines()]
|
|
print(f"檔名數量: {len(filenames)}")
|
|
|
|
return features, filenames
|
|
|
|
|
|
def parse_filename(filename: str) -> dict:
|
|
"""解析檔名提取資訊"""
|
|
# 範例: 201301_2458_AI1_page4_sig1.png
|
|
parts = filename.replace('.png', '').split('_')
|
|
if len(parts) >= 5:
|
|
return {
|
|
'year_month': parts[0],
|
|
'serial': parts[1],
|
|
'doc_type': parts[2],
|
|
'page': parts[3].replace('page', ''),
|
|
'sig_index': parts[4].replace('sig', '')
|
|
}
|
|
return {'raw': filename}
|
|
|
|
|
|
def cosine_similarity(v1, v2):
|
|
"""計算餘弦相似度(向量已 L2 正規化)"""
|
|
return np.dot(v1, v2)
|
|
|
|
|
|
def random_sampling_analysis(features, filenames, n_pairs=100000):
|
|
"""隨機抽樣計算相似度分布"""
|
|
print(f"\n隨機抽樣 {n_pairs:,} 對簽名...")
|
|
|
|
n = len(filenames)
|
|
similarities = []
|
|
pair_indices = []
|
|
|
|
# 產生隨機配對
|
|
for _ in tqdm(range(n_pairs), desc="計算相似度"):
|
|
i, j = random.sample(range(n), 2)
|
|
sim = cosine_similarity(features[i], features[j])
|
|
similarities.append(sim)
|
|
pair_indices.append((i, j))
|
|
|
|
return np.array(similarities), pair_indices
|
|
|
|
|
|
def find_high_similarity_pairs(features, filenames, threshold=0.95, sample_size=100000):
|
|
"""找出高相似度的簽名對"""
|
|
print(f"\n搜尋相似度 > {threshold} 的簽名對...")
|
|
|
|
n = len(filenames)
|
|
high_sim_pairs = []
|
|
|
|
# 使用隨機抽樣找高相似度對
|
|
# 由於全量計算太慢 (n^2 = 33 billion pairs),採用抽樣策略
|
|
for _ in tqdm(range(sample_size), desc="搜尋高相似度"):
|
|
i, j = random.sample(range(n), 2)
|
|
sim = cosine_similarity(features[i], features[j])
|
|
if sim > threshold:
|
|
high_sim_pairs.append({
|
|
'idx1': i,
|
|
'idx2': j,
|
|
'file1': filenames[i],
|
|
'file2': filenames[j],
|
|
'similarity': float(sim),
|
|
'parsed1': parse_filename(filenames[i]),
|
|
'parsed2': parse_filename(filenames[j])
|
|
})
|
|
|
|
return high_sim_pairs
|
|
|
|
|
|
def systematic_high_similarity_search(features, filenames, threshold=0.95, batch_size=1000):
|
|
"""
|
|
更系統化的高相似度搜尋:
|
|
對每個簽名,找出與它最相似的其他簽名
|
|
"""
|
|
print(f"\n系統化搜尋高相似度對 (threshold={threshold})...")
|
|
print("這會對每個簽名找出最相似的候選...")
|
|
|
|
n = len(filenames)
|
|
high_sim_pairs = []
|
|
seen_pairs = set()
|
|
|
|
# 隨機抽樣一部分簽名作為查詢
|
|
sample_indices = random.sample(range(n), min(5000, n))
|
|
|
|
for idx in tqdm(sample_indices, desc="搜尋"):
|
|
# 計算這個簽名與所有其他簽名的相似度
|
|
# 使用矩陣運算加速
|
|
sims = features @ features[idx]
|
|
|
|
# 找出高於閾值的(排除自己)
|
|
high_sim_idx = np.where(sims > threshold)[0]
|
|
|
|
for j in high_sim_idx:
|
|
if j != idx:
|
|
pair_key = tuple(sorted([idx, int(j)]))
|
|
if pair_key not in seen_pairs:
|
|
seen_pairs.add(pair_key)
|
|
high_sim_pairs.append({
|
|
'idx1': int(idx),
|
|
'idx2': int(j),
|
|
'file1': filenames[idx],
|
|
'file2': filenames[int(j)],
|
|
'similarity': float(sims[j]),
|
|
'parsed1': parse_filename(filenames[idx]),
|
|
'parsed2': parse_filename(filenames[int(j)])
|
|
})
|
|
|
|
return high_sim_pairs
|
|
|
|
|
|
def analyze_high_similarity_sources(high_sim_pairs):
|
|
"""分析高相似度對的來源特徵"""
|
|
print("\n分析高相似度對的來源...")
|
|
|
|
stats = {
|
|
'same_pdf': 0,
|
|
'same_year_month': 0,
|
|
'same_doc_type': 0,
|
|
'different_everything': 0,
|
|
'total': len(high_sim_pairs)
|
|
}
|
|
|
|
for pair in high_sim_pairs:
|
|
p1, p2 = pair.get('parsed1', {}), pair.get('parsed2', {})
|
|
|
|
# 同一 PDF
|
|
if p1.get('year_month') == p2.get('year_month') and \
|
|
p1.get('serial') == p2.get('serial') and \
|
|
p1.get('doc_type') == p2.get('doc_type'):
|
|
stats['same_pdf'] += 1
|
|
# 同月份
|
|
elif p1.get('year_month') == p2.get('year_month'):
|
|
stats['same_year_month'] += 1
|
|
# 同類型
|
|
elif p1.get('doc_type') == p2.get('doc_type'):
|
|
stats['same_doc_type'] += 1
|
|
else:
|
|
stats['different_everything'] += 1
|
|
|
|
return stats
|
|
|
|
|
|
def plot_similarity_distribution(similarities, output_path):
|
|
"""繪製相似度分布圖"""
|
|
print("\n繪製分布圖...")
|
|
|
|
try:
|
|
# 轉換為 Python list 完全避免 numpy 問題
|
|
sim_list = similarities.tolist()
|
|
|
|
fig, axes = plt.subplots(1, 2, figsize=(14, 5))
|
|
|
|
# 左圖:完整分布 - 使用 range 指定 bins
|
|
ax1 = axes[0]
|
|
ax1.hist(sim_list, bins=np.linspace(min(sim_list), max(sim_list), 101).tolist(),
|
|
density=True, alpha=0.7, color='steelblue', edgecolor='white')
|
|
ax1.axvline(x=0.95, color='red', linestyle='--', label='0.95 threshold')
|
|
ax1.axvline(x=0.99, color='darkred', linestyle='--', label='0.99 threshold')
|
|
ax1.set_xlabel('Cosine Similarity', fontsize=12)
|
|
ax1.set_ylabel('Density', fontsize=12)
|
|
ax1.set_title('Signature Similarity Distribution (Random Sampling)', fontsize=14)
|
|
ax1.legend()
|
|
|
|
# 統計標註
|
|
mean_sim = float(np.mean(similarities))
|
|
std_sim = float(np.std(similarities))
|
|
ax1.annotate(f'Mean: {mean_sim:.4f}\nStd: {std_sim:.4f}',
|
|
xy=(0.02, 0.95), xycoords='axes fraction',
|
|
fontsize=10, verticalalignment='top',
|
|
bbox=dict(boxstyle='round', facecolor='wheat', alpha=0.5))
|
|
|
|
# 右圖:高相似度區域放大
|
|
ax2 = axes[1]
|
|
high_sim_list = [x for x in sim_list if x > 0.8]
|
|
if len(high_sim_list) > 0:
|
|
ax2.hist(high_sim_list, bins=np.linspace(0.8, max(high_sim_list), 51).tolist(),
|
|
density=True, alpha=0.7, color='coral', edgecolor='white')
|
|
ax2.axvline(x=0.95, color='red', linestyle='--', label='0.95 threshold')
|
|
ax2.axvline(x=0.99, color='darkred', linestyle='--', label='0.99 threshold')
|
|
ax2.set_xlabel('Cosine Similarity', fontsize=12)
|
|
ax2.set_ylabel('Density', fontsize=12)
|
|
ax2.set_title('High Similarity Region (> 0.8)', fontsize=14)
|
|
ax2.legend()
|
|
|
|
# 高相似度統計
|
|
pct_95 = int((similarities > 0.95).sum()) / len(similarities) * 100
|
|
pct_99 = int((similarities > 0.99).sum()) / len(similarities) * 100
|
|
ax2.annotate(f'> 0.95: {pct_95:.4f}%\n> 0.99: {pct_99:.4f}%',
|
|
xy=(0.98, 0.95), xycoords='axes fraction',
|
|
fontsize=10, verticalalignment='top', horizontalalignment='right',
|
|
bbox=dict(boxstyle='round', facecolor='wheat', alpha=0.5))
|
|
|
|
plt.tight_layout()
|
|
plt.savefig(output_path, dpi=150, bbox_inches='tight')
|
|
plt.close()
|
|
|
|
print(f"分布圖已儲存: {output_path}")
|
|
except Exception as e:
|
|
print(f"繪圖失敗: {e}")
|
|
print("跳過繪圖,繼續其他分析...")
|
|
|
|
|
|
def generate_statistics_report(similarities, high_sim_pairs, source_stats, output_path):
|
|
"""生成統計報告"""
|
|
report = {
|
|
'random_sampling': {
|
|
'n_pairs': len(similarities),
|
|
'mean': float(np.mean(similarities)),
|
|
'std': float(np.std(similarities)),
|
|
'min': float(np.min(similarities)),
|
|
'max': float(np.max(similarities)),
|
|
'percentiles': {
|
|
'25%': float(np.percentile(similarities, 25)),
|
|
'50%': float(np.percentile(similarities, 50)),
|
|
'75%': float(np.percentile(similarities, 75)),
|
|
'90%': float(np.percentile(similarities, 90)),
|
|
'95%': float(np.percentile(similarities, 95)),
|
|
'99%': float(np.percentile(similarities, 99)),
|
|
},
|
|
'above_thresholds': {
|
|
'>0.90': int((similarities > 0.90).sum()),
|
|
'>0.95': int((similarities > 0.95).sum()),
|
|
'>0.99': int((similarities > 0.99).sum()),
|
|
}
|
|
},
|
|
'high_similarity_search': {
|
|
'threshold': HIGH_SIMILARITY_THRESHOLD,
|
|
'pairs_found': len(high_sim_pairs),
|
|
'source_analysis': source_stats,
|
|
'top_10_pairs': sorted(high_sim_pairs, key=lambda x: x['similarity'], reverse=True)[:10]
|
|
}
|
|
}
|
|
|
|
with open(output_path, 'w', encoding='utf-8') as f:
|
|
json.dump(report, f, indent=2, ensure_ascii=False)
|
|
|
|
print(f"統計報告已儲存: {output_path}")
|
|
return report
|
|
|
|
|
|
def print_summary(report):
|
|
"""印出摘要"""
|
|
print("\n" + "=" * 70)
|
|
print("相似度分布分析摘要")
|
|
print("=" * 70)
|
|
|
|
rs = report['random_sampling']
|
|
print(f"\n隨機抽樣統計 ({rs['n_pairs']:,} 對):")
|
|
print(f" 平均相似度: {rs['mean']:.4f}")
|
|
print(f" 標準差: {rs['std']:.4f}")
|
|
print(f" 範圍: [{rs['min']:.4f}, {rs['max']:.4f}]")
|
|
print(f"\n百分位數:")
|
|
for k, v in rs['percentiles'].items():
|
|
print(f" {k}: {v:.4f}")
|
|
|
|
print(f"\n高相似度對數量:")
|
|
for k, v in rs['above_thresholds'].items():
|
|
pct = v / rs['n_pairs'] * 100
|
|
print(f" {k}: {v:,} ({pct:.4f}%)")
|
|
|
|
hs = report['high_similarity_search']
|
|
print(f"\n系統化搜尋結果 (threshold={hs['threshold']}):")
|
|
print(f" 發現高相似度對: {hs['pairs_found']:,}")
|
|
|
|
if hs['source_analysis']['total'] > 0:
|
|
sa = hs['source_analysis']
|
|
print(f"\n來源分析:")
|
|
print(f" 同一 PDF: {sa['same_pdf']} ({sa['same_pdf']/sa['total']*100:.1f}%)")
|
|
print(f" 同月份: {sa['same_year_month']} ({sa['same_year_month']/sa['total']*100:.1f}%)")
|
|
print(f" 同類型: {sa['same_doc_type']} ({sa['same_doc_type']/sa['total']*100:.1f}%)")
|
|
print(f" 完全不同: {sa['different_everything']} ({sa['different_everything']/sa['total']*100:.1f}%)")
|
|
|
|
if hs['top_10_pairs']:
|
|
print(f"\nTop 10 高相似度對:")
|
|
for i, pair in enumerate(hs['top_10_pairs'], 1):
|
|
print(f" {i}. {pair['similarity']:.4f}")
|
|
print(f" {pair['file1']}")
|
|
print(f" {pair['file2']}")
|
|
|
|
|
|
def main():
|
|
print("=" * 70)
|
|
print("Step 3: 相似度分布探索")
|
|
print("=" * 70)
|
|
|
|
# 確保輸出目錄存在
|
|
REPORTS_PATH.mkdir(parents=True, exist_ok=True)
|
|
|
|
# 載入資料
|
|
features, filenames = load_data()
|
|
|
|
# 隨機抽樣分析
|
|
similarities, pair_indices = random_sampling_analysis(features, filenames, NUM_RANDOM_PAIRS)
|
|
|
|
# 繪製分布圖
|
|
plot_similarity_distribution(
|
|
similarities,
|
|
REPORTS_PATH / "similarity_distribution.png"
|
|
)
|
|
|
|
# 系統化搜尋高相似度對
|
|
high_sim_pairs = systematic_high_similarity_search(
|
|
features, filenames,
|
|
threshold=HIGH_SIMILARITY_THRESHOLD
|
|
)
|
|
|
|
# 分析來源
|
|
source_stats = analyze_high_similarity_sources(high_sim_pairs)
|
|
|
|
# 生成報告
|
|
report = generate_statistics_report(
|
|
similarities, high_sim_pairs, source_stats,
|
|
REPORTS_PATH / "similarity_statistics.json"
|
|
)
|
|
|
|
# 儲存高相似度對列表
|
|
high_sim_output = REPORTS_PATH / "high_similarity_pairs.json"
|
|
with open(high_sim_output, 'w', encoding='utf-8') as f:
|
|
json.dump(high_sim_pairs, f, indent=2, ensure_ascii=False)
|
|
print(f"高相似度對列表已儲存: {high_sim_output}")
|
|
|
|
# 印出摘要
|
|
print_summary(report)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|