#!/usr/bin/env python3 """ Step 3: 相似度分布探索 1. 隨機抽樣 100,000 對簽名 2. 計算 cosine similarity 3. 繪製直方圖分布 4. 找出高相似度對 (>0.95) 5. 分析高相似度對的來源 """ import numpy as np import matplotlib.pyplot as plt import seaborn as sns from pathlib import Path from tqdm import tqdm import random from collections import defaultdict import json # 路徑配置 OUTPUT_DIR = Path("/Volumes/NV2/PDF-Processing/signature-analysis") FEATURES_PATH = OUTPUT_DIR / "features" / "signature_features.npy" FILENAMES_PATH = OUTPUT_DIR / "features" / "signature_filenames.txt" REPORTS_PATH = OUTPUT_DIR / "reports" # 分析配置 NUM_RANDOM_PAIRS = 100000 HIGH_SIMILARITY_THRESHOLD = 0.95 VERY_HIGH_SIMILARITY_THRESHOLD = 0.99 def load_data(): """載入特徵向量和檔名""" print("載入特徵向量...") features = np.load(FEATURES_PATH) print(f"特徵矩陣形狀: {features.shape}") print("載入檔名列表...") with open(FILENAMES_PATH, 'r') as f: filenames = [line.strip() for line in f.readlines()] print(f"檔名數量: {len(filenames)}") return features, filenames def parse_filename(filename: str) -> dict: """解析檔名提取資訊""" # 範例: 201301_2458_AI1_page4_sig1.png parts = filename.replace('.png', '').split('_') if len(parts) >= 5: return { 'year_month': parts[0], 'serial': parts[1], 'doc_type': parts[2], 'page': parts[3].replace('page', ''), 'sig_index': parts[4].replace('sig', '') } return {'raw': filename} def cosine_similarity(v1, v2): """計算餘弦相似度(向量已 L2 正規化)""" return np.dot(v1, v2) def random_sampling_analysis(features, filenames, n_pairs=100000): """隨機抽樣計算相似度分布""" print(f"\n隨機抽樣 {n_pairs:,} 對簽名...") n = len(filenames) similarities = [] pair_indices = [] # 產生隨機配對 for _ in tqdm(range(n_pairs), desc="計算相似度"): i, j = random.sample(range(n), 2) sim = cosine_similarity(features[i], features[j]) similarities.append(sim) pair_indices.append((i, j)) return np.array(similarities), pair_indices def find_high_similarity_pairs(features, filenames, threshold=0.95, sample_size=100000): """找出高相似度的簽名對""" print(f"\n搜尋相似度 > {threshold} 的簽名對...") n = len(filenames) high_sim_pairs = [] # 使用隨機抽樣找高相似度對 # 由於全量計算太慢 (n^2 = 33 billion pairs),採用抽樣策略 for _ in tqdm(range(sample_size), desc="搜尋高相似度"): i, j = random.sample(range(n), 2) sim = cosine_similarity(features[i], features[j]) if sim > threshold: high_sim_pairs.append({ 'idx1': i, 'idx2': j, 'file1': filenames[i], 'file2': filenames[j], 'similarity': float(sim), 'parsed1': parse_filename(filenames[i]), 'parsed2': parse_filename(filenames[j]) }) return high_sim_pairs def systematic_high_similarity_search(features, filenames, threshold=0.95, batch_size=1000): """ 更系統化的高相似度搜尋: 對每個簽名,找出與它最相似的其他簽名 """ print(f"\n系統化搜尋高相似度對 (threshold={threshold})...") print("這會對每個簽名找出最相似的候選...") n = len(filenames) high_sim_pairs = [] seen_pairs = set() # 隨機抽樣一部分簽名作為查詢 sample_indices = random.sample(range(n), min(5000, n)) for idx in tqdm(sample_indices, desc="搜尋"): # 計算這個簽名與所有其他簽名的相似度 # 使用矩陣運算加速 sims = features @ features[idx] # 找出高於閾值的(排除自己) high_sim_idx = np.where(sims > threshold)[0] for j in high_sim_idx: if j != idx: pair_key = tuple(sorted([idx, int(j)])) if pair_key not in seen_pairs: seen_pairs.add(pair_key) high_sim_pairs.append({ 'idx1': int(idx), 'idx2': int(j), 'file1': filenames[idx], 'file2': filenames[int(j)], 'similarity': float(sims[j]), 'parsed1': parse_filename(filenames[idx]), 'parsed2': parse_filename(filenames[int(j)]) }) return high_sim_pairs def analyze_high_similarity_sources(high_sim_pairs): """分析高相似度對的來源特徵""" print("\n分析高相似度對的來源...") stats = { 'same_pdf': 0, 'same_year_month': 0, 'same_doc_type': 0, 'different_everything': 0, 'total': len(high_sim_pairs) } for pair in high_sim_pairs: p1, p2 = pair.get('parsed1', {}), pair.get('parsed2', {}) # 同一 PDF if p1.get('year_month') == p2.get('year_month') and \ p1.get('serial') == p2.get('serial') and \ p1.get('doc_type') == p2.get('doc_type'): stats['same_pdf'] += 1 # 同月份 elif p1.get('year_month') == p2.get('year_month'): stats['same_year_month'] += 1 # 同類型 elif p1.get('doc_type') == p2.get('doc_type'): stats['same_doc_type'] += 1 else: stats['different_everything'] += 1 return stats def plot_similarity_distribution(similarities, output_path): """繪製相似度分布圖""" print("\n繪製分布圖...") try: # 轉換為 Python list 完全避免 numpy 問題 sim_list = similarities.tolist() fig, axes = plt.subplots(1, 2, figsize=(14, 5)) # 左圖:完整分布 - 使用 range 指定 bins ax1 = axes[0] ax1.hist(sim_list, bins=np.linspace(min(sim_list), max(sim_list), 101).tolist(), density=True, alpha=0.7, color='steelblue', edgecolor='white') ax1.axvline(x=0.95, color='red', linestyle='--', label='0.95 threshold') ax1.axvline(x=0.99, color='darkred', linestyle='--', label='0.99 threshold') ax1.set_xlabel('Cosine Similarity', fontsize=12) ax1.set_ylabel('Density', fontsize=12) ax1.set_title('Signature Similarity Distribution (Random Sampling)', fontsize=14) ax1.legend() # 統計標註 mean_sim = float(np.mean(similarities)) std_sim = float(np.std(similarities)) ax1.annotate(f'Mean: {mean_sim:.4f}\nStd: {std_sim:.4f}', xy=(0.02, 0.95), xycoords='axes fraction', fontsize=10, verticalalignment='top', bbox=dict(boxstyle='round', facecolor='wheat', alpha=0.5)) # 右圖:高相似度區域放大 ax2 = axes[1] high_sim_list = [x for x in sim_list if x > 0.8] if len(high_sim_list) > 0: ax2.hist(high_sim_list, bins=np.linspace(0.8, max(high_sim_list), 51).tolist(), density=True, alpha=0.7, color='coral', edgecolor='white') ax2.axvline(x=0.95, color='red', linestyle='--', label='0.95 threshold') ax2.axvline(x=0.99, color='darkred', linestyle='--', label='0.99 threshold') ax2.set_xlabel('Cosine Similarity', fontsize=12) ax2.set_ylabel('Density', fontsize=12) ax2.set_title('High Similarity Region (> 0.8)', fontsize=14) ax2.legend() # 高相似度統計 pct_95 = int((similarities > 0.95).sum()) / len(similarities) * 100 pct_99 = int((similarities > 0.99).sum()) / len(similarities) * 100 ax2.annotate(f'> 0.95: {pct_95:.4f}%\n> 0.99: {pct_99:.4f}%', xy=(0.98, 0.95), xycoords='axes fraction', fontsize=10, verticalalignment='top', horizontalalignment='right', bbox=dict(boxstyle='round', facecolor='wheat', alpha=0.5)) plt.tight_layout() plt.savefig(output_path, dpi=150, bbox_inches='tight') plt.close() print(f"分布圖已儲存: {output_path}") except Exception as e: print(f"繪圖失敗: {e}") print("跳過繪圖,繼續其他分析...") def generate_statistics_report(similarities, high_sim_pairs, source_stats, output_path): """生成統計報告""" report = { 'random_sampling': { 'n_pairs': len(similarities), 'mean': float(np.mean(similarities)), 'std': float(np.std(similarities)), 'min': float(np.min(similarities)), 'max': float(np.max(similarities)), 'percentiles': { '25%': float(np.percentile(similarities, 25)), '50%': float(np.percentile(similarities, 50)), '75%': float(np.percentile(similarities, 75)), '90%': float(np.percentile(similarities, 90)), '95%': float(np.percentile(similarities, 95)), '99%': float(np.percentile(similarities, 99)), }, 'above_thresholds': { '>0.90': int((similarities > 0.90).sum()), '>0.95': int((similarities > 0.95).sum()), '>0.99': int((similarities > 0.99).sum()), } }, 'high_similarity_search': { 'threshold': HIGH_SIMILARITY_THRESHOLD, 'pairs_found': len(high_sim_pairs), 'source_analysis': source_stats, 'top_10_pairs': sorted(high_sim_pairs, key=lambda x: x['similarity'], reverse=True)[:10] } } with open(output_path, 'w', encoding='utf-8') as f: json.dump(report, f, indent=2, ensure_ascii=False) print(f"統計報告已儲存: {output_path}") return report def print_summary(report): """印出摘要""" print("\n" + "=" * 70) print("相似度分布分析摘要") print("=" * 70) rs = report['random_sampling'] print(f"\n隨機抽樣統計 ({rs['n_pairs']:,} 對):") print(f" 平均相似度: {rs['mean']:.4f}") print(f" 標準差: {rs['std']:.4f}") print(f" 範圍: [{rs['min']:.4f}, {rs['max']:.4f}]") print(f"\n百分位數:") for k, v in rs['percentiles'].items(): print(f" {k}: {v:.4f}") print(f"\n高相似度對數量:") for k, v in rs['above_thresholds'].items(): pct = v / rs['n_pairs'] * 100 print(f" {k}: {v:,} ({pct:.4f}%)") hs = report['high_similarity_search'] print(f"\n系統化搜尋結果 (threshold={hs['threshold']}):") print(f" 發現高相似度對: {hs['pairs_found']:,}") if hs['source_analysis']['total'] > 0: sa = hs['source_analysis'] print(f"\n來源分析:") print(f" 同一 PDF: {sa['same_pdf']} ({sa['same_pdf']/sa['total']*100:.1f}%)") print(f" 同月份: {sa['same_year_month']} ({sa['same_year_month']/sa['total']*100:.1f}%)") print(f" 同類型: {sa['same_doc_type']} ({sa['same_doc_type']/sa['total']*100:.1f}%)") print(f" 完全不同: {sa['different_everything']} ({sa['different_everything']/sa['total']*100:.1f}%)") if hs['top_10_pairs']: print(f"\nTop 10 高相似度對:") for i, pair in enumerate(hs['top_10_pairs'], 1): print(f" {i}. {pair['similarity']:.4f}") print(f" {pair['file1']}") print(f" {pair['file2']}") def main(): print("=" * 70) print("Step 3: 相似度分布探索") print("=" * 70) # 確保輸出目錄存在 REPORTS_PATH.mkdir(parents=True, exist_ok=True) # 載入資料 features, filenames = load_data() # 隨機抽樣分析 similarities, pair_indices = random_sampling_analysis(features, filenames, NUM_RANDOM_PAIRS) # 繪製分布圖 plot_similarity_distribution( similarities, REPORTS_PATH / "similarity_distribution.png" ) # 系統化搜尋高相似度對 high_sim_pairs = systematic_high_similarity_search( features, filenames, threshold=HIGH_SIMILARITY_THRESHOLD ) # 分析來源 source_stats = analyze_high_similarity_sources(high_sim_pairs) # 生成報告 report = generate_statistics_report( similarities, high_sim_pairs, source_stats, REPORTS_PATH / "similarity_statistics.json" ) # 儲存高相似度對列表 high_sim_output = REPORTS_PATH / "high_similarity_pairs.json" with open(high_sim_output, 'w', encoding='utf-8') as f: json.dump(high_sim_pairs, f, indent=2, ensure_ascii=False) print(f"高相似度對列表已儲存: {high_sim_output}") # 印出摘要 print_summary(report) if __name__ == "__main__": main()