Files
pdf_signature_extraction/signature_analysis/32_non_firm_a_calibration.py
T
gbanyan e1d81e3732 Add script 32: non-Firm-A calibration spike (verdict C with twist)
Spike for the from-outside-of-firmA branch. Runs the three-method
threshold framework (KDE+dip, BD/McCrary, Beta mixture / logit-GMM,
2D-GMM) on three subsets:

  Subset I  big4_non_A   KPMG+PwC+EY pooled (266 CPAs, 89.9k sigs)
  Subset II all_non_A    every firm except Firm A (515 CPAs, 108k sigs)
  Subset III firm_A      reference baseline (171 CPAs, 60.4k sigs)

Plus pre_2018 / post_2020 time-stratified secondary on subsets I and II.

Result: verdict C -- every subset is unimodal at the dip-test level
(dip p > 0.76 across the board), including Firm A itself.  Time
stratification does not recover bimodality.

Cross-subset Beta-2 cosine crossings: Firm A 0.977, big4_non_A 0.930,
all_non_A 0.938; Paper A's published 0.945 sits between the two mass
centers, indicating the published "natural threshold" is effectively
a between-firm separator rather than a within-pool mechanism boundary.
This finding motivates a follow-up reverse-anchor spike (script 33).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-12 12:05:18 +08:00

779 lines
30 KiB
Python

