diff --git a/signature_analysis/38_v4_convergence_k3_and_reverse_anchor.py b/signature_analysis/38_v4_convergence_k3_and_reverse_anchor.py new file mode 100644 index 0000000..e278d12 --- /dev/null +++ b/signature_analysis/38_v4_convergence_k3_and_reverse_anchor.py @@ -0,0 +1,531 @@ +#!/usr/bin/env python3 +""" +Script 38: v4.0 Convergence — K=3 cluster + Reverse-Anchor + Paper A rule +========================================================================== +Phase 1.6 (G2) script. Tests whether three INDEPENDENT statistical +approaches converge on the same Big-4 CPA ranking: + + Approach 1: K=3 GMM cluster posterior P_C1 (hand-leaning) + -- from Script 37 baseline fit on full Big-4 (n=437). + Higher P_C1 -> more hand-leaning. + + Approach 2: Reverse-anchor directional score + -- non-Big-4 (n=249, mid/small firms) as the + fully-replicated reference distribution. + -- For each Big-4 CPA: cosine left-tail percentile under + the reference 2D Gaussian (MCD). + -- Score = -percentile (so higher = more deviated in the + hand-leaning direction). + + Approach 3: Paper A v3.x operational hand_frac + -- Per-CPA fraction of signatures that fail + (cos > 0.95 AND dh <= 5). + +Convergence claim: if all three rank Big-4 CPAs the same way (Spearman +rho >= 0.7 for every pair), then the v4.0 methodology paper has +**three independent lines of evidence** for the same population +structure -- a much harder thing for a reviewer to dismiss than any +single approach. + +Per-firm breakdown shows the Script 35 finding (Firm A 0% C1, PwC +23.5% C1) holds across all three lenses. + +Methodology choice: non-Big-4 as the reverse-anchor reference (rather +than non-Firm-A as in Script 33) maintains strict train/target +separation -- the v4.0 target population is Big-4, the reference is +strictly outside Big-4. + +Output: + reports/v4_big4/convergence_k3_reverse_anchor/ + convergence_results.json + convergence_report.md + scatter_pairwise.png 3x3 scatter of approach pairs + per_firm_summary.csv per-firm aggregates +""" + +import sqlite3 +import csv +import json +import numpy as np +import matplotlib +matplotlib.use('Agg') +import matplotlib.pyplot as plt +from pathlib import Path +from datetime import datetime +from scipy import stats +from sklearn.mixture import GaussianMixture +from sklearn.covariance import MinCovDet + +DB = '/Volumes/NV2/PDF-Processing/signature-analysis/signature_analysis.db' +OUT = Path('/Volumes/NV2/PDF-Processing/signature-analysis/reports/' + 'v4_big4/convergence_k3_reverse_anchor') +OUT.mkdir(parents=True, exist_ok=True) + +MIN_SIGS = 10 +SEED = 42 +BIG4 = ('勤業眾信聯合', '安侯建業聯合', '資誠聯合', '安永聯合') +LABEL = {'勤業眾信聯合': 'Firm A (Deloitte)', '安侯建業聯合': 'KPMG', + '資誠聯合': 'PwC', '安永聯合': 'EY'} + +PAPER_A_COS_CUT = 0.95 +PAPER_A_DH_CUT = 5 + +# Convergence thresholds (heuristic) +RHO_STRONG = 0.70 +RHO_PARTIAL = 0.40 + + +def load_accountants(firm_filter_sql, params, with_handfrac=False): + conn = sqlite3.connect(DB) + cur = conn.cursor() + if with_handfrac: + sql = f''' + SELECT s.assigned_accountant, + a.firm, + AVG(s.max_similarity_to_same_accountant) AS cos_mean, + AVG(CAST(s.min_dhash_independent AS REAL)) AS dh_mean, + AVG(CASE + WHEN s.max_similarity_to_same_accountant > ? + AND s.min_dhash_independent <= ? + THEN 0.0 ELSE 1.0 + END) AS hand_frac, + COUNT(*) AS n + FROM signatures s + JOIN accountants a ON s.assigned_accountant = a.name + WHERE s.assigned_accountant IS NOT NULL + AND s.max_similarity_to_same_accountant IS NOT NULL + AND s.min_dhash_independent IS NOT NULL + {firm_filter_sql} + GROUP BY s.assigned_accountant + HAVING n >= ? + ''' + cur.execute(sql, [PAPER_A_COS_CUT, PAPER_A_DH_CUT] + + params + [MIN_SIGS]) + rows = cur.fetchall() + out = [{'cpa': r[0], 'firm': r[1], + 'cos_mean': float(r[2]), 'dh_mean': float(r[3]), + 'hand_frac': float(r[4]), 'n_sigs': int(r[5])} + for r in rows] + else: + sql = f''' + SELECT s.assigned_accountant, + a.firm, + AVG(s.max_similarity_to_same_accountant) AS cos_mean, + AVG(CAST(s.min_dhash_independent AS REAL)) AS dh_mean, + COUNT(*) AS n + FROM signatures s + JOIN accountants a ON s.assigned_accountant = a.name + WHERE s.assigned_accountant IS NOT NULL + AND s.max_similarity_to_same_accountant IS NOT NULL + AND s.min_dhash_independent IS NOT NULL + {firm_filter_sql} + GROUP BY s.assigned_accountant + HAVING n >= ? + ''' + cur.execute(sql, params + [MIN_SIGS]) + rows = cur.fetchall() + out = [{'cpa': r[0], 'firm': r[1], + 'cos_mean': float(r[2]), 'dh_mean': float(r[3]), + 'n_sigs': int(r[4])} for r in rows] + conn.close() + return out + + +def load_big4(): + return load_accountants('AND a.firm IN (?, ?, ?, ?)', + list(BIG4), with_handfrac=True) + + +def load_non_big4_reference(): + return load_accountants( + 'AND a.firm IS NOT NULL AND a.firm NOT IN (?, ?, ?, ?)', + list(BIG4), with_handfrac=False) + + +def fit_reference_gaussian(points): + X = np.asarray(points, dtype=float) + mcd = MinCovDet(random_state=SEED, support_fraction=0.85).fit(X) + return { + 'mean': mcd.location_, + 'cov': mcd.covariance_, + 'cov_inv': np.linalg.inv(mcd.covariance_), + 'support_fraction': 0.85, + 'n_reference': int(len(X)), + } + + +def reverse_anchor_directional_score(cpa, ref): + """Returns -cos_left_tail_pct under the reference marginal cos + Gaussian. Higher (less negative) = more deviated in the hand- + leaning direction (left tail of reference cosine distribution). + """ + mu_c = ref['mean'][0] + sd_c = float(np.sqrt(ref['cov'][0, 0])) + tail = float(stats.norm.cdf(cpa['cos_mean'], loc=mu_c, scale=sd_c)) + return -tail + + +def fit_k3_big4(big4_cpas): + X = np.column_stack([ + [c['cos_mean'] for c in big4_cpas], + [c['dh_mean'] for c in big4_cpas], + ]) + gmm = GaussianMixture(n_components=3, covariance_type='full', + random_state=SEED, n_init=15, max_iter=500).fit(X) + order = np.argsort(gmm.means_[:, 0]) # C1 = lowest cos = hand-leaning + return gmm, order + + +def compute_p_c1(cpa, gmm, order): + X = np.array([[cpa['cos_mean'], cpa['dh_mean']]]) + raw_post = gmm.predict_proba(X)[0] + return float(raw_post[order[0]]) + + +def compute_correlations(big4_data): + p_c1 = np.array([d['p_c1'] for d in big4_data]) + rev_anchor = np.array([d['reverse_anchor_score'] for d in big4_data]) + hand_frac = np.array([d['paperA_hand_frac'] for d in big4_data]) + pairs = [ + ('p_c1_vs_paperA_hand_frac', p_c1, hand_frac), + ('reverse_anchor_vs_paperA_hand_frac', rev_anchor, hand_frac), + ('p_c1_vs_reverse_anchor', p_c1, rev_anchor), + ] + out = {} + for name, a, b in pairs: + rho, p = stats.spearmanr(a, b) + r, p_pearson = stats.pearsonr(a, b) + out[name] = { + 'spearman_rho': float(rho), + 'spearman_p': float(p), + 'pearson_r': float(r), + 'pearson_p': float(p_pearson), + } + return out + + +def classify_convergence(corrs): + rhos = [corrs['p_c1_vs_paperA_hand_frac']['spearman_rho'], + corrs['reverse_anchor_vs_paperA_hand_frac']['spearman_rho'], + corrs['p_c1_vs_reverse_anchor']['spearman_rho']] + abs_rhos = [abs(r) for r in rhos] + min_abs_rho = float(min(abs_rhos)) + all_strong = all(r >= RHO_STRONG for r in abs_rhos) + all_partial = all(r >= RHO_PARTIAL for r in abs_rhos) + if all_strong: + return 'CONVERGENCE_STRONG', ( + f'All three pairwise Spearman |rho| >= {RHO_STRONG}; ' + f'min |rho| = {min_abs_rho:.3f}. Three independent statistical ' + f'lenses agree on the Big-4 CPA hand-leaning ranking.') + if all_partial: + return 'CONVERGENCE_PARTIAL', ( + f'All three pairwise Spearman |rho| >= {RHO_PARTIAL} but at ' + f'least one falls below {RHO_STRONG}; min |rho| = ' + f'{min_abs_rho:.3f}. Methods agree on direction but not ' + f'tightness; v4.0 can present them as complementary lenses.') + return 'CONVERGENCE_WEAK', ( + f'At least one pair has |rho| < {RHO_PARTIAL}; min |rho| = ' + f'{min_abs_rho:.3f}. Methods disagree -- they may be measuring ' + f'different constructs.') + + +def per_firm_aggregate(big4_data): + by_firm = {} + for d in big4_data: + by_firm.setdefault(d['firm'], []).append(d) + rows = [] + for f in BIG4: + items = by_firm.get(f, []) + n = len(items) + if n == 0: + continue + c1_count = sum(1 for d in items if d['hard_label'] == 'C1') + c2_count = sum(1 for d in items if d['hard_label'] == 'C2') + c3_count = sum(1 for d in items if d['hard_label'] == 'C3') + mean_p_c1 = float(np.mean([d['p_c1'] for d in items])) + mean_rev = float(np.mean([d['reverse_anchor_score'] for d in items])) + mean_hand = float(np.mean([d['paperA_hand_frac'] for d in items])) + rows.append({ + 'firm': f, + 'firm_label': LABEL[f], + 'n_cpas': n, + 'k3_C1_count': c1_count, + 'k3_C2_count': c2_count, + 'k3_C3_count': c3_count, + 'k3_C1_pct': float(100 * c1_count / n), + 'k3_C3_pct': float(100 * c3_count / n), + 'mean_p_c1': mean_p_c1, + 'mean_reverse_anchor': mean_rev, + 'mean_paperA_hand_frac': mean_hand, + }) + return rows + + +def render_scatter(big4_data): + p_c1 = np.array([d['p_c1'] for d in big4_data]) + rev = np.array([d['reverse_anchor_score'] for d in big4_data]) + hf = np.array([d['paperA_hand_frac'] for d in big4_data]) + firm_color = { + '勤業眾信聯合': 'crimson', '安侯建業聯合': 'royalblue', + '資誠聯合': 'forestgreen', '安永聯合': 'darkorange', + } + colors = [firm_color[d['firm']] for d in big4_data] + + fig, axes = plt.subplots(1, 3, figsize=(18, 5.5)) + pairs = [ + ('K=3 P(C1 hand-leaning)', p_c1, + 'Paper A hand_frac', hf, + 'p_c1_vs_paperA_hand_frac'), + ('Reverse-anchor directional score', rev, + 'Paper A hand_frac', hf, + 'reverse_anchor_vs_paperA_hand_frac'), + ('K=3 P(C1 hand-leaning)', p_c1, + 'Reverse-anchor directional score', rev, + 'p_c1_vs_reverse_anchor'), + ] + for ax, (xl, x, yl, y, _name) in zip(axes, pairs): + ax.scatter(x, y, s=20, alpha=0.55, c=colors, edgecolor='white') + rho, p = stats.spearmanr(x, y) + ax.set_xlabel(xl) + ax.set_ylabel(yl) + ax.set_title(f'{xl}\nvs {yl}\nSpearman rho={rho:.3f} (p={p:.2e})') + ax.grid(alpha=0.3) + # Add legend for firm color + handles = [plt.Line2D([0], [0], marker='o', linestyle='', color=c, + label=LABEL[f], markersize=8) + for f, c in firm_color.items()] + fig.legend(handles=handles, loc='lower center', + ncol=4, bbox_to_anchor=(0.5, -0.02)) + fig.tight_layout() + fig.savefig(OUT / 'scatter_pairwise.png', dpi=150, + bbox_inches='tight') + plt.close(fig) + + +def write_csv(per_firm_rows, big4_data): + csv_per_firm = OUT / 'per_firm_summary.csv' + with open(csv_per_firm, 'w', newline='', encoding='utf-8') as f: + w = csv.writer(f) + w.writerow(['firm', 'firm_label', 'n_cpas', + 'k3_C1_count', 'k3_C2_count', 'k3_C3_count', + 'k3_C1_pct', 'k3_C3_pct', + 'mean_p_c1', 'mean_reverse_anchor', + 'mean_paperA_hand_frac']) + for r in per_firm_rows: + w.writerow([r['firm'], r['firm_label'], r['n_cpas'], + r['k3_C1_count'], r['k3_C2_count'], r['k3_C3_count'], + f'{r["k3_C1_pct"]:.2f}', f'{r["k3_C3_pct"]:.2f}', + f'{r["mean_p_c1"]:.4f}', + f'{r["mean_reverse_anchor"]:.4f}', + f'{r["mean_paperA_hand_frac"]:.4f}']) + csv_cpa = OUT / 'per_cpa_scores.csv' + with open(csv_cpa, 'w', newline='', encoding='utf-8') as f: + w = csv.writer(f) + w.writerow(['cpa', 'firm', 'firm_label', 'n_sigs', + 'cos_mean', 'dh_mean', + 'p_c1', 'p_c2', 'p_c3', 'hard_label', + 'reverse_anchor_score', 'paperA_hand_frac']) + for d in big4_data: + w.writerow([d['cpa'], d['firm'], LABEL[d['firm']], d['n_sigs'], + f'{d["cos_mean"]:.4f}', f'{d["dh_mean"]:.4f}', + f'{d["p_c1"]:.4f}', f'{d["p_c2"]:.4f}', + f'{d["p_c3"]:.4f}', d['hard_label'], + f'{d["reverse_anchor_score"]:.4f}', + f'{d["paperA_hand_frac"]:.4f}']) + return csv_per_firm, csv_cpa + + +def render_md(big4_data, ref, k3_components, corrs, verdict, per_firm_rows): + md = [ + '# v4.0 Convergence: K=3 + Reverse-Anchor + Paper A', + f'Generated: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}', + '', + '## A. Three independent lenses on Big-4 CPAs', + '', + '### 1. K=3 GMM cluster posterior P_C1 (hand-leaning)', + '', + '| Component | mean cos | mean dh | weight | interpretation |', + '|---|---|---|---|---|', + ] + for i, name in enumerate(['C1 hand-leaning', 'C2 mixed', + 'C3 replicated']): + m = k3_components['means'][i] + w = k3_components['weights'][i] + md.append(f'| {name} | {m[0]:.4f} | {m[1]:.4f} | {w:.3f} | ' + f'higher P_C1 = more hand-leaning |') + + md += ['', + '### 2. Reverse-anchor directional score', + '', + f'- Reference: non-Big-4 CPAs (n = {ref["n_reference"]}, ' + f'mid/small firms only -- strict separation from Big-4 target)', + f'- Reference center (MCD, support 0.85): cos = ' + f'{ref["mean"][0]:.4f}, dh = {ref["mean"][1]:.4f}', + f'- Score per Big-4 CPA: -cos_left_tail_percentile under the ' + f'reference marginal cos Gaussian. Higher = deeper into the ' + f'left tail = more hand-leaning relative to the reference.', + '', + '### 3. Paper A v3.x operational rule', + '', + f'- Per-CPA hand_frac = 1 - (fraction of signatures satisfying ' + f'cos > {PAPER_A_COS_CUT} AND dh <= {PAPER_A_DH_CUT})', + '', + '## B. Pairwise Spearman correlations', + '', + '| Pair | Spearman rho | p | Pearson r | p |', + '|---|---|---|---|---|'] + for name, c in corrs.items(): + md.append(f'| {name} | **{c["spearman_rho"]:.4f}** | ' + f'{c["spearman_p"]:.2e} | {c["pearson_r"]:.4f} | ' + f'{c["pearson_p"]:.2e} |') + + md += ['', f'## C. Convergence verdict: **{verdict[0]}**', + '', verdict[1], '', + '### Verdict legend', + f'- **CONVERGENCE_STRONG**: all 3 |rho| >= {RHO_STRONG}.', + f'- **CONVERGENCE_PARTIAL**: all 3 |rho| >= {RHO_PARTIAL}.', + f'- **CONVERGENCE_WEAK**: at least one |rho| < {RHO_PARTIAL}.', + '', + '## D. Per-firm summary', + '', + '| Firm | n CPAs | K=3 C1% | K=3 C3% | mean P_C1 | mean rev-anchor | mean hand_frac |', + '|---|---|---|---|---|---|---|'] + for r in per_firm_rows: + md.append(f'| {r["firm_label"]} | {r["n_cpas"]} | ' + f'{r["k3_C1_pct"]:.2f}% | {r["k3_C3_pct"]:.2f}% | ' + f'{r["mean_p_c1"]:.4f} | {r["mean_reverse_anchor"]:.4f} | ' + f'{r["mean_paperA_hand_frac"]:.4f} |') + + md += ['', + '## E. Files', + '- `scatter_pairwise.png` -- 1x3 scatter of approach pairs', + '- `per_firm_summary.csv` -- per-firm aggregates', + '- `per_cpa_scores.csv` -- per-CPA all three scores + hard label', + '- `convergence_results.json` -- full machine-readable output', + '', + '## F. Methodology notes', + '', + '- Reference population for reverse-anchor: non-Big-4 CPAs only ' + '(n=249), preserving strict train/target separation. This is ' + 'tighter than Script 33 (which used non-Firm-A including other ' + 'Big-4); using a population fully outside Big-4 means the ' + 'reverse-anchor metric carries no within-Big-4 information.', + '- K=3 fit on full Big-4 (not LOOO) -- Script 37 already showed ' + 'C1 component shape is stable across LOOO folds; this script ' + 'uses the canonical full-Big-4 fit for per-CPA posteriors.', + '- All three approaches operate on the per-CPA mean (cos, dh) -- ' + 'no signature-level scoring here. A signature-level convergence ' + 'check is deferred (it would inflate sample size to ~90k ' + 'without adding methodological signal).', + ] + return '\n'.join(md) + + +def main(): + print('=' * 72) + print('Script 38: v4.0 Convergence -- K=3 + Reverse-Anchor + Paper A') + print('=' * 72) + big4 = load_big4() + print(f'\nN Big-4 CPAs (n_sigs >= {MIN_SIGS}): {len(big4)}') + by_firm_count = {} + for d in big4: + by_firm_count[d['firm']] = by_firm_count.get(d['firm'], 0) + 1 + for f in BIG4: + print(f' {LABEL[f]}: {by_firm_count.get(f, 0)}') + + ref_cpas = load_non_big4_reference() + print(f'\nN non-Big-4 reference CPAs (n_sigs >= {MIN_SIGS}): ' + f'{len(ref_cpas)}') + + # Build reference Gaussian + ref_points = np.array([[c['cos_mean'], c['dh_mean']] for c in ref_cpas]) + ref = fit_reference_gaussian(ref_points) + print(f' Reference center (MCD): cos={ref["mean"][0]:.4f}, ' + f'dh={ref["mean"][1]:.4f}') + + # K=3 fit + gmm, order = fit_k3_big4(big4) + means_sorted = gmm.means_[order] + weights_sorted = gmm.weights_[order] + print(f'\nFull-Big-4 K=3 components (sorted by cos):') + for i, name in enumerate(['C1 hand-leaning', 'C2 mixed', + 'C3 replicated']): + print(f' {name}: cos={means_sorted[i,0]:.4f}, ' + f'dh={means_sorted[i,1]:.4f}, weight={weights_sorted[i]:.3f}') + + # Score each Big-4 CPA + for d in big4: + X = np.array([[d['cos_mean'], d['dh_mean']]]) + raw_post = gmm.predict_proba(X)[0] + d['p_c1'] = float(raw_post[order[0]]) + d['p_c2'] = float(raw_post[order[1]]) + d['p_c3'] = float(raw_post[order[2]]) + hard = int(np.argmax(raw_post)) + d['hard_label'] = ['C1', 'C2', 'C3'][[order[0], order[1], + order[2]].index(hard)] + d['reverse_anchor_score'] = reverse_anchor_directional_score(d, ref) + d['paperA_hand_frac'] = d['hand_frac'] + + # Correlations + corrs = compute_correlations(big4) + print('\nPairwise Spearman correlations:') + for name, c in corrs.items(): + print(f' {name}: rho={c["spearman_rho"]:+.4f} ' + f'(p={c["spearman_p"]:.2e})') + + # Verdict + verdict = classify_convergence(corrs) + print(f'\nVerdict: {verdict[0]}') + print(f' {verdict[1]}') + + # Per-firm aggregate + per_firm_rows = per_firm_aggregate(big4) + print('\nPer-firm summary:') + print(f' {"Firm":<22} {"n":>4} {"C1%":>7} {"C3%":>7} ' + f'{"E[P_C1]":>9} {"E[rev]":>9} {"E[hand]":>9}') + for r in per_firm_rows: + print(f' {r["firm_label"]:<22} {r["n_cpas"]:>4} ' + f'{r["k3_C1_pct"]:>6.2f}% {r["k3_C3_pct"]:>6.2f}% ' + f'{r["mean_p_c1"]:>9.4f} {r["mean_reverse_anchor"]:>9.4f} ' + f'{r["mean_paperA_hand_frac"]:>9.4f}') + + # Plots, CSVs, JSON, MD + render_scatter(big4) + csv_pf, csv_cpa = write_csv(per_firm_rows, big4) + print(f'\nCSV: {csv_pf}; {csv_cpa}') + + payload = { + 'generated_at': datetime.now().isoformat(), + 'min_sigs_per_accountant': MIN_SIGS, + 'paper_a_operational_cuts': {'cos': PAPER_A_COS_CUT, + 'dh': PAPER_A_DH_CUT}, + 'reference_population': { + 'description': 'non-Big-4 CPAs (mid/small firms only)', + 'n_cpas': ref['n_reference'], + 'center_mcd': [float(x) for x in ref['mean']], + 'cov_mcd': [[float(x) for x in row] for row in ref['cov']], + }, + 'k3_components': { + 'means': means_sorted.tolist(), + 'weights': weights_sorted.tolist(), + }, + 'correlations': corrs, + 'verdict': {'class': verdict[0], 'explanation': verdict[1]}, + 'per_firm_summary': per_firm_rows, + 'n_big4_cpas': len(big4), + } + json_path = OUT / 'convergence_results.json' + json_path.write_text(json.dumps(payload, indent=2, ensure_ascii=False), + encoding='utf-8') + print(f'JSON: {json_path}') + + md = render_md(big4, ref, {'means': means_sorted.tolist(), + 'weights': weights_sorted.tolist()}, + corrs, verdict, per_firm_rows) + md_path = OUT / 'convergence_report.md' + md_path.write_text(md, encoding='utf-8') + print(f'Report: {md_path}') + + +if __name__ == '__main__': + main()