#!/usr/bin/env python3 """ Script 37: K=3 Leave-One-Firm-Out Check (Path P2 viability test) ================================================================= Follow-up to Script 36's UNSTABLE K=2 LOOO finding. Tests whether the K=3 mixture's C1 component (lowest-cosine "hand-leaning" cluster, ~14% weight per Script 35) is a real cross-firm sub-population or is also firm-mass driven. Reference: Script 35 (full Big-4 K=3) reported C1 cluster membership: Firm A 0/171 = 0.0% KPMG 10/112 = 8.9% PwC 24/102 = 23.5% EY 6/52 = 11.5% The hypothesis: if C1 is a true cross-firm hand-leaning sub-population, then: - Across 4 LOOO folds, the C1 component should sit at roughly the same (cos, dh) coordinates with similar weight. - When the held-out firm's CPAs are assigned via the fold's K=3 posterior, the fraction in C1 should approximate the Script 35 full-data percentages. If C1 collapses, shifts dramatically, or fails to predict held-out membership, then K=3 is also firm-mass driven and Path P2 fails. Output: reports/v4_big4/k3_loo_check/ k3_loo_results.json k3_loo_report.md panel_k3_loo_.png """ import sqlite3 import json import numpy as np import matplotlib matplotlib.use('Agg') import matplotlib.pyplot as plt from pathlib import Path from datetime import datetime from sklearn.mixture import GaussianMixture from scipy.stats import norm DB = '/Volumes/NV2/PDF-Processing/signature-analysis/signature_analysis.db' OUT = Path('/Volumes/NV2/PDF-Processing/signature-analysis/reports/' 'v4_big4/k3_loo_check') OUT.mkdir(parents=True, exist_ok=True) MIN_SIGS = 10 SEED = 42 BIG4 = ('勤業眾信聯合', '安侯建業聯合', '資誠聯合', '安永聯合') LABEL = {'勤業眾信聯合': 'Firm A (Deloitte)', '安侯建業聯合': 'KPMG', '資誠聯合': 'PwC', '安永聯合': 'EY'} SLUG = {'勤業眾信聯合': 'FirmA', '安侯建業聯合': 'KPMG', '資誠聯合': 'PwC', '安永聯合': 'EY'} # Script 35 full-Big-4 K=3 baseline (informational; reproduce here as expected) SCRIPT35_C1_PCT = {'勤業眾信聯合': 0.0, '安侯建業聯合': 8.9, '資誠聯合': 23.5, '安永聯合': 11.5} def load_big4_accountants(): conn = sqlite3.connect(DB) cur = conn.cursor() cur.execute(''' SELECT s.assigned_accountant, a.firm, AVG(s.max_similarity_to_same_accountant) AS cos_mean, AVG(CAST(s.min_dhash_independent AS REAL)) AS dh_mean, COUNT(*) AS n FROM signatures s JOIN accountants a ON s.assigned_accountant = a.name WHERE s.assigned_accountant IS NOT NULL AND s.max_similarity_to_same_accountant IS NOT NULL AND s.min_dhash_independent IS NOT NULL AND a.firm IN (?, ?, ?, ?) GROUP BY s.assigned_accountant HAVING n >= ? ''', BIG4 + (MIN_SIGS,)) rows = cur.fetchall() conn.close() return [{'cpa': r[0], 'firm': r[1], 'cos_mean': float(r[2]), 'dh_mean': float(r[3]), 'n_sigs': int(r[4])} for r in rows] def fit_k3(X): return GaussianMixture(n_components=3, covariance_type='full', random_state=SEED, n_init=15, max_iter=500).fit(X) def sort_components_by_cos(gmm): """Return ordering such that comp[0] has lowest cosine mean.""" return np.argsort(gmm.means_[:, 0]) def wilson_ci(k, n, alpha=0.05): if n == 0: return (0.0, 1.0) z = norm.ppf(1 - alpha / 2) phat = k / n denom = 1 + z * z / n center = (phat + z * z / (2 * n)) / denom pm = z * np.sqrt(phat * (1 - phat) / n + z * z / (4 * n * n)) / denom return (max(0.0, center - pm), min(1.0, center + pm)) def run_full_baseline(cpas): print('\n[A] Full-Big-4 K=3 baseline (replicates Script 35)') X = np.column_stack([ [c['cos_mean'] for c in cpas], [c['dh_mean'] for c in cpas], ]) gmm = fit_k3(X) order = sort_components_by_cos(gmm) means = gmm.means_[order] weights = gmm.weights_[order] raw_labels = gmm.predict(X) label_map = {old: new for new, old in enumerate(order)} labels = np.array([label_map[l] for l in raw_labels]) by_firm_c1 = {f: 0 for f in BIG4} by_firm_total = {f: 0 for f in BIG4} for c, lab in zip(cpas, labels): by_firm_total[c['firm']] += 1 if lab == 0: by_firm_c1[c['firm']] += 1 print(f' C1 (hand-leaning) center: cos={means[0,0]:.4f}, ' f'dh={means[0,1]:.4f}, weight={weights[0]:.3f}') print(f' C2 (mixed) center: cos={means[1,0]:.4f}, ' f'dh={means[1,1]:.4f}, weight={weights[1]:.3f}') print(f' C3 (replicated) center: cos={means[2,0]:.4f}, ' f'dh={means[2,1]:.4f}, weight={weights[2]:.3f}') print(' C1 membership by firm:') for f in BIG4: n = by_firm_total[f] k = by_firm_c1[f] pct = 100 * k / n if n else 0.0 print(f' {LABEL[f]:<22} {k:>3}/{n:>3} = {pct:5.2f}% ' f'(Script 35 expected: {SCRIPT35_C1_PCT[f]}%)') return { 'means_sorted': means.tolist(), 'weights_sorted': weights.tolist(), 'c1_membership_by_firm': { f: {'k': int(by_firm_c1[f]), 'n': int(by_firm_total[f]), 'pct': float(100 * by_firm_c1[f] / by_firm_total[f]) if by_firm_total[f] else 0.0} for f in BIG4 }, } def run_loo(cpas): by_firm = {} for c in cpas: by_firm.setdefault(c['firm'], []).append(c) fold_results = {} for held_firm in BIG4: train = [c for c in cpas if c['firm'] != held_firm] held = by_firm[held_firm] X_train = np.column_stack([ [c['cos_mean'] for c in train], [c['dh_mean'] for c in train], ]) X_held = np.column_stack([ [c['cos_mean'] for c in held], [c['dh_mean'] for c in held], ]) gmm = fit_k3(X_train) order = sort_components_by_cos(gmm) means = gmm.means_[order] weights = gmm.weights_[order] # Posterior on held-out raw_post = gmm.predict_proba(X_held) post = raw_post[:, order] held_labels = np.argmax(post, axis=1) n_c1 = int(np.sum(held_labels == 0)) n_c2 = int(np.sum(held_labels == 1)) n_c3 = int(np.sum(held_labels == 2)) n_held = len(held) c1_rate = n_c1 / n_held if n_held else 0.0 wlo, whi = wilson_ci(n_c1, n_held) # Train-side weights for stability check print(f'\n[B] LOOO fold: held = {LABEL[held_firm]}') print(f' train K=3 components (sorted by cos):') for i in range(3): print(f' C{i+1}: cos={means[i,0]:.4f}, dh={means[i,1]:.4f}, ' f'weight={weights[i]:.3f}') print(f' held-out assignments: C1={n_c1}/{n_held} = ' f'{c1_rate*100:.2f}% [Wilson 95%: ' f'{wlo*100:.2f}%, {whi*100:.2f}%]') print(f' C2={n_c2}/{n_held} = ' f'{n_c2/n_held*100:.2f}%') print(f' C3={n_c3}/{n_held} = ' f'{n_c3/n_held*100:.2f}%') print(f' Script 35 expected C1 for {LABEL[held_firm]}: ' f'{SCRIPT35_C1_PCT[held_firm]}%') fold_results[held_firm] = { 'n_train': len(train), 'n_held': n_held, 'k3_components_sorted_by_cos': { 'means': means.tolist(), 'weights': weights.tolist(), }, 'held_out_assignments': { 'n_c1_handleaning': n_c1, 'n_c2_mixed': n_c2, 'n_c3_replicated': n_c3, 'c1_rate': float(c1_rate), 'c1_wilson95': [float(wlo), float(whi)], }, 'script35_expected_c1_pct': SCRIPT35_C1_PCT[held_firm], } return fold_results def stability_summary(fold_results, baseline): """Aggregate C1 component drift across folds.""" c1_means_cos = [fold_results[f]['k3_components_sorted_by_cos']['means'][0][0] for f in BIG4] c1_means_dh = [fold_results[f]['k3_components_sorted_by_cos']['means'][0][1] for f in BIG4] c1_weights = [fold_results[f]['k3_components_sorted_by_cos']['weights'][0] for f in BIG4] base_c1_cos = baseline['means_sorted'][0][0] base_c1_dh = baseline['means_sorted'][0][1] base_c1_w = baseline['weights_sorted'][0] summary = { 'fold_c1_cos_means': c1_means_cos, 'fold_c1_dh_means': c1_means_dh, 'fold_c1_weights': c1_weights, 'baseline_c1': {'cos': base_c1_cos, 'dh': base_c1_dh, 'weight': base_c1_w}, 'max_c1_cos_dev_from_baseline': float( max(abs(np.array(c1_means_cos) - base_c1_cos))), 'max_c1_dh_dev_from_baseline': float( max(abs(np.array(c1_means_dh) - base_c1_dh))), 'max_c1_weight_dev_from_baseline': float( max(abs(np.array(c1_weights) - base_c1_w))), } # Heuristic stability bars (these are exploratory, not formal test): cos_stable = summary['max_c1_cos_dev_from_baseline'] <= 0.01 dh_stable = summary['max_c1_dh_dev_from_baseline'] <= 1.0 weight_stable = summary['max_c1_weight_dev_from_baseline'] <= 0.10 summary['cos_stable'] = bool(cos_stable) summary['dh_stable'] = bool(dh_stable) summary['weight_stable'] = bool(weight_stable) summary['c1_component_stable'] = bool(cos_stable and dh_stable and weight_stable) # Held-out C1 prediction agreement with Script 35 expectation pred_v_expected = [] for f in BIG4: actual = fold_results[f]['held_out_assignments']['c1_rate'] * 100 expected = SCRIPT35_C1_PCT[f] pred_v_expected.append({ 'firm': LABEL[f], 'predicted_c1_pct': actual, 'expected_c1_pct': expected, 'abs_diff': abs(actual - expected), }) summary['held_out_prediction_check'] = pred_v_expected summary['max_abs_pct_diff'] = float(max(p['abs_diff'] for p in pred_v_expected)) # Verdict if (summary['c1_component_stable'] and summary['max_abs_pct_diff'] <= 5.0): verdict = 'P2_STRONG' msg = ('K=3 C1 component is stable across LOOO folds (cos drift ' '<= 0.01, dh drift <= 1.0, weight drift <= 0.10); held-out ' 'C1 predictions agree with Script 35 baseline within 5pp. ' 'Path P2 is viable: K=3 captures a real cross-firm ' 'hand-leaning cluster.') elif summary['c1_component_stable']: verdict = 'P2_PARTIAL' msg = ('K=3 C1 component is stable but held-out C1 prediction ' f'diverges from Script 35 baseline (max abs diff ' f'{summary["max_abs_pct_diff"]:.1f}pp). Cluster exists but ' 'membership is not well-predicted by held-out fit.') else: verdict = 'P2_WEAK' msg = ('K=3 C1 component is NOT stable across LOOO folds (cos drift ' f'{summary["max_c1_cos_dev_from_baseline"]:.4f}, dh drift ' f'{summary["max_c1_dh_dev_from_baseline"]:.3f}, weight drift ' f'{summary["max_c1_weight_dev_from_baseline"]:.3f}). ' 'K=3 is also firm-mass driven; Path P2 fails.') summary['verdict'] = verdict summary['verdict_message'] = msg return summary def render_panels(cpas, fold_results): by_firm = {} for c in cpas: by_firm.setdefault(c['firm'], []).append(c) for held_firm in BIG4: held = by_firm[held_firm] train = [c for c in cpas if c['firm'] != held_firm] fr = fold_results[held_firm] means = np.array(fr['k3_components_sorted_by_cos']['means']) weights = fr['k3_components_sorted_by_cos']['weights'] rate = fr['held_out_assignments']['c1_rate'] n_c1 = fr['held_out_assignments']['n_c1_handleaning'] n_h = fr['n_held'] wlo, whi = fr['held_out_assignments']['c1_wilson95'] fig, ax = plt.subplots(figsize=(9, 7)) ax.scatter([c['cos_mean'] for c in train], [c['dh_mean'] for c in train], s=18, alpha=0.4, color='lightgray', label=f'Train (other Big-3, n={len(train)})') ax.scatter([c['cos_mean'] for c in held], [c['dh_mean'] for c in held], s=42, alpha=0.85, color='crimson', edgecolor='white', label=f'Held-out: {LABEL[held_firm]} (n={n_h})') markers = ['v', 's', '^'] comp_colors = ['darkred', 'goldenrod', 'navy'] comp_labels = ['C1 hand-leaning', 'C2 mixed', 'C3 replicated'] for i in range(3): ax.scatter([means[i, 0]], [means[i, 1]], s=200, marker=markers[i], color=comp_colors[i], edgecolor='black', linewidth=1.5, label=f'{comp_labels[i]}: ({means[i,0]:.3f}, ' f'{means[i,1]:.2f}), w={weights[i]:.2f}') ax.set_xlabel('Accountant cos_mean') ax.set_ylabel('Accountant dh_mean') ax.set_title( f'K=3 LOOO held-out {LABEL[held_firm]}: C1 = {n_c1}/{n_h} = ' f'{rate*100:.1f}% [Wilson 95%: {wlo*100:.1f}%, ' f'{whi*100:.1f}%]\n(Script 35 baseline expected: ' f'{SCRIPT35_C1_PCT[held_firm]}%)') ax.legend(fontsize=8, loc='upper right') ax.grid(alpha=0.3) fig.tight_layout() fig.savefig(OUT / f'panel_k3_loo_{SLUG[held_firm]}.png', dpi=150) plt.close(fig) def render_md(baseline, fold_results, summary): md = [ '# Phase 1.5: K=3 LOOO Check (Path P2 viability)', f'Generated: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}', '', '## A. Full-Big-4 K=3 baseline (replicates Script 35)', '', '| Component | mean cos | mean dh | weight |', '|---|---|---|---|', ] for i, (m, w) in enumerate(zip(baseline['means_sorted'], baseline['weights_sorted'])): name = ['C1 hand-leaning', 'C2 mixed', 'C3 replicated'][i] md.append(f'| {name} | {m[0]:.4f} | {m[1]:.4f} | {w:.3f} |') md += ['', '### Baseline C1 membership by firm', '', '| Firm | Baseline C1 / total | % | Script 35 expected |', '|---|---|---|---|'] for f in BIG4: b = baseline['c1_membership_by_firm'][f] md.append(f'| {LABEL[f]} | {b["k"]}/{b["n"]} | {b["pct"]:.2f}% | ' f'{SCRIPT35_C1_PCT[f]}% |') md += ['', '## B. Leave-One-Firm-Out K=3 fits', ''] for f in BIG4: fr = fold_results[f] means = fr['k3_components_sorted_by_cos']['means'] weights = fr['k3_components_sorted_by_cos']['weights'] ass = fr['held_out_assignments'] md += [f'### Held-out: {LABEL[f]}', '', f'- n_train = {fr["n_train"]}, n_held = {fr["n_held"]}', f'- Held-out assignments: ' f'C1={ass["n_c1_handleaning"]}/{fr["n_held"]} = ' f'{ass["c1_rate"]*100:.2f}% ' f'[Wilson 95%: {ass["c1_wilson95"][0]*100:.2f}%, ' f'{ass["c1_wilson95"][1]*100:.2f}%]; ' f'C2={ass["n_c2_mixed"]}; C3={ass["n_c3_replicated"]}', f'- Script 35 baseline expected C1: ' f'{SCRIPT35_C1_PCT[f]}%', '', '| Train K=3 component | mean cos | mean dh | weight |', '|---|---|---|---|'] for i, (m, w) in enumerate(zip(means, weights)): name = ['C1 hand-leaning', 'C2 mixed', 'C3 replicated'][i] md.append(f'| {name} | {m[0]:.4f} | {m[1]:.4f} | {w:.3f} |') md.append('') md += ['## C. Cross-fold C1 stability summary', '', f'- Baseline C1 (full Big-4): cos = ' f'{summary["baseline_c1"]["cos"]:.4f}, dh = ' f'{summary["baseline_c1"]["dh"]:.4f}, weight = ' f'{summary["baseline_c1"]["weight"]:.3f}', f'- Fold C1 cos means: {summary["fold_c1_cos_means"]}', f'- Fold C1 dh means: {summary["fold_c1_dh_means"]}', f'- Fold C1 weights : {summary["fold_c1_weights"]}', f'- Max |C1 cos dev| vs baseline: ' f'{summary["max_c1_cos_dev_from_baseline"]:.4f} ' f'(stable bar: 0.01, {"OK" if summary["cos_stable"] else "FAIL"})', f'- Max |C1 dh dev| vs baseline: ' f'{summary["max_c1_dh_dev_from_baseline"]:.3f} ' f'(stable bar: 1.0, {"OK" if summary["dh_stable"] else "FAIL"})', f'- Max |C1 weight dev| vs baseline: ' f'{summary["max_c1_weight_dev_from_baseline"]:.3f} ' f'(stable bar: 0.10, {"OK" if summary["weight_stable"] else "FAIL"})', '', '### Held-out prediction vs Script 35 baseline', '', '| Firm | Predicted C1% | Expected C1% | |diff| pp |', '|---|---|---|---|'] for entry in summary['held_out_prediction_check']: md.append(f'| {entry["firm"]} | {entry["predicted_c1_pct"]:.2f}% | ' f'{entry["expected_c1_pct"]}% | ' f'{entry["abs_diff"]:.2f} |') md += ['', f'- Max |%diff| across folds: {summary["max_abs_pct_diff"]:.2f}pp ' f'(viable bar: <= 5.0 pp)', '', f'## Verdict: **{summary["verdict"]}**', '', summary['verdict_message'], '', '### Verdict legend', '- **P2_STRONG**: C1 cluster reproducible across folds AND ' 'held-out predictions match Script 35 baseline within 5 pp. ' 'K=3 captures a real cross-firm hand-leaning sub-population; ' 'Paper A v4.0 can use K=3 hard assignment as the operational ' 'classifier.', '- **P2_PARTIAL**: C1 cluster shape reproducible but membership ' 'predictions diverge. Cluster exists conceptually but is not ' 'predictively useful as an operational classifier.', '- **P2_WEAK**: C1 cluster shifts substantially across folds. ' 'K=3 is also firm-mass driven; v4.0 needs a different strategy ' '(P1 firm-templatedness reframe, P3 rollback, or P4 ' 'reverse-anchor).', ] return '\n'.join(md) def main(): print('=' * 72) print('Script 37: K=3 LOOO Check (Path P2 viability)') print('=' * 72) cpas = load_big4_accountants() print(f'\nN Big-4 CPAs: {len(cpas)}') baseline = run_full_baseline(cpas) fold_results = run_loo(cpas) summary = stability_summary(fold_results, baseline) print(f'\n[C] Verdict: {summary["verdict"]}') print(f' {summary["verdict_message"]}') render_panels(cpas, fold_results) payload = { 'generated_at': datetime.now().isoformat(), 'min_sigs_per_accountant': MIN_SIGS, 'random_seed': SEED, 'n_cpas_total': len(cpas), 'baseline': baseline, 'loo_folds': fold_results, 'stability_summary': summary, 'script35_c1_baseline_pct': SCRIPT35_C1_PCT, } json_path = OUT / 'k3_loo_results.json' json_path.write_text(json.dumps(payload, indent=2, ensure_ascii=False), encoding='utf-8') print(f'JSON: {json_path}') md = render_md(baseline, fold_results, summary) md_path = OUT / 'k3_loo_report.md' md_path.write_text(md, encoding='utf-8') print(f'Report: {md_path}') if __name__ == '__main__': main()