#!/usr/bin/env python3 """ Script 36: Paper A v4.0 Calibration + Leave-One-Firm-Out Validation ===================================================================== Phase 1 foundation script for the v4.0 Big-4 reframe. Inputs (DB): /Volumes/NV2/PDF-Processing/signature-analysis/signature_analysis.db Output: /Volumes/NV2/PDF-Processing/signature-analysis/reports/v4_big4/ calibration_and_loo_validation/ calibration_loo_results.json calibration_loo_report.md panel_calibration.png panel_loo_.png Sections: A. Big-4 calibration recap - Pool Firm A + KPMG + PwC + EY accountant means (n=437 CPAs). - Fit 2D GMM K=2 (primary) and K=3 (secondary). - Bootstrap 500 resamples for marginal crossings (cos and dh). - Derive operational classifier rule: R_v4 := cos > c_cut AND dh <= d_cut where (c_cut, d_cut) = (Big-4 2D-GMM K=2 marginal crossings). B. Leave-one-firm-out (LOOO) cross-validation - For each of 4 Big-4 firms F: * Refit K=2 on the other 3 firms only. * Bootstrap 500 resamples for the held-out fit's marginal crossings. * Predict the held-out F CPAs' cluster assignments using the held-out-derived rule. * Compute: - n_F, n_F_classified_replicated (cluster C_high_cos), n_F_classified_handleaning (cluster C_low_cos) - Wilson 95% CI on the replicated rate for F - Compare derived rule (c_cut, d_cut) across folds: is it stable? C. Cross-fold stability table - For each fold, report (c_cut, d_cut), and the replicated rate the held-out firm receives. - Verdict (printed and saved): STABLE max |c_cut - mean| <= 0.005 AND max |d_cut - mean| <= 0.5 across the 4 folds UNSTABLE otherwise Methodology decisions (flag for partner / reviewer feedback): * Held-out unit = firm (not 30% of accountants within firm). Rationale: v4.0 makes a methodology-paper claim that the pipeline reproduces across firms. Within-firm 70/30 only tests sampling variance within one firm; LOOO tests cross-firm generalization, which is the stronger and more honest claim. * Bootstrap n=500 (vs Script 34's 500 — kept consistent). * GMM hyperparameters (n_init=15, max_iter=500, random_state=42) kept consistent with Scripts 32/34/35. """ import sqlite3 import json import numpy as np import matplotlib matplotlib.use('Agg') import matplotlib.pyplot as plt from pathlib import Path from datetime import datetime from scipy import stats from scipy.optimize import brentq from scipy.stats import norm from sklearn.mixture import GaussianMixture import diptest DB = '/Volumes/NV2/PDF-Processing/signature-analysis/signature_analysis.db' OUT = Path('/Volumes/NV2/PDF-Processing/signature-analysis/reports/' 'v4_big4/calibration_and_loo_validation') OUT.mkdir(parents=True, exist_ok=True) MIN_SIGS = 10 N_BOOT = 500 SEED = 42 BIG4 = ('勤業眾信聯合', '安侯建業聯合', '資誠聯合', '安永聯合') FIRM_A_LABEL = '勤業眾信聯合' # Deloitte def load_big4_accountants(): """Return list of dicts: {cpa, firm, cos_mean, dh_mean, n_sigs}.""" conn = sqlite3.connect(DB) cur = conn.cursor() cur.execute(''' SELECT s.assigned_accountant, a.firm, AVG(s.max_similarity_to_same_accountant) AS cos_mean, AVG(CAST(s.min_dhash_independent AS REAL)) AS dh_mean, COUNT(*) AS n FROM signatures s JOIN accountants a ON s.assigned_accountant = a.name WHERE s.assigned_accountant IS NOT NULL AND s.max_similarity_to_same_accountant IS NOT NULL AND s.min_dhash_independent IS NOT NULL AND a.firm IN (?, ?, ?, ?) GROUP BY s.assigned_accountant HAVING n >= ? ''', BIG4 + (MIN_SIGS,)) rows = cur.fetchall() conn.close() return [{'cpa': r[0], 'firm': r[1], 'cos_mean': float(r[2]), 'dh_mean': float(r[3]), 'n_sigs': int(r[4])} for r in rows] def fit_gmm_2d(X, K, seed=SEED): return GaussianMixture(n_components=K, covariance_type='full', random_state=seed, n_init=15, max_iter=500).fit(X) def marginal_crossing(gmm, X, dim): """2-comp 2D GMM -> crossing on the specified marginal dim.""" means = gmm.means_ covs = gmm.covariances_ weights = gmm.weights_ if gmm.n_components != 2: raise ValueError('marginal_crossing requires K=2') m1, m2 = means[0][dim], means[1][dim] s1 = np.sqrt(covs[0][dim, dim]) s2 = np.sqrt(covs[1][dim, dim]) w1, w2 = weights[0], weights[1] def diff(x): return (w2 * stats.norm.pdf(x, m2, s2) - w1 * stats.norm.pdf(x, m1, s1)) xs = np.linspace(float(X[:, dim].min()), float(X[:, dim].max()), 2000) ys = diff(xs) ch = np.where(np.diff(np.sign(ys)) != 0)[0] if not len(ch): return None mid = 0.5 * (m1 + m2) crossings = [] for i in ch: try: crossings.append(brentq(diff, xs[i], xs[i + 1])) except ValueError: continue if not crossings: return None return float(min(crossings, key=lambda c: abs(c - mid))) def bootstrap_crossings(X, n_boot=N_BOOT, seed=SEED): rng = np.random.default_rng(seed) n = len(X) cos_cs, dh_cs = [], [] for _ in range(n_boot): idx = rng.integers(0, n, size=n) Xb = X[idx] gmm = fit_gmm_2d(Xb, 2) c = marginal_crossing(gmm, Xb, 0) d = marginal_crossing(gmm, Xb, 1) if c is not None: cos_cs.append(c) if d is not None: dh_cs.append(d) cos_cs = np.asarray(cos_cs) dh_cs = np.asarray(dh_cs) def summarize(arr): if len(arr) < n_boot * 0.5: return None return { 'n_successful': int(len(arr)), 'mean': float(np.mean(arr)), 'median': float(np.median(arr)), 'std': float(np.std(arr, ddof=1)), 'ci95': [float(np.quantile(arr, 0.025)), float(np.quantile(arr, 0.975))], 'ci_halfwidth': float(0.5 * (np.quantile(arr, 0.975) - np.quantile(arr, 0.025))), } return summarize(cos_cs), summarize(dh_cs) def derive_rule(c_cut, d_cut): """Operational classifier rule: a signature is replicated iff cos > c_cut AND dh <= d_cut.""" return { 'cos_threshold': float(c_cut) if c_cut is not None else None, 'dh_threshold': float(d_cut) if d_cut is not None else None, 'rule': (f'replicated iff cos > {c_cut:.4f} AND dh <= {d_cut:.4f}' if c_cut is not None and d_cut is not None else 'rule undefined'), } def wilson_ci(k, n, alpha=0.05): if n == 0: return (0.0, 1.0) z = norm.ppf(1 - alpha / 2) phat = k / n denom = 1 + z * z / n center = (phat + z * z / (2 * n)) / denom pm = z * np.sqrt(phat * (1 - phat) / n + z * z / (4 * n * n)) / denom return (max(0.0, center - pm), min(1.0, center + pm)) def classify_cpa(cos_mean, dh_mean, c_cut, d_cut): """At the accountant level, a CPA is 'replicated' if their MEAN coordinates satisfy the rule. (Note: this is a CPA-level summarisation; a per-signature classifier would apply the same rule signature-by-signature.)""" if c_cut is None or d_cut is None: return 'undefined' if cos_mean > c_cut and dh_mean <= d_cut: return 'replicated' return 'hand_leaning' def kde_dip(values): arr = np.asarray(values, dtype=float) arr = arr[np.isfinite(arr)] if len(arr) < 8: return None dip, pval = diptest.diptest(arr, boot_pval=True, n_boot=2000) return {'dip': float(dip), 'dip_pvalue': float(pval), 'unimodal_alpha05': bool(pval > 0.05), 'n': int(len(arr))} def run_calibration(cpas): cos = np.array([c['cos_mean'] for c in cpas]) dh = np.array([c['dh_mean'] for c in cpas]) X = np.column_stack([cos, dh]) print(f'\n[A] Calibration on {len(cpas)} Big-4 CPAs') dip_cos = kde_dip(cos) dip_dh = kde_dip(dh) print(f' dip-test (cos): p={dip_cos["dip_pvalue"]:.4g}') print(f' dip-test (dh) : p={dip_dh["dip_pvalue"]:.4g}') gmm2 = fit_gmm_2d(X, 2) gmm3 = fit_gmm_2d(X, 3) c_cut = marginal_crossing(gmm2, X, 0) d_cut = marginal_crossing(gmm2, X, 1) print(f' K=2 marginal crossings: cos={c_cut:.4f}, dh={d_cut:.4f}') print(f' K=2 BIC={gmm2.bic(X):.2f}; K=3 BIC={gmm3.bic(X):.2f}') boot_cos, boot_dh = bootstrap_crossings(X) if boot_cos: print(f' bootstrap (cos): median={boot_cos["median"]:.4f}, ' f'95% CI=[{boot_cos["ci95"][0]:.4f}, {boot_cos["ci95"][1]:.4f}]') if boot_dh: print(f' bootstrap (dh) : median={boot_dh["median"]:.4f}, ' f'95% CI=[{boot_dh["ci95"][0]:.4f}, {boot_dh["ci95"][1]:.4f}]') rule = derive_rule(c_cut, d_cut) print(f' Derived rule: {rule["rule"]}') return { 'n_cpas': len(cpas), 'dip_test_cos': dip_cos, 'dip_test_dh': dip_dh, 'k2_crossings': {'cos': c_cut, 'dh': d_cut}, 'k2_bic': float(gmm2.bic(X)), 'k3_bic': float(gmm3.bic(X)), 'k2_components': { 'means': gmm2.means_.tolist(), 'weights': gmm2.weights_.tolist(), }, 'bootstrap_cos': boot_cos, 'bootstrap_dh': boot_dh, 'rule': rule, } def run_loo(cpas): """Leave-one-firm-out cross-validation.""" by_firm = {} for c in cpas: by_firm.setdefault(c['firm'], []).append(c) fold_results = {} for held_firm in BIG4: train_cpas = [c for c in cpas if c['firm'] != held_firm] held_cpas = by_firm.get(held_firm, []) n_train = len(train_cpas) n_held = len(held_cpas) print(f'\n[B] LOOO fold: held-out = {held_firm} ' f'(n_train={n_train}, n_held={n_held})') X_train = np.column_stack([ [c['cos_mean'] for c in train_cpas], [c['dh_mean'] for c in train_cpas], ]) gmm = fit_gmm_2d(X_train, 2) c_cut = marginal_crossing(gmm, X_train, 0) d_cut = marginal_crossing(gmm, X_train, 1) boot_cos, boot_dh = bootstrap_crossings(X_train) # Apply derived rule to held-out firm replicated = 0 hand_leaning = 0 for c in held_cpas: cls = classify_cpa(c['cos_mean'], c['dh_mean'], c_cut, d_cut) if cls == 'replicated': replicated += 1 else: hand_leaning += 1 rep_rate = replicated / n_held if n_held else 0.0 wlo, whi = wilson_ci(replicated, n_held) print(f' fold rule: cos>{c_cut:.4f} AND dh<={d_cut:.4f}') print(f' held-out replicated: {replicated}/{n_held} = ' f'{rep_rate*100:.2f}% [{wlo*100:.2f}%, {whi*100:.2f}%]') fold_results[held_firm] = { 'n_train': n_train, 'n_held': n_held, 'fold_rule': derive_rule(c_cut, d_cut), 'fold_crossings': {'cos': c_cut, 'dh': d_cut}, 'bootstrap_cos': boot_cos, 'bootstrap_dh': boot_dh, 'held_out_classification': { 'n_replicated': replicated, 'n_hand_leaning': hand_leaning, 'replicated_rate': rep_rate, 'wilson95': [float(wlo), float(whi)], }, } return fold_results def cross_fold_stability(fold_results, full_calib): cs = [fold_results[f]['fold_crossings']['cos'] for f in BIG4 if fold_results[f]['fold_crossings']['cos'] is not None] ds = [fold_results[f]['fold_crossings']['dh'] for f in BIG4 if fold_results[f]['fold_crossings']['dh'] is not None] full_c = full_calib['k2_crossings']['cos'] full_d = full_calib['k2_crossings']['dh'] summary = { 'fold_cos_crossings': cs, 'fold_dh_crossings': ds, 'mean_cos': float(np.mean(cs)) if cs else None, 'mean_dh': float(np.mean(ds)) if ds else None, 'max_dev_cos_from_mean': (float(max(abs(np.array(cs) - np.mean(cs)))) if cs else None), 'max_dev_dh_from_mean': (float(max(abs(np.array(ds) - np.mean(ds)))) if ds else None), 'max_dev_cos_from_full': (float(max(abs(np.array(cs) - full_c))) if cs and full_c else None), 'max_dev_dh_from_full': (float(max(abs(np.array(ds) - full_d))) if ds and full_d else None), } cos_stable = (summary['max_dev_cos_from_mean'] is not None and summary['max_dev_cos_from_mean'] <= 0.005) dh_stable = (summary['max_dev_dh_from_mean'] is not None and summary['max_dev_dh_from_mean'] <= 0.5) summary['verdict'] = ('STABLE' if (cos_stable and dh_stable) else 'UNSTABLE') return summary def render_panels(cpas, full_calib, fold_results): by_firm = {} for c in cpas: by_firm.setdefault(c['firm'], []).append(c) # Calibration panel fig, ax = plt.subplots(figsize=(9, 7)) colors = {'勤業眾信聯合': 'crimson', '安侯建業聯合': 'royalblue', '資誠聯合': 'forestgreen', '安永聯合': 'darkorange'} labels = {'勤業眾信聯合': 'Firm A (Deloitte)', '安侯建業聯合': 'KPMG', '資誠聯合': 'PwC', '安永聯合': 'EY'} for firm in BIG4: pts = by_firm[firm] ax.scatter([p['cos_mean'] for p in pts], [p['dh_mean'] for p in pts], s=30, alpha=0.6, color=colors[firm], label=f'{labels[firm]} (n={len(pts)})') c_cut = full_calib['k2_crossings']['cos'] d_cut = full_calib['k2_crossings']['dh'] ax.axvline(c_cut, color='black', ls='--', lw=1.5, label=f'cos cut = {c_cut:.4f}') ax.axhline(d_cut, color='black', ls=':', lw=1.5, label=f'dh cut = {d_cut:.4f}') ax.set_xlabel('Accountant cos_mean') ax.set_ylabel('Accountant dh_mean') ax.set_title('Big-4 calibration: 437 CPAs + K=2 marginal crossings') ax.legend(fontsize=9) ax.grid(alpha=0.3) fig.tight_layout() fig.savefig(OUT / 'panel_calibration.png', dpi=150) plt.close(fig) # LOOO panels for held_firm in BIG4: held = by_firm[held_firm] train_pts = [c for c in cpas if c['firm'] != held_firm] fr = fold_results[held_firm] c_cut_f = fr['fold_crossings']['cos'] d_cut_f = fr['fold_crossings']['dh'] fig, ax = plt.subplots(figsize=(9, 7)) ax.scatter([p['cos_mean'] for p in train_pts], [p['dh_mean'] for p in train_pts], s=20, alpha=0.4, color='lightgray', label=f'Train (other Big-3, n={len(train_pts)})') ax.scatter([p['cos_mean'] for p in held], [p['dh_mean'] for p in held], s=40, alpha=0.85, color=colors[held_firm], edgecolor='white', label=f'Held-out: {labels[held_firm]} (n={len(held)})') if c_cut_f is not None: ax.axvline(c_cut_f, color='black', ls='--', lw=1.5, label=f'fold cos cut = {c_cut_f:.4f}') if d_cut_f is not None: ax.axhline(d_cut_f, color='black', ls=':', lw=1.5, label=f'fold dh cut = {d_cut_f:.4f}') rep = fr['held_out_classification']['n_replicated'] nh = fr['n_held'] rate = fr['held_out_classification']['replicated_rate'] wlo, whi = fr['held_out_classification']['wilson95'] ax.set_title( f'LOOO: held-out {labels[held_firm]} ({rep}/{nh} = ' f'{rate*100:.1f}% replicated, Wilson 95% ' f'[{wlo*100:.1f}%, {whi*100:.1f}%])') ax.set_xlabel('Accountant cos_mean') ax.set_ylabel('Accountant dh_mean') ax.legend(fontsize=9) ax.grid(alpha=0.3) fig.tight_layout() firm_slug = ('FirmA' if held_firm == FIRM_A_LABEL else {'安侯建業聯合': 'KPMG', '資誠聯合': 'PwC', '安永聯合': 'EY'}.get(held_firm, held_firm)) fig.savefig(OUT / f'panel_loo_{firm_slug}.png', dpi=150) plt.close(fig) def render_md(full_calib, fold_results, stability, sample_sizes): md = [ '# Paper A v4.0 Phase 1 — Calibration + LOOO Validation', f'Generated: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}', '', '## A. Big-4 Calibration', '', f'- N CPAs: {full_calib["n_cpas"]}', f'- dip-test cos: p = {full_calib["dip_test_cos"]["dip_pvalue"]:.4g} ' f'({"unimodal" if full_calib["dip_test_cos"]["unimodal_alpha05"] else "multimodal"})', f'- dip-test dh : p = {full_calib["dip_test_dh"]["dip_pvalue"]:.4g} ' f'({"unimodal" if full_calib["dip_test_dh"]["unimodal_alpha05"] else "multimodal"})', f'- 2D GMM K=2 BIC = {full_calib["k2_bic"]:.2f}', f'- 2D GMM K=3 BIC = {full_calib["k3_bic"]:.2f}', '', '### Marginal crossings (point + bootstrap 95% CI, n=500)', '', '| Axis | Point | Bootstrap median | 95% CI | CI half-width |', '|---|---|---|---|---|', ] for axis_label, key in [('cos', 'bootstrap_cos'), ('dh', 'bootstrap_dh')]: b = full_calib[key] point = full_calib['k2_crossings'][axis_label] if b is None: md.append(f'| {axis_label} | {point} | n/a | n/a | n/a |') else: md.append(f'| {axis_label} | {point:.4f} | {b["median"]:.4f} | ' f'[{b["ci95"][0]:.4f}, {b["ci95"][1]:.4f}] | ' f'{b["ci_halfwidth"]:.4f} |') md += ['', f'### Operational classifier rule', '', f'> {full_calib["rule"]["rule"]}', '', '### K=2 components', '', '| Component | mean cos | mean dh | weight |', '|---|---|---|---|'] for i, (m, w) in enumerate(zip(full_calib['k2_components']['means'], full_calib['k2_components']['weights'])): md.append(f'| C{i+1} | {m[0]:.4f} | {m[1]:.4f} | {w:.3f} |') md += ['', '## B. Leave-One-Firm-Out Validation', '', '| Held-out firm | n_train | n_held | Fold cos cut | Fold dh cut | ' 'Replicated rate | Wilson 95% |', '|---|---|---|---|---|---|---|'] label_map = {'勤業眾信聯合': 'Firm A (Deloitte)', '安侯建業聯合': 'KPMG', '資誠聯合': 'PwC', '安永聯合': 'EY'} for f in BIG4: fr = fold_results[f] c = fr['fold_crossings']['cos'] d = fr['fold_crossings']['dh'] rep = fr['held_out_classification'] c_str = f'{c:.4f}' if c is not None else 'n/a' d_str = f'{d:.4f}' if d is not None else 'n/a' md.append(f'| {label_map[f]} | {fr["n_train"]} | {fr["n_held"]} | ' f'{c_str} | {d_str} | {rep["replicated_rate"]*100:.2f}% | ' f'[{rep["wilson95"][0]*100:.2f}%, ' f'{rep["wilson95"][1]*100:.2f}%] |') md += ['', '## C. Cross-fold stability', '', f'- Mean fold cos crossing: ' f'{stability["mean_cos"]:.4f}' if stability["mean_cos"] is not None else '- Mean fold cos crossing: n/a', f'- Mean fold dh crossing : ' f'{stability["mean_dh"]:.4f}' if stability["mean_dh"] is not None else '- Mean fold dh crossing: n/a', f'- Max |dev_cos| across folds: ' f'{stability["max_dev_cos_from_mean"]:.4f}' if stability["max_dev_cos_from_mean"] is not None else '- Max |dev_cos|: n/a', f'- Max |dev_dh| across folds : ' f'{stability["max_dev_dh_from_mean"]:.4f}' if stability["max_dev_dh_from_mean"] is not None else '- Max |dev_dh|: n/a', f'- Max |dev_cos| vs full-calib: ' f'{stability["max_dev_cos_from_full"]:.4f}' if stability["max_dev_cos_from_full"] is not None else '- Max |dev_cos| vs full: n/a', f'- Max |dev_dh| vs full-calib : ' f'{stability["max_dev_dh_from_full"]:.4f}' if stability["max_dev_dh_from_full"] is not None else '- Max |dev_dh| vs full: n/a', '', f'**Verdict: {stability["verdict"]}**', '', '### Verdict legend', '- **STABLE**: max |dev_cos| <= 0.005 AND max |dev_dh| <= 0.5 ' 'across the 4 LOOO folds; the threshold is reproducible across ' 'firms.', '- **UNSTABLE**: at least one fold deviates beyond the tolerance; ' 'the threshold is sensitive to which firm is held out, which ' 'would invite reviewer questions about generalizability.', '', '## Methodology notes', '', '- Held-out unit is the firm (not within-firm 70/30) -- this ' 'tests the v4.0 methodology-paper claim that the pipeline ' 'reproduces across firms, not just within a calibration sample.', '- Bootstrap n=500 (consistent with Script 34); ' 'GMM hyperparameters n_init=15, max_iter=500, random_state=42 ' '(consistent with Scripts 32/34/35).', '- CPA-level classification uses the rule applied to the ' 'accountant\'s mean (cos_mean, dh_mean). A per-signature ' 'classifier would apply the same rule signature-by-signature ' '(deferred to Script 38 for sensitivity analysis).', '', '## Files', '- `panel_calibration.png` -- 437 Big-4 CPAs + K=2 cuts', '- `panel_loo_.png` -- LOOO fold panels (4 firms)', '- `calibration_loo_results.json` -- machine-readable full output', ] return '\n'.join(md) def main(): print('=' * 72) print('Script 36: v4.0 Calibration + Leave-One-Firm-Out Validation') print('=' * 72) cpas = load_big4_accountants() sample_sizes = {} for c in cpas: sample_sizes.setdefault(c['firm'], 0) sample_sizes[c['firm']] += 1 print(f'\nTotal Big-4 CPAs (n_sigs >= {MIN_SIGS}): {len(cpas)}') for f in BIG4: print(f' {f}: {sample_sizes.get(f, 0)}') full_calib = run_calibration(cpas) fold_results = run_loo(cpas) stability = cross_fold_stability(fold_results, full_calib) print(f'\n[C] Cross-fold stability verdict: {stability["verdict"]}') print(f' Max |dev_cos| from mean = ' f'{stability["max_dev_cos_from_mean"]}; ' f'from full-calib = {stability["max_dev_cos_from_full"]}') print(f' Max |dev_dh| from mean = ' f'{stability["max_dev_dh_from_mean"]}; ' f'from full-calib = {stability["max_dev_dh_from_full"]}') render_panels(cpas, full_calib, fold_results) print(f'\nPanels: {OUT}/panel_calibration.png + 4 LOOO panels') payload = { 'generated_at': datetime.now().isoformat(), 'min_sigs_per_accountant': MIN_SIGS, 'n_bootstrap': N_BOOT, 'random_seed': SEED, 'sample_sizes': sample_sizes, 'big4_calibration': full_calib, 'loo_folds': fold_results, 'cross_fold_stability': stability, } json_path = OUT / 'calibration_loo_results.json' json_path.write_text(json.dumps(payload, indent=2, ensure_ascii=False), encoding='utf-8') print(f'JSON: {json_path}') md = render_md(full_calib, fold_results, stability, sample_sizes) md_path = OUT / 'calibration_loo_report.md' md_path.write_text(md, encoding='utf-8') print(f'Report: {md_path}') if __name__ == '__main__': main()