#!/usr/bin/env python3
"""
Script 32: Non-Firm-A Calibration Spike
========================================
Research question (branch ``from-outside-of-firmA``):
If we throw away Firm A entirely, can we still derive meaningful
cosine / dHash thresholds at the accountant level?
Three subset analyses (per user's "1. 我們可以分開做" clarification):
Subset I — Big-4 minus Firm A: KPMG + PwC + EY pooled
Subset II — All non-Firm-A firms: every firm except 勤業眾信聯合
Subset III (baseline reference) — Firm A only
Each subset is run through Script 20's three-method framework
(KDE+dip, BD/McCrary, 2-component Beta mixture + logit-GMM) plus the
2D-GMM 2-comp marginal crossing from Script 18, on the
per-accountant means of:
* cos_mean = AVG(s.max_similarity_to_same_accountant)
* dh_mean = AVG(s.min_dhash_independent)
Time-stratified contingency analysis:
If Subset I/II fail to expose bimodality, we re-load each
accountant's signatures stratified into pre-2018 vs post-2020
sub-buckets (>=5 sigs per bucket required) and re-run the
three-method framework on the resulting bucket-level means.
This tests whether the time axis can substitute for the
firm-anchor axis.
Verdict (A/B/C):
A Bimodal structure emerges in Subset I or II without time
stratification, with crossings within +-0.02 (cos) / +-2.0 (dh)
of Paper A baselines (0.945, 8.10) and dip-test multimodal at
alpha=0.05. -> "outside-Firm-A calibration is viable"
B Bimodal structure only emerges after time stratification.
-> "time axis substitutes for firm anchor; v3.21 robustness or
Paper C with time-stratified design"
C No bimodality in either; crossings are unstable / outside
plausible range. -> "Firm A is required as anchor; this
strengthens Paper A's framing"
Output:
reports/non_firm_a_calibration/
non_firm_a_calibration_results.json
non_firm_a_calibration_report.md
panel_<subset>_<measure>.png
"""
import sqlite3
import json
import numpy as np
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt
from pathlib import Path
from datetime import datetime
from scipy import stats
from scipy.signal import find_peaks
from scipy.optimize import brentq
from sklearn.mixture import GaussianMixture
import diptest
DB = '/Volumes/NV2/PDF-Processing/signature-analysis/signature_analysis.db'
OUT = Path('/Volumes/NV2/PDF-Processing/signature-analysis/reports/'
'non_firm_a_calibration')
OUT.mkdir(parents=True, exist_ok=True)
EPS = 1e-6
Z_CRIT = 1.96
MIN_SIGS = 10
MIN_SIGS_PER_BUCKET = 5
FIRM_A = '勤業眾信聯合' # Deloitte
BIG4_NON_A = ('安侯建業聯合', '資誠聯合', '安永聯合') # KPMG, PwC, EY
PAPER_A_COS_BASELINE = 0.945
PAPER_A_DH_BASELINE = 8.10
# ---------- Loaders ----------
def _accountant_means_query(firm_filter_sql, params, time_filter_sql=''):
sql = f'''
SELECT s.assigned_accountant,
AVG(s.max_similarity_to_same_accountant) AS cos_mean,
AVG(CAST(s.min_dhash_independent AS REAL)) AS dh_mean,
COUNT(*) AS n
FROM signatures s
JOIN accountants a ON s.assigned_accountant = a.name
WHERE s.assigned_accountant IS NOT NULL
AND s.max_similarity_to_same_accountant IS NOT NULL
AND s.min_dhash_independent IS NOT NULL
{firm_filter_sql}
{time_filter_sql}
GROUP BY s.assigned_accountant
HAVING n >= ?
'''
return sql, params + [MIN_SIGS]
def load_subset(label):
"""Return (cos, dh, n_accountants, n_signatures)."""
conn = sqlite3.connect(DB)
cur = conn.cursor()
if label == 'big4_non_A':
firm_filter = 'AND a.firm IN (?, ?, ?)'
params = list(BIG4_NON_A)
elif label == 'all_non_A':
firm_filter = 'AND a.firm IS NOT NULL AND a.firm != ?'
params = [FIRM_A]
elif label == 'firm_A':
firm_filter = 'AND a.firm = ?'
params = [FIRM_A]
else:
raise ValueError(label)
sql, p = _accountant_means_query(firm_filter, params)
cur.execute(sql, p)
rows = cur.fetchall()
conn.close()
cos = np.array([r[1] for r in rows])
dh = np.array([r[2] for r in rows])
n_sigs = int(sum(r[3] for r in rows))
return cos, dh, len(rows), n_sigs
def load_subset_time_stratified(label, period):
"""Per-accountant means computed only from `period` signatures.
period: 'pre_2018' (year_month < 2018-01) or 'post_2020' (>= 2020-01).
"""
conn = sqlite3.connect(DB)
cur = conn.cursor()
if period == 'pre_2018':
time_filter = "AND s.year_month < '2018-01'"
elif period == 'post_2020':
time_filter = "AND s.year_month >= '2020-01'"
else:
raise ValueError(period)
if label == 'big4_non_A':
firm_filter = 'AND a.firm IN (?, ?, ?)'
params = list(BIG4_NON_A)
elif label == 'all_non_A':
firm_filter = 'AND a.firm IS NOT NULL AND a.firm != ?'
params = [FIRM_A]
else:
raise ValueError(label)
sql = f'''
SELECT s.assigned_accountant,
AVG(s.max_similarity_to_same_accountant) AS cos_mean,
AVG(CAST(s.min_dhash_independent AS REAL)) AS dh_mean,
COUNT(*) AS n
FROM signatures s
JOIN accountants a ON s.assigned_accountant = a.name
WHERE s.assigned_accountant IS NOT NULL
AND s.max_similarity_to_same_accountant IS NOT NULL
AND s.min_dhash_independent IS NOT NULL
{firm_filter}
{time_filter}
GROUP BY s.assigned_accountant
HAVING n >= ?
'''
cur.execute(sql, params + [MIN_SIGS_PER_BUCKET])
rows = cur.fetchall()
conn.close()
cos = np.array([r[1] for r in rows])
dh = np.array([r[2] for r in rows])
return cos, dh, len(rows), int(sum(r[3] for r in rows))
# ---------- Methods (lifted from Script 20) ----------
def method_kde_antimode(values):
arr = np.asarray(values, dtype=float)
arr = arr[np.isfinite(arr)]
if len(arr) < 8:
return {'n': int(len(arr)), 'note': 'too few points'}
dip, pval = diptest.diptest(arr, boot_pval=True, n_boot=2000)
kde = stats.gaussian_kde(arr, bw_method='silverman')
xs = np.linspace(arr.min(), arr.max(), 2000)
density = kde(xs)
peaks, _ = find_peaks(density, prominence=density.max() * 0.02)
antimodes = []
for i in range(len(peaks) - 1):
seg = density[peaks[i]:peaks[i + 1]]
if not len(seg):
continue
local = peaks[i] + int(np.argmin(seg))
antimodes.append(float(xs[local]))
sens = {}
for bwf in [0.5, 0.75, 1.0, 1.5, 2.0]:
kde_s = stats.gaussian_kde(arr, bw_method=kde.factor * bwf)
d_s = kde_s(xs)
p_s, _ = find_peaks(d_s, prominence=d_s.max() * 0.02)
sens[f'bw_x{bwf}'] = int(len(p_s))
return {
'n': int(len(arr)),
'dip': float(dip),
'dip_pvalue': float(pval),
'unimodal_alpha05': bool(pval > 0.05),
'kde_bandwidth_silverman': float(kde.factor),
'n_modes': int(len(peaks)),
'mode_locations': [float(xs[p]) for p in peaks],
'antimodes': antimodes,
'primary_antimode': (antimodes[0] if antimodes else None),
'bandwidth_sensitivity_n_modes': sens,
}
def method_bd_mccrary(values, bin_width, direction):
arr = np.asarray(values, dtype=float)
arr = arr[np.isfinite(arr)]
if len(arr) < 8:
return {'n': int(len(arr)), 'note': 'too few points'}
lo = float(np.floor(arr.min() / bin_width) * bin_width)
hi = float(np.ceil(arr.max() / bin_width) * bin_width)
edges = np.arange(lo, hi + bin_width, bin_width)
counts, _ = np.histogram(arr, bins=edges)
centers = (edges[:-1] + edges[1:]) / 2.0
N = counts.sum()
p = counts / N if N else counts.astype(float)
n_bins = len(counts)
z = np.full(n_bins, np.nan)
for i in range(1, n_bins - 1):
p_lo = p[i - 1]
p_hi = p[i + 1]
exp_i = 0.5 * (counts[i - 1] + counts[i + 1])
var_i = (N * p[i] * (1 - p[i])
+ 0.25 * N * (p_lo + p_hi) * (1 - p_lo - p_hi))
if var_i <= 0:
continue
z[i] = (counts[i] - exp_i) / np.sqrt(var_i)
transitions = []
for i in range(1, len(z)):
if np.isnan(z[i - 1]) or np.isnan(z[i]):
continue
ok = ((direction == 'neg_to_pos' and z[i - 1] < -Z_CRIT and z[i] > Z_CRIT)
or (direction == 'pos_to_neg' and z[i - 1] > Z_CRIT and z[i] < -Z_CRIT))
if ok:
transitions.append({
'threshold_between': float((centers[i - 1] + centers[i]) / 2.0),
'z_before': float(z[i - 1]),
'z_after': float(z[i]),
})
best = (max(transitions,
key=lambda t: abs(t['z_before']) + abs(t['z_after']))
if transitions else None)
return {
'n': int(len(arr)),
'bin_width': float(bin_width),
'direction': direction,
'n_transitions': len(transitions),
'transitions': transitions,
'threshold': (best['threshold_between'] if best else None),
}
def fit_beta_mixture_em(x, K=2, max_iter=300, tol=1e-6, seed=42):
x = np.clip(np.asarray(x, dtype=float), EPS, 1 - EPS)
n = len(x)
q = np.linspace(0, 1, K + 1)
thresh = np.quantile(x, q[1:-1])
labels = np.digitize(x, thresh)
resp = np.zeros((n, K))
resp[np.arange(n), labels] = 1.0
ll_hist = []
for it in range(max_iter):
nk = resp.sum(axis=0) + 1e-12
weights = nk / nk.sum()
mus = (resp * x[:, None]).sum(axis=0) / nk
var_num = (resp * (x[:, None] - mus) ** 2).sum(axis=0)
vars_ = var_num / nk
upper = mus * (1 - mus) - 1e-9
vars_ = np.minimum(vars_, upper)
vars_ = np.maximum(vars_, 1e-9)
factor = np.maximum(mus * (1 - mus) / vars_ - 1, 1e-6)
alphas = mus * factor
betas = (1 - mus) * factor
log_pdfs = np.column_stack([
stats.beta.logpdf(x, alphas[k], betas[k]) + np.log(weights[k])
for k in range(K)
])
m = log_pdfs.max(axis=1, keepdims=True)
ll = (m.ravel() + np.log(np.exp(log_pdfs - m).sum(axis=1))).sum()
ll_hist.append(float(ll))
new_resp = np.exp(log_pdfs - m)
new_resp = new_resp / new_resp.sum(axis=1, keepdims=True)
if it > 0 and abs(ll_hist[-1] - ll_hist[-2]) < tol:
resp = new_resp
break
resp = new_resp
order = np.argsort(mus)
alphas = alphas[order]
betas = betas[order]
weights = weights[order]
mus = mus[order]
k_params = 3 * K - 1
ll_final = ll_hist[-1]
return {
'K': K,
'alphas': [float(a) for a in alphas],
'betas': [float(b) for b in betas],
'weights': [float(w) for w in weights],
'mus': [float(m) for m in mus],
'log_likelihood': ll_final,
'aic': float(2 * k_params - 2 * ll_final),
'bic': float(k_params * np.log(n) - 2 * ll_final),
'n_iter': it + 1,
}
def beta_crossing(fit):
if fit['K'] != 2:
return None
a1, b1, w1 = fit['alphas'][0], fit['betas'][0], fit['weights'][0]
a2, b2, w2 = fit['alphas'][1], fit['betas'][1], fit['weights'][1]
def diff(x):
return (w2 * stats.beta.pdf(x, a2, b2)
- w1 * stats.beta.pdf(x, a1, b1))
xs = np.linspace(EPS, 1 - EPS, 2000)
ys = diff(xs)
changes = np.where(np.diff(np.sign(ys)) != 0)[0]
if not len(changes):
return None
mid = 0.5 * (fit['mus'][0] + fit['mus'][1])
crossings = []
for i in changes:
try:
crossings.append(brentq(diff, xs[i], xs[i + 1]))
except ValueError:
continue
if not crossings:
return None
return float(min(crossings, key=lambda c: abs(c - mid)))
def fit_logit_gmm(x, K=2, seed=42):
x = np.clip(np.asarray(x, dtype=float), EPS, 1 - EPS)
z = np.log(x / (1 - x)).reshape(-1, 1)
gmm = GaussianMixture(n_components=K, random_state=seed,
max_iter=500).fit(z)
order = np.argsort(gmm.means_.ravel())
means = gmm.means_.ravel()[order]
stds = np.sqrt(gmm.covariances_.ravel())[order]
weights = gmm.weights_[order]
crossing = None
if K == 2:
m1, s1, w1 = means[0], stds[0], weights[0]
m2, s2, w2 = means[1], stds[1], weights[1]
def diff(z0):
return (w2 * stats.norm.pdf(z0, m2, s2)
- w1 * stats.norm.pdf(z0, m1, s1))
zs = np.linspace(min(m1, m2) - 2, max(m1, m2) + 2, 2000)
ys = diff(zs)
ch = np.where(np.diff(np.sign(ys)) != 0)[0]
if len(ch):
try:
z_cross = brentq(diff, zs[ch[0]], zs[ch[0] + 1])
crossing = float(1 / (1 + np.exp(-z_cross)))
except ValueError:
pass
return {
'K': K,
'means_logit': [float(m) for m in means],
'stds_logit': [float(s) for s in stds],
'weights': [float(w) for w in weights],
'means_original_scale': [float(1 / (1 + np.exp(-m))) for m in means],
'aic': float(gmm.aic(z)),
'bic': float(gmm.bic(z)),
'crossing_original': crossing,
}
def method_beta_mixture(values, is_cosine=True):
arr = np.asarray(values, dtype=float)
arr = arr[np.isfinite(arr)]
if len(arr) < 8:
return {'n': int(len(arr)), 'note': 'too few points'}
x = arr if is_cosine else arr / 64.0
beta2 = fit_beta_mixture_em(x, K=2)
beta3 = fit_beta_mixture_em(x, K=3)
cross_beta2 = beta_crossing(beta2)
if not is_cosine and cross_beta2 is not None:
cross_beta2 = cross_beta2 * 64.0
gmm2 = fit_logit_gmm(x, K=2)
gmm3 = fit_logit_gmm(x, K=3)
if not is_cosine and gmm2.get('crossing_original') is not None:
gmm2['crossing_original'] = gmm2['crossing_original'] * 64.0
return {
'n': int(len(x)),
'scale_transform': ('identity' if is_cosine else 'dhash/64'),
'beta_2': beta2,
'beta_3': beta3,
'bic_preferred_K': (2 if beta2['bic'] < beta3['bic'] else 3),
'beta_2_crossing_original': cross_beta2,
'logit_gmm_2': gmm2,
'logit_gmm_3': gmm3,
}
def gmm_2d_marginal_crossing(cos, dh, dim):
"""2-comp 2D GMM, then marginal crossing on the requested dim."""
X = np.column_stack([cos, dh])
if len(X) < 8:
return None
gmm = GaussianMixture(n_components=2, covariance_type='full',
random_state=42, n_init=15, max_iter=500).fit(X)
means = gmm.means_
covs = gmm.covariances_
weights = gmm.weights_
m1, m2 = means[0][dim], means[1][dim]
s1 = np.sqrt(covs[0][dim, dim])
s2 = np.sqrt(covs[1][dim, dim])
w1, w2 = weights[0], weights[1]
def diff(x):
return (w2 * stats.norm.pdf(x, m2, s2)
- w1 * stats.norm.pdf(x, m1, s1))
xs = np.linspace(float(X[:, dim].min()), float(X[:, dim].max()), 2000)
ys = diff(xs)
ch = np.where(np.diff(np.sign(ys)) != 0)[0]
if not len(ch):
return None
mid = 0.5 * (m1 + m2)
crossings = []
for i in ch:
try:
crossings.append(brentq(diff, xs[i], xs[i + 1]))
except ValueError:
continue
if not crossings:
return None
return float(min(crossings, key=lambda c: abs(c - mid)))
def gmm_2d_3comp_summary(cos, dh):
"""K=3 2D GMM for completeness; report component means + weights."""
X = np.column_stack([cos, dh])
if len(X) < 12:
return None
gmm = GaussianMixture(n_components=3, covariance_type='full',
random_state=42, n_init=15, max_iter=500).fit(X)
order = np.argsort(gmm.means_[:, 0]) # sort by cosine ascending
return {
'means': [[float(m[0]), float(m[1])] for m in gmm.means_[order]],
'weights': [float(w) for w in gmm.weights_[order]],
'bic': float(gmm.bic(X)),
'aic': float(gmm.aic(X)),
}
# ---------- Driver ----------
def run_three_method(cos, dh, label):
results = {}
for desc, arr, bin_width, direction, is_cos in [
('cos_mean', cos, 0.002, 'neg_to_pos', True),
('dh_mean', dh, 0.2, 'pos_to_neg', False),
]:
m1 = method_kde_antimode(arr)
m2 = method_bd_mccrary(arr, bin_width, direction)
m3 = method_beta_mixture(arr, is_cosine=is_cos)
gmm2_marginal = gmm_2d_marginal_crossing(
cos, dh, dim=(0 if desc == 'cos_mean' else 1))
results[desc] = {
'method_1_kde_antimode': m1,
'method_2_bd_mccrary': m2,
'method_3_beta_mixture': m3,
'gmm_2d_2comp_marginal_crossing': gmm2_marginal,
}
results['gmm_2d_3comp'] = gmm_2d_3comp_summary(cos, dh)
return results
def plot_panel(values, methods, title, out_path, bin_width=None):
arr = np.asarray(values, dtype=float)
fig, axes = plt.subplots(2, 1, figsize=(11, 7),
gridspec_kw={'height_ratios': [3, 1]})
ax = axes[0]
if bin_width is None:
bins = 40
else:
lo = float(np.floor(arr.min() / bin_width) * bin_width)
hi = float(np.ceil(arr.max() / bin_width) * bin_width)
bins = np.arange(lo, hi + bin_width, bin_width)
ax.hist(arr, bins=bins, density=True, alpha=0.55, color='steelblue',
edgecolor='white')
kde = stats.gaussian_kde(arr, bw_method='silverman')
xs = np.linspace(arr.min(), arr.max(), 500)
ax.plot(xs, kde(xs), 'g-', lw=1.5, label='KDE (Silverman)')
colors = {'kde': 'green', 'bd': 'red', 'beta': 'purple',
'gmm2': 'orange', 'baseline': 'black'}
for key, (val, lbl) in methods.items():
if val is None:
continue
ls = ':' if key == 'baseline' else '--'
ax.axvline(val, color=colors.get(key, 'gray'), lw=2, ls=ls,
label=f'{lbl} = {val:.4f}')
ax.set_xlabel(title)
ax.set_ylabel('Density')
ax.set_title(title)
ax.legend(fontsize=8)
ax2 = axes[1]
ax2.set_title('Thresholds across methods')
ax2.set_xlim(ax.get_xlim())
for i, (key, (val, lbl)) in enumerate(methods.items()):
if val is None:
continue
ax2.scatter([val], [i], color=colors.get(key, 'gray'), s=100, zorder=3)
ax2.annotate(f' {lbl}: {val:.4f}', (val, i), fontsize=8, va='center')
ax2.set_yticks(range(len(methods)))
ax2.set_yticklabels([m for m in methods.keys()])
ax2.grid(alpha=0.3)
plt.tight_layout()
fig.savefig(out_path, dpi=150)
plt.close()
def emit_panel(subset_label, results):
for desc, bin_width in [('cos_mean', 0.002), ('dh_mean', 0.2)]:
if 'note' in results[desc]['method_1_kde_antimode']:
continue
baseline = (PAPER_A_COS_BASELINE if desc == 'cos_mean'
else PAPER_A_DH_BASELINE)
methods_for_plot = {
'kde': (results[desc]['method_1_kde_antimode'].get('primary_antimode'),
'KDE antimode'),
'bd': (results[desc]['method_2_bd_mccrary'].get('threshold'),
'BD/McCrary'),
'beta': (results[desc]['method_3_beta_mixture'].get(
'beta_2_crossing_original'), 'Beta-2 crossing'),
'gmm2': (results[desc]['gmm_2d_2comp_marginal_crossing'],
'2D GMM 2-comp'),
'baseline': (baseline, f'Paper A baseline'),
}
# Need data array for the plot; reload for size only
arr = np.array([]) # filled by caller via closure if needed
# Plot caller passes arr
return methods_for_plot
return None
def classify_verdict(results_by_subset):
"""Return ('A'|'B'|'C', explanation)."""
def well_separated(res, baseline_cos, baseline_dh):
cos_cross = res['cos_mean']['method_3_beta_mixture'].get(
'beta_2_crossing_original')
dh_cross = res['dh_mean']['method_3_beta_mixture'].get(
'beta_2_crossing_original')
cos_dip_p = res['cos_mean']['method_1_kde_antimode'].get('dip_pvalue')
dh_dip_p = res['dh_mean']['method_1_kde_antimode'].get('dip_pvalue')
cos_ok = (cos_cross is not None
and abs(cos_cross - baseline_cos) <= 0.02
and cos_dip_p is not None and cos_dip_p <= 0.05)
dh_ok = (dh_cross is not None
and abs(dh_cross - baseline_dh) <= 2.0
and dh_dip_p is not None and dh_dip_p <= 0.05)
return cos_ok, dh_ok
for subset in ('big4_non_A', 'all_non_A'):
res = results_by_subset.get(subset)
if not res:
continue
cos_ok, dh_ok = well_separated(res, PAPER_A_COS_BASELINE,
PAPER_A_DH_BASELINE)
if cos_ok and dh_ok:
return 'A', (f"Subset '{subset}' shows bimodal cos+dh with "
f"crossings within tolerance of Paper A baselines.")
# B: time-stratified rescues it?
for subset_period in ('big4_non_A_pre_2018',
'big4_non_A_post_2020',
'all_non_A_pre_2018',
'all_non_A_post_2020'):
res = results_by_subset.get(subset_period)
if not res:
continue
cos_ok, dh_ok = well_separated(res, PAPER_A_COS_BASELINE,
PAPER_A_DH_BASELINE)
if cos_ok and dh_ok:
return 'B', (f"Time-stratified subset '{subset_period}' recovers "
f"separable bimodality.")
return 'C', ("Neither pooled nor time-stratified non-Firm-A calibration "
"produces a baseline-consistent bimodal threshold.")
def render_report(results_by_subset, sample_sizes, verdict):
md = [
'# Non-Firm-A Calibration Spike (Script 32)',
f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}",
'',
'## Research Question',
'',
('If we exclude Firm A (Deloitte) from calibration, can the '
'three-method framework still recover a meaningful '
'cosine / dHash threshold at the accountant level?'),
'',
'## Sample Sizes',
'',
'| Subset | N accountants (>=10 sigs) | N signatures |',
'|--------|---------------------------|--------------|',
]
for label, (n_acc, n_sig) in sample_sizes.items():
md.append(f'| `{label}` | {n_acc} | {n_sig} |')
md += ['',
'## Paper A Baselines (for comparison)',
'',
f'- Accountant-level 2D GMM 2-comp marginal crossings: '
f'cos = **{PAPER_A_COS_BASELINE}**, dHash = **{PAPER_A_DH_BASELINE}**',
'']
for label, results in results_by_subset.items():
md += [f'## Subset: `{label}`', '']
for measure, baseline in [('cos_mean', PAPER_A_COS_BASELINE),
('dh_mean', PAPER_A_DH_BASELINE)]:
r = results[measure]
md += [f'### {measure}', '',
'| Method | Threshold | Supporting statistic |',
'|--------|-----------|----------------------|']
kde = r['method_1_kde_antimode']
if 'note' in kde:
md.append(f'| Method 1: KDE+dip | n/a | {kde["note"]} |')
else:
tag = 'unimodal' if kde['unimodal_alpha05'] else 'multimodal'
md.append(
f'| Method 1: KDE antimode (dip test) | '
f'{kde["primary_antimode"]} | '
f'dip={kde["dip"]:.4f}, p={kde["dip_pvalue"]:.4f} '
f'({tag}); n_modes={kde["n_modes"]} |')
bd = r['method_2_bd_mccrary']
md.append(
f'| Method 2: BD/McCrary | {bd.get("threshold")} | '
f'{bd.get("n_transitions", 0)} transition(s) |')
beta = r['method_3_beta_mixture']
if 'note' in beta:
md.append(f'| Method 3: Beta mixture | n/a | {beta["note"]} |')
else:
md.append(
f'| Method 3: 2-comp Beta mixture | '
f'{beta["beta_2_crossing_original"]} | '
f'Beta-2 BIC={beta["beta_2"]["bic"]:.2f}, '
f'Beta-3 BIC={beta["beta_3"]["bic"]:.2f} '
f'(K*={beta["bic_preferred_K"]}) |')
md.append(
f'| Method 3\': LogGMM-2 | '
f'{beta["logit_gmm_2"].get("crossing_original")} | '
f'logit-Gaussian robustness check |')
md.append(
f'| 2D GMM 2-comp marginal crossing | '
f'{r["gmm_2d_2comp_marginal_crossing"]} | '
f'paired with Paper A baseline = {baseline} |')
md.append('')
if results.get('gmm_2d_3comp'):
g3 = results['gmm_2d_3comp']
md += ['### 2D GMM K=3 components (for completeness)',
'',
'| Component | mean cos | mean dh | weight |',
'|-----------|----------|---------|--------|']
for i, (m, w) in enumerate(zip(g3['means'], g3['weights'])):
md.append(f'| C{i + 1} | {m[0]:.4f} | {m[1]:.4f} | {w:.3f} |')
md.append('')
md.append(f'BIC(K=3 2D)={g3["bic"]:.2f}, AIC={g3["aic"]:.2f}')
md.append('')
md += ['## Verdict',
'',
f'**{verdict[0]}** — {verdict[1]}',
'',
'### Verdict legend',
'- **A**: outside-Firm-A calibration is viable in pooled form '
'(crossings within +-0.02 cos / +-2.0 dh of Paper A baselines '
'AND dip-test multimodal at alpha=0.05).',
'- **B**: time-stratified subset recovers separable bimodality.',
'- **C**: neither rescue works; Firm A remains required as anchor.',
'']
return '\n'.join(md)
def main():
print('=' * 72)
print('Script 32: Non-Firm-A Calibration Spike')
print('=' * 72)
sample_sizes = {}
results_by_subset = {}
arrays_by_subset = {}
# --- Pooled subsets ---
for label in ('big4_non_A', 'all_non_A', 'firm_A'):
cos, dh, n_acc, n_sig = load_subset(label)
sample_sizes[label] = (n_acc, n_sig)
arrays_by_subset[label] = (cos, dh)
print(f'\n[{label}] N accountants={n_acc}, N sigs={n_sig}')
results_by_subset[label] = run_three_method(cos, dh, label)
for desc in ('cos_mean', 'dh_mean'):
r = results_by_subset[label][desc]
kde = r['method_1_kde_antimode']
beta = r['method_3_beta_mixture']
print(f' {desc}: dip p={kde.get("dip_pvalue")} '
f'(n_modes={kde.get("n_modes")}); '
f'Beta-2 cross={beta.get("beta_2_crossing_original")}; '
f'2D-GMM marginal={r["gmm_2d_2comp_marginal_crossing"]}')
# --- Time-stratified secondary (run unconditionally; verdict logic decides) ---
for label in ('big4_non_A', 'all_non_A'):
for period in ('pre_2018', 'post_2020'):
cos, dh, n_acc, n_sig = load_subset_time_stratified(label, period)
key = f'{label}_{period}'
sample_sizes[key] = (n_acc, n_sig)
arrays_by_subset[key] = (cos, dh)
print(f'\n[{key}] N accountants={n_acc}, N sigs={n_sig}')
if n_acc < 8:
print(f' (skipped: too few accountants for analysis)')
continue
results_by_subset[key] = run_three_method(cos, dh, key)
for desc in ('cos_mean', 'dh_mean'):
r = results_by_subset[key][desc]
kde = r['method_1_kde_antimode']
beta = r['method_3_beta_mixture']
print(f' {desc}: dip p={kde.get("dip_pvalue")} '
f'(n_modes={kde.get("n_modes")}); '
f'Beta-2 cross={beta.get("beta_2_crossing_original")}; '
f'2D-GMM marginal={r["gmm_2d_2comp_marginal_crossing"]}')
# --- Plots ---
for label, results in results_by_subset.items():
cos, dh = arrays_by_subset[label]
for desc, arr, bin_width, baseline in [
('cos_mean', cos, 0.002, PAPER_A_COS_BASELINE),
('dh_mean', dh, 0.2, PAPER_A_DH_BASELINE),
]:
r = results[desc]
if 'note' in r['method_1_kde_antimode']:
continue
methods_for_plot = {
'kde': (r['method_1_kde_antimode'].get('primary_antimode'),
'KDE antimode'),
'bd': (r['method_2_bd_mccrary'].get('threshold'),
'BD/McCrary'),
'beta': (r['method_3_beta_mixture'].get(
'beta_2_crossing_original'), 'Beta-2 crossing'),
'gmm2': (r['gmm_2d_2comp_marginal_crossing'],
'2D GMM 2-comp'),
'baseline': (baseline, 'Paper A baseline'),
}
png = OUT / f'panel_{label}_{desc}.png'
plot_panel(arr, methods_for_plot,
f'{label} -- accountant-level {desc}',
png, bin_width=bin_width)
print(f' plot: {png}')
# --- Verdict ---
verdict = classify_verdict(results_by_subset)
print(f'\nVerdict: {verdict[0]} -- {verdict[1]}')
# --- Persist ---
payload = {
'generated_at': datetime.now().isoformat(),
'min_sigs_per_accountant': MIN_SIGS,
'min_sigs_per_bucket_time_stratified': MIN_SIGS_PER_BUCKET,
'paper_a_baselines': {
'cos': PAPER_A_COS_BASELINE,
'dh': PAPER_A_DH_BASELINE,
},
'sample_sizes': {k: {'n_accountants': v[0], 'n_signatures': v[1]}
for k, v in sample_sizes.items()},
'results': results_by_subset,
'verdict': {'class': verdict[0], 'explanation': verdict[1]},
}
(OUT / 'non_firm_a_calibration_results.json').write_text(
json.dumps(payload, indent=2, ensure_ascii=False), encoding='utf-8')
print(f'\nJSON: {OUT / "non_firm_a_calibration_results.json"}')
md = render_report(results_by_subset, sample_sizes, verdict)
(OUT / 'non_firm_a_calibration_report.md').write_text(md, encoding='utf-8')
print(f'Report: {OUT / "non_firm_a_calibration_report.md"}')
if __name__ == '__main__':
main()