diff --git a/signature_analysis/37_v4_k3_loo_check.py b/signature_analysis/37_v4_k3_loo_check.py new file mode 100644 index 0000000..6a31d11 --- /dev/null +++ b/signature_analysis/37_v4_k3_loo_check.py @@ -0,0 +1,478 @@ +#!/usr/bin/env python3 +""" +Script 37: K=3 Leave-One-Firm-Out Check (Path P2 viability test) +================================================================= +Follow-up to Script 36's UNSTABLE K=2 LOOO finding. Tests whether the +K=3 mixture's C1 component (lowest-cosine "hand-leaning" cluster, +~14% weight per Script 35) is a real cross-firm sub-population or +is also firm-mass driven. + +Reference: Script 35 (full Big-4 K=3) reported C1 cluster membership: + Firm A 0/171 = 0.0% + KPMG 10/112 = 8.9% + PwC 24/102 = 23.5% + EY 6/52 = 11.5% + +The hypothesis: if C1 is a true cross-firm hand-leaning sub-population, +then: + - Across 4 LOOO folds, the C1 component should sit at roughly the + same (cos, dh) coordinates with similar weight. + - When the held-out firm's CPAs are assigned via the fold's K=3 + posterior, the fraction in C1 should approximate the Script 35 + full-data percentages. + +If C1 collapses, shifts dramatically, or fails to predict held-out +membership, then K=3 is also firm-mass driven and Path P2 fails. + +Output: + reports/v4_big4/k3_loo_check/ + k3_loo_results.json + k3_loo_report.md + panel_k3_loo_.png +""" + +import sqlite3 +import json +import numpy as np +import matplotlib +matplotlib.use('Agg') +import matplotlib.pyplot as plt +from pathlib import Path +from datetime import datetime +from sklearn.mixture import GaussianMixture +from scipy.stats import norm + +DB = '/Volumes/NV2/PDF-Processing/signature-analysis/signature_analysis.db' +OUT = Path('/Volumes/NV2/PDF-Processing/signature-analysis/reports/' + 'v4_big4/k3_loo_check') +OUT.mkdir(parents=True, exist_ok=True) + +MIN_SIGS = 10 +SEED = 42 +BIG4 = ('勤業眾信聯合', '安侯建業聯合', '資誠聯合', '安永聯合') +LABEL = {'勤業眾信聯合': 'Firm A (Deloitte)', '安侯建業聯合': 'KPMG', + '資誠聯合': 'PwC', '安永聯合': 'EY'} +SLUG = {'勤業眾信聯合': 'FirmA', '安侯建業聯合': 'KPMG', + '資誠聯合': 'PwC', '安永聯合': 'EY'} + +# Script 35 full-Big-4 K=3 baseline (informational; reproduce here as expected) +SCRIPT35_C1_PCT = {'勤業眾信聯合': 0.0, '安侯建業聯合': 8.9, + '資誠聯合': 23.5, '安永聯合': 11.5} + + +def load_big4_accountants(): + conn = sqlite3.connect(DB) + cur = conn.cursor() + cur.execute(''' + SELECT s.assigned_accountant, + a.firm, + AVG(s.max_similarity_to_same_accountant) AS cos_mean, + AVG(CAST(s.min_dhash_independent AS REAL)) AS dh_mean, + COUNT(*) AS n + FROM signatures s + JOIN accountants a ON s.assigned_accountant = a.name + WHERE s.assigned_accountant IS NOT NULL + AND s.max_similarity_to_same_accountant IS NOT NULL + AND s.min_dhash_independent IS NOT NULL + AND a.firm IN (?, ?, ?, ?) + GROUP BY s.assigned_accountant + HAVING n >= ? + ''', BIG4 + (MIN_SIGS,)) + rows = cur.fetchall() + conn.close() + return [{'cpa': r[0], 'firm': r[1], + 'cos_mean': float(r[2]), 'dh_mean': float(r[3]), + 'n_sigs': int(r[4])} for r in rows] + + +def fit_k3(X): + return GaussianMixture(n_components=3, covariance_type='full', + random_state=SEED, n_init=15, max_iter=500).fit(X) + + +def sort_components_by_cos(gmm): + """Return ordering such that comp[0] has lowest cosine mean.""" + return np.argsort(gmm.means_[:, 0]) + + +def wilson_ci(k, n, alpha=0.05): + if n == 0: + return (0.0, 1.0) + z = norm.ppf(1 - alpha / 2) + phat = k / n + denom = 1 + z * z / n + center = (phat + z * z / (2 * n)) / denom + pm = z * np.sqrt(phat * (1 - phat) / n + z * z / (4 * n * n)) / denom + return (max(0.0, center - pm), min(1.0, center + pm)) + + +def run_full_baseline(cpas): + print('\n[A] Full-Big-4 K=3 baseline (replicates Script 35)') + X = np.column_stack([ + [c['cos_mean'] for c in cpas], + [c['dh_mean'] for c in cpas], + ]) + gmm = fit_k3(X) + order = sort_components_by_cos(gmm) + means = gmm.means_[order] + weights = gmm.weights_[order] + raw_labels = gmm.predict(X) + label_map = {old: new for new, old in enumerate(order)} + labels = np.array([label_map[l] for l in raw_labels]) + + by_firm_c1 = {f: 0 for f in BIG4} + by_firm_total = {f: 0 for f in BIG4} + for c, lab in zip(cpas, labels): + by_firm_total[c['firm']] += 1 + if lab == 0: + by_firm_c1[c['firm']] += 1 + print(f' C1 (hand-leaning) center: cos={means[0,0]:.4f}, ' + f'dh={means[0,1]:.4f}, weight={weights[0]:.3f}') + print(f' C2 (mixed) center: cos={means[1,0]:.4f}, ' + f'dh={means[1,1]:.4f}, weight={weights[1]:.3f}') + print(f' C3 (replicated) center: cos={means[2,0]:.4f}, ' + f'dh={means[2,1]:.4f}, weight={weights[2]:.3f}') + print(' C1 membership by firm:') + for f in BIG4: + n = by_firm_total[f] + k = by_firm_c1[f] + pct = 100 * k / n if n else 0.0 + print(f' {LABEL[f]:<22} {k:>3}/{n:>3} = {pct:5.2f}% ' + f'(Script 35 expected: {SCRIPT35_C1_PCT[f]}%)') + return { + 'means_sorted': means.tolist(), + 'weights_sorted': weights.tolist(), + 'c1_membership_by_firm': { + f: {'k': int(by_firm_c1[f]), 'n': int(by_firm_total[f]), + 'pct': float(100 * by_firm_c1[f] / by_firm_total[f]) + if by_firm_total[f] else 0.0} + for f in BIG4 + }, + } + + +def run_loo(cpas): + by_firm = {} + for c in cpas: + by_firm.setdefault(c['firm'], []).append(c) + fold_results = {} + for held_firm in BIG4: + train = [c for c in cpas if c['firm'] != held_firm] + held = by_firm[held_firm] + X_train = np.column_stack([ + [c['cos_mean'] for c in train], + [c['dh_mean'] for c in train], + ]) + X_held = np.column_stack([ + [c['cos_mean'] for c in held], + [c['dh_mean'] for c in held], + ]) + gmm = fit_k3(X_train) + order = sort_components_by_cos(gmm) + means = gmm.means_[order] + weights = gmm.weights_[order] + # Posterior on held-out + raw_post = gmm.predict_proba(X_held) + post = raw_post[:, order] + held_labels = np.argmax(post, axis=1) + n_c1 = int(np.sum(held_labels == 0)) + n_c2 = int(np.sum(held_labels == 1)) + n_c3 = int(np.sum(held_labels == 2)) + n_held = len(held) + c1_rate = n_c1 / n_held if n_held else 0.0 + wlo, whi = wilson_ci(n_c1, n_held) + # Train-side weights for stability check + print(f'\n[B] LOOO fold: held = {LABEL[held_firm]}') + print(f' train K=3 components (sorted by cos):') + for i in range(3): + print(f' C{i+1}: cos={means[i,0]:.4f}, dh={means[i,1]:.4f}, ' + f'weight={weights[i]:.3f}') + print(f' held-out assignments: C1={n_c1}/{n_held} = ' + f'{c1_rate*100:.2f}% [Wilson 95%: ' + f'{wlo*100:.2f}%, {whi*100:.2f}%]') + print(f' C2={n_c2}/{n_held} = ' + f'{n_c2/n_held*100:.2f}%') + print(f' C3={n_c3}/{n_held} = ' + f'{n_c3/n_held*100:.2f}%') + print(f' Script 35 expected C1 for {LABEL[held_firm]}: ' + f'{SCRIPT35_C1_PCT[held_firm]}%') + fold_results[held_firm] = { + 'n_train': len(train), + 'n_held': n_held, + 'k3_components_sorted_by_cos': { + 'means': means.tolist(), + 'weights': weights.tolist(), + }, + 'held_out_assignments': { + 'n_c1_handleaning': n_c1, + 'n_c2_mixed': n_c2, + 'n_c3_replicated': n_c3, + 'c1_rate': float(c1_rate), + 'c1_wilson95': [float(wlo), float(whi)], + }, + 'script35_expected_c1_pct': SCRIPT35_C1_PCT[held_firm], + } + return fold_results + + +def stability_summary(fold_results, baseline): + """Aggregate C1 component drift across folds.""" + c1_means_cos = [fold_results[f]['k3_components_sorted_by_cos']['means'][0][0] + for f in BIG4] + c1_means_dh = [fold_results[f]['k3_components_sorted_by_cos']['means'][0][1] + for f in BIG4] + c1_weights = [fold_results[f]['k3_components_sorted_by_cos']['weights'][0] + for f in BIG4] + base_c1_cos = baseline['means_sorted'][0][0] + base_c1_dh = baseline['means_sorted'][0][1] + base_c1_w = baseline['weights_sorted'][0] + summary = { + 'fold_c1_cos_means': c1_means_cos, + 'fold_c1_dh_means': c1_means_dh, + 'fold_c1_weights': c1_weights, + 'baseline_c1': {'cos': base_c1_cos, 'dh': base_c1_dh, + 'weight': base_c1_w}, + 'max_c1_cos_dev_from_baseline': float( + max(abs(np.array(c1_means_cos) - base_c1_cos))), + 'max_c1_dh_dev_from_baseline': float( + max(abs(np.array(c1_means_dh) - base_c1_dh))), + 'max_c1_weight_dev_from_baseline': float( + max(abs(np.array(c1_weights) - base_c1_w))), + } + # Heuristic stability bars (these are exploratory, not formal test): + cos_stable = summary['max_c1_cos_dev_from_baseline'] <= 0.01 + dh_stable = summary['max_c1_dh_dev_from_baseline'] <= 1.0 + weight_stable = summary['max_c1_weight_dev_from_baseline'] <= 0.10 + summary['cos_stable'] = bool(cos_stable) + summary['dh_stable'] = bool(dh_stable) + summary['weight_stable'] = bool(weight_stable) + summary['c1_component_stable'] = bool(cos_stable and dh_stable + and weight_stable) + + # Held-out C1 prediction agreement with Script 35 expectation + pred_v_expected = [] + for f in BIG4: + actual = fold_results[f]['held_out_assignments']['c1_rate'] * 100 + expected = SCRIPT35_C1_PCT[f] + pred_v_expected.append({ + 'firm': LABEL[f], + 'predicted_c1_pct': actual, + 'expected_c1_pct': expected, + 'abs_diff': abs(actual - expected), + }) + summary['held_out_prediction_check'] = pred_v_expected + summary['max_abs_pct_diff'] = float(max(p['abs_diff'] + for p in pred_v_expected)) + + # Verdict + if (summary['c1_component_stable'] + and summary['max_abs_pct_diff'] <= 5.0): + verdict = 'P2_STRONG' + msg = ('K=3 C1 component is stable across LOOO folds (cos drift ' + '<= 0.01, dh drift <= 1.0, weight drift <= 0.10); held-out ' + 'C1 predictions agree with Script 35 baseline within 5pp. ' + 'Path P2 is viable: K=3 captures a real cross-firm ' + 'hand-leaning cluster.') + elif summary['c1_component_stable']: + verdict = 'P2_PARTIAL' + msg = ('K=3 C1 component is stable but held-out C1 prediction ' + f'diverges from Script 35 baseline (max abs diff ' + f'{summary["max_abs_pct_diff"]:.1f}pp). Cluster exists but ' + 'membership is not well-predicted by held-out fit.') + else: + verdict = 'P2_WEAK' + msg = ('K=3 C1 component is NOT stable across LOOO folds (cos drift ' + f'{summary["max_c1_cos_dev_from_baseline"]:.4f}, dh drift ' + f'{summary["max_c1_dh_dev_from_baseline"]:.3f}, weight drift ' + f'{summary["max_c1_weight_dev_from_baseline"]:.3f}). ' + 'K=3 is also firm-mass driven; Path P2 fails.') + summary['verdict'] = verdict + summary['verdict_message'] = msg + return summary + + +def render_panels(cpas, fold_results): + by_firm = {} + for c in cpas: + by_firm.setdefault(c['firm'], []).append(c) + for held_firm in BIG4: + held = by_firm[held_firm] + train = [c for c in cpas if c['firm'] != held_firm] + fr = fold_results[held_firm] + means = np.array(fr['k3_components_sorted_by_cos']['means']) + weights = fr['k3_components_sorted_by_cos']['weights'] + rate = fr['held_out_assignments']['c1_rate'] + n_c1 = fr['held_out_assignments']['n_c1_handleaning'] + n_h = fr['n_held'] + wlo, whi = fr['held_out_assignments']['c1_wilson95'] + fig, ax = plt.subplots(figsize=(9, 7)) + ax.scatter([c['cos_mean'] for c in train], + [c['dh_mean'] for c in train], s=18, alpha=0.4, + color='lightgray', + label=f'Train (other Big-3, n={len(train)})') + ax.scatter([c['cos_mean'] for c in held], + [c['dh_mean'] for c in held], s=42, alpha=0.85, + color='crimson', edgecolor='white', + label=f'Held-out: {LABEL[held_firm]} (n={n_h})') + markers = ['v', 's', '^'] + comp_colors = ['darkred', 'goldenrod', 'navy'] + comp_labels = ['C1 hand-leaning', 'C2 mixed', 'C3 replicated'] + for i in range(3): + ax.scatter([means[i, 0]], [means[i, 1]], s=200, + marker=markers[i], color=comp_colors[i], + edgecolor='black', linewidth=1.5, + label=f'{comp_labels[i]}: ({means[i,0]:.3f}, ' + f'{means[i,1]:.2f}), w={weights[i]:.2f}') + ax.set_xlabel('Accountant cos_mean') + ax.set_ylabel('Accountant dh_mean') + ax.set_title( + f'K=3 LOOO held-out {LABEL[held_firm]}: C1 = {n_c1}/{n_h} = ' + f'{rate*100:.1f}% [Wilson 95%: {wlo*100:.1f}%, ' + f'{whi*100:.1f}%]\n(Script 35 baseline expected: ' + f'{SCRIPT35_C1_PCT[held_firm]}%)') + ax.legend(fontsize=8, loc='upper right') + ax.grid(alpha=0.3) + fig.tight_layout() + fig.savefig(OUT / f'panel_k3_loo_{SLUG[held_firm]}.png', dpi=150) + plt.close(fig) + + +def render_md(baseline, fold_results, summary): + md = [ + '# Phase 1.5: K=3 LOOO Check (Path P2 viability)', + f'Generated: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}', + '', + '## A. Full-Big-4 K=3 baseline (replicates Script 35)', + '', + '| Component | mean cos | mean dh | weight |', + '|---|---|---|---|', + ] + for i, (m, w) in enumerate(zip(baseline['means_sorted'], + baseline['weights_sorted'])): + name = ['C1 hand-leaning', 'C2 mixed', + 'C3 replicated'][i] + md.append(f'| {name} | {m[0]:.4f} | {m[1]:.4f} | {w:.3f} |') + md += ['', + '### Baseline C1 membership by firm', + '', + '| Firm | Baseline C1 / total | % | Script 35 expected |', + '|---|---|---|---|'] + for f in BIG4: + b = baseline['c1_membership_by_firm'][f] + md.append(f'| {LABEL[f]} | {b["k"]}/{b["n"]} | {b["pct"]:.2f}% | ' + f'{SCRIPT35_C1_PCT[f]}% |') + + md += ['', '## B. Leave-One-Firm-Out K=3 fits', ''] + for f in BIG4: + fr = fold_results[f] + means = fr['k3_components_sorted_by_cos']['means'] + weights = fr['k3_components_sorted_by_cos']['weights'] + ass = fr['held_out_assignments'] + md += [f'### Held-out: {LABEL[f]}', + '', + f'- n_train = {fr["n_train"]}, n_held = {fr["n_held"]}', + f'- Held-out assignments: ' + f'C1={ass["n_c1_handleaning"]}/{fr["n_held"]} = ' + f'{ass["c1_rate"]*100:.2f}% ' + f'[Wilson 95%: {ass["c1_wilson95"][0]*100:.2f}%, ' + f'{ass["c1_wilson95"][1]*100:.2f}%]; ' + f'C2={ass["n_c2_mixed"]}; C3={ass["n_c3_replicated"]}', + f'- Script 35 baseline expected C1: ' + f'{SCRIPT35_C1_PCT[f]}%', + '', + '| Train K=3 component | mean cos | mean dh | weight |', + '|---|---|---|---|'] + for i, (m, w) in enumerate(zip(means, weights)): + name = ['C1 hand-leaning', 'C2 mixed', + 'C3 replicated'][i] + md.append(f'| {name} | {m[0]:.4f} | {m[1]:.4f} | {w:.3f} |') + md.append('') + + md += ['## C. Cross-fold C1 stability summary', '', + f'- Baseline C1 (full Big-4): cos = ' + f'{summary["baseline_c1"]["cos"]:.4f}, dh = ' + f'{summary["baseline_c1"]["dh"]:.4f}, weight = ' + f'{summary["baseline_c1"]["weight"]:.3f}', + f'- Fold C1 cos means: {summary["fold_c1_cos_means"]}', + f'- Fold C1 dh means: {summary["fold_c1_dh_means"]}', + f'- Fold C1 weights : {summary["fold_c1_weights"]}', + f'- Max |C1 cos dev| vs baseline: ' + f'{summary["max_c1_cos_dev_from_baseline"]:.4f} ' + f'(stable bar: 0.01, {"OK" if summary["cos_stable"] else "FAIL"})', + f'- Max |C1 dh dev| vs baseline: ' + f'{summary["max_c1_dh_dev_from_baseline"]:.3f} ' + f'(stable bar: 1.0, {"OK" if summary["dh_stable"] else "FAIL"})', + f'- Max |C1 weight dev| vs baseline: ' + f'{summary["max_c1_weight_dev_from_baseline"]:.3f} ' + f'(stable bar: 0.10, {"OK" if summary["weight_stable"] else "FAIL"})', + '', + '### Held-out prediction vs Script 35 baseline', + '', + '| Firm | Predicted C1% | Expected C1% | |diff| pp |', + '|---|---|---|---|'] + for entry in summary['held_out_prediction_check']: + md.append(f'| {entry["firm"]} | {entry["predicted_c1_pct"]:.2f}% | ' + f'{entry["expected_c1_pct"]}% | ' + f'{entry["abs_diff"]:.2f} |') + md += ['', + f'- Max |%diff| across folds: {summary["max_abs_pct_diff"]:.2f}pp ' + f'(viable bar: <= 5.0 pp)', + '', + f'## Verdict: **{summary["verdict"]}**', + '', + summary['verdict_message'], + '', + '### Verdict legend', + '- **P2_STRONG**: C1 cluster reproducible across folds AND ' + 'held-out predictions match Script 35 baseline within 5 pp. ' + 'K=3 captures a real cross-firm hand-leaning sub-population; ' + 'Paper A v4.0 can use K=3 hard assignment as the operational ' + 'classifier.', + '- **P2_PARTIAL**: C1 cluster shape reproducible but membership ' + 'predictions diverge. Cluster exists conceptually but is not ' + 'predictively useful as an operational classifier.', + '- **P2_WEAK**: C1 cluster shifts substantially across folds. ' + 'K=3 is also firm-mass driven; v4.0 needs a different strategy ' + '(P1 firm-templatedness reframe, P3 rollback, or P4 ' + 'reverse-anchor).', + ] + return '\n'.join(md) + + +def main(): + print('=' * 72) + print('Script 37: K=3 LOOO Check (Path P2 viability)') + print('=' * 72) + cpas = load_big4_accountants() + print(f'\nN Big-4 CPAs: {len(cpas)}') + baseline = run_full_baseline(cpas) + fold_results = run_loo(cpas) + summary = stability_summary(fold_results, baseline) + print(f'\n[C] Verdict: {summary["verdict"]}') + print(f' {summary["verdict_message"]}') + + render_panels(cpas, fold_results) + + payload = { + 'generated_at': datetime.now().isoformat(), + 'min_sigs_per_accountant': MIN_SIGS, + 'random_seed': SEED, + 'n_cpas_total': len(cpas), + 'baseline': baseline, + 'loo_folds': fold_results, + 'stability_summary': summary, + 'script35_c1_baseline_pct': SCRIPT35_C1_PCT, + } + json_path = OUT / 'k3_loo_results.json' + json_path.write_text(json.dumps(payload, indent=2, ensure_ascii=False), + encoding='utf-8') + print(f'JSON: {json_path}') + + md = render_md(baseline, fold_results, summary) + md_path = OUT / 'k3_loo_report.md' + md_path.write_text(md, encoding='utf-8') + print(f'Report: {md_path}') + + +if __name__ == '__main__': + main()