#!/usr/bin/env python3 """ Deloitte (勤業眾信) Signature Similarity Distribution Analysis ============================================================== Evaluate whether Firm A's max_similarity values follow a normal distribution or contain subgroups (e.g., genuinely hand-signed vs digitally stamped). Tests: 1. Descriptive statistics & percentiles 2. Normality tests (Shapiro-Wilk, D'Agostino-Pearson, Anderson-Darling, KS) 3. Histogram + KDE + fitted normal overlay 4. Q-Q plot 5. Multimodality check (Hartigan's dip test approximation) 6. Outlier identification (signatures with unusually low similarity) 7. dHash distance distribution for Firm A Output: figures + report to console """ import sqlite3 import numpy as np import matplotlib matplotlib.use('Agg') import matplotlib.pyplot as plt from scipy import stats from pathlib import Path from collections import Counter DB_PATH = '/Volumes/NV2/PDF-Processing/signature-analysis/signature_analysis.db' OUTPUT_DIR = Path('/Volumes/NV2/PDF-Processing/signature-analysis/reports/deloitte_distribution') OUTPUT_DIR.mkdir(parents=True, exist_ok=True) FIRM_A = '勤業眾信聯合' def load_firm_a_data(): """Load all Firm A signature similarity data.""" conn = sqlite3.connect(DB_PATH) cur = conn.cursor() cur.execute(''' SELECT s.signature_id, s.image_filename, s.assigned_accountant, s.max_similarity_to_same_accountant, s.phash_distance_to_closest FROM signatures s LEFT JOIN accountants a ON s.assigned_accountant = a.name WHERE a.firm = ? AND s.max_similarity_to_same_accountant IS NOT NULL ''', (FIRM_A,)) rows = cur.fetchall() conn.close() data = [] for r in rows: data.append({ 'sig_id': r[0], 'filename': r[1], 'accountant': r[2], 'cosine': r[3], 'phash': r[4], }) return data def descriptive_stats(cosines, label="Firm A Cosine Similarity"): """Print comprehensive descriptive statistics.""" print(f"\n{'='*65}") print(f" {label}") print(f"{'='*65}") print(f" N = {len(cosines):,}") print(f" Mean = {np.mean(cosines):.6f}") print(f" Median = {np.median(cosines):.6f}") print(f" Std Dev = {np.std(cosines):.6f}") print(f" Variance = {np.var(cosines):.8f}") print(f" Min = {np.min(cosines):.6f}") print(f" Max = {np.max(cosines):.6f}") print(f" Range = {np.ptp(cosines):.6f}") print(f" Skewness = {stats.skew(cosines):.4f}") print(f" Kurtosis = {stats.kurtosis(cosines):.4f} (excess)") print(f" IQR = {np.percentile(cosines, 75) - np.percentile(cosines, 25):.6f}") print() print(f" Percentiles:") for p in [1, 5, 10, 25, 50, 75, 90, 95, 99]: print(f" P{p:<3d} = {np.percentile(cosines, p):.6f}") def normality_tests(cosines): """Run multiple normality tests.""" print(f"\n{'='*65}") print(f" NORMALITY TESTS") print(f"{'='*65}") # Shapiro-Wilk (max 5000 samples) if len(cosines) > 5000: sample = np.random.choice(cosines, 5000, replace=False) stat, p = stats.shapiro(sample) print(f"\n Shapiro-Wilk (n=5000 subsample):") else: stat, p = stats.shapiro(cosines) print(f"\n Shapiro-Wilk (n={len(cosines)}):") print(f" W = {stat:.6f}, p = {p:.2e}") print(f" → {'Normal' if p > 0.05 else 'NOT normal'} at α=0.05") # D'Agostino-Pearson if len(cosines) >= 20: stat, p = stats.normaltest(cosines) print(f"\n D'Agostino-Pearson:") print(f" K² = {stat:.4f}, p = {p:.2e}") print(f" → {'Normal' if p > 0.05 else 'NOT normal'} at α=0.05") # Anderson-Darling result = stats.anderson(cosines, dist='norm') print(f"\n Anderson-Darling:") print(f" A² = {result.statistic:.4f}") for i, (sl, cv) in enumerate(zip(result.significance_level, result.critical_values)): reject = "REJECT" if result.statistic > cv else "accept" print(f" {sl}%: critical={cv:.4f} → {reject}") # Kolmogorov-Smirnov against normal mu, sigma = np.mean(cosines), np.std(cosines) stat, p = stats.kstest(cosines, 'norm', args=(mu, sigma)) print(f"\n Kolmogorov-Smirnov (vs fitted normal):") print(f" D = {stat:.6f}, p = {p:.2e}") print(f" → {'Normal' if p > 0.05 else 'NOT normal'} at α=0.05") return mu, sigma def test_alternative_distributions(cosines): """Fit alternative distributions and compare.""" print(f"\n{'='*65}") print(f" DISTRIBUTION FITTING (AIC comparison)") print(f"{'='*65}") distributions = { 'norm': stats.norm, 'skewnorm': stats.skewnorm, 'beta': stats.beta, 'lognorm': stats.lognorm, 'gamma': stats.gamma, } results = [] for name, dist in distributions.items(): try: params = dist.fit(cosines) log_likelihood = np.sum(dist.logpdf(cosines, *params)) k = len(params) aic = 2 * k - 2 * log_likelihood bic = k * np.log(len(cosines)) - 2 * log_likelihood results.append((name, aic, bic, params, log_likelihood)) except Exception as e: print(f" {name}: fit failed ({e})") results.sort(key=lambda x: x[1]) # sort by AIC print(f"\n {'Distribution':<15} {'AIC':>12} {'BIC':>12} {'LogLik':>12}") print(f" {'-'*51}") for name, aic, bic, params, ll in results: marker = " ←best" if name == results[0][0] else "" print(f" {name:<15} {aic:>12.1f} {bic:>12.1f} {ll:>12.1f}{marker}") return results def per_accountant_analysis(data): """Analyze per-accountant distributions within Firm A.""" print(f"\n{'='*65}") print(f" PER-ACCOUNTANT ANALYSIS (within Firm A)") print(f"{'='*65}") by_acct = {} for d in data: by_acct.setdefault(d['accountant'], []).append(d['cosine']) print(f"\n {'Accountant':<20} {'N':>6} {'Mean':>8} {'Std':>8} {'Min':>8} {'P5':>8} {'P50':>8}") print(f" {'-'*66}") acct_stats = [] for acct, vals in sorted(by_acct.items(), key=lambda x: np.mean(x[1])): v = np.array(vals) print(f" {acct:<20} {len(v):>6} {v.mean():>8.4f} {v.std():>8.4f} " f"{v.min():>8.4f} {np.percentile(v, 5):>8.4f} {np.median(v):>8.4f}") acct_stats.append({ 'accountant': acct, 'n': len(v), 'mean': float(v.mean()), 'std': float(v.std()), 'min': float(v.min()), 'values': v, }) # Check if per-accountant means are homogeneous (one-way ANOVA) if len(by_acct) >= 2: groups = [np.array(v) for v in by_acct.values() if len(v) >= 5] if len(groups) >= 2: f_stat, p_val = stats.f_oneway(*groups) print(f"\n One-way ANOVA across accountants:") print(f" F = {f_stat:.4f}, p = {p_val:.2e}") print(f" → {'Homogeneous' if p_val > 0.05 else 'Significantly different means'} at α=0.05") # Levene's test for homogeneity of variance lev_stat, lev_p = stats.levene(*groups) print(f"\n Levene's test (variance homogeneity):") print(f" W = {lev_stat:.4f}, p = {lev_p:.2e}") print(f" → {'Homogeneous variance' if lev_p > 0.05 else 'Heterogeneous variance'} at α=0.05") return acct_stats def identify_outliers(data, cosines): """Identify Firm A signatures with unusually low similarity.""" print(f"\n{'='*65}") print(f" OUTLIER ANALYSIS (low-similarity Firm A signatures)") print(f"{'='*65}") q1 = np.percentile(cosines, 25) q3 = np.percentile(cosines, 75) iqr = q3 - q1 lower_fence = q1 - 1.5 * iqr lower_extreme = q1 - 3.0 * iqr print(f" IQR method: Q1={q1:.4f}, Q3={q3:.4f}, IQR={iqr:.4f}") print(f" Lower fence (mild): {lower_fence:.4f}") print(f" Lower fence (extreme): {lower_extreme:.4f}") outliers = [d for d in data if d['cosine'] < lower_fence] extreme_outliers = [d for d in data if d['cosine'] < lower_extreme] print(f"\n Mild outliers (< {lower_fence:.4f}): {len(outliers)}") print(f" Extreme outliers (< {lower_extreme:.4f}): {len(extreme_outliers)}") if outliers: print(f"\n Bottom 20 by cosine similarity:") sorted_outliers = sorted(outliers, key=lambda x: x['cosine'])[:20] for d in sorted_outliers: phash_str = f"pHash={d['phash']}" if d['phash'] is not None else "pHash=N/A" print(f" cosine={d['cosine']:.4f} {phash_str} {d['accountant']} {d['filename']}") # Also show count below various thresholds print(f"\n Signatures below key thresholds:") for thresh in [0.95, 0.90, 0.85, 0.837, 0.80]: n_below = sum(1 for c in cosines if c < thresh) print(f" < {thresh:.3f}: {n_below:,} ({100*n_below/len(cosines):.2f}%)") def plot_histogram_kde(cosines, mu, sigma): """Plot histogram with KDE and fitted normal overlay.""" fig, axes = plt.subplots(1, 2, figsize=(14, 5)) # Left: Full histogram ax = axes[0] ax.hist(cosines, bins=80, density=True, alpha=0.6, color='steelblue', edgecolor='white', linewidth=0.5, label='Observed') # Fitted normal x = np.linspace(cosines.min() - 0.02, cosines.max() + 0.02, 300) ax.plot(x, stats.norm.pdf(x, mu, sigma), 'r-', lw=2, label=f'Normal fit (μ={mu:.4f}, σ={sigma:.4f})') # KDE kde = stats.gaussian_kde(cosines) ax.plot(x, kde(x), 'g--', lw=2, label='KDE') ax.set_xlabel('Max Cosine Similarity') ax.set_ylabel('Density') ax.set_title(f'Firm A (勤業眾信) Cosine Similarity Distribution (N={len(cosines):,})') ax.legend(fontsize=9) ax.axvline(0.95, color='orange', ls=':', alpha=0.7, label='θ=0.95') ax.axvline(0.837, color='purple', ls=':', alpha=0.7, label='KDE crossover') # Right: Q-Q plot ax2 = axes[1] stats.probplot(cosines, dist='norm', plot=ax2) ax2.set_title('Q-Q Plot (vs Normal)') ax2.get_lines()[0].set_markersize(2) plt.tight_layout() fig.savefig(OUTPUT_DIR / 'firm_a_cosine_distribution.png', dpi=150) print(f"\n Saved: {OUTPUT_DIR / 'firm_a_cosine_distribution.png'}") plt.close() def plot_per_accountant(acct_stats): """Box plot per accountant.""" # Sort by mean acct_stats.sort(key=lambda x: x['mean']) fig, ax = plt.subplots(figsize=(12, max(5, len(acct_stats) * 0.4))) positions = range(len(acct_stats)) labels = [f"{a['accountant']} (n={a['n']})" for a in acct_stats] box_data = [a['values'] for a in acct_stats] bp = ax.boxplot(box_data, positions=positions, vert=False, widths=0.6, patch_artist=True, showfliers=True, flierprops=dict(marker='.', markersize=3, alpha=0.5)) for patch in bp['boxes']: patch.set_facecolor('lightsteelblue') ax.set_yticks(positions) ax.set_yticklabels(labels, fontsize=8) ax.set_xlabel('Max Cosine Similarity') ax.set_title('Per-Accountant Similarity Distribution (Firm A)') ax.axvline(0.95, color='orange', ls=':', alpha=0.7) ax.axvline(0.837, color='purple', ls=':', alpha=0.7) plt.tight_layout() fig.savefig(OUTPUT_DIR / 'firm_a_per_accountant_boxplot.png', dpi=150) print(f" Saved: {OUTPUT_DIR / 'firm_a_per_accountant_boxplot.png'}") plt.close() def plot_phash_distribution(data): """Plot dHash distance distribution for Firm A.""" phash_vals = [d['phash'] for d in data if d['phash'] is not None] if not phash_vals: print(" No pHash data available.") return phash_arr = np.array(phash_vals) fig, ax = plt.subplots(figsize=(10, 5)) max_val = min(int(phash_arr.max()) + 2, 65) bins = np.arange(-0.5, max_val + 0.5, 1) ax.hist(phash_arr, bins=bins, alpha=0.7, color='coral', edgecolor='white') ax.set_xlabel('dHash Distance') ax.set_ylabel('Count') ax.set_title(f'Firm A dHash Distance Distribution (N={len(phash_vals):,})') ax.axvline(5, color='green', ls='--', label='θ=5 (high conf.)') ax.axvline(15, color='orange', ls='--', label='θ=15 (moderate)') ax.legend() plt.tight_layout() fig.savefig(OUTPUT_DIR / 'firm_a_dhash_distribution.png', dpi=150) print(f" Saved: {OUTPUT_DIR / 'firm_a_dhash_distribution.png'}") plt.close() def multimodality_test(cosines): """Check for potential multimodality using kernel density peaks.""" print(f"\n{'='*65}") print(f" MULTIMODALITY ANALYSIS") print(f"{'='*65}") kde = stats.gaussian_kde(cosines, bw_method='silverman') x = np.linspace(cosines.min(), cosines.max(), 1000) density = kde(x) # Find local maxima from scipy.signal import find_peaks peaks, properties = find_peaks(density, prominence=0.01) peak_positions = x[peaks] peak_heights = density[peaks] print(f" KDE bandwidth (Silverman): {kde.factor:.6f}") print(f" Number of detected modes: {len(peaks)}") for i, (pos, h) in enumerate(zip(peak_positions, peak_heights)): print(f" Mode {i+1}: position={pos:.4f}, density={h:.2f}") if len(peaks) == 1: print(f"\n → Distribution appears UNIMODAL") print(f" Single peak at {peak_positions[0]:.4f}") elif len(peaks) > 1: print(f"\n → Distribution appears MULTIMODAL ({len(peaks)} modes)") print(f" This suggests subgroups may exist within Firm A") # Check separation between modes for i in range(len(peaks) - 1): sep = peak_positions[i + 1] - peak_positions[i] # Find valley between modes valley_region = density[peaks[i]:peaks[i + 1]] valley_depth = peak_heights[i:i + 2].min() - valley_region.min() print(f" Separation {i+1}-{i+2}: Δ={sep:.4f}, valley depth={valley_depth:.2f}") # Also try different bandwidths print(f"\n Sensitivity analysis (bandwidth variation):") for bw_factor in [0.5, 0.75, 1.0, 1.5, 2.0]: bw = kde.factor * bw_factor kde_test = stats.gaussian_kde(cosines, bw_method=bw) density_test = kde_test(x) peaks_test, _ = find_peaks(density_test, prominence=0.005) print(f" bw={bw:.4f} (×{bw_factor:.1f}): {len(peaks_test)} mode(s)") def main(): print("Loading Firm A (勤業眾信) signature data...") data = load_firm_a_data() print(f"Total Firm A signatures: {len(data):,}") cosines = np.array([d['cosine'] for d in data]) # 1. Descriptive statistics descriptive_stats(cosines) # 2. Normality tests mu, sigma = normality_tests(cosines) # 3. Alternative distribution fitting test_alternative_distributions(cosines) # 4. Per-accountant analysis acct_stats = per_accountant_analysis(data) # 5. Outlier analysis identify_outliers(data, cosines) # 6. Multimodality test multimodality_test(cosines) # 7. Generate plots print(f"\n{'='*65}") print(f" GENERATING FIGURES") print(f"{'='*65}") plot_histogram_kde(cosines, mu, sigma) plot_per_accountant(acct_stats) plot_phash_distribution(data) # Summary print(f"\n{'='*65}") print(f" SUMMARY") print(f"{'='*65}") below_95 = sum(1 for c in cosines if c < 0.95) below_kde = sum(1 for c in cosines if c < 0.837) print(f" Firm A signatures: {len(cosines):,}") print(f" Below 0.95 threshold: {below_95:,} ({100*below_95/len(cosines):.1f}%)") print(f" Below KDE crossover (0.837): {below_kde:,} ({100*below_kde/len(cosines):.1f}%)") print(f" If distribution is NOT normal → subgroups may exist") print(f" If multimodal → some signatures may be genuinely hand-signed") print(f"\n Output directory: {OUTPUT_DIR}") if __name__ == "__main__": main()