From fbfab1fa6894cdf2e2ce412e9b35a9078334fa75 Mon Sep 17 00:00:00 2001 From: gbanyan Date: Mon, 20 Apr 2026 21:51:41 +0800 Subject: [PATCH] Add three-convergent-method threshold scripts + pixel-identity validation Implements Partner v3's statistical rigor requirements at the level of signature vs. accountant analysis units: - Script 15 (Hartigan dip test): formal unimodality test via `diptest`. Result: Firm A cosine UNIMODAL (p=0.17, pure non-hand-signed population); full-sample cosine MULTIMODAL (p<0.001, mix of two regimes); accountant-level aggregates MULTIMODAL on both cos and dHash. - Script 16 (Burgstahler-Dichev / McCrary): discretised Z-score transition detection. Firm A and full-sample cosine transitions at 0.985; dHash at 2.0. - Script 17 (Beta mixture EM + logit-GMM): 2/3-component Beta via EM with MoM M-step, plus parallel Gaussian mixture on logit transform as White (1982) robustness check. Beta-3 BIC < Beta-2 BIC at signature level confirms 2-component is a forced fit -- supporting the pivot to accountant-level mixture. - Script 18 (Accountant-level GMM): rebuilds the 2026-04-16 analysis that was done inline and not saved. BIC-best K=3 with components matching prior memory almost exactly: C1 (cos=0.983, dh=2.41, 20%, Deloitte 139/141), C2 (0.954, 6.99, 51%, KPMG/PwC/EY), C3 (0.928, 11.17, 28%, small firms). 2-component natural thresholds: cos=0.9450, dh=8.10. - Script 19 (Pixel-identity validation): no human annotation needed. Uses pixel_identical_to_closest (310 sigs) as gold positive and Firm A as anchor positive. Confirms Firm A cosine>0.95 = 92.51% (matches prior 2026-04-08 finding of 92.5%), dual rule cos>0.95 AND dhash_indep<=8 captures 89.95% of Firm A. Python deps added: diptest, scikit-learn (installed into venv). Co-Authored-By: Claude Opus 4.7 (1M context) --- signature_analysis/15_hartigan_dip_test.py | 227 ++++++++++ .../16_bd_mccrary_discontinuity.py | 318 ++++++++++++++ signature_analysis/17_beta_mixture_em.py | 406 +++++++++++++++++ signature_analysis/18_accountant_mixture.py | 396 +++++++++++++++++ .../19_pixel_identity_validation.py | 413 ++++++++++++++++++ 5 files changed, 1760 insertions(+) create mode 100644 signature_analysis/15_hartigan_dip_test.py create mode 100644 signature_analysis/16_bd_mccrary_discontinuity.py create mode 100644 signature_analysis/17_beta_mixture_em.py create mode 100644 signature_analysis/18_accountant_mixture.py create mode 100644 signature_analysis/19_pixel_identity_validation.py diff --git a/signature_analysis/15_hartigan_dip_test.py b/signature_analysis/15_hartigan_dip_test.py new file mode 100644 index 0000000..b86e55c --- /dev/null +++ b/signature_analysis/15_hartigan_dip_test.py @@ -0,0 +1,227 @@ +#!/usr/bin/env python3 +""" +Script 15: Hartigan Dip Test for Unimodality +============================================= +Runs the proper Hartigan & Hartigan (1985) dip test via the `diptest` package +on the empirical signature-similarity distributions. + +Purpose: + Confirm/refute bimodality assumption underpinning threshold-selection methods. + Prior finding (2026-04-16): signature-level distribution is unimodal long-tail; + the story is that bimodality only emerges at the accountant level. + +Tests: + 1. Firm A (Deloitte) cosine max-similarity -> expected UNIMODAL + 2. Firm A (Deloitte) independent min dHash -> expected UNIMODAL + 3. Full-sample cosine max-similarity -> test + 4. Full-sample independent min dHash -> test + 5. Accountant-level cosine mean (per-accountant) -> expected BIMODAL / MULTIMODAL + 6. Accountant-level dhash mean (per-accountant) -> expected BIMODAL / MULTIMODAL + +Output: + reports/dip_test/dip_test_report.md + reports/dip_test/dip_test_results.json +""" + +import sqlite3 +import json +import numpy as np +import diptest +from pathlib import Path +from datetime import datetime + +DB = '/Volumes/NV2/PDF-Processing/signature-analysis/signature_analysis.db' +OUT = Path('/Volumes/NV2/PDF-Processing/signature-analysis/reports/dip_test') +OUT.mkdir(parents=True, exist_ok=True) + +FIRM_A = '勤業眾信聯合' + + +def run_dip(values, label, n_boot=2000): + """Run Hartigan dip test and return structured result.""" + arr = np.asarray(values, dtype=float) + arr = arr[~np.isnan(arr)] + if len(arr) < 4: + return {'label': label, 'n': int(len(arr)), 'error': 'too few observations'} + + dip, pval = diptest.diptest(arr, boot_pval=True, n_boot=n_boot) + verdict = 'UNIMODAL (accept H0)' if pval > 0.05 else 'MULTIMODAL (reject H0)' + return { + 'label': label, + 'n': int(len(arr)), + 'mean': float(np.mean(arr)), + 'std': float(np.std(arr)), + 'min': float(np.min(arr)), + 'max': float(np.max(arr)), + 'dip': float(dip), + 'p_value': float(pval), + 'n_boot': int(n_boot), + 'verdict_alpha_05': verdict, + } + + +def fetch_firm_a(): + conn = sqlite3.connect(DB) + cur = conn.cursor() + cur.execute(''' + SELECT s.max_similarity_to_same_accountant, + s.min_dhash_independent + FROM signatures s + JOIN accountants a ON s.assigned_accountant = a.name + WHERE a.firm = ? + AND s.max_similarity_to_same_accountant IS NOT NULL + ''', (FIRM_A,)) + rows = cur.fetchall() + conn.close() + cos = [r[0] for r in rows if r[0] is not None] + dh = [r[1] for r in rows if r[1] is not None] + return np.array(cos), np.array(dh) + + +def fetch_full_sample(): + conn = sqlite3.connect(DB) + cur = conn.cursor() + cur.execute(''' + SELECT max_similarity_to_same_accountant, min_dhash_independent + FROM signatures + WHERE max_similarity_to_same_accountant IS NOT NULL + ''') + rows = cur.fetchall() + conn.close() + cos = np.array([r[0] for r in rows if r[0] is not None]) + dh = np.array([r[1] for r in rows if r[1] is not None]) + return cos, dh + + +def fetch_accountant_aggregates(min_sigs=10): + """Per-accountant mean cosine and mean independent dHash.""" + conn = sqlite3.connect(DB) + cur = conn.cursor() + cur.execute(''' + SELECT s.assigned_accountant, + AVG(s.max_similarity_to_same_accountant) AS cos_mean, + AVG(CAST(s.min_dhash_independent AS REAL)) AS dh_mean, + COUNT(*) AS n + FROM signatures s + WHERE s.assigned_accountant IS NOT NULL + AND s.max_similarity_to_same_accountant IS NOT NULL + AND s.min_dhash_independent IS NOT NULL + GROUP BY s.assigned_accountant + HAVING n >= ? + ''', (min_sigs,)) + rows = cur.fetchall() + conn.close() + cos_means = np.array([r[1] for r in rows]) + dh_means = np.array([r[2] for r in rows]) + return cos_means, dh_means, len(rows) + + +def main(): + print('='*70) + print('Script 15: Hartigan Dip Test for Unimodality') + print('='*70) + + results = {} + + # Firm A + print('\n[1/3] Firm A (Deloitte)...') + fa_cos, fa_dh = fetch_firm_a() + print(f' Firm A cosine N={len(fa_cos):,}, dHash N={len(fa_dh):,}') + results['firm_a_cosine'] = run_dip(fa_cos, 'Firm A cosine max-similarity') + results['firm_a_dhash'] = run_dip(fa_dh, 'Firm A independent min dHash') + + # Full sample + print('\n[2/3] Full sample...') + all_cos, all_dh = fetch_full_sample() + print(f' Full cosine N={len(all_cos):,}, dHash N={len(all_dh):,}') + # Dip test on >=10k obs can be slow with 2000 boot; use 500 for full sample + results['full_cosine'] = run_dip(all_cos, 'Full-sample cosine max-similarity', + n_boot=500) + results['full_dhash'] = run_dip(all_dh, 'Full-sample independent min dHash', + n_boot=500) + + # Accountant-level aggregates + print('\n[3/3] Accountant-level aggregates (min 10 sigs)...') + acct_cos, acct_dh, n_acct = fetch_accountant_aggregates(min_sigs=10) + print(f' Accountants analyzed: {n_acct}') + results['accountant_cos_mean'] = run_dip(acct_cos, + 'Per-accountant cosine mean') + results['accountant_dh_mean'] = run_dip(acct_dh, + 'Per-accountant dHash mean') + + # Print summary + print('\n' + '='*70) + print('RESULTS SUMMARY') + print('='*70) + print(f"{'Test':<40} {'N':>8} {'dip':>8} {'p':>10} Verdict") + print('-'*90) + for key, r in results.items(): + if 'error' in r: + continue + print(f"{r['label']:<40} {r['n']:>8,} {r['dip']:>8.4f} " + f"{r['p_value']:>10.4f} {r['verdict_alpha_05']}") + + # Write JSON + json_path = OUT / 'dip_test_results.json' + with open(json_path, 'w') as f: + json.dump({ + 'generated_at': datetime.now().isoformat(), + 'db': DB, + 'results': results, + }, f, indent=2, ensure_ascii=False) + print(f'\nJSON saved: {json_path}') + + # Write Markdown report + md = [ + '# Hartigan Dip Test Report', + f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}", + '', + '## Method', + '', + 'Hartigan & Hartigan (1985) dip test via `diptest` Python package.', + 'H0: distribution is unimodal. H1: multimodal (two or more modes).', + 'p-value computed by bootstrap against a uniform null (2000 reps for', + 'Firm A/accountant-level, 500 reps for full-sample due to size).', + '', + '## Results', + '', + '| Test | N | dip | p-value | Verdict (α=0.05) |', + '|------|---|-----|---------|------------------|', + ] + for r in results.values(): + if 'error' in r: + md.append(f"| {r['label']} | {r['n']} | — | — | {r['error']} |") + continue + md.append( + f"| {r['label']} | {r['n']:,} | {r['dip']:.4f} | " + f"{r['p_value']:.4f} | {r['verdict_alpha_05']} |" + ) + md += [ + '', + '## Interpretation', + '', + '* **Signature level** (Firm A + full sample): the dip test indicates', + ' whether a single mode explains the max-cosine/min-dHash distribution.', + ' Prior finding (2026-04-16) suggested unimodal long-tail; this script', + ' provides the formal test.', + '', + '* **Accountant level** (per-accountant mean): if multimodal here but', + ' unimodal at the signature level, this confirms the interpretation', + " that signing-behaviour is discrete across accountants (replication", + ' vs hand-signing), while replication quality itself is a continuous', + ' spectrum.', + '', + '## Downstream implication', + '', + 'Methods that assume bimodality (KDE antimode, 2-component Beta mixture)', + 'should be applied at the level where dip test rejects H0. If the', + "signature-level dip test fails to reject, the paper should report this", + 'and shift the mixture analysis to the accountant level (see Script 18).', + ] + md_path = OUT / 'dip_test_report.md' + md_path.write_text('\n'.join(md), encoding='utf-8') + print(f'Report saved: {md_path}') + + +if __name__ == '__main__': + main() diff --git a/signature_analysis/16_bd_mccrary_discontinuity.py b/signature_analysis/16_bd_mccrary_discontinuity.py new file mode 100644 index 0000000..26cf530 --- /dev/null +++ b/signature_analysis/16_bd_mccrary_discontinuity.py @@ -0,0 +1,318 @@ +#!/usr/bin/env python3 +""" +Script 16: Burgstahler-Dichev / McCrary Discontinuity Test +========================================================== +Tests for a discontinuity in the empirical density of similarity scores, +following: + - Burgstahler & Dichev (1997) - earnings-management style smoothness test + - McCrary (2008) - rigorous density-discontinuity asymptotics + +Idea: + Discretize the distribution into equal-width bins. For each bin i compute + the standardized deviation Z_i between observed count and the smooth + expectation (average of neighbours). Under H0 (distributional smoothness), + Z_i ~ N(0,1). A threshold is identified at the transition where Z_{i-1} + is significantly negative (below expectation) next to Z_i significantly + positive (above expectation) -- marking the boundary between two + generative mechanisms (hand-signed vs non-hand-signed). + +Inputs: + - Firm A cosine max-similarity and independent min dHash + - Full-sample cosine and dHash (for comparison) + +Output: + reports/bd_mccrary/bd_mccrary_report.md + reports/bd_mccrary/bd_mccrary_results.json + reports/bd_mccrary/bd_mccrary_.png (overlay plots) +""" + +import sqlite3 +import json +import numpy as np +import matplotlib +matplotlib.use('Agg') +import matplotlib.pyplot as plt +from pathlib import Path +from datetime import datetime + +DB = '/Volumes/NV2/PDF-Processing/signature-analysis/signature_analysis.db' +OUT = Path('/Volumes/NV2/PDF-Processing/signature-analysis/reports/bd_mccrary') +OUT.mkdir(parents=True, exist_ok=True) + +FIRM_A = '勤業眾信聯合' + +# BD/McCrary critical values (two-sided, alpha=0.05) +Z_CRIT = 1.96 + + +def bd_mccrary(values, bin_width, lo=None, hi=None): + """ + Compute Burgstahler-Dichev standardized deviations per bin. + + For each bin i with count n_i: + expected = 0.5 * (n_{i-1} + n_{i+1}) + SE = sqrt(N*p_i*(1-p_i) + 0.25*N*(p_{i-1}+p_{i+1})*(1-p_{i-1}-p_{i+1})) + Z_i = (n_i - expected) / SE + + Returns arrays of (bin_centers, counts, z_scores, expected). + """ + arr = np.asarray(values, dtype=float) + arr = arr[~np.isnan(arr)] + if lo is None: + lo = float(np.floor(arr.min() / bin_width) * bin_width) + if hi is None: + hi = float(np.ceil(arr.max() / bin_width) * bin_width) + edges = np.arange(lo, hi + bin_width, bin_width) + counts, _ = np.histogram(arr, bins=edges) + centers = (edges[:-1] + edges[1:]) / 2.0 + + N = counts.sum() + p = counts / N if N else counts.astype(float) + + n_bins = len(counts) + z = np.full(n_bins, np.nan) + expected = np.full(n_bins, np.nan) + + for i in range(1, n_bins - 1): + p_lo = p[i - 1] + p_hi = p[i + 1] + exp_i = 0.5 * (counts[i - 1] + counts[i + 1]) + var_i = (N * p[i] * (1 - p[i]) + + 0.25 * N * (p_lo + p_hi) * (1 - p_lo - p_hi)) + if var_i <= 0: + continue + se = np.sqrt(var_i) + z[i] = (counts[i] - exp_i) / se + expected[i] = exp_i + + return centers, counts, z, expected + + +def find_transition(centers, z, direction='neg_to_pos'): + """ + Find the first bin pair where Z_{i-1} significantly negative and + Z_i significantly positive (or vice versa). + + direction='neg_to_pos' -> threshold where hand-signed density drops + (below expectation) and non-hand-signed + density rises (above expectation). For + cosine similarity, this transition is + expected around the separation point, so + the threshold sits between centers[i-1] + and centers[i]. + """ + transitions = [] + for i in range(1, len(z)): + if np.isnan(z[i - 1]) or np.isnan(z[i]): + continue + if direction == 'neg_to_pos': + if z[i - 1] < -Z_CRIT and z[i] > Z_CRIT: + transitions.append({ + 'idx': int(i), + 'threshold_between': float( + (centers[i - 1] + centers[i]) / 2.0), + 'z_below': float(z[i - 1]), + 'z_above': float(z[i]), + 'left_center': float(centers[i - 1]), + 'right_center': float(centers[i]), + }) + else: # pos_to_neg + if z[i - 1] > Z_CRIT and z[i] < -Z_CRIT: + transitions.append({ + 'idx': int(i), + 'threshold_between': float( + (centers[i - 1] + centers[i]) / 2.0), + 'z_above': float(z[i - 1]), + 'z_below': float(z[i]), + 'left_center': float(centers[i - 1]), + 'right_center': float(centers[i]), + }) + return transitions + + +def plot_bd(centers, counts, z, expected, title, out_path, threshold=None): + fig, axes = plt.subplots(2, 1, figsize=(11, 7), sharex=True) + + ax = axes[0] + ax.bar(centers, counts, width=(centers[1] - centers[0]) * 0.9, + color='steelblue', alpha=0.6, edgecolor='white', label='Observed') + mask = ~np.isnan(expected) + ax.plot(centers[mask], expected[mask], 'r-', lw=1.5, + label='Expected (smooth null)') + ax.set_ylabel('Count') + ax.set_title(title) + ax.legend() + if threshold is not None: + ax.axvline(threshold, color='green', ls='--', lw=2, + label=f'Threshold≈{threshold:.4f}') + + ax = axes[1] + ax.axhline(0, color='black', lw=0.5) + ax.axhline(Z_CRIT, color='red', ls=':', alpha=0.7, + label=f'±{Z_CRIT} critical') + ax.axhline(-Z_CRIT, color='red', ls=':', alpha=0.7) + colors = ['coral' if zi > Z_CRIT else 'steelblue' if zi < -Z_CRIT + else 'lightgray' for zi in z] + ax.bar(centers, z, width=(centers[1] - centers[0]) * 0.9, color=colors, + edgecolor='black', lw=0.3) + ax.set_xlabel('Value') + ax.set_ylabel('Z statistic') + ax.legend() + if threshold is not None: + ax.axvline(threshold, color='green', ls='--', lw=2) + + plt.tight_layout() + fig.savefig(out_path, dpi=150) + plt.close() + + +def fetch(label): + conn = sqlite3.connect(DB) + cur = conn.cursor() + if label == 'firm_a_cosine': + cur.execute(''' + SELECT s.max_similarity_to_same_accountant + FROM signatures s + JOIN accountants a ON s.assigned_accountant = a.name + WHERE a.firm = ? AND s.max_similarity_to_same_accountant IS NOT NULL + ''', (FIRM_A,)) + elif label == 'firm_a_dhash': + cur.execute(''' + SELECT s.min_dhash_independent + FROM signatures s + JOIN accountants a ON s.assigned_accountant = a.name + WHERE a.firm = ? AND s.min_dhash_independent IS NOT NULL + ''', (FIRM_A,)) + elif label == 'full_cosine': + cur.execute(''' + SELECT max_similarity_to_same_accountant FROM signatures + WHERE max_similarity_to_same_accountant IS NOT NULL + ''') + elif label == 'full_dhash': + cur.execute(''' + SELECT min_dhash_independent FROM signatures + WHERE min_dhash_independent IS NOT NULL + ''') + else: + raise ValueError(label) + vals = [r[0] for r in cur.fetchall() if r[0] is not None] + conn.close() + return np.array(vals, dtype=float) + + +def main(): + print('='*70) + print('Script 16: Burgstahler-Dichev / McCrary Discontinuity Test') + print('='*70) + + cases = [ + ('firm_a_cosine', 0.005, 'Firm A cosine max-similarity', 'neg_to_pos'), + ('firm_a_dhash', 1.0, 'Firm A independent min dHash', 'pos_to_neg'), + ('full_cosine', 0.005, 'Full-sample cosine max-similarity', + 'neg_to_pos'), + ('full_dhash', 1.0, 'Full-sample independent min dHash', 'pos_to_neg'), + ] + + all_results = {} + for key, bw, label, direction in cases: + print(f'\n[{label}] bin width={bw}') + arr = fetch(key) + print(f' N = {len(arr):,}') + centers, counts, z, expected = bd_mccrary(arr, bw) + transitions = find_transition(centers, z, direction=direction) + + # Summarize + if transitions: + # Choose the most extreme (highest |z_above * z_below|) transition + best = max(transitions, + key=lambda t: abs(t.get('z_above', 0)) + + abs(t.get('z_below', 0))) + threshold = best['threshold_between'] + print(f' {len(transitions)} candidate transition(s); ' + f'best at {threshold:.4f}') + else: + best = None + threshold = None + print(' No significant transition detected (no Z^- next to Z^+)') + + # Plot + png = OUT / f'bd_mccrary_{key}.png' + plot_bd(centers, counts, z, expected, label, png, threshold=threshold) + print(f' plot: {png}') + + all_results[key] = { + 'label': label, + 'n': int(len(arr)), + 'bin_width': float(bw), + 'direction': direction, + 'n_bins': int(len(centers)), + 'bin_centers': [float(c) for c in centers], + 'counts': [int(c) for c in counts], + 'z_scores': [None if np.isnan(zi) else float(zi) for zi in z], + 'transitions': transitions, + 'best_transition': best, + 'threshold': threshold, + } + + # Write JSON + json_path = OUT / 'bd_mccrary_results.json' + with open(json_path, 'w') as f: + json.dump({ + 'generated_at': datetime.now().isoformat(), + 'z_critical': Z_CRIT, + 'results': all_results, + }, f, indent=2, ensure_ascii=False) + print(f'\nJSON: {json_path}') + + # Markdown + md = [ + '# Burgstahler-Dichev / McCrary Discontinuity Test Report', + f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}", + '', + '## Method', + '', + 'For each bin i of width δ, under the null of distributional', + 'smoothness the expected count is the average of neighbours,', + 'and the standardized deviation', + '', + ' Z_i = (n_i - 0.5*(n_{i-1}+n_{i+1})) / SE', + '', + 'is approximately N(0,1). We flag a transition when Z_{i-1} < -1.96', + 'and Z_i > 1.96 (or reversed, depending on the scale direction).', + 'The threshold is taken at the midpoint of the two bin centres.', + '', + '## Results', + '', + '| Test | N | bin width | Transitions | Threshold |', + '|------|---|-----------|-------------|-----------|', + ] + for r in all_results.values(): + thr = (f"{r['threshold']:.4f}" if r['threshold'] is not None + else '—') + md.append( + f"| {r['label']} | {r['n']:,} | {r['bin_width']} | " + f"{len(r['transitions'])} | {thr} |" + ) + md += [ + '', + '## Notes', + '', + '* For cosine (direction `neg_to_pos`), the transition marks the', + " boundary below which hand-signed dominates and above which", + ' non-hand-signed replication dominates.', + '* For dHash (direction `pos_to_neg`), the transition marks the', + " boundary below which replication dominates (small distances)", + ' and above which hand-signed variation dominates.', + '* Multiple candidate transitions are ranked by total |Z| magnitude', + ' on both sides of the boundary; the strongest is reported.', + '* Absence of a significant transition is itself informative: it', + ' is consistent with a single generative mechanism (e.g. Firm A', + ' which is near-universally non-hand-signed).', + ] + md_path = OUT / 'bd_mccrary_report.md' + md_path.write_text('\n'.join(md), encoding='utf-8') + print(f'Report: {md_path}') + + +if __name__ == '__main__': + main() diff --git a/signature_analysis/17_beta_mixture_em.py b/signature_analysis/17_beta_mixture_em.py new file mode 100644 index 0000000..15efc63 --- /dev/null +++ b/signature_analysis/17_beta_mixture_em.py @@ -0,0 +1,406 @@ +#!/usr/bin/env python3 +""" +Script 17: Beta Mixture Model via EM + Gaussian Mixture on Logit Transform +========================================================================== +Fits a 2-component Beta mixture to cosine similarity, plus parallel +Gaussian mixture on logit-transformed data as robustness check. + +Theory: + - Cosine similarity is bounded [0,1] so Beta is the natural parametric + family for the component distributions. + - EM algorithm (Dempster, Laird & Rubin 1977) provides ML estimates. + - If the mixture gives a crossing point, that is the Bayes-optimal + threshold under the fitted model. + - Robustness: logit(x) maps (0,1) to the real line, where Gaussian + mixture is standard; White (1982) quasi-MLE guarantees asymptotic + recovery of the best Beta-family approximation even under + mis-specification. + +Parametrization of Beta via method-of-moments inside the M-step: + alpha = mu * ((mu*(1-mu))/var - 1) + beta = (1-mu) * ((mu*(1-mu))/var - 1) + +Expected outcome (per memory 2026-04-16): + Signature-level Beta mixture FAILS to separate hand-signed vs + non-hand-signed because the distribution is unimodal long-tail. + Report this as a formal result -- it motivates the pivot to + accountant-level mixture (Script 18). + +Output: + reports/beta_mixture/beta_mixture_report.md + reports/beta_mixture/beta_mixture_results.json + reports/beta_mixture/beta_mixture_.png +""" + +import sqlite3 +import json +import numpy as np +import matplotlib +matplotlib.use('Agg') +import matplotlib.pyplot as plt +from pathlib import Path +from datetime import datetime +from scipy import stats +from scipy.optimize import brentq +from sklearn.mixture import GaussianMixture + +DB = '/Volumes/NV2/PDF-Processing/signature-analysis/signature_analysis.db' +OUT = Path('/Volumes/NV2/PDF-Processing/signature-analysis/reports/beta_mixture') +OUT.mkdir(parents=True, exist_ok=True) + +FIRM_A = '勤業眾信聯合' +EPS = 1e-6 + + +def fit_beta_mixture_em(x, n_components=2, max_iter=300, tol=1e-6, seed=42): + """ + Fit a K-component Beta mixture via EM using MoM M-step estimates for + alpha/beta of each component. MoM works because Beta is fully determined + by its mean and variance under the moment equations. + """ + rng = np.random.default_rng(seed) + x = np.clip(np.asarray(x, dtype=float), EPS, 1 - EPS) + n = len(x) + K = n_components + + # Initialise responsibilities by quantile-based split + q = np.linspace(0, 1, K + 1) + thresh = np.quantile(x, q[1:-1]) + labels = np.digitize(x, thresh) + resp = np.zeros((n, K)) + resp[np.arange(n), labels] = 1.0 + + params = [] # list of dicts with alpha, beta, weight + log_like_hist = [] + for it in range(max_iter): + # M-step + nk = resp.sum(axis=0) + 1e-12 + weights = nk / nk.sum() + mus = (resp * x[:, None]).sum(axis=0) / nk + var_num = (resp * (x[:, None] - mus) ** 2).sum(axis=0) + vars_ = var_num / nk + # Ensure validity for Beta: var < mu*(1-mu) + upper = mus * (1 - mus) - 1e-9 + vars_ = np.minimum(vars_, upper) + vars_ = np.maximum(vars_, 1e-9) + factor = mus * (1 - mus) / vars_ - 1 + factor = np.maximum(factor, 1e-6) + alphas = mus * factor + betas = (1 - mus) * factor + params = [{'alpha': float(alphas[k]), 'beta': float(betas[k]), + 'weight': float(weights[k]), 'mu': float(mus[k]), + 'var': float(vars_[k])} for k in range(K)] + + # E-step + log_pdfs = np.column_stack([ + stats.beta.logpdf(x, alphas[k], betas[k]) + np.log(weights[k]) + for k in range(K) + ]) + m = log_pdfs.max(axis=1, keepdims=True) + log_like = (m.ravel() + np.log(np.exp(log_pdfs - m).sum(axis=1))).sum() + log_like_hist.append(float(log_like)) + new_resp = np.exp(log_pdfs - m) + new_resp = new_resp / new_resp.sum(axis=1, keepdims=True) + + if it > 0 and abs(log_like_hist[-1] - log_like_hist[-2]) < tol: + resp = new_resp + break + resp = new_resp + + # Order components by mean ascending (so C1 = low mean, CK = high mean) + order = np.argsort([p['mu'] for p in params]) + params = [params[i] for i in order] + resp = resp[:, order] + + # AIC/BIC (k = 3K - 1 free parameters: alpha, beta, weight each component; + # weights sum to 1 removes one df) + k = 3 * K - 1 + aic = 2 * k - 2 * log_like_hist[-1] + bic = k * np.log(n) - 2 * log_like_hist[-1] + + return { + 'components': params, + 'log_likelihood': log_like_hist[-1], + 'aic': float(aic), + 'bic': float(bic), + 'n_iter': it + 1, + 'responsibilities': resp, + } + + +def mixture_crossing(params, x_range): + """Find crossing point of two weighted component densities (K=2).""" + if len(params) != 2: + return None + a1, b1, w1 = params[0]['alpha'], params[0]['beta'], params[0]['weight'] + a2, b2, w2 = params[1]['alpha'], params[1]['beta'], params[1]['weight'] + + def diff(x): + return (w2 * stats.beta.pdf(x, a2, b2) + - w1 * stats.beta.pdf(x, a1, b1)) + + # Search for sign change inside the overlap region + xs = np.linspace(x_range[0] + 1e-4, x_range[1] - 1e-4, 2000) + ys = diff(xs) + sign_changes = np.where(np.diff(np.sign(ys)) != 0)[0] + if len(sign_changes) == 0: + return None + # Pick crossing closest to midpoint of component means + mid = 0.5 * (params[0]['mu'] + params[1]['mu']) + crossings = [] + for i in sign_changes: + try: + x0 = brentq(diff, xs[i], xs[i + 1]) + crossings.append(x0) + except ValueError: + continue + if not crossings: + return None + return min(crossings, key=lambda c: abs(c - mid)) + + +def logit(x): + x = np.clip(x, EPS, 1 - EPS) + return np.log(x / (1 - x)) + + +def invlogit(z): + return 1.0 / (1.0 + np.exp(-z)) + + +def fit_gmm_logit(x, n_components=2, seed=42): + """GMM on logit-transformed values. Returns crossing point in original scale.""" + z = logit(x).reshape(-1, 1) + gmm = GaussianMixture(n_components=n_components, random_state=seed, + max_iter=500).fit(z) + means = gmm.means_.ravel() + covs = gmm.covariances_.ravel() + weights = gmm.weights_ + order = np.argsort(means) + comps = [{ + 'mu_logit': float(means[i]), + 'sigma_logit': float(np.sqrt(covs[i])), + 'weight': float(weights[i]), + 'mu_original': float(invlogit(means[i])), + } for i in order] + + result = { + 'components': comps, + 'log_likelihood': float(gmm.score(z) * len(z)), + 'aic': float(gmm.aic(z)), + 'bic': float(gmm.bic(z)), + 'n_iter': int(gmm.n_iter_), + } + + if n_components == 2: + m1, s1, w1 = means[order[0]], np.sqrt(covs[order[0]]), weights[order[0]] + m2, s2, w2 = means[order[1]], np.sqrt(covs[order[1]]), weights[order[1]] + + def diff(z0): + return (w2 * stats.norm.pdf(z0, m2, s2) + - w1 * stats.norm.pdf(z0, m1, s1)) + zs = np.linspace(min(m1, m2) - 1, max(m1, m2) + 1, 2000) + ys = diff(zs) + changes = np.where(np.diff(np.sign(ys)) != 0)[0] + if len(changes): + try: + z_cross = brentq(diff, zs[changes[0]], zs[changes[0] + 1]) + result['crossing_logit'] = float(z_cross) + result['crossing_original'] = float(invlogit(z_cross)) + except ValueError: + pass + return result + + +def plot_mixture(x, beta_res, title, out_path, gmm_res=None): + x = np.asarray(x, dtype=float).ravel() + x = x[np.isfinite(x)] + fig, ax = plt.subplots(figsize=(10, 5)) + bin_edges = np.linspace(float(x.min()), float(x.max()), 81) + ax.hist(x, bins=bin_edges, density=True, alpha=0.45, color='steelblue', + edgecolor='white') + xs = np.linspace(max(0.0, x.min() - 0.01), min(1.0, x.max() + 0.01), 500) + total = np.zeros_like(xs) + for i, p in enumerate(beta_res['components']): + comp_pdf = p['weight'] * stats.beta.pdf(xs, p['alpha'], p['beta']) + total = total + comp_pdf + ax.plot(xs, comp_pdf, '--', lw=1.5, + label=f"C{i+1}: α={p['alpha']:.2f}, β={p['beta']:.2f}, " + f"w={p['weight']:.2f}") + ax.plot(xs, total, 'r-', lw=2, label='Beta mixture (sum)') + + crossing = mixture_crossing(beta_res['components'], (xs[0], xs[-1])) + if crossing is not None: + ax.axvline(crossing, color='green', ls='--', lw=2, + label=f'Beta crossing = {crossing:.4f}') + + if gmm_res and 'crossing_original' in gmm_res: + ax.axvline(gmm_res['crossing_original'], color='purple', ls=':', + lw=2, label=f"Logit-GMM crossing = " + f"{gmm_res['crossing_original']:.4f}") + + ax.set_xlabel('Value') + ax.set_ylabel('Density') + ax.set_title(title) + ax.legend(fontsize=8) + plt.tight_layout() + fig.savefig(out_path, dpi=150) + plt.close() + return crossing + + +def fetch(label): + conn = sqlite3.connect(DB) + cur = conn.cursor() + if label == 'firm_a_cosine': + cur.execute(''' + SELECT s.max_similarity_to_same_accountant + FROM signatures s + JOIN accountants a ON s.assigned_accountant = a.name + WHERE a.firm = ? AND s.max_similarity_to_same_accountant IS NOT NULL + ''', (FIRM_A,)) + elif label == 'full_cosine': + cur.execute(''' + SELECT max_similarity_to_same_accountant FROM signatures + WHERE max_similarity_to_same_accountant IS NOT NULL + ''') + else: + raise ValueError(label) + vals = [r[0] for r in cur.fetchall() if r[0] is not None] + conn.close() + return np.array(vals, dtype=float) + + +def main(): + print('='*70) + print('Script 17: Beta Mixture EM + Logit-GMM Robustness Check') + print('='*70) + + cases = [ + ('firm_a_cosine', 'Firm A cosine max-similarity'), + ('full_cosine', 'Full-sample cosine max-similarity'), + ] + + summary = {} + for key, label in cases: + print(f'\n[{label}]') + x = fetch(key) + print(f' N = {len(x):,}') + + # Subsample for full sample to keep EM tractable but still stable + if len(x) > 200000: + rng = np.random.default_rng(42) + x_fit = rng.choice(x, 200000, replace=False) + print(f' Subsampled to {len(x_fit):,} for EM fitting') + else: + x_fit = x + + beta2 = fit_beta_mixture_em(x_fit, n_components=2) + beta3 = fit_beta_mixture_em(x_fit, n_components=3) + print(f' Beta-2 AIC={beta2["aic"]:.1f}, BIC={beta2["bic"]:.1f}') + print(f' Beta-3 AIC={beta3["aic"]:.1f}, BIC={beta3["bic"]:.1f}') + + gmm2 = fit_gmm_logit(x_fit, n_components=2) + gmm3 = fit_gmm_logit(x_fit, n_components=3) + print(f' LogGMM2 AIC={gmm2["aic"]:.1f}, BIC={gmm2["bic"]:.1f}') + print(f' LogGMM3 AIC={gmm3["aic"]:.1f}, BIC={gmm3["bic"]:.1f}') + + # Report crossings + crossing_beta = mixture_crossing(beta2['components'], (x.min(), x.max())) + print(f' Beta-2 crossing: ' + f"{('%.4f' % crossing_beta) if crossing_beta else '—'}") + print(f' LogGMM-2 crossing (original scale): ' + f"{gmm2.get('crossing_original', '—')}") + + # Plot + png = OUT / f'beta_mixture_{key}.png' + plot_mixture(x_fit, beta2, f'{label}: Beta mixture (2 comp)', png, + gmm_res=gmm2) + print(f' plot: {png}') + + # Strip responsibilities for JSON compactness + beta2_out = {k: v for k, v in beta2.items() if k != 'responsibilities'} + beta3_out = {k: v for k, v in beta3.items() if k != 'responsibilities'} + + summary[key] = { + 'label': label, + 'n': int(len(x)), + 'n_fit': int(len(x_fit)), + 'beta_2': beta2_out, + 'beta_3': beta3_out, + 'beta_2_crossing': (float(crossing_beta) + if crossing_beta is not None else None), + 'logit_gmm_2': gmm2, + 'logit_gmm_3': gmm3, + 'bic_best': ('beta_2' if beta2['bic'] < beta3['bic'] + else 'beta_3'), + } + + # Write JSON + json_path = OUT / 'beta_mixture_results.json' + with open(json_path, 'w') as f: + json.dump({ + 'generated_at': datetime.now().isoformat(), + 'results': summary, + }, f, indent=2, ensure_ascii=False, default=float) + print(f'\nJSON: {json_path}') + + # Markdown + md = [ + '# Beta Mixture EM Report', + f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}", + '', + '## Method', + '', + '* 2- and 3-component Beta mixture fit by EM with method-of-moments', + ' M-step (stable for bounded data).', + '* Parallel 2/3-component Gaussian mixture on logit-transformed', + ' values as robustness check (White 1982 quasi-MLE consistency).', + '* Crossing point of the 2-component mixture densities is reported', + ' as the Bayes-optimal threshold under equal misclassification cost.', + '', + '## Results', + '', + '| Dataset | N (fit) | Beta-2 BIC | Beta-3 BIC | LogGMM-2 BIC | LogGMM-3 BIC | BIC-best |', + '|---------|---------|------------|------------|--------------|--------------|----------|', + ] + for r in summary.values(): + md.append( + f"| {r['label']} | {r['n_fit']:,} | " + f"{r['beta_2']['bic']:.1f} | {r['beta_3']['bic']:.1f} | " + f"{r['logit_gmm_2']['bic']:.1f} | {r['logit_gmm_3']['bic']:.1f} | " + f"{r['bic_best']} |" + ) + + md += ['', '## Threshold estimates (2-component)', '', + '| Dataset | Beta-2 crossing | LogGMM-2 crossing (orig) |', + '|---------|-----------------|--------------------------|'] + for r in summary.values(): + beta_str = (f"{r['beta_2_crossing']:.4f}" + if r['beta_2_crossing'] is not None else '—') + gmm_str = (f"{r['logit_gmm_2']['crossing_original']:.4f}" + if 'crossing_original' in r['logit_gmm_2'] else '—') + md.append(f"| {r['label']} | {beta_str} | {gmm_str} |") + + md += [ + '', + '## Interpretation', + '', + 'A successful 2-component fit with a clear crossing point would', + 'indicate two underlying generative mechanisms (hand-signed vs', + 'non-hand-signed) with a principled Bayes-optimal boundary.', + '', + 'If Beta-3 BIC is meaningfully smaller than Beta-2, or if the', + 'components of Beta-2 largely overlap (similar means, wide spread),', + 'this is consistent with a unimodal distribution poorly approximated', + 'by two components. Prior finding (2026-04-16) suggested this is', + 'the case at signature level; the accountant-level mixture', + '(Script 18) is where the bimodality emerges.', + ] + md_path = OUT / 'beta_mixture_report.md' + md_path.write_text('\n'.join(md), encoding='utf-8') + print(f'Report: {md_path}') + + +if __name__ == '__main__': + main() diff --git a/signature_analysis/18_accountant_mixture.py b/signature_analysis/18_accountant_mixture.py new file mode 100644 index 0000000..d0617ed --- /dev/null +++ b/signature_analysis/18_accountant_mixture.py @@ -0,0 +1,396 @@ +#!/usr/bin/env python3 +""" +Script 18: Accountant-Level 3-Component Gaussian Mixture +======================================================== +Rebuild the GMM analysis from memory 2026-04-16: at the accountant level +(not signature level), the joint distribution of (cosine_mean, dhash_mean) +separates into three components corresponding to signing-behaviour +regimes: + + C1 High-replication cos_mean ≈ 0.983, dh_mean ≈ 2.4, ~20%, Deloitte-heavy + C2 Middle band cos_mean ≈ 0.954, dh_mean ≈ 7.0, ~52%, KPMG/PwC/EY + C3 Hand-signed tendency cos_mean ≈ 0.928, dh_mean ≈ 11.2, ~28%, small firms + +The script: + 1. Aggregates per-accountant means from the signature table. + 2. Fits 1-, 2-, 3-, 4-component 2D Gaussian mixtures and selects by BIC. + 3. Reports component parameters, cluster assignments, and per-firm + breakdown. + 4. For the 2-component fit derives the natural threshold (crossing of + marginal densities in cosine-mean and dhash-mean). + +Output: + reports/accountant_mixture/accountant_mixture_report.md + reports/accountant_mixture/accountant_mixture_results.json + reports/accountant_mixture/accountant_mixture_2d.png + reports/accountant_mixture/accountant_mixture_marginals.png +""" + +import sqlite3 +import json +import numpy as np +import matplotlib +matplotlib.use('Agg') +import matplotlib.pyplot as plt +from pathlib import Path +from datetime import datetime +from scipy import stats +from scipy.optimize import brentq +from sklearn.mixture import GaussianMixture + +DB = '/Volumes/NV2/PDF-Processing/signature-analysis/signature_analysis.db' +OUT = Path('/Volumes/NV2/PDF-Processing/signature-analysis/reports/' + 'accountant_mixture') +OUT.mkdir(parents=True, exist_ok=True) + +MIN_SIGS = 10 + + +def load_accountant_aggregates(): + conn = sqlite3.connect(DB) + cur = conn.cursor() + cur.execute(''' + SELECT s.assigned_accountant, + a.firm, + AVG(s.max_similarity_to_same_accountant) AS cos_mean, + AVG(CAST(s.min_dhash_independent AS REAL)) AS dh_mean, + COUNT(*) AS n + FROM signatures s + LEFT JOIN accountants a ON s.assigned_accountant = a.name + WHERE s.assigned_accountant IS NOT NULL + AND s.max_similarity_to_same_accountant IS NOT NULL + AND s.min_dhash_independent IS NOT NULL + GROUP BY s.assigned_accountant + HAVING n >= ? + ''', (MIN_SIGS,)) + rows = cur.fetchall() + conn.close() + return [ + {'accountant': r[0], 'firm': r[1] or '(unknown)', + 'cos_mean': float(r[2]), 'dh_mean': float(r[3]), 'n': int(r[4])} + for r in rows + ] + + +def fit_gmm_range(X, ks=(1, 2, 3, 4, 5), seed=42, n_init=10): + results = [] + best_bic = np.inf + best = None + for k in ks: + gmm = GaussianMixture( + n_components=k, covariance_type='full', + random_state=seed, n_init=n_init, max_iter=500, + ).fit(X) + bic = gmm.bic(X) + aic = gmm.aic(X) + results.append({ + 'k': int(k), 'bic': float(bic), 'aic': float(aic), + 'converged': bool(gmm.converged_), 'n_iter': int(gmm.n_iter_), + }) + if bic < best_bic: + best_bic = bic + best = gmm + return results, best + + +def summarize_components(gmm, X, df): + """Assign clusters, return per-component stats + per-firm breakdown.""" + labels = gmm.predict(X) + means = gmm.means_ + order = np.argsort(means[:, 0]) # order by cos_mean ascending + # Relabel so smallest cos_mean = component 1 + relabel = np.argsort(order) + + # Actually invert: in prior memory C1 was HIGH replication (highest cos). + # To keep consistent with memory, order DESCENDING by cos_mean so C1 = high. + order = np.argsort(-means[:, 0]) + relabel = {int(old): new + 1 for new, old in enumerate(order)} + new_labels = np.array([relabel[int(l)] for l in labels]) + + components = [] + for rank, old_idx in enumerate(order, start=1): + mu = means[old_idx] + cov = gmm.covariances_[old_idx] + w = gmm.weights_[old_idx] + mask = new_labels == rank + firms = {} + for row, in_cluster in zip(df, mask): + if not in_cluster: + continue + firms[row['firm']] = firms.get(row['firm'], 0) + 1 + firms_sorted = sorted(firms.items(), key=lambda kv: -kv[1]) + components.append({ + 'component': rank, + 'mu_cos': float(mu[0]), + 'mu_dh': float(mu[1]), + 'cov_00': float(cov[0, 0]), + 'cov_11': float(cov[1, 1]), + 'cov_01': float(cov[0, 1]), + 'corr': float(cov[0, 1] / np.sqrt(cov[0, 0] * cov[1, 1])), + 'weight': float(w), + 'n_accountants': int(mask.sum()), + 'top_firms': firms_sorted[:5], + }) + return components, new_labels + + +def marginal_crossing(means, covs, weights, dim, search_lo, search_hi): + """Find crossing of two weighted marginal Gaussians along dimension `dim`.""" + m1, m2 = means[0][dim], means[1][dim] + s1 = np.sqrt(covs[0][dim, dim]) + s2 = np.sqrt(covs[1][dim, dim]) + w1, w2 = weights[0], weights[1] + + def diff(x): + return (w2 * stats.norm.pdf(x, m2, s2) + - w1 * stats.norm.pdf(x, m1, s1)) + + xs = np.linspace(search_lo, search_hi, 2000) + ys = diff(xs) + changes = np.where(np.diff(np.sign(ys)) != 0)[0] + if not len(changes): + return None + mid = 0.5 * (m1 + m2) + crossings = [] + for i in changes: + try: + crossings.append(brentq(diff, xs[i], xs[i + 1])) + except ValueError: + continue + if not crossings: + return None + return float(min(crossings, key=lambda c: abs(c - mid))) + + +def plot_2d(df, labels, means, title, out_path): + colors = ['#d62728', '#1f77b4', '#2ca02c', '#9467bd', '#ff7f0e'] + fig, ax = plt.subplots(figsize=(9, 7)) + for k in sorted(set(labels)): + mask = labels == k + xs = [r['cos_mean'] for r, m in zip(df, mask) if m] + ys = [r['dh_mean'] for r, m in zip(df, mask) if m] + ax.scatter(xs, ys, s=20, alpha=0.55, color=colors[(k - 1) % 5], + label=f'C{k} (n={int(mask.sum())})') + for i, mu in enumerate(means): + ax.plot(mu[0], mu[1], 'k*', ms=18, mec='white', mew=1.5) + ax.annotate(f' μ{i+1}', (mu[0], mu[1]), fontsize=10) + ax.set_xlabel('Per-accountant mean cosine max-similarity') + ax.set_ylabel('Per-accountant mean independent min dHash') + ax.set_title(title) + ax.legend() + plt.tight_layout() + fig.savefig(out_path, dpi=150) + plt.close() + + +def plot_marginals(df, labels, gmm_2, out_path, cos_cross=None, dh_cross=None): + cos = np.array([r['cos_mean'] for r in df]) + dh = np.array([r['dh_mean'] for r in df]) + + fig, axes = plt.subplots(1, 2, figsize=(13, 5)) + + # Cosine marginal + ax = axes[0] + ax.hist(cos, bins=40, density=True, alpha=0.5, color='steelblue', + edgecolor='white') + xs = np.linspace(cos.min(), cos.max(), 400) + means_2 = gmm_2.means_ + covs_2 = gmm_2.covariances_ + weights_2 = gmm_2.weights_ + order = np.argsort(-means_2[:, 0]) + for rank, i in enumerate(order, start=1): + ys = weights_2[i] * stats.norm.pdf(xs, means_2[i, 0], + np.sqrt(covs_2[i, 0, 0])) + ax.plot(xs, ys, '--', label=f'C{rank} μ={means_2[i,0]:.3f}') + if cos_cross is not None: + ax.axvline(cos_cross, color='green', lw=2, + label=f'Crossing = {cos_cross:.4f}') + ax.set_xlabel('Per-accountant mean cosine') + ax.set_ylabel('Density') + ax.set_title('Cosine marginal (2-component fit)') + ax.legend(fontsize=8) + + # dHash marginal + ax = axes[1] + ax.hist(dh, bins=40, density=True, alpha=0.5, color='coral', + edgecolor='white') + xs = np.linspace(dh.min(), dh.max(), 400) + for rank, i in enumerate(order, start=1): + ys = weights_2[i] * stats.norm.pdf(xs, means_2[i, 1], + np.sqrt(covs_2[i, 1, 1])) + ax.plot(xs, ys, '--', label=f'C{rank} μ={means_2[i,1]:.2f}') + if dh_cross is not None: + ax.axvline(dh_cross, color='green', lw=2, + label=f'Crossing = {dh_cross:.4f}') + ax.set_xlabel('Per-accountant mean dHash') + ax.set_ylabel('Density') + ax.set_title('dHash marginal (2-component fit)') + ax.legend(fontsize=8) + + plt.tight_layout() + fig.savefig(out_path, dpi=150) + plt.close() + + +def main(): + print('='*70) + print('Script 18: Accountant-Level Gaussian Mixture') + print('='*70) + + df = load_accountant_aggregates() + print(f'\nAccountants with >= {MIN_SIGS} signatures: {len(df)}') + X = np.array([[r['cos_mean'], r['dh_mean']] for r in df]) + + # Fit K=1..5 + print('\nFitting GMMs with K=1..5...') + bic_results, _ = fit_gmm_range(X, ks=(1, 2, 3, 4, 5), seed=42, n_init=15) + for r in bic_results: + print(f" K={r['k']}: BIC={r['bic']:.2f} AIC={r['aic']:.2f} " + f"converged={r['converged']}") + best_k = min(bic_results, key=lambda r: r['bic'])['k'] + print(f'\nBIC-best K = {best_k}') + + # Fit 3-component specifically (target) + gmm_3 = GaussianMixture(n_components=3, covariance_type='full', + random_state=42, n_init=15, max_iter=500).fit(X) + comps_3, labels_3 = summarize_components(gmm_3, X, df) + + print('\n--- 3-component summary ---') + for c in comps_3: + tops = ', '.join(f"{f}({n})" for f, n in c['top_firms']) + print(f" C{c['component']}: cos={c['mu_cos']:.3f}, " + f"dh={c['mu_dh']:.2f}, w={c['weight']:.2f}, " + f"n={c['n_accountants']} -> {tops}") + + # Fit 2-component for threshold derivation + gmm_2 = GaussianMixture(n_components=2, covariance_type='full', + random_state=42, n_init=15, max_iter=500).fit(X) + comps_2, labels_2 = summarize_components(gmm_2, X, df) + + # Crossings + cos_cross = marginal_crossing(gmm_2.means_, gmm_2.covariances_, + gmm_2.weights_, dim=0, + search_lo=X[:, 0].min(), + search_hi=X[:, 0].max()) + dh_cross = marginal_crossing(gmm_2.means_, gmm_2.covariances_, + gmm_2.weights_, dim=1, + search_lo=X[:, 1].min(), + search_hi=X[:, 1].max()) + print(f'\n2-component crossings: cos={cos_cross}, dh={dh_cross}') + + # Plots + plot_2d(df, labels_3, gmm_3.means_, + '3-component accountant-level GMM', + OUT / 'accountant_mixture_2d.png') + plot_marginals(df, labels_2, gmm_2, + OUT / 'accountant_mixture_marginals.png', + cos_cross=cos_cross, dh_cross=dh_cross) + + # Per-accountant CSV (for downstream use) + csv_path = OUT / 'accountant_clusters.csv' + with open(csv_path, 'w', encoding='utf-8') as f: + f.write('accountant,firm,n_signatures,cos_mean,dh_mean,' + 'cluster_k3,cluster_k2\n') + for r, k3, k2 in zip(df, labels_3, labels_2): + f.write(f"{r['accountant']},{r['firm']},{r['n']}," + f"{r['cos_mean']:.6f},{r['dh_mean']:.6f},{k3},{k2}\n") + print(f'CSV: {csv_path}') + + # Summary JSON + summary = { + 'generated_at': datetime.now().isoformat(), + 'n_accountants': len(df), + 'min_signatures': MIN_SIGS, + 'bic_model_selection': bic_results, + 'best_k_by_bic': best_k, + 'gmm_3': { + 'components': comps_3, + 'aic': float(gmm_3.aic(X)), + 'bic': float(gmm_3.bic(X)), + 'log_likelihood': float(gmm_3.score(X) * len(X)), + }, + 'gmm_2': { + 'components': comps_2, + 'aic': float(gmm_2.aic(X)), + 'bic': float(gmm_2.bic(X)), + 'log_likelihood': float(gmm_2.score(X) * len(X)), + 'cos_crossing': cos_cross, + 'dh_crossing': dh_cross, + }, + } + with open(OUT / 'accountant_mixture_results.json', 'w') as f: + json.dump(summary, f, indent=2, ensure_ascii=False) + print(f'JSON: {OUT / "accountant_mixture_results.json"}') + + # Markdown + md = [ + '# Accountant-Level Gaussian Mixture Report', + f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}", + '', + '## Data', + '', + f'* Per-accountant aggregates: mean cosine max-similarity, ' + f'mean independent min dHash.', + f"* Minimum signatures per accountant: {MIN_SIGS}.", + f'* Accountants included: **{len(df)}**.', + '', + '## Model selection (BIC)', + '', + '| K | BIC | AIC | Converged |', + '|---|-----|-----|-----------|', + ] + for r in bic_results: + mark = ' ←best' if r['k'] == best_k else '' + md.append( + f"| {r['k']} | {r['bic']:.2f} | {r['aic']:.2f} | " + f"{r['converged']}{mark} |" + ) + + md += ['', '## 3-component fit', '', + '| Component | cos_mean | dh_mean | weight | n_accountants | top firms |', + '|-----------|----------|---------|--------|----------------|-----------|'] + for c in comps_3: + tops = ', '.join(f"{f}:{n}" for f, n in c['top_firms']) + md.append( + f"| C{c['component']} | {c['mu_cos']:.3f} | {c['mu_dh']:.2f} | " + f"{c['weight']:.3f} | {c['n_accountants']} | {tops} |" + ) + + md += ['', '## 2-component fit (threshold derivation)', '', + '| Component | cos_mean | dh_mean | weight | n_accountants |', + '|-----------|----------|---------|--------|----------------|'] + for c in comps_2: + md.append( + f"| C{c['component']} | {c['mu_cos']:.3f} | {c['mu_dh']:.2f} | " + f"{c['weight']:.3f} | {c['n_accountants']} |" + ) + + md += ['', '### Natural thresholds from 2-component crossings', '', + f'* Cosine: **{cos_cross:.4f}**' if cos_cross + else '* Cosine: no crossing found', + f'* dHash: **{dh_cross:.4f}**' if dh_cross + else '* dHash: no crossing found', + '', + '## Interpretation', + '', + 'The accountant-level mixture separates signing-behaviour regimes,', + 'while the signature-level distribution is a continuous spectrum', + '(see Scripts 15 and 17). The BIC-best model chooses how many', + 'discrete regimes the data supports. The 2-component crossings', + 'are the natural per-accountant thresholds for classifying a', + "CPA's signing behaviour.", + '', + '## Artifacts', + '', + '* `accountant_mixture_2d.png` - 2D scatter with 3-component fit', + '* `accountant_mixture_marginals.png` - 1D marginals with 2-component fit', + '* `accountant_clusters.csv` - per-accountant cluster assignments', + '* `accountant_mixture_results.json` - full numerical results', + ] + (OUT / 'accountant_mixture_report.md').write_text('\n'.join(md), + encoding='utf-8') + print(f'Report: {OUT / "accountant_mixture_report.md"}') + + +if __name__ == '__main__': + main() diff --git a/signature_analysis/19_pixel_identity_validation.py b/signature_analysis/19_pixel_identity_validation.py new file mode 100644 index 0000000..7090059 --- /dev/null +++ b/signature_analysis/19_pixel_identity_validation.py @@ -0,0 +1,413 @@ +#!/usr/bin/env python3 +""" +Script 19: Pixel-Identity Validation (No Human Annotation Required) +=================================================================== +Validates the cosine + dHash dual classifier using three naturally +occurring reference populations instead of manual labels: + + Positive anchor 1: pixel_identical_to_closest = 1 + Two signature images byte-identical after crop/resize. + Mathematically impossible to arise from independent hand-signing + => absolute ground truth for replication. + + Positive anchor 2: Firm A (Deloitte) signatures + Interview + visual evidence establishes near-universal non-hand- + signing across 2013-2023 (see memories 2026-04-08, 2026-04-14). + We treat Firm A as a strong prior positive. + + Negative anchor: signatures with cosine <= low threshold + Pairs with very low cosine similarity cannot plausibly be pixel + duplicates, so they serve as absolute negatives. + +Metrics reported: + - FAR/FRR/EER using the pixel-identity anchor as the gold positive + and low-similarity pairs as the gold negative. + - Precision/Recall/F1 at cosine and dHash thresholds from Scripts + 15/16/17/18. + - Convergence with Firm A anchor (what fraction of Firm A signatures + are correctly classified at each threshold). + +Small visual sanity sample (30 pairs) is exported for spot-check, but +metrics are derived entirely from pixel and Firm A evidence. + +Output: + reports/pixel_validation/pixel_validation_report.md + reports/pixel_validation/pixel_validation_results.json + reports/pixel_validation/roc_cosine.png, roc_dhash.png + reports/pixel_validation/sanity_sample.csv +""" + +import sqlite3 +import json +import numpy as np +import matplotlib +matplotlib.use('Agg') +import matplotlib.pyplot as plt +from pathlib import Path +from datetime import datetime + +DB = '/Volumes/NV2/PDF-Processing/signature-analysis/signature_analysis.db' +OUT = Path('/Volumes/NV2/PDF-Processing/signature-analysis/reports/' + 'pixel_validation') +OUT.mkdir(parents=True, exist_ok=True) + +FIRM_A = '勤業眾信聯合' +NEGATIVE_COSINE_UPPER = 0.70 # pairs with max-cosine < 0.70 assumed not replicated +SANITY_SAMPLE_SIZE = 30 + + +def load_signatures(): + conn = sqlite3.connect(DB) + cur = conn.cursor() + cur.execute(''' + SELECT s.signature_id, s.image_filename, s.assigned_accountant, + a.firm, s.max_similarity_to_same_accountant, + s.phash_distance_to_closest, s.min_dhash_independent, + s.pixel_identical_to_closest, s.closest_match_file + FROM signatures s + LEFT JOIN accountants a ON s.assigned_accountant = a.name + WHERE s.max_similarity_to_same_accountant IS NOT NULL + ''') + rows = cur.fetchall() + conn.close() + data = [] + for r in rows: + data.append({ + 'sig_id': r[0], 'filename': r[1], 'accountant': r[2], + 'firm': r[3] or '(unknown)', + 'cosine': float(r[4]), + 'dhash_cond': None if r[5] is None else int(r[5]), + 'dhash_indep': None if r[6] is None else int(r[6]), + 'pixel_identical': int(r[7] or 0), + 'closest_match': r[8], + }) + return data + + +def confusion(y_true, y_pred): + tp = int(np.sum((y_true == 1) & (y_pred == 1))) + fp = int(np.sum((y_true == 0) & (y_pred == 1))) + fn = int(np.sum((y_true == 1) & (y_pred == 0))) + tn = int(np.sum((y_true == 0) & (y_pred == 0))) + return tp, fp, fn, tn + + +def classification_metrics(y_true, y_pred): + tp, fp, fn, tn = confusion(y_true, y_pred) + denom_p = max(tp + fp, 1) + denom_r = max(tp + fn, 1) + precision = tp / denom_p + recall = tp / denom_r + f1 = (2 * precision * recall / (precision + recall) + if precision + recall > 0 else 0.0) + far = fp / max(fp + tn, 1) # false acceptance rate (over negatives) + frr = fn / max(fn + tp, 1) # false rejection rate (over positives) + return { + 'tp': tp, 'fp': fp, 'fn': fn, 'tn': tn, + 'precision': float(precision), + 'recall': float(recall), + 'f1': float(f1), + 'far': float(far), + 'frr': float(frr), + } + + +def sweep_threshold(scores, y, directions, thresholds): + """For direction 'above' a prediction is positive if score > threshold; + for 'below' it is positive if score < threshold.""" + out = [] + for t in thresholds: + if directions == 'above': + y_pred = (scores > t).astype(int) + else: + y_pred = (scores < t).astype(int) + m = classification_metrics(y, y_pred) + m['threshold'] = float(t) + out.append(m) + return out + + +def find_eer(sweep): + """EER = point where FAR ≈ FRR; interpolated from nearest pair.""" + thr = np.array([s['threshold'] for s in sweep]) + far = np.array([s['far'] for s in sweep]) + frr = np.array([s['frr'] for s in sweep]) + diff = far - frr + signs = np.sign(diff) + changes = np.where(np.diff(signs) != 0)[0] + if len(changes) == 0: + idx = int(np.argmin(np.abs(diff))) + return {'threshold': float(thr[idx]), 'far': float(far[idx]), + 'frr': float(frr[idx]), 'eer': float(0.5 * (far[idx] + frr[idx]))} + i = int(changes[0]) + w = abs(diff[i]) / (abs(diff[i]) + abs(diff[i + 1]) + 1e-12) + thr_i = (1 - w) * thr[i] + w * thr[i + 1] + far_i = (1 - w) * far[i] + w * far[i + 1] + frr_i = (1 - w) * frr[i] + w * frr[i + 1] + return {'threshold': float(thr_i), 'far': float(far_i), + 'frr': float(frr_i), 'eer': float(0.5 * (far_i + frr_i))} + + +def plot_roc(sweep, title, out_path): + far = np.array([s['far'] for s in sweep]) + frr = np.array([s['frr'] for s in sweep]) + thr = np.array([s['threshold'] for s in sweep]) + fig, axes = plt.subplots(1, 2, figsize=(13, 5)) + + ax = axes[0] + ax.plot(far, 1 - frr, 'b-', lw=2) + ax.plot([0, 1], [0, 1], 'k--', alpha=0.4) + ax.set_xlabel('FAR') + ax.set_ylabel('1 - FRR (True Positive Rate)') + ax.set_title(f'{title} - ROC') + ax.set_xlim(0, 1) + ax.set_ylim(0, 1) + ax.grid(alpha=0.3) + + ax = axes[1] + ax.plot(thr, far, 'r-', lw=2, label='FAR') + ax.plot(thr, frr, 'b-', lw=2, label='FRR') + ax.set_xlabel('Threshold') + ax.set_ylabel('Error rate') + ax.set_title(f'{title} - FAR / FRR vs threshold') + ax.legend() + ax.grid(alpha=0.3) + + plt.tight_layout() + fig.savefig(out_path, dpi=150) + plt.close() + + +def main(): + print('='*70) + print('Script 19: Pixel-Identity Validation (No Annotation)') + print('='*70) + + data = load_signatures() + print(f'\nTotal signatures loaded: {len(data):,}') + cos = np.array([d['cosine'] for d in data]) + dh_indep = np.array([d['dhash_indep'] if d['dhash_indep'] is not None + else -1 for d in data]) + pix = np.array([d['pixel_identical'] for d in data]) + firm = np.array([d['firm'] for d in data]) + + print(f'Pixel-identical: {int(pix.sum()):,} signatures') + print(f'Firm A signatures: {int((firm == FIRM_A).sum()):,}') + print(f'Negative anchor (cosine < {NEGATIVE_COSINE_UPPER}): ' + f'{int((cos < NEGATIVE_COSINE_UPPER).sum()):,}') + + # Build labelled set: + # positive = pixel_identical == 1 + # negative = cosine < NEGATIVE_COSINE_UPPER (and not pixel_identical) + pos_mask = pix == 1 + neg_mask = (cos < NEGATIVE_COSINE_UPPER) & (~pos_mask) + labelled_mask = pos_mask | neg_mask + y = pos_mask[labelled_mask].astype(int) + cos_l = cos[labelled_mask] + dh_l = dh_indep[labelled_mask] + + # --- Sweep cosine threshold + cos_thresh = np.linspace(0.50, 1.00, 101) + cos_sweep = sweep_threshold(cos_l, y, 'above', cos_thresh) + cos_eer = find_eer(cos_sweep) + print(f'\nCosine EER: threshold={cos_eer["threshold"]:.4f}, ' + f'EER={cos_eer["eer"]:.4f}') + + # --- Sweep dHash threshold (independent) + dh_l_valid = dh_l >= 0 + y_dh = y[dh_l_valid] + dh_valid = dh_l[dh_l_valid] + dh_thresh = np.arange(0, 40) + dh_sweep = sweep_threshold(dh_valid, y_dh, 'below', dh_thresh) + dh_eer = find_eer(dh_sweep) + print(f'dHash EER: threshold={dh_eer["threshold"]:.4f}, ' + f'EER={dh_eer["eer"]:.4f}') + + # Plots + plot_roc(cos_sweep, 'Cosine (pixel-identity anchor)', + OUT / 'roc_cosine.png') + plot_roc(dh_sweep, 'Independent dHash (pixel-identity anchor)', + OUT / 'roc_dhash.png') + + # --- Evaluate canonical thresholds + canonical = [ + ('cosine', 0.837, 'above', cos, pos_mask, neg_mask), + ('cosine', 0.941, 'above', cos, pos_mask, neg_mask), + ('cosine', 0.95, 'above', cos, pos_mask, neg_mask), + ('dhash_indep', 5, 'below', dh_indep, pos_mask, + neg_mask & (dh_indep >= 0)), + ('dhash_indep', 8, 'below', dh_indep, pos_mask, + neg_mask & (dh_indep >= 0)), + ('dhash_indep', 15, 'below', dh_indep, pos_mask, + neg_mask & (dh_indep >= 0)), + ] + canonical_results = [] + for name, thr, direction, scores, p_mask, n_mask in canonical: + labelled = p_mask | n_mask + valid = labelled & (scores >= 0 if 'dhash' in name else np.ones_like( + labelled, dtype=bool)) + y_local = p_mask[valid].astype(int) + s = scores[valid] + if direction == 'above': + y_pred = (s > thr).astype(int) + else: + y_pred = (s < thr).astype(int) + m = classification_metrics(y_local, y_pred) + m.update({'indicator': name, 'threshold': float(thr), + 'direction': direction}) + canonical_results.append(m) + print(f" {name} @ {thr:>5} ({direction}): " + f"P={m['precision']:.3f}, R={m['recall']:.3f}, " + f"F1={m['f1']:.3f}, FAR={m['far']:.4f}, FRR={m['frr']:.4f}") + + # --- Firm A anchor validation + firm_a_mask = firm == FIRM_A + firm_a_cos = cos[firm_a_mask] + firm_a_dh = dh_indep[firm_a_mask] + + firm_a_rates = {} + for thr in [0.837, 0.941, 0.95]: + firm_a_rates[f'cosine>{thr}'] = float(np.mean(firm_a_cos > thr)) + for thr in [5, 8, 15]: + valid = firm_a_dh >= 0 + firm_a_rates[f'dhash_indep<={thr}'] = float( + np.mean(firm_a_dh[valid] <= thr)) + # Dual thresholds + firm_a_rates['cosine>0.95 AND dhash_indep<=8'] = float( + np.mean((firm_a_cos > 0.95) & + (firm_a_dh >= 0) & (firm_a_dh <= 8))) + + print('\nFirm A anchor validation:') + for k, v in firm_a_rates.items(): + print(f' {k}: {v*100:.2f}%') + + # --- Stratified sanity sample (30 signatures across 5 strata) + rng = np.random.default_rng(42) + strata = [ + ('pixel_identical', pix == 1), + ('high_cos_low_dh', + (cos > 0.95) & (dh_indep >= 0) & (dh_indep <= 5) & (pix == 0)), + ('borderline', + (cos > 0.837) & (cos < 0.95) & (dh_indep >= 0) & (dh_indep <= 15)), + ('style_consistency_only', + (cos > 0.95) & (dh_indep >= 0) & (dh_indep > 15)), + ('likely_genuine', cos < NEGATIVE_COSINE_UPPER), + ] + sanity_sample = [] + per_stratum = SANITY_SAMPLE_SIZE // len(strata) + for stratum_name, m in strata: + idx = np.where(m)[0] + pick = rng.choice(idx, size=min(per_stratum, len(idx)), replace=False) + for i in pick: + d = data[i] + sanity_sample.append({ + 'stratum': stratum_name, 'sig_id': d['sig_id'], + 'filename': d['filename'], 'accountant': d['accountant'], + 'firm': d['firm'], 'cosine': d['cosine'], + 'dhash_indep': d['dhash_indep'], + 'pixel_identical': d['pixel_identical'], + 'closest_match': d['closest_match'], + }) + + csv_path = OUT / 'sanity_sample.csv' + with open(csv_path, 'w', encoding='utf-8') as f: + keys = ['stratum', 'sig_id', 'filename', 'accountant', 'firm', + 'cosine', 'dhash_indep', 'pixel_identical', 'closest_match'] + f.write(','.join(keys) + '\n') + for row in sanity_sample: + f.write(','.join(str(row[k]) if row[k] is not None else '' + for k in keys) + '\n') + print(f'\nSanity sample CSV: {csv_path}') + + # --- Save results + summary = { + 'generated_at': datetime.now().isoformat(), + 'n_signatures': len(data), + 'n_pixel_identical': int(pos_mask.sum()), + 'n_firm_a': int(firm_a_mask.sum()), + 'n_negative_anchor': int(neg_mask.sum()), + 'negative_cosine_upper': NEGATIVE_COSINE_UPPER, + 'eer_cosine': cos_eer, + 'eer_dhash_indep': dh_eer, + 'canonical_thresholds': canonical_results, + 'firm_a_anchor_rates': firm_a_rates, + 'cosine_sweep': cos_sweep, + 'dhash_sweep': dh_sweep, + } + with open(OUT / 'pixel_validation_results.json', 'w') as f: + json.dump(summary, f, indent=2, ensure_ascii=False) + print(f'JSON: {OUT / "pixel_validation_results.json"}') + + # --- Markdown + md = [ + '# Pixel-Identity Validation Report', + f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}", + '', + '## Anchors (no human annotation required)', + '', + f'* **Pixel-identical anchor (gold positive):** ' + f'{int(pos_mask.sum()):,} signatures whose closest same-accountant', + ' match is byte-identical after crop/normalise. Under handwriting', + ' physics this can only arise from image duplication.', + f'* **Negative anchor:** signatures whose maximum same-accountant', + f' cosine is below {NEGATIVE_COSINE_UPPER} ' + f'({int(neg_mask.sum()):,} signatures). Treated as', + ' confirmed not-replicated.', + f'* **Firm A anchor:** Deloitte ({int(firm_a_mask.sum()):,} signatures),', + ' near-universally non-hand-signed per partner interviews.', + '', + '## Equal Error Rate (EER)', + '', + '| Indicator | Direction | EER threshold | EER |', + '|-----------|-----------|---------------|-----|', + f"| Cosine max-similarity | > t | {cos_eer['threshold']:.4f} | " + f"{cos_eer['eer']:.4f} |", + f"| Independent min dHash | < t | {dh_eer['threshold']:.4f} | " + f"{dh_eer['eer']:.4f} |", + '', + '## Canonical thresholds', + '', + '| Indicator | Threshold | Precision | Recall | F1 | FAR | FRR |', + '|-----------|-----------|-----------|--------|----|-----|-----|', + ] + for c in canonical_results: + md.append( + f"| {c['indicator']} | {c['threshold']} " + f"({c['direction']}) | {c['precision']:.3f} | " + f"{c['recall']:.3f} | {c['f1']:.3f} | " + f"{c['far']:.4f} | {c['frr']:.4f} |" + ) + + md += ['', '## Firm A anchor validation', '', + '| Rule | Firm A rate |', + '|------|-------------|'] + for k, v in firm_a_rates.items(): + md.append(f'| {k} | {v*100:.2f}% |') + + md += ['', '## Sanity sample', '', + f'A stratified sample of {len(sanity_sample)} signatures ' + '(pixel-identical, high-cos/low-dh, borderline, style-only, ' + 'likely-genuine) is exported to `sanity_sample.csv` for visual', + 'spot-check. These are **not** used to compute metrics.', + '', + '## Interpretation', + '', + 'Because the gold positive is a *subset* of the true replication', + 'positives (only those that happen to be pixel-identical to their', + 'nearest match), recall is conservative: the classifier should', + 'catch pixel-identical pairs reliably and will additionally flag', + 'many non-pixel-identical replications (low dHash but not zero).', + 'FAR against the low-cosine negative anchor is the meaningful', + 'upper bound on spurious replication flags.', + '', + 'Convergence of thresholds across Scripts 15 (dip test), 16 (BD),', + '17 (Beta mixture), 18 (accountant mixture) and the EER here', + 'should be reported in the paper as multi-method validation.', + ] + (OUT / 'pixel_validation_report.md').write_text('\n'.join(md), + encoding='utf-8') + print(f'Report: {OUT / "pixel_validation_report.md"}') + + +if __name__ == '__main__': + main()