#!/usr/bin/env python3 """ Script 39: Signature-Level Convergence (preempts aggregation attack) ====================================================================== Phase 1.7 follow-up to Script 38's per-CPA convergence. Verifies that the per-CPA K=3 + reverse-anchor + Paper A agreement holds at the signature level (not just per-CPA mean), so a reviewer cannot attack with "you washed out within-CPA heterogeneity by averaging". Three labels per Big-4 signature: L1 PaperA_rule: non_hand iff cos > 0.95 AND dh <= 5 L2 K3_perCPA: hard assignment under per-CPA K=3 components fit on accountant means (Script 38 baseline) L3 K3_perSig: hard assignment under a fresh K=3 fit on the signature-level (cos, dh) cloud Output: reports/v4_big4/signature_level_convergence/ sig_level_results.json sig_level_report.md crosstab_paperA_vs_k3perCPA.csv crosstab_paperA_vs_k3perSig.csv crosstab_k3perCPA_vs_k3perSig.csv Headline metrics: - Cohen's kappa for each pairwise label comparison - Per-firm marginal agreement - Component drift between per-CPA K=3 and per-signature K=3 """ import sqlite3 import csv import json import numpy as np import matplotlib matplotlib.use('Agg') import matplotlib.pyplot as plt from pathlib import Path from datetime import datetime from sklearn.mixture import GaussianMixture DB = '/Volumes/NV2/PDF-Processing/signature-analysis/signature_analysis.db' OUT = Path('/Volumes/NV2/PDF-Processing/signature-analysis/reports/' 'v4_big4/signature_level_convergence') OUT.mkdir(parents=True, exist_ok=True) SEED = 42 BIG4 = ('勤業眾信聯合', '安侯建業聯合', '資誠聯合', '安永聯合') LABEL = {'勤業眾信聯合': 'Firm A (Deloitte)', '安侯建業聯合': 'KPMG', '資誠聯合': 'PwC', '安永聯合': 'EY'} PAPER_A_COS_CUT = 0.95 PAPER_A_DH_CUT = 5 MIN_SIGS = 10 # for the per-CPA K=3 fit only def load_big4_signatures(): conn = sqlite3.connect(DB) cur = conn.cursor() cur.execute(''' SELECT s.signature_id, s.assigned_accountant, a.firm, s.max_similarity_to_same_accountant, CAST(s.min_dhash_independent AS REAL) FROM signatures s JOIN accountants a ON s.assigned_accountant = a.name WHERE s.assigned_accountant IS NOT NULL AND s.max_similarity_to_same_accountant IS NOT NULL AND s.min_dhash_independent IS NOT NULL AND a.firm IN (?, ?, ?, ?) ''', BIG4) rows = cur.fetchall() conn.close() return rows def load_per_cpa_means(): """Returns (cpa_array, firm_array, X_2d) for the per-CPA fit.""" conn = sqlite3.connect(DB) cur = conn.cursor() cur.execute(''' SELECT s.assigned_accountant, a.firm, AVG(s.max_similarity_to_same_accountant) AS cos_mean, AVG(CAST(s.min_dhash_independent AS REAL)) AS dh_mean, COUNT(*) AS n FROM signatures s JOIN accountants a ON s.assigned_accountant = a.name WHERE s.assigned_accountant IS NOT NULL AND s.max_similarity_to_same_accountant IS NOT NULL AND s.min_dhash_independent IS NOT NULL AND a.firm IN (?, ?, ?, ?) GROUP BY s.assigned_accountant HAVING n >= ? ''', BIG4 + (MIN_SIGS,)) rows = cur.fetchall() conn.close() cpas = [r[0] for r in rows] firms = [r[1] for r in rows] X = np.array([[float(r[2]), float(r[3])] for r in rows]) return cpas, firms, X def fit_k3(X, seed=SEED): return GaussianMixture(n_components=3, covariance_type='full', random_state=seed, n_init=15, max_iter=500).fit(X) def label_paperA(cos, dh): """Returns 0 = non_hand (replicated), 1 = hand_leaning.""" return np.where((cos > PAPER_A_COS_CUT) & (dh <= PAPER_A_DH_CUT), 0, 1) def label_k3(gmm, X, order): """Returns hard label in {0=C1, 1=C2, 2=C3} where C1 = lowest cos.""" raw = gmm.predict(X) label_map = {old: new for new, old in enumerate(order)} return np.array([label_map[l] for l in raw]) def cohen_kappa(y1, y2): """Cohen's kappa for two label arrays.""" n = len(y1) if n == 0: return 0.0 classes = sorted(set(y1.tolist()) | set(y2.tolist())) k = len(classes) cm = np.zeros((k, k), dtype=float) for a, b in zip(y1, y2): cm[classes.index(int(a)), classes.index(int(b))] += 1 p_o = np.sum(np.diag(cm)) / n row_marg = cm.sum(axis=1) / n col_marg = cm.sum(axis=0) / n p_e = float(np.sum(row_marg * col_marg)) if p_e == 1.0: return 1.0 if p_o == 1.0 else 0.0 return float((p_o - p_e) / (1 - p_e)) def crosstab(y1, y2, labels1, labels2): """Cross-tabulation as a dict-of-dicts.""" out = {a: {b: 0 for b in labels2} for a in labels1} for a, b in zip(y1, y2): out[labels1[int(a)]][labels2[int(b)]] += 1 return out def write_crosstab_csv(ct, name, labels1, labels2): p = OUT / name with open(p, 'w', newline='', encoding='utf-8') as f: w = csv.writer(f) w.writerow([''] + labels2 + ['total']) for a in labels1: row = [a] + [ct[a][b] for b in labels2] row.append(sum(ct[a].values())) w.writerow(row) col_totals = [sum(ct[a][b] for a in labels1) for b in labels2] w.writerow(['total'] + col_totals + [sum(col_totals)]) return p def per_firm_agreement(firms_arr, y1, y2): out = {} for f in BIG4: mask = (firms_arr == f) n = int(mask.sum()) if n == 0: out[f] = {'n': 0, 'agreement': None} continue agree_count = int(np.sum(y1[mask] == y2[mask])) out[f] = { 'n': n, 'agree_count': agree_count, 'agreement_rate': float(agree_count / n), } return out def main(): print('=' * 72) print('Script 39: Signature-Level Convergence') print('=' * 72) # 1. Per-CPA K=3 (Script 38 baseline reproduction) cpas, cpa_firms, X_cpa = load_per_cpa_means() print(f'\n[setup] N CPAs (n_sigs >= {MIN_SIGS}): {len(cpas)}') gmm_cpa = fit_k3(X_cpa) order_cpa = np.argsort(gmm_cpa.means_[:, 0]) means_cpa = gmm_cpa.means_[order_cpa] weights_cpa = gmm_cpa.weights_[order_cpa] print(' Per-CPA K=3 components (sorted by cos):') for i, name in enumerate(['C1 hand-leaning', 'C2 mixed', 'C3 replicated']): print(f' {name}: cos={means_cpa[i,0]:.4f}, ' f'dh={means_cpa[i,1]:.4f}, weight={weights_cpa[i]:.3f}') # 2. Load all Big-4 signatures rows = load_big4_signatures() n_sig = len(rows) sig_ids = np.array([r[0] for r in rows]) sig_firms = np.array([r[2] for r in rows]) cos = np.array([r[3] for r in rows], dtype=float) dh = np.array([r[4] for r in rows], dtype=float) X_sig = np.column_stack([cos, dh]) print(f'\n[setup] N Big-4 signatures: {n_sig:,}') # 3. Three labels per signature L1 = label_paperA(cos, dh) L2 = label_k3(gmm_cpa, X_sig, order_cpa) print('\n[fit] Per-signature K=3 (fresh fit on signature cloud)') gmm_sig = fit_k3(X_sig) order_sig = np.argsort(gmm_sig.means_[:, 0]) means_sig = gmm_sig.means_[order_sig] weights_sig = gmm_sig.weights_[order_sig] print(' Per-signature K=3 components (sorted by cos):') for i, name in enumerate(['C1 hand-leaning', 'C2 mixed', 'C3 replicated']): print(f' {name}: cos={means_sig[i,0]:.4f}, ' f'dh={means_sig[i,1]:.4f}, weight={weights_sig[i]:.3f}') L3 = label_k3(gmm_sig, X_sig, order_sig) # 4. Cross-tabs paperA_labels = ['non_hand', 'hand_leaning'] k3_labels = ['C1_handleaning', 'C2_mixed', 'C3_replicated'] ct_p_vs_kcpa = crosstab(L1, L2, paperA_labels, k3_labels) ct_p_vs_ksig = crosstab(L1, L3, paperA_labels, k3_labels) ct_kcpa_vs_ksig = crosstab(L2, L3, k3_labels, k3_labels) write_crosstab_csv(ct_p_vs_kcpa, 'crosstab_paperA_vs_k3perCPA.csv', paperA_labels, k3_labels) write_crosstab_csv(ct_p_vs_ksig, 'crosstab_paperA_vs_k3perSig.csv', paperA_labels, k3_labels) write_crosstab_csv(ct_kcpa_vs_ksig, 'crosstab_k3perCPA_vs_k3perSig.csv', k3_labels, k3_labels) # 5. Cohen's kappa (collapse K=3 -> binary {C1+C2 = hand-ish, C3 = replicated}) L2_bin = (L2 == 2).astype(int) # 1 = replicated (C3), 0 = otherwise L3_bin = (L3 == 2).astype(int) L1_bin = 1 - L1 # invert so 1 = non_hand (replicated), 0 = hand-leaning print('\n[kappa] Cohen kappa, binary collapse (1 = replicated)') kappa_p_kcpa = cohen_kappa(L1_bin, L2_bin) kappa_p_ksig = cohen_kappa(L1_bin, L3_bin) kappa_kcpa_ksig = cohen_kappa(L2_bin, L3_bin) print(f' PaperA vs K=3-perCPA : kappa = {kappa_p_kcpa:.4f}') print(f' PaperA vs K=3-perSig : kappa = {kappa_p_ksig:.4f}') print(f' K=3-CPA vs K=3-perSig : kappa = {kappa_kcpa_ksig:.4f}') # 6. Per-firm agreement print('\n[per-firm] Binary agreement (collapsed):') print(f' {"Firm":<22} {"n_sigs":>9} {"P_vs_K3CPA":>11} ' f'{"P_vs_K3sig":>11} {"K3CPA_vs_K3sig":>15}') per_firm_p_kcpa = per_firm_agreement(sig_firms, L1_bin, L2_bin) per_firm_p_ksig = per_firm_agreement(sig_firms, L1_bin, L3_bin) per_firm_kcpa_ksig = per_firm_agreement(sig_firms, L2_bin, L3_bin) for f in BIG4: a1 = per_firm_p_kcpa[f]['agreement_rate'] a2 = per_firm_p_ksig[f]['agreement_rate'] a3 = per_firm_kcpa_ksig[f]['agreement_rate'] print(f' {LABEL[f]:<22} {per_firm_p_kcpa[f]["n"]:>9,} ' f'{a1*100:>10.2f}% {a2*100:>10.2f}% {a3*100:>14.2f}%') # 7. Component drift between per-CPA and per-signature K=3 print('\n[drift] Per-CPA K=3 vs per-signature K=3 components:') drift = [] for i, name in enumerate(['C1 hand-leaning', 'C2 mixed', 'C3 replicated']): d_cos = abs(means_cpa[i, 0] - means_sig[i, 0]) d_dh = abs(means_cpa[i, 1] - means_sig[i, 1]) d_w = abs(weights_cpa[i] - weights_sig[i]) drift.append({'component': name, 'd_cos': float(d_cos), 'd_dh': float(d_dh), 'd_weight': float(d_w)}) print(f' {name}: |dcos|={d_cos:.4f}, |ddh|={d_dh:.3f}, ' f'|dweight|={d_w:.3f}') # Verdict if (kappa_p_kcpa >= 0.6 and kappa_p_ksig >= 0.6 and kappa_kcpa_ksig >= 0.6): verdict = 'SIG_CONVERGENCE_STRONG' msg = ('All three pairwise Cohen kappas >= 0.60 (substantial ' 'agreement at signature level); per-CPA aggregation does ' 'not wash out signal.') elif (kappa_p_kcpa >= 0.4 and kappa_p_ksig >= 0.4 and kappa_kcpa_ksig >= 0.4): verdict = 'SIG_CONVERGENCE_MODERATE' msg = ('All three pairwise Cohen kappas >= 0.40 (moderate ' 'agreement); per-CPA aggregation captures most of the ' 'signature-level structure.') else: verdict = 'SIG_CONVERGENCE_WEAK' msg = ('At least one pairwise Cohen kappa < 0.40; per-CPA ' 'aggregation hides meaningful signature-level disagreement ' 'between methods.') print(f'\n[verdict] {verdict}') print(f' {msg}') payload = { 'generated_at': datetime.now().isoformat(), 'n_signatures_big4': int(n_sig), 'n_cpas_for_per_cpa_fit': int(len(cpas)), 'paper_a_cuts': {'cos': PAPER_A_COS_CUT, 'dh': PAPER_A_DH_CUT}, 'per_cpa_k3': { 'means': means_cpa.tolist(), 'weights': weights_cpa.tolist(), }, 'per_signature_k3': { 'means': means_sig.tolist(), 'weights': weights_sig.tolist(), }, 'component_drift_per_CPA_vs_per_sig': drift, 'cohen_kappa_binary_collapse': { 'paperA_vs_k3perCPA': float(kappa_p_kcpa), 'paperA_vs_k3perSig': float(kappa_p_ksig), 'k3perCPA_vs_k3perSig': float(kappa_kcpa_ksig), }, 'crosstabs': { 'paperA_vs_k3perCPA': ct_p_vs_kcpa, 'paperA_vs_k3perSig': ct_p_vs_ksig, 'k3perCPA_vs_k3perSig': ct_kcpa_vs_ksig, }, 'per_firm_agreement': { 'paperA_vs_k3perCPA': {f: per_firm_p_kcpa[f] for f in BIG4}, 'paperA_vs_k3perSig': {f: per_firm_p_ksig[f] for f in BIG4}, 'k3perCPA_vs_k3perSig': {f: per_firm_kcpa_ksig[f] for f in BIG4}, }, 'verdict': {'class': verdict, 'explanation': msg}, } json_path = OUT / 'sig_level_results.json' json_path.write_text(json.dumps(payload, indent=2, ensure_ascii=False), encoding='utf-8') print(f'\nJSON: {json_path}') # Markdown report md = [ '# Signature-Level Convergence Check (Script 39)', f'Generated: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}', '', '## Goal', '', ('Verify that the per-CPA convergence found in Script 38 holds at ' 'signature granularity, so a reviewer cannot attack with ' '"per-CPA aggregation washes out heterogeneity."'), '', '## Three signature-level labels', '', '- **PaperA**: non_hand iff cos > 0.95 AND dh <= 5', '- **K=3 perCPA**: hard assignment under K=3 components fit on ' f'{len(cpas)} per-CPA means (Script 38 baseline)', '- **K=3 perSig**: hard assignment under K=3 components fit ' f'directly on the {n_sig:,} signature-level (cos, dh) cloud', '', '## Component comparison', '', '| Component | Per-CPA cos | Per-CPA dh | Per-CPA wt | Per-Sig cos | Per-Sig dh | Per-Sig wt |', '|---|---|---|---|---|---|---|', ] for i, name in enumerate(['C1 hand-leaning', 'C2 mixed', 'C3 replicated']): md.append(f'| {name} | {means_cpa[i,0]:.4f} | {means_cpa[i,1]:.4f} | ' f'{weights_cpa[i]:.3f} | {means_sig[i,0]:.4f} | ' f'{means_sig[i,1]:.4f} | {weights_sig[i]:.3f} |') md += ['', '## Cohen kappa (binary: 1 = replicated, 0 = hand-leaning)', '', '| Pair | kappa |', '|---|---|', f'| PaperA vs K=3 perCPA | **{kappa_p_kcpa:.4f}** |', f'| PaperA vs K=3 perSig | **{kappa_p_ksig:.4f}** |', f'| K=3 perCPA vs K=3 perSig | **{kappa_kcpa_ksig:.4f}** |', '', ('Reference: kappa <= 0 = no agreement, 0.0-0.2 slight, ' '0.2-0.4 fair, 0.4-0.6 moderate, 0.6-0.8 substantial, ' '0.8-1.0 almost perfect (Landis & Koch 1977).'), '', '## Per-firm binary agreement', '', '| Firm | n_sigs | PaperA vs K3-perCPA | PaperA vs K3-perSig | K3-CPA vs K3-Sig |', '|---|---|---|---|---|', ] for f in BIG4: md.append(f'| {LABEL[f]} | {per_firm_p_kcpa[f]["n"]:,} | ' f'{per_firm_p_kcpa[f]["agreement_rate"]*100:.2f}% | ' f'{per_firm_p_ksig[f]["agreement_rate"]*100:.2f}% | ' f'{per_firm_kcpa_ksig[f]["agreement_rate"]*100:.2f}% |') md += ['', f'## Verdict: **{verdict}**', '', msg, '', '### Verdict legend', '- SIG_CONVERGENCE_STRONG: all 3 kappas >= 0.60 (substantial)', '- SIG_CONVERGENCE_MODERATE: all 3 kappas >= 0.40 (moderate)', '- SIG_CONVERGENCE_WEAK: at least one kappa < 0.40', ] md_path = OUT / 'sig_level_report.md' md_path.write_text('\n'.join(md), encoding='utf-8') print(f'Report: {md_path}') if __name__ == '__main__': main()