e1d81e3732
Spike for the from-outside-of-firmA branch. Runs the three-method threshold framework (KDE+dip, BD/McCrary, Beta mixture / logit-GMM, 2D-GMM) on three subsets: Subset I big4_non_A KPMG+PwC+EY pooled (266 CPAs, 89.9k sigs) Subset II all_non_A every firm except Firm A (515 CPAs, 108k sigs) Subset III firm_A reference baseline (171 CPAs, 60.4k sigs) Plus pre_2018 / post_2020 time-stratified secondary on subsets I and II. Result: verdict C -- every subset is unimodal at the dip-test level (dip p > 0.76 across the board), including Firm A itself. Time stratification does not recover bimodality. Cross-subset Beta-2 cosine crossings: Firm A 0.977, big4_non_A 0.930, all_non_A 0.938; Paper A's published 0.945 sits between the two mass centers, indicating the published "natural threshold" is effectively a between-firm separator rather than a within-pool mechanism boundary. This finding motivates a follow-up reverse-anchor spike (script 33). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
779 lines
30 KiB
Python
779 lines
30 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Script 32: Non-Firm-A Calibration Spike
|
|
========================================
|
|
Research question (branch ``from-outside-of-firmA``):
|
|
If we throw away Firm A entirely, can we still derive meaningful
|
|
cosine / dHash thresholds at the accountant level?
|
|
|
|
Three subset analyses (per user's "1. 我們可以分開做" clarification):
|
|
|
|
Subset I — Big-4 minus Firm A: KPMG + PwC + EY pooled
|
|
Subset II — All non-Firm-A firms: every firm except 勤業眾信聯合
|
|
Subset III (baseline reference) — Firm A only
|
|
|
|
Each subset is run through Script 20's three-method framework
|
|
(KDE+dip, BD/McCrary, 2-component Beta mixture + logit-GMM) plus the
|
|
2D-GMM 2-comp marginal crossing from Script 18, on the
|
|
per-accountant means of:
|
|
* cos_mean = AVG(s.max_similarity_to_same_accountant)
|
|
* dh_mean = AVG(s.min_dhash_independent)
|
|
|
|
Time-stratified contingency analysis:
|
|
If Subset I/II fail to expose bimodality, we re-load each
|
|
accountant's signatures stratified into pre-2018 vs post-2020
|
|
sub-buckets (>=5 sigs per bucket required) and re-run the
|
|
three-method framework on the resulting bucket-level means.
|
|
This tests whether the time axis can substitute for the
|
|
firm-anchor axis.
|
|
|
|
Verdict (A/B/C):
|
|
A Bimodal structure emerges in Subset I or II without time
|
|
stratification, with crossings within +-0.02 (cos) / +-2.0 (dh)
|
|
of Paper A baselines (0.945, 8.10) and dip-test multimodal at
|
|
alpha=0.05. -> "outside-Firm-A calibration is viable"
|
|
B Bimodal structure only emerges after time stratification.
|
|
-> "time axis substitutes for firm anchor; v3.21 robustness or
|
|
Paper C with time-stratified design"
|
|
C No bimodality in either; crossings are unstable / outside
|
|
plausible range. -> "Firm A is required as anchor; this
|
|
strengthens Paper A's framing"
|
|
|
|
Output:
|
|
reports/non_firm_a_calibration/
|
|
non_firm_a_calibration_results.json
|
|
non_firm_a_calibration_report.md
|
|
panel_<subset>_<measure>.png
|
|
"""
|
|
|
|
import sqlite3
|
|
import json
|
|
import numpy as np
|
|
import matplotlib
|
|
matplotlib.use('Agg')
|
|
import matplotlib.pyplot as plt
|
|
from pathlib import Path
|
|
from datetime import datetime
|
|
from scipy import stats
|
|
from scipy.signal import find_peaks
|
|
from scipy.optimize import brentq
|
|
from sklearn.mixture import GaussianMixture
|
|
import diptest
|
|
|
|
DB = '/Volumes/NV2/PDF-Processing/signature-analysis/signature_analysis.db'
|
|
OUT = Path('/Volumes/NV2/PDF-Processing/signature-analysis/reports/'
|
|
'non_firm_a_calibration')
|
|
OUT.mkdir(parents=True, exist_ok=True)
|
|
|
|
EPS = 1e-6
|
|
Z_CRIT = 1.96
|
|
MIN_SIGS = 10
|
|
MIN_SIGS_PER_BUCKET = 5
|
|
|
|
FIRM_A = '勤業眾信聯合' # Deloitte
|
|
BIG4_NON_A = ('安侯建業聯合', '資誠聯合', '安永聯合') # KPMG, PwC, EY
|
|
|
|
PAPER_A_COS_BASELINE = 0.945
|
|
PAPER_A_DH_BASELINE = 8.10
|
|
|
|
|
|
# ---------- Loaders ----------
|
|
def _accountant_means_query(firm_filter_sql, params, time_filter_sql=''):
|
|
sql = f'''
|
|
SELECT s.assigned_accountant,
|
|
AVG(s.max_similarity_to_same_accountant) AS cos_mean,
|
|
AVG(CAST(s.min_dhash_independent AS REAL)) AS dh_mean,
|
|
COUNT(*) AS n
|
|
FROM signatures s
|
|
JOIN accountants a ON s.assigned_accountant = a.name
|
|
WHERE s.assigned_accountant IS NOT NULL
|
|
AND s.max_similarity_to_same_accountant IS NOT NULL
|
|
AND s.min_dhash_independent IS NOT NULL
|
|
{firm_filter_sql}
|
|
{time_filter_sql}
|
|
GROUP BY s.assigned_accountant
|
|
HAVING n >= ?
|
|
'''
|
|
return sql, params + [MIN_SIGS]
|
|
|
|
|
|
def load_subset(label):
|
|
"""Return (cos, dh, n_accountants, n_signatures)."""
|
|
conn = sqlite3.connect(DB)
|
|
cur = conn.cursor()
|
|
if label == 'big4_non_A':
|
|
firm_filter = 'AND a.firm IN (?, ?, ?)'
|
|
params = list(BIG4_NON_A)
|
|
elif label == 'all_non_A':
|
|
firm_filter = 'AND a.firm IS NOT NULL AND a.firm != ?'
|
|
params = [FIRM_A]
|
|
elif label == 'firm_A':
|
|
firm_filter = 'AND a.firm = ?'
|
|
params = [FIRM_A]
|
|
else:
|
|
raise ValueError(label)
|
|
sql, p = _accountant_means_query(firm_filter, params)
|
|
cur.execute(sql, p)
|
|
rows = cur.fetchall()
|
|
conn.close()
|
|
cos = np.array([r[1] for r in rows])
|
|
dh = np.array([r[2] for r in rows])
|
|
n_sigs = int(sum(r[3] for r in rows))
|
|
return cos, dh, len(rows), n_sigs
|
|
|
|
|
|
def load_subset_time_stratified(label, period):
|
|
"""Per-accountant means computed only from `period` signatures.
|
|
|
|
period: 'pre_2018' (year_month < 2018-01) or 'post_2020' (>= 2020-01).
|
|
"""
|
|
conn = sqlite3.connect(DB)
|
|
cur = conn.cursor()
|
|
if period == 'pre_2018':
|
|
time_filter = "AND s.year_month < '2018-01'"
|
|
elif period == 'post_2020':
|
|
time_filter = "AND s.year_month >= '2020-01'"
|
|
else:
|
|
raise ValueError(period)
|
|
if label == 'big4_non_A':
|
|
firm_filter = 'AND a.firm IN (?, ?, ?)'
|
|
params = list(BIG4_NON_A)
|
|
elif label == 'all_non_A':
|
|
firm_filter = 'AND a.firm IS NOT NULL AND a.firm != ?'
|
|
params = [FIRM_A]
|
|
else:
|
|
raise ValueError(label)
|
|
sql = f'''
|
|
SELECT s.assigned_accountant,
|
|
AVG(s.max_similarity_to_same_accountant) AS cos_mean,
|
|
AVG(CAST(s.min_dhash_independent AS REAL)) AS dh_mean,
|
|
COUNT(*) AS n
|
|
FROM signatures s
|
|
JOIN accountants a ON s.assigned_accountant = a.name
|
|
WHERE s.assigned_accountant IS NOT NULL
|
|
AND s.max_similarity_to_same_accountant IS NOT NULL
|
|
AND s.min_dhash_independent IS NOT NULL
|
|
{firm_filter}
|
|
{time_filter}
|
|
GROUP BY s.assigned_accountant
|
|
HAVING n >= ?
|
|
'''
|
|
cur.execute(sql, params + [MIN_SIGS_PER_BUCKET])
|
|
rows = cur.fetchall()
|
|
conn.close()
|
|
cos = np.array([r[1] for r in rows])
|
|
dh = np.array([r[2] for r in rows])
|
|
return cos, dh, len(rows), int(sum(r[3] for r in rows))
|
|
|
|
|
|
# ---------- Methods (lifted from Script 20) ----------
|
|
def method_kde_antimode(values):
|
|
arr = np.asarray(values, dtype=float)
|
|
arr = arr[np.isfinite(arr)]
|
|
if len(arr) < 8:
|
|
return {'n': int(len(arr)), 'note': 'too few points'}
|
|
dip, pval = diptest.diptest(arr, boot_pval=True, n_boot=2000)
|
|
kde = stats.gaussian_kde(arr, bw_method='silverman')
|
|
xs = np.linspace(arr.min(), arr.max(), 2000)
|
|
density = kde(xs)
|
|
peaks, _ = find_peaks(density, prominence=density.max() * 0.02)
|
|
antimodes = []
|
|
for i in range(len(peaks) - 1):
|
|
seg = density[peaks[i]:peaks[i + 1]]
|
|
if not len(seg):
|
|
continue
|
|
local = peaks[i] + int(np.argmin(seg))
|
|
antimodes.append(float(xs[local]))
|
|
sens = {}
|
|
for bwf in [0.5, 0.75, 1.0, 1.5, 2.0]:
|
|
kde_s = stats.gaussian_kde(arr, bw_method=kde.factor * bwf)
|
|
d_s = kde_s(xs)
|
|
p_s, _ = find_peaks(d_s, prominence=d_s.max() * 0.02)
|
|
sens[f'bw_x{bwf}'] = int(len(p_s))
|
|
return {
|
|
'n': int(len(arr)),
|
|
'dip': float(dip),
|
|
'dip_pvalue': float(pval),
|
|
'unimodal_alpha05': bool(pval > 0.05),
|
|
'kde_bandwidth_silverman': float(kde.factor),
|
|
'n_modes': int(len(peaks)),
|
|
'mode_locations': [float(xs[p]) for p in peaks],
|
|
'antimodes': antimodes,
|
|
'primary_antimode': (antimodes[0] if antimodes else None),
|
|
'bandwidth_sensitivity_n_modes': sens,
|
|
}
|
|
|
|
|
|
def method_bd_mccrary(values, bin_width, direction):
|
|
arr = np.asarray(values, dtype=float)
|
|
arr = arr[np.isfinite(arr)]
|
|
if len(arr) < 8:
|
|
return {'n': int(len(arr)), 'note': 'too few points'}
|
|
lo = float(np.floor(arr.min() / bin_width) * bin_width)
|
|
hi = float(np.ceil(arr.max() / bin_width) * bin_width)
|
|
edges = np.arange(lo, hi + bin_width, bin_width)
|
|
counts, _ = np.histogram(arr, bins=edges)
|
|
centers = (edges[:-1] + edges[1:]) / 2.0
|
|
N = counts.sum()
|
|
p = counts / N if N else counts.astype(float)
|
|
n_bins = len(counts)
|
|
z = np.full(n_bins, np.nan)
|
|
for i in range(1, n_bins - 1):
|
|
p_lo = p[i - 1]
|
|
p_hi = p[i + 1]
|
|
exp_i = 0.5 * (counts[i - 1] + counts[i + 1])
|
|
var_i = (N * p[i] * (1 - p[i])
|
|
+ 0.25 * N * (p_lo + p_hi) * (1 - p_lo - p_hi))
|
|
if var_i <= 0:
|
|
continue
|
|
z[i] = (counts[i] - exp_i) / np.sqrt(var_i)
|
|
transitions = []
|
|
for i in range(1, len(z)):
|
|
if np.isnan(z[i - 1]) or np.isnan(z[i]):
|
|
continue
|
|
ok = ((direction == 'neg_to_pos' and z[i - 1] < -Z_CRIT and z[i] > Z_CRIT)
|
|
or (direction == 'pos_to_neg' and z[i - 1] > Z_CRIT and z[i] < -Z_CRIT))
|
|
if ok:
|
|
transitions.append({
|
|
'threshold_between': float((centers[i - 1] + centers[i]) / 2.0),
|
|
'z_before': float(z[i - 1]),
|
|
'z_after': float(z[i]),
|
|
})
|
|
best = (max(transitions,
|
|
key=lambda t: abs(t['z_before']) + abs(t['z_after']))
|
|
if transitions else None)
|
|
return {
|
|
'n': int(len(arr)),
|
|
'bin_width': float(bin_width),
|
|
'direction': direction,
|
|
'n_transitions': len(transitions),
|
|
'transitions': transitions,
|
|
'threshold': (best['threshold_between'] if best else None),
|
|
}
|
|
|
|
|
|
def fit_beta_mixture_em(x, K=2, max_iter=300, tol=1e-6, seed=42):
|
|
x = np.clip(np.asarray(x, dtype=float), EPS, 1 - EPS)
|
|
n = len(x)
|
|
q = np.linspace(0, 1, K + 1)
|
|
thresh = np.quantile(x, q[1:-1])
|
|
labels = np.digitize(x, thresh)
|
|
resp = np.zeros((n, K))
|
|
resp[np.arange(n), labels] = 1.0
|
|
ll_hist = []
|
|
for it in range(max_iter):
|
|
nk = resp.sum(axis=0) + 1e-12
|
|
weights = nk / nk.sum()
|
|
mus = (resp * x[:, None]).sum(axis=0) / nk
|
|
var_num = (resp * (x[:, None] - mus) ** 2).sum(axis=0)
|
|
vars_ = var_num / nk
|
|
upper = mus * (1 - mus) - 1e-9
|
|
vars_ = np.minimum(vars_, upper)
|
|
vars_ = np.maximum(vars_, 1e-9)
|
|
factor = np.maximum(mus * (1 - mus) / vars_ - 1, 1e-6)
|
|
alphas = mus * factor
|
|
betas = (1 - mus) * factor
|
|
log_pdfs = np.column_stack([
|
|
stats.beta.logpdf(x, alphas[k], betas[k]) + np.log(weights[k])
|
|
for k in range(K)
|
|
])
|
|
m = log_pdfs.max(axis=1, keepdims=True)
|
|
ll = (m.ravel() + np.log(np.exp(log_pdfs - m).sum(axis=1))).sum()
|
|
ll_hist.append(float(ll))
|
|
new_resp = np.exp(log_pdfs - m)
|
|
new_resp = new_resp / new_resp.sum(axis=1, keepdims=True)
|
|
if it > 0 and abs(ll_hist[-1] - ll_hist[-2]) < tol:
|
|
resp = new_resp
|
|
break
|
|
resp = new_resp
|
|
order = np.argsort(mus)
|
|
alphas = alphas[order]
|
|
betas = betas[order]
|
|
weights = weights[order]
|
|
mus = mus[order]
|
|
k_params = 3 * K - 1
|
|
ll_final = ll_hist[-1]
|
|
return {
|
|
'K': K,
|
|
'alphas': [float(a) for a in alphas],
|
|
'betas': [float(b) for b in betas],
|
|
'weights': [float(w) for w in weights],
|
|
'mus': [float(m) for m in mus],
|
|
'log_likelihood': ll_final,
|
|
'aic': float(2 * k_params - 2 * ll_final),
|
|
'bic': float(k_params * np.log(n) - 2 * ll_final),
|
|
'n_iter': it + 1,
|
|
}
|
|
|
|
|
|
def beta_crossing(fit):
|
|
if fit['K'] != 2:
|
|
return None
|
|
a1, b1, w1 = fit['alphas'][0], fit['betas'][0], fit['weights'][0]
|
|
a2, b2, w2 = fit['alphas'][1], fit['betas'][1], fit['weights'][1]
|
|
|
|
def diff(x):
|
|
return (w2 * stats.beta.pdf(x, a2, b2)
|
|
- w1 * stats.beta.pdf(x, a1, b1))
|
|
xs = np.linspace(EPS, 1 - EPS, 2000)
|
|
ys = diff(xs)
|
|
changes = np.where(np.diff(np.sign(ys)) != 0)[0]
|
|
if not len(changes):
|
|
return None
|
|
mid = 0.5 * (fit['mus'][0] + fit['mus'][1])
|
|
crossings = []
|
|
for i in changes:
|
|
try:
|
|
crossings.append(brentq(diff, xs[i], xs[i + 1]))
|
|
except ValueError:
|
|
continue
|
|
if not crossings:
|
|
return None
|
|
return float(min(crossings, key=lambda c: abs(c - mid)))
|
|
|
|
|
|
def fit_logit_gmm(x, K=2, seed=42):
|
|
x = np.clip(np.asarray(x, dtype=float), EPS, 1 - EPS)
|
|
z = np.log(x / (1 - x)).reshape(-1, 1)
|
|
gmm = GaussianMixture(n_components=K, random_state=seed,
|
|
max_iter=500).fit(z)
|
|
order = np.argsort(gmm.means_.ravel())
|
|
means = gmm.means_.ravel()[order]
|
|
stds = np.sqrt(gmm.covariances_.ravel())[order]
|
|
weights = gmm.weights_[order]
|
|
crossing = None
|
|
if K == 2:
|
|
m1, s1, w1 = means[0], stds[0], weights[0]
|
|
m2, s2, w2 = means[1], stds[1], weights[1]
|
|
|
|
def diff(z0):
|
|
return (w2 * stats.norm.pdf(z0, m2, s2)
|
|
- w1 * stats.norm.pdf(z0, m1, s1))
|
|
zs = np.linspace(min(m1, m2) - 2, max(m1, m2) + 2, 2000)
|
|
ys = diff(zs)
|
|
ch = np.where(np.diff(np.sign(ys)) != 0)[0]
|
|
if len(ch):
|
|
try:
|
|
z_cross = brentq(diff, zs[ch[0]], zs[ch[0] + 1])
|
|
crossing = float(1 / (1 + np.exp(-z_cross)))
|
|
except ValueError:
|
|
pass
|
|
return {
|
|
'K': K,
|
|
'means_logit': [float(m) for m in means],
|
|
'stds_logit': [float(s) for s in stds],
|
|
'weights': [float(w) for w in weights],
|
|
'means_original_scale': [float(1 / (1 + np.exp(-m))) for m in means],
|
|
'aic': float(gmm.aic(z)),
|
|
'bic': float(gmm.bic(z)),
|
|
'crossing_original': crossing,
|
|
}
|
|
|
|
|
|
def method_beta_mixture(values, is_cosine=True):
|
|
arr = np.asarray(values, dtype=float)
|
|
arr = arr[np.isfinite(arr)]
|
|
if len(arr) < 8:
|
|
return {'n': int(len(arr)), 'note': 'too few points'}
|
|
x = arr if is_cosine else arr / 64.0
|
|
beta2 = fit_beta_mixture_em(x, K=2)
|
|
beta3 = fit_beta_mixture_em(x, K=3)
|
|
cross_beta2 = beta_crossing(beta2)
|
|
if not is_cosine and cross_beta2 is not None:
|
|
cross_beta2 = cross_beta2 * 64.0
|
|
gmm2 = fit_logit_gmm(x, K=2)
|
|
gmm3 = fit_logit_gmm(x, K=3)
|
|
if not is_cosine and gmm2.get('crossing_original') is not None:
|
|
gmm2['crossing_original'] = gmm2['crossing_original'] * 64.0
|
|
return {
|
|
'n': int(len(x)),
|
|
'scale_transform': ('identity' if is_cosine else 'dhash/64'),
|
|
'beta_2': beta2,
|
|
'beta_3': beta3,
|
|
'bic_preferred_K': (2 if beta2['bic'] < beta3['bic'] else 3),
|
|
'beta_2_crossing_original': cross_beta2,
|
|
'logit_gmm_2': gmm2,
|
|
'logit_gmm_3': gmm3,
|
|
}
|
|
|
|
|
|
def gmm_2d_marginal_crossing(cos, dh, dim):
|
|
"""2-comp 2D GMM, then marginal crossing on the requested dim."""
|
|
X = np.column_stack([cos, dh])
|
|
if len(X) < 8:
|
|
return None
|
|
gmm = GaussianMixture(n_components=2, covariance_type='full',
|
|
random_state=42, n_init=15, max_iter=500).fit(X)
|
|
means = gmm.means_
|
|
covs = gmm.covariances_
|
|
weights = gmm.weights_
|
|
m1, m2 = means[0][dim], means[1][dim]
|
|
s1 = np.sqrt(covs[0][dim, dim])
|
|
s2 = np.sqrt(covs[1][dim, dim])
|
|
w1, w2 = weights[0], weights[1]
|
|
|
|
def diff(x):
|
|
return (w2 * stats.norm.pdf(x, m2, s2)
|
|
- w1 * stats.norm.pdf(x, m1, s1))
|
|
xs = np.linspace(float(X[:, dim].min()), float(X[:, dim].max()), 2000)
|
|
ys = diff(xs)
|
|
ch = np.where(np.diff(np.sign(ys)) != 0)[0]
|
|
if not len(ch):
|
|
return None
|
|
mid = 0.5 * (m1 + m2)
|
|
crossings = []
|
|
for i in ch:
|
|
try:
|
|
crossings.append(brentq(diff, xs[i], xs[i + 1]))
|
|
except ValueError:
|
|
continue
|
|
if not crossings:
|
|
return None
|
|
return float(min(crossings, key=lambda c: abs(c - mid)))
|
|
|
|
|
|
def gmm_2d_3comp_summary(cos, dh):
|
|
"""K=3 2D GMM for completeness; report component means + weights."""
|
|
X = np.column_stack([cos, dh])
|
|
if len(X) < 12:
|
|
return None
|
|
gmm = GaussianMixture(n_components=3, covariance_type='full',
|
|
random_state=42, n_init=15, max_iter=500).fit(X)
|
|
order = np.argsort(gmm.means_[:, 0]) # sort by cosine ascending
|
|
return {
|
|
'means': [[float(m[0]), float(m[1])] for m in gmm.means_[order]],
|
|
'weights': [float(w) for w in gmm.weights_[order]],
|
|
'bic': float(gmm.bic(X)),
|
|
'aic': float(gmm.aic(X)),
|
|
}
|
|
|
|
|
|
# ---------- Driver ----------
|
|
def run_three_method(cos, dh, label):
|
|
results = {}
|
|
for desc, arr, bin_width, direction, is_cos in [
|
|
('cos_mean', cos, 0.002, 'neg_to_pos', True),
|
|
('dh_mean', dh, 0.2, 'pos_to_neg', False),
|
|
]:
|
|
m1 = method_kde_antimode(arr)
|
|
m2 = method_bd_mccrary(arr, bin_width, direction)
|
|
m3 = method_beta_mixture(arr, is_cosine=is_cos)
|
|
gmm2_marginal = gmm_2d_marginal_crossing(
|
|
cos, dh, dim=(0 if desc == 'cos_mean' else 1))
|
|
results[desc] = {
|
|
'method_1_kde_antimode': m1,
|
|
'method_2_bd_mccrary': m2,
|
|
'method_3_beta_mixture': m3,
|
|
'gmm_2d_2comp_marginal_crossing': gmm2_marginal,
|
|
}
|
|
results['gmm_2d_3comp'] = gmm_2d_3comp_summary(cos, dh)
|
|
return results
|
|
|
|
|
|
def plot_panel(values, methods, title, out_path, bin_width=None):
|
|
arr = np.asarray(values, dtype=float)
|
|
fig, axes = plt.subplots(2, 1, figsize=(11, 7),
|
|
gridspec_kw={'height_ratios': [3, 1]})
|
|
ax = axes[0]
|
|
if bin_width is None:
|
|
bins = 40
|
|
else:
|
|
lo = float(np.floor(arr.min() / bin_width) * bin_width)
|
|
hi = float(np.ceil(arr.max() / bin_width) * bin_width)
|
|
bins = np.arange(lo, hi + bin_width, bin_width)
|
|
ax.hist(arr, bins=bins, density=True, alpha=0.55, color='steelblue',
|
|
edgecolor='white')
|
|
kde = stats.gaussian_kde(arr, bw_method='silverman')
|
|
xs = np.linspace(arr.min(), arr.max(), 500)
|
|
ax.plot(xs, kde(xs), 'g-', lw=1.5, label='KDE (Silverman)')
|
|
colors = {'kde': 'green', 'bd': 'red', 'beta': 'purple',
|
|
'gmm2': 'orange', 'baseline': 'black'}
|
|
for key, (val, lbl) in methods.items():
|
|
if val is None:
|
|
continue
|
|
ls = ':' if key == 'baseline' else '--'
|
|
ax.axvline(val, color=colors.get(key, 'gray'), lw=2, ls=ls,
|
|
label=f'{lbl} = {val:.4f}')
|
|
ax.set_xlabel(title)
|
|
ax.set_ylabel('Density')
|
|
ax.set_title(title)
|
|
ax.legend(fontsize=8)
|
|
|
|
ax2 = axes[1]
|
|
ax2.set_title('Thresholds across methods')
|
|
ax2.set_xlim(ax.get_xlim())
|
|
for i, (key, (val, lbl)) in enumerate(methods.items()):
|
|
if val is None:
|
|
continue
|
|
ax2.scatter([val], [i], color=colors.get(key, 'gray'), s=100, zorder=3)
|
|
ax2.annotate(f' {lbl}: {val:.4f}', (val, i), fontsize=8, va='center')
|
|
ax2.set_yticks(range(len(methods)))
|
|
ax2.set_yticklabels([m for m in methods.keys()])
|
|
ax2.grid(alpha=0.3)
|
|
plt.tight_layout()
|
|
fig.savefig(out_path, dpi=150)
|
|
plt.close()
|
|
|
|
|
|
def emit_panel(subset_label, results):
|
|
for desc, bin_width in [('cos_mean', 0.002), ('dh_mean', 0.2)]:
|
|
if 'note' in results[desc]['method_1_kde_antimode']:
|
|
continue
|
|
baseline = (PAPER_A_COS_BASELINE if desc == 'cos_mean'
|
|
else PAPER_A_DH_BASELINE)
|
|
methods_for_plot = {
|
|
'kde': (results[desc]['method_1_kde_antimode'].get('primary_antimode'),
|
|
'KDE antimode'),
|
|
'bd': (results[desc]['method_2_bd_mccrary'].get('threshold'),
|
|
'BD/McCrary'),
|
|
'beta': (results[desc]['method_3_beta_mixture'].get(
|
|
'beta_2_crossing_original'), 'Beta-2 crossing'),
|
|
'gmm2': (results[desc]['gmm_2d_2comp_marginal_crossing'],
|
|
'2D GMM 2-comp'),
|
|
'baseline': (baseline, f'Paper A baseline'),
|
|
}
|
|
# Need data array for the plot; reload for size only
|
|
arr = np.array([]) # filled by caller via closure if needed
|
|
# Plot caller passes arr
|
|
return methods_for_plot
|
|
return None
|
|
|
|
|
|
def classify_verdict(results_by_subset):
|
|
"""Return ('A'|'B'|'C', explanation)."""
|
|
def well_separated(res, baseline_cos, baseline_dh):
|
|
cos_cross = res['cos_mean']['method_3_beta_mixture'].get(
|
|
'beta_2_crossing_original')
|
|
dh_cross = res['dh_mean']['method_3_beta_mixture'].get(
|
|
'beta_2_crossing_original')
|
|
cos_dip_p = res['cos_mean']['method_1_kde_antimode'].get('dip_pvalue')
|
|
dh_dip_p = res['dh_mean']['method_1_kde_antimode'].get('dip_pvalue')
|
|
cos_ok = (cos_cross is not None
|
|
and abs(cos_cross - baseline_cos) <= 0.02
|
|
and cos_dip_p is not None and cos_dip_p <= 0.05)
|
|
dh_ok = (dh_cross is not None
|
|
and abs(dh_cross - baseline_dh) <= 2.0
|
|
and dh_dip_p is not None and dh_dip_p <= 0.05)
|
|
return cos_ok, dh_ok
|
|
|
|
for subset in ('big4_non_A', 'all_non_A'):
|
|
res = results_by_subset.get(subset)
|
|
if not res:
|
|
continue
|
|
cos_ok, dh_ok = well_separated(res, PAPER_A_COS_BASELINE,
|
|
PAPER_A_DH_BASELINE)
|
|
if cos_ok and dh_ok:
|
|
return 'A', (f"Subset '{subset}' shows bimodal cos+dh with "
|
|
f"crossings within tolerance of Paper A baselines.")
|
|
# B: time-stratified rescues it?
|
|
for subset_period in ('big4_non_A_pre_2018',
|
|
'big4_non_A_post_2020',
|
|
'all_non_A_pre_2018',
|
|
'all_non_A_post_2020'):
|
|
res = results_by_subset.get(subset_period)
|
|
if not res:
|
|
continue
|
|
cos_ok, dh_ok = well_separated(res, PAPER_A_COS_BASELINE,
|
|
PAPER_A_DH_BASELINE)
|
|
if cos_ok and dh_ok:
|
|
return 'B', (f"Time-stratified subset '{subset_period}' recovers "
|
|
f"separable bimodality.")
|
|
return 'C', ("Neither pooled nor time-stratified non-Firm-A calibration "
|
|
"produces a baseline-consistent bimodal threshold.")
|
|
|
|
|
|
def render_report(results_by_subset, sample_sizes, verdict):
|
|
md = [
|
|
'# Non-Firm-A Calibration Spike (Script 32)',
|
|
f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}",
|
|
'',
|
|
'## Research Question',
|
|
'',
|
|
('If we exclude Firm A (Deloitte) from calibration, can the '
|
|
'three-method framework still recover a meaningful '
|
|
'cosine / dHash threshold at the accountant level?'),
|
|
'',
|
|
'## Sample Sizes',
|
|
'',
|
|
'| Subset | N accountants (>=10 sigs) | N signatures |',
|
|
'|--------|---------------------------|--------------|',
|
|
]
|
|
for label, (n_acc, n_sig) in sample_sizes.items():
|
|
md.append(f'| `{label}` | {n_acc} | {n_sig} |')
|
|
md += ['',
|
|
'## Paper A Baselines (for comparison)',
|
|
'',
|
|
f'- Accountant-level 2D GMM 2-comp marginal crossings: '
|
|
f'cos = **{PAPER_A_COS_BASELINE}**, dHash = **{PAPER_A_DH_BASELINE}**',
|
|
'']
|
|
|
|
for label, results in results_by_subset.items():
|
|
md += [f'## Subset: `{label}`', '']
|
|
for measure, baseline in [('cos_mean', PAPER_A_COS_BASELINE),
|
|
('dh_mean', PAPER_A_DH_BASELINE)]:
|
|
r = results[measure]
|
|
md += [f'### {measure}', '',
|
|
'| Method | Threshold | Supporting statistic |',
|
|
'|--------|-----------|----------------------|']
|
|
kde = r['method_1_kde_antimode']
|
|
if 'note' in kde:
|
|
md.append(f'| Method 1: KDE+dip | n/a | {kde["note"]} |')
|
|
else:
|
|
tag = 'unimodal' if kde['unimodal_alpha05'] else 'multimodal'
|
|
md.append(
|
|
f'| Method 1: KDE antimode (dip test) | '
|
|
f'{kde["primary_antimode"]} | '
|
|
f'dip={kde["dip"]:.4f}, p={kde["dip_pvalue"]:.4f} '
|
|
f'({tag}); n_modes={kde["n_modes"]} |')
|
|
bd = r['method_2_bd_mccrary']
|
|
md.append(
|
|
f'| Method 2: BD/McCrary | {bd.get("threshold")} | '
|
|
f'{bd.get("n_transitions", 0)} transition(s) |')
|
|
beta = r['method_3_beta_mixture']
|
|
if 'note' in beta:
|
|
md.append(f'| Method 3: Beta mixture | n/a | {beta["note"]} |')
|
|
else:
|
|
md.append(
|
|
f'| Method 3: 2-comp Beta mixture | '
|
|
f'{beta["beta_2_crossing_original"]} | '
|
|
f'Beta-2 BIC={beta["beta_2"]["bic"]:.2f}, '
|
|
f'Beta-3 BIC={beta["beta_3"]["bic"]:.2f} '
|
|
f'(K*={beta["bic_preferred_K"]}) |')
|
|
md.append(
|
|
f'| Method 3\': LogGMM-2 | '
|
|
f'{beta["logit_gmm_2"].get("crossing_original")} | '
|
|
f'logit-Gaussian robustness check |')
|
|
md.append(
|
|
f'| 2D GMM 2-comp marginal crossing | '
|
|
f'{r["gmm_2d_2comp_marginal_crossing"]} | '
|
|
f'paired with Paper A baseline = {baseline} |')
|
|
md.append('')
|
|
if results.get('gmm_2d_3comp'):
|
|
g3 = results['gmm_2d_3comp']
|
|
md += ['### 2D GMM K=3 components (for completeness)',
|
|
'',
|
|
'| Component | mean cos | mean dh | weight |',
|
|
'|-----------|----------|---------|--------|']
|
|
for i, (m, w) in enumerate(zip(g3['means'], g3['weights'])):
|
|
md.append(f'| C{i + 1} | {m[0]:.4f} | {m[1]:.4f} | {w:.3f} |')
|
|
md.append('')
|
|
md.append(f'BIC(K=3 2D)={g3["bic"]:.2f}, AIC={g3["aic"]:.2f}')
|
|
md.append('')
|
|
|
|
md += ['## Verdict',
|
|
'',
|
|
f'**{verdict[0]}** — {verdict[1]}',
|
|
'',
|
|
'### Verdict legend',
|
|
'- **A**: outside-Firm-A calibration is viable in pooled form '
|
|
'(crossings within +-0.02 cos / +-2.0 dh of Paper A baselines '
|
|
'AND dip-test multimodal at alpha=0.05).',
|
|
'- **B**: time-stratified subset recovers separable bimodality.',
|
|
'- **C**: neither rescue works; Firm A remains required as anchor.',
|
|
'']
|
|
return '\n'.join(md)
|
|
|
|
|
|
def main():
|
|
print('=' * 72)
|
|
print('Script 32: Non-Firm-A Calibration Spike')
|
|
print('=' * 72)
|
|
|
|
sample_sizes = {}
|
|
results_by_subset = {}
|
|
arrays_by_subset = {}
|
|
|
|
# --- Pooled subsets ---
|
|
for label in ('big4_non_A', 'all_non_A', 'firm_A'):
|
|
cos, dh, n_acc, n_sig = load_subset(label)
|
|
sample_sizes[label] = (n_acc, n_sig)
|
|
arrays_by_subset[label] = (cos, dh)
|
|
print(f'\n[{label}] N accountants={n_acc}, N sigs={n_sig}')
|
|
results_by_subset[label] = run_three_method(cos, dh, label)
|
|
for desc in ('cos_mean', 'dh_mean'):
|
|
r = results_by_subset[label][desc]
|
|
kde = r['method_1_kde_antimode']
|
|
beta = r['method_3_beta_mixture']
|
|
print(f' {desc}: dip p={kde.get("dip_pvalue")} '
|
|
f'(n_modes={kde.get("n_modes")}); '
|
|
f'Beta-2 cross={beta.get("beta_2_crossing_original")}; '
|
|
f'2D-GMM marginal={r["gmm_2d_2comp_marginal_crossing"]}')
|
|
|
|
# --- Time-stratified secondary (run unconditionally; verdict logic decides) ---
|
|
for label in ('big4_non_A', 'all_non_A'):
|
|
for period in ('pre_2018', 'post_2020'):
|
|
cos, dh, n_acc, n_sig = load_subset_time_stratified(label, period)
|
|
key = f'{label}_{period}'
|
|
sample_sizes[key] = (n_acc, n_sig)
|
|
arrays_by_subset[key] = (cos, dh)
|
|
print(f'\n[{key}] N accountants={n_acc}, N sigs={n_sig}')
|
|
if n_acc < 8:
|
|
print(f' (skipped: too few accountants for analysis)')
|
|
continue
|
|
results_by_subset[key] = run_three_method(cos, dh, key)
|
|
for desc in ('cos_mean', 'dh_mean'):
|
|
r = results_by_subset[key][desc]
|
|
kde = r['method_1_kde_antimode']
|
|
beta = r['method_3_beta_mixture']
|
|
print(f' {desc}: dip p={kde.get("dip_pvalue")} '
|
|
f'(n_modes={kde.get("n_modes")}); '
|
|
f'Beta-2 cross={beta.get("beta_2_crossing_original")}; '
|
|
f'2D-GMM marginal={r["gmm_2d_2comp_marginal_crossing"]}')
|
|
|
|
# --- Plots ---
|
|
for label, results in results_by_subset.items():
|
|
cos, dh = arrays_by_subset[label]
|
|
for desc, arr, bin_width, baseline in [
|
|
('cos_mean', cos, 0.002, PAPER_A_COS_BASELINE),
|
|
('dh_mean', dh, 0.2, PAPER_A_DH_BASELINE),
|
|
]:
|
|
r = results[desc]
|
|
if 'note' in r['method_1_kde_antimode']:
|
|
continue
|
|
methods_for_plot = {
|
|
'kde': (r['method_1_kde_antimode'].get('primary_antimode'),
|
|
'KDE antimode'),
|
|
'bd': (r['method_2_bd_mccrary'].get('threshold'),
|
|
'BD/McCrary'),
|
|
'beta': (r['method_3_beta_mixture'].get(
|
|
'beta_2_crossing_original'), 'Beta-2 crossing'),
|
|
'gmm2': (r['gmm_2d_2comp_marginal_crossing'],
|
|
'2D GMM 2-comp'),
|
|
'baseline': (baseline, 'Paper A baseline'),
|
|
}
|
|
png = OUT / f'panel_{label}_{desc}.png'
|
|
plot_panel(arr, methods_for_plot,
|
|
f'{label} -- accountant-level {desc}',
|
|
png, bin_width=bin_width)
|
|
print(f' plot: {png}')
|
|
|
|
# --- Verdict ---
|
|
verdict = classify_verdict(results_by_subset)
|
|
print(f'\nVerdict: {verdict[0]} -- {verdict[1]}')
|
|
|
|
# --- Persist ---
|
|
payload = {
|
|
'generated_at': datetime.now().isoformat(),
|
|
'min_sigs_per_accountant': MIN_SIGS,
|
|
'min_sigs_per_bucket_time_stratified': MIN_SIGS_PER_BUCKET,
|
|
'paper_a_baselines': {
|
|
'cos': PAPER_A_COS_BASELINE,
|
|
'dh': PAPER_A_DH_BASELINE,
|
|
},
|
|
'sample_sizes': {k: {'n_accountants': v[0], 'n_signatures': v[1]}
|
|
for k, v in sample_sizes.items()},
|
|
'results': results_by_subset,
|
|
'verdict': {'class': verdict[0], 'explanation': verdict[1]},
|
|
}
|
|
(OUT / 'non_firm_a_calibration_results.json').write_text(
|
|
json.dumps(payload, indent=2, ensure_ascii=False), encoding='utf-8')
|
|
print(f'\nJSON: {OUT / "non_firm_a_calibration_results.json"}')
|
|
|
|
md = render_report(results_by_subset, sample_sizes, verdict)
|
|
(OUT / 'non_firm_a_calibration_report.md').write_text(md, encoding='utf-8')
|
|
print(f'Report: {OUT / "non_firm_a_calibration_report.md"}')
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|