#!/usr/bin/env python3 """ Script 34: Big-4-Only Pooled Calibration ========================================== Pool Firm A + KPMG + PwC + EY (drop all mid/small firms) and re-run the three-method framework + 2D GMM K=2/K=3 + bootstrap stability on the resulting accountant-level (cos_mean, dh_mean) plane. Why this variant: Paper A's published "natural threshold" (cos=0.945, dh=8.10) was derived from a 3-comp 2D GMM on the FULL dataset (Big-4 + ~250 mid/small-firm CPAs). The mid/small-firm tail adds extra noise and is itself heterogeneous (many firms, few CPAs each). Restricting to Big-4 only gives a cleaner four-firm contrast and may produce a tighter, more reproducible crossing. Comparison table (the deliverable): | Source | cos crossing | dh crossing | | Paper A published (full 3-comp) | 0.945 | 8.10 | | Firm A alone (Script 32) | ~0.977 | ~4.6 | | Non-Firm-A alone (Script 32) | ~0.938 | ~7.5 | | Big-4 only pooled (this script) | ??? | ??? | | + bootstrap 95% CI | [..,..] | [..,..] | Verdict (descriptive): TIGHTER bootstrap 95% CI half-width <= 0.005 (cos) AND <= 0.5 (dh) AND point estimate within 0.01 (cos) / 1.0 (dh) of 0.945/8.10 COMPARABLE CI overlaps Paper A point estimate, half-width <= 0.01 / 1.0 WIDER CI half-width > 0.01 (cos) OR > 1.0 (dh) Output: reports/big4_only_pooled/ big4_only_pooled_results.json big4_only_pooled_report.md panel_big4_only_.png """ import sqlite3 import json import numpy as np import matplotlib matplotlib.use('Agg') import matplotlib.pyplot as plt from pathlib import Path from datetime import datetime from scipy import stats from scipy.signal import find_peaks from scipy.optimize import brentq from sklearn.mixture import GaussianMixture import diptest DB = '/Volumes/NV2/PDF-Processing/signature-analysis/signature_analysis.db' OUT = Path('/Volumes/NV2/PDF-Processing/signature-analysis/reports/' 'big4_only_pooled') OUT.mkdir(parents=True, exist_ok=True) EPS = 1e-6 Z_CRIT = 1.96 MIN_SIGS = 10 N_BOOTSTRAP = 500 BOOT_SEED = 42 BIG4 = ('勤業眾信聯合', '安侯建業聯合', '資誠聯合', '安永聯合') PAPER_A_COS = 0.945 PAPER_A_DH = 8.10 def load_big4_pooled(): conn = sqlite3.connect(DB) cur = conn.cursor() cur.execute(''' SELECT s.assigned_accountant, a.firm, AVG(s.max_similarity_to_same_accountant) AS cos_mean, AVG(CAST(s.min_dhash_independent AS REAL)) AS dh_mean, COUNT(*) AS n FROM signatures s JOIN accountants a ON s.assigned_accountant = a.name WHERE s.assigned_accountant IS NOT NULL AND s.max_similarity_to_same_accountant IS NOT NULL AND s.min_dhash_independent IS NOT NULL AND a.firm IN (?, ?, ?, ?) GROUP BY s.assigned_accountant HAVING n >= ? ''', BIG4 + (MIN_SIGS,)) rows = cur.fetchall() conn.close() return rows def gmm_2d_marginal_crossing(X, dim, K=2, seed=42): if len(X) < 8: return None, None gmm = GaussianMixture(n_components=K, covariance_type='full', random_state=seed, n_init=15, max_iter=500).fit(X) means = gmm.means_ covs = gmm.covariances_ weights = gmm.weights_ if K != 2: return None, gmm m1, m2 = means[0][dim], means[1][dim] s1 = np.sqrt(covs[0][dim, dim]) s2 = np.sqrt(covs[1][dim, dim]) w1, w2 = weights[0], weights[1] def diff(x): return (w2 * stats.norm.pdf(x, m2, s2) - w1 * stats.norm.pdf(x, m1, s1)) xs = np.linspace(float(X[:, dim].min()), float(X[:, dim].max()), 2000) ys = diff(xs) ch = np.where(np.diff(np.sign(ys)) != 0)[0] if not len(ch): return None, gmm mid = 0.5 * (m1 + m2) crossings = [] for i in ch: try: crossings.append(brentq(diff, xs[i], xs[i + 1])) except ValueError: continue if not crossings: return None, gmm return float(min(crossings, key=lambda c: abs(c - mid))), gmm def gmm_3comp_summary(X, seed=42): if len(X) < 12: return None gmm = GaussianMixture(n_components=3, covariance_type='full', random_state=seed, n_init=15, max_iter=500).fit(X) order = np.argsort(gmm.means_[:, 0]) return { 'means': [[float(m[0]), float(m[1])] for m in gmm.means_[order]], 'weights': [float(w) for w in gmm.weights_[order]], 'bic': float(gmm.bic(X)), 'aic': float(gmm.aic(X)), } def fit_logit_gmm(x, K=2, seed=42): x = np.clip(np.asarray(x, dtype=float), EPS, 1 - EPS) z = np.log(x / (1 - x)).reshape(-1, 1) gmm = GaussianMixture(n_components=K, random_state=seed, max_iter=500).fit(z) order = np.argsort(gmm.means_.ravel()) means = gmm.means_.ravel()[order] stds = np.sqrt(gmm.covariances_.ravel())[order] weights = gmm.weights_[order] crossing = None if K == 2: m1, s1, w1 = means[0], stds[0], weights[0] m2, s2, w2 = means[1], stds[1], weights[1] def diff(z0): return (w2 * stats.norm.pdf(z0, m2, s2) - w1 * stats.norm.pdf(z0, m1, s1)) zs = np.linspace(min(m1, m2) - 2, max(m1, m2) + 2, 2000) ys = diff(zs) ch = np.where(np.diff(np.sign(ys)) != 0)[0] if len(ch): try: z_cross = brentq(diff, zs[ch[0]], zs[ch[0] + 1]) crossing = float(1 / (1 + np.exp(-z_cross))) except ValueError: pass return { 'K': K, 'aic': float(gmm.aic(z)), 'bic': float(gmm.bic(z)), 'crossing_original': crossing, } def kde_dip(values): arr = np.asarray(values, dtype=float) arr = arr[np.isfinite(arr)] dip, pval = diptest.diptest(arr, boot_pval=True, n_boot=2000) kde = stats.gaussian_kde(arr, bw_method='silverman') xs = np.linspace(arr.min(), arr.max(), 2000) density = kde(xs) peaks, _ = find_peaks(density, prominence=density.max() * 0.02) antimodes = [] for i in range(len(peaks) - 1): seg = density[peaks[i]:peaks[i + 1]] if not len(seg): continue local = peaks[i] + int(np.argmin(seg)) antimodes.append(float(xs[local])) return { 'n': int(len(arr)), 'dip': float(dip), 'dip_pvalue': float(pval), 'unimodal_alpha05': bool(pval > 0.05), 'n_modes': int(len(peaks)), 'antimode': antimodes[0] if antimodes else None, } def bd_mccrary(values, bin_width, direction): arr = np.asarray(values, dtype=float) arr = arr[np.isfinite(arr)] lo = float(np.floor(arr.min() / bin_width) * bin_width) hi = float(np.ceil(arr.max() / bin_width) * bin_width) edges = np.arange(lo, hi + bin_width, bin_width) counts, _ = np.histogram(arr, bins=edges) centers = (edges[:-1] + edges[1:]) / 2.0 N = counts.sum() p = counts / N if N else counts.astype(float) n_bins = len(counts) z = np.full(n_bins, np.nan) for i in range(1, n_bins - 1): p_lo = p[i - 1] p_hi = p[i + 1] exp_i = 0.5 * (counts[i - 1] + counts[i + 1]) var_i = (N * p[i] * (1 - p[i]) + 0.25 * N * (p_lo + p_hi) * (1 - p_lo - p_hi)) if var_i <= 0: continue z[i] = (counts[i] - exp_i) / np.sqrt(var_i) transitions = [] for i in range(1, len(z)): if np.isnan(z[i - 1]) or np.isnan(z[i]): continue ok = ((direction == 'neg_to_pos' and z[i - 1] < -Z_CRIT and z[i] > Z_CRIT) or (direction == 'pos_to_neg' and z[i - 1] > Z_CRIT and z[i] < -Z_CRIT)) if ok: transitions.append({ 'threshold_between': float((centers[i - 1] + centers[i]) / 2.0), 'z_before': float(z[i - 1]), 'z_after': float(z[i]), }) best = (max(transitions, key=lambda t: abs(t['z_before']) + abs(t['z_after'])) if transitions else None) return { 'n_transitions': len(transitions), 'threshold': (best['threshold_between'] if best else None), } def bootstrap_2d_gmm_crossing(X, dim, n_boot=N_BOOTSTRAP, seed=BOOT_SEED): rng = np.random.default_rng(seed) crossings = [] n = len(X) for b in range(n_boot): idx = rng.integers(0, n, size=n) Xb = X[idx] c, _ = gmm_2d_marginal_crossing(Xb, dim, K=2, seed=42) if c is not None: crossings.append(c) crossings = np.asarray(crossings) if len(crossings) < n_boot * 0.5: return None return { 'n_successful_boot': int(len(crossings)), 'mean': float(np.mean(crossings)), 'median': float(np.median(crossings)), 'std': float(np.std(crossings, ddof=1)), 'ci95': [float(np.quantile(crossings, 0.025)), float(np.quantile(crossings, 0.975))], 'ci_halfwidth': float(0.5 * (np.quantile(crossings, 0.975) - np.quantile(crossings, 0.025))), } def classify_stability(boot_cos, boot_dh, point_cos, point_dh): if boot_cos is None or boot_dh is None: return 'WIDER', ('Bootstrap failed to converge in >50% of resamples; ' 'crossing is unstable.') cos_hw = boot_cos['ci_halfwidth'] dh_hw = boot_dh['ci_halfwidth'] cos_offset = abs(point_cos - PAPER_A_COS) if point_cos is not None else None dh_offset = abs(point_dh - PAPER_A_DH) if point_dh is not None else None note = (f'CI half-width (cos) = {cos_hw:.4f}, (dh) = {dh_hw:.3f}; ' f'offset from Paper A baseline (cos) = {cos_offset}, ' f'(dh) = {dh_offset}.') if (cos_hw <= 0.005 and dh_hw <= 0.5 and cos_offset is not None and cos_offset <= 0.01 and dh_offset is not None and dh_offset <= 1.0): return 'TIGHTER', f'Big-4-only crossing is tighter and aligned. {note}' if cos_hw <= 0.01 and dh_hw <= 1.0: return 'COMPARABLE', (f'Big-4-only crossing is comparable to ' f'published baseline in stability. {note}') return 'WIDER', (f'Big-4-only crossing is wider than the published ' f'baseline -- restriction does not improve stability. {note}') def main(): print('=' * 72) print('Script 34: Big-4-Only Pooled Calibration') print('=' * 72) rows = load_big4_pooled() by_firm = {} for r in rows: by_firm.setdefault(r[1], 0) by_firm[r[1]] += 1 print(f'\nN Big-4 CPAs (n_signatures >= {MIN_SIGS}): {len(rows)}') for firm, n in sorted(by_firm.items(), key=lambda x: -x[1]): print(f' {firm}: {n}') cos = np.array([r[2] for r in rows]) dh = np.array([r[3] for r in rows]) X = np.column_stack([cos, dh]) # Three-method on each margin out = {'sample_sizes': by_firm, 'n_total_cpas': int(len(rows))} for desc, arr, bin_width, direction in [ ('cos_mean', cos, 0.002, 'neg_to_pos'), ('dh_mean', dh, 0.2, 'pos_to_neg'), ]: kde_r = kde_dip(arr) bd_r = bd_mccrary(arr, bin_width, direction) is_cos = (desc == 'cos_mean') x_norm = arr if is_cos else arr / 64.0 loggmm2 = fit_logit_gmm(x_norm, K=2) if not is_cos and loggmm2.get('crossing_original') is not None: loggmm2['crossing_original'] = loggmm2['crossing_original'] * 64.0 out[desc] = { 'kde_dip': kde_r, 'bd_mccrary': bd_r, 'logit_gmm_2': loggmm2, } print(f'\n[{desc}]') print(f' KDE+dip: dip p={kde_r["dip_pvalue"]:.4f}, ' f'n_modes={kde_r["n_modes"]}, antimode={kde_r["antimode"]}') print(f' BD/McCrary: {bd_r["n_transitions"]} transitions, ' f'threshold={bd_r["threshold"]}') print(f' LogGMM-2 crossing: {loggmm2.get("crossing_original")}') # 2D GMM K=2 marginal crossings + bootstrap print('\n[2D GMM K=2]') cross_cos, gmm2 = gmm_2d_marginal_crossing(X, dim=0, K=2) cross_dh, _ = gmm_2d_marginal_crossing(X, dim=1, K=2) print(f' cos crossing = {cross_cos}') print(f' dh crossing = {cross_dh}') print(f' K=2 BIC = {gmm2.bic(X):.2f}, AIC = {gmm2.aic(X):.2f}') print(f' Component means: {gmm2.means_.tolist()}') print(f' Component weights: {gmm2.weights_.tolist()}') print('\n[2D GMM K=3 (for completeness)]') g3 = gmm_3comp_summary(X) print(f' Components (sorted by cos): {g3["means"]}') print(f' Weights: {g3["weights"]}') print(f' K=3 BIC = {g3["bic"]:.2f}, AIC = {g3["aic"]:.2f}') print('\n[Bootstrap 95% CI on 2D GMM crossings]') boot_cos = bootstrap_2d_gmm_crossing(X, dim=0) boot_dh = bootstrap_2d_gmm_crossing(X, dim=1) if boot_cos: print(f' cos: median={boot_cos["median"]:.4f}, ' f'95% CI=[{boot_cos["ci95"][0]:.4f}, {boot_cos["ci95"][1]:.4f}], ' f'half-width={boot_cos["ci_halfwidth"]:.4f} ' f'({boot_cos["n_successful_boot"]}/{N_BOOTSTRAP} resamples)') if boot_dh: print(f' dh: median={boot_dh["median"]:.4f}, ' f'95% CI=[{boot_dh["ci95"][0]:.4f}, {boot_dh["ci95"][1]:.4f}], ' f'half-width={boot_dh["ci_halfwidth"]:.4f} ' f'({boot_dh["n_successful_boot"]}/{N_BOOTSTRAP} resamples)') out['gmm_2d_2comp'] = { 'cos_crossing': cross_cos, 'dh_crossing': cross_dh, 'bic': float(gmm2.bic(X)), 'aic': float(gmm2.aic(X)), 'means': gmm2.means_.tolist(), 'weights': gmm2.weights_.tolist(), 'bootstrap_cos': boot_cos, 'bootstrap_dh': boot_dh, } out['gmm_2d_3comp'] = g3 out['paper_a_baseline'] = {'cos': PAPER_A_COS, 'dh': PAPER_A_DH} # Verdict verdict_class, verdict_msg = classify_stability( boot_cos, boot_dh, cross_cos, cross_dh) out['verdict'] = {'class': verdict_class, 'explanation': verdict_msg} print(f'\nVerdict: {verdict_class} -- {verdict_msg}') # Plots: histogram + crossings overlay for desc, arr, bin_width, point in [ ('cos_mean', cos, 0.002, cross_cos), ('dh_mean', dh, 0.2, cross_dh), ]: boot = boot_cos if desc == 'cos_mean' else boot_dh baseline = PAPER_A_COS if desc == 'cos_mean' else PAPER_A_DH fig, ax = plt.subplots(figsize=(10, 5)) lo = float(np.floor(arr.min() / bin_width) * bin_width) hi = float(np.ceil(arr.max() / bin_width) * bin_width) bins = np.arange(lo, hi + bin_width, bin_width) ax.hist(arr, bins=bins, density=True, alpha=0.55, color='steelblue', edgecolor='white') kde = stats.gaussian_kde(arr, bw_method='silverman') xs = np.linspace(arr.min(), arr.max(), 500) ax.plot(xs, kde(xs), 'g-', lw=1.5, label='KDE (Silverman)') if point is not None: ax.axvline(point, color='orange', lw=2, ls='--', label=f'2D-GMM K=2 crossing = {point:.4f}') ax.axvline(baseline, color='black', lw=2, ls=':', label=f'Paper A baseline = {baseline}') if boot is not None: ax.axvspan(boot['ci95'][0], boot['ci95'][1], color='orange', alpha=0.15, label=f"95% bootstrap CI = " f"[{boot['ci95'][0]:.4f}, {boot['ci95'][1]:.4f}]") ax.set_xlabel(desc) ax.set_ylabel('Density') ax.set_title(f'Big-4-only pooled accountant {desc} ' f'(n={len(arr)} CPAs)') ax.legend(fontsize=9) fig.tight_layout() png = OUT / f'panel_big4_only_{desc}.png' fig.savefig(png, dpi=150) plt.close(fig) print(f' plot: {png}') out['generated_at'] = datetime.now().isoformat() (OUT / 'big4_only_pooled_results.json').write_text( json.dumps(out, indent=2, ensure_ascii=False), encoding='utf-8') print(f'\nJSON: {OUT / "big4_only_pooled_results.json"}') # Markdown md = [ '# Big-4-Only Pooled Calibration (Script 34)', f'Generated: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}', '', '## Sample', '', f'- Population: Firm A + KPMG + PwC + EY (no mid/small firms)', f'- N CPAs (n_sigs >= {MIN_SIGS}): **{len(rows)}**', '', '| Firm | N CPAs |', '|---|---|', ] for firm, n in sorted(by_firm.items(), key=lambda x: -x[1]): md.append(f'| {firm} | {n} |') md += ['', '## Comparison table', '', '| Source | cos crossing | dh crossing |', '|---|---|---|', f'| Paper A published (full 3-comp) | {PAPER_A_COS} | {PAPER_A_DH} |', f'| Firm A alone (Script 32) | ~0.977 | ~4.6 |', f'| Non-Firm-A alone (Script 32) | ~0.938 | ~7.5 |', f'| **Big-4 only pooled (this script, K=2)** | ' f'**{cross_cos}** | **{cross_dh}** |'] if boot_cos: md.append(f'| + bootstrap 95% CI (n={N_BOOTSTRAP}) | ' f'[{boot_cos["ci95"][0]:.4f}, {boot_cos["ci95"][1]:.4f}] | ' f'[{boot_dh["ci95"][0]:.4f}, {boot_dh["ci95"][1]:.4f}] |') md += ['', '## Three-method margin checks (Big-4-only)', '', '| Measure | dip p (KDE) | KDE antimode | BD/McCrary threshold | LogGMM-2 crossing |', '|---|---|---|---|---|', f'| cos_mean | {out["cos_mean"]["kde_dip"]["dip_pvalue"]:.4f} | ' f'{out["cos_mean"]["kde_dip"]["antimode"]} | ' f'{out["cos_mean"]["bd_mccrary"]["threshold"]} | ' f'{out["cos_mean"]["logit_gmm_2"]["crossing_original"]} |', f'| dh_mean | {out["dh_mean"]["kde_dip"]["dip_pvalue"]:.4f} | ' f'{out["dh_mean"]["kde_dip"]["antimode"]} | ' f'{out["dh_mean"]["bd_mccrary"]["threshold"]} | ' f'{out["dh_mean"]["logit_gmm_2"]["crossing_original"]} |', '', '## 2D GMM K=2 components', '', '| Component | mean cos | mean dh | weight |', '|---|---|---|---|'] for i, (m, w) in enumerate(zip(gmm2.means_, gmm2.weights_)): md.append(f'| C{i+1} | {m[0]:.4f} | {m[1]:.4f} | {w:.3f} |') md.append(f'') md.append(f'BIC(K=2 2D)={gmm2.bic(X):.2f}, AIC={gmm2.aic(X):.2f}') md.append(f'BIC(K=3 2D)={g3["bic"]:.2f}, AIC={g3["aic"]:.2f}') md += ['', '## 2D GMM K=3 components', '', '| Component | mean cos | mean dh | weight |', '|---|---|---|---|'] for i, (m, w) in enumerate(zip(g3['means'], g3['weights'])): md.append(f'| C{i+1} | {m[0]:.4f} | {m[1]:.4f} | {w:.3f} |') md += ['', '## Verdict', '', f'**{verdict_class}** -- {verdict_msg}', '', '### Verdict legend', '- **TIGHTER**: bootstrap CI half-width <= 0.005 (cos) AND <= 0.5 ' '(dh) AND point estimate within 0.01 (cos) / 1.0 (dh) of Paper A ' 'baseline (0.945, 8.10). Big-4-only restriction strictly improves ' 'stability without shifting the threshold materially.', '- **COMPARABLE**: CI half-width <= 0.01 (cos) / <= 1.0 (dh). ' 'Big-4-only is within published precision.', '- **WIDER**: bootstrap unstable -- mid/small-firm tail was ' 'apparently informative, not just noise.', ''] (OUT / 'big4_only_pooled_report.md').write_text('\n'.join(md), encoding='utf-8') print(f'Report: {OUT / "big4_only_pooled_report.md"}') if __name__ == '__main__': main()