Files
pdf_signature_extraction/signature_analysis/39c_v4_midsmall_signature_diptest.py
gbanyan d4f370bd5e Add Scripts 39b/c/d/e + 40b + 43: anchor-based FAR diagnostics
Spike checkpoint in response to codex rounds 28-30 review:

- 39b/c: signature-level dip test on Big-4 and non-Big-4 marginals
- 39d: dHash discrete-value robustness (raw vs jittered + histogram
  valleys + firm residualization); confirms within-firm dHash dip
  rejection is integer-mass-point artefact
- 39e: dHash firm-residualized + jittered 2x2 factorial decomposition;
  confirms Big-4 pooled dh "multimodality" is composition + integer
  artefact (centered + jittered p=0.35, 0/5 seeds reject)
- 40b: inter-CPA per-pair FAR sweep (cos + dh marginal + joint +
  conditional); replicates v3 cos>0.95 FAR=0.0006 and provides
  v4-new dh FAR curve
- 43: pool-normalized per-signature FAR (codex round-30 fix for
  per-pair vs per-signature conflation); per-sig FAR for deployed
  any-pair rule = 11.02%, per-firm structure shows Firm A 20% vs
  B/C/D <1%

These scripts replace the distributional path (K=3 mixture / dip /
antimode) with anchor-based threshold derivation. Companion
artefacts in reports/v4_big4/{signature_level_diptest,
midsmall_signature_diptest, dhash_discrete_robustness,
inter_cpa_far_sweep, pool_normalized_far}/.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-13 14:08:49 +08:00

215 lines
8.0 KiB
Python

#!/usr/bin/env python3
"""
Script 39c: Mid/Small-Firm Signature-Level Dip Test
====================================================
Companion to Script 39b. 39b showed every Big-4 firm rejects
unimodality on the dHash signature marginal (p < 5e-4 in each
of A/B/C/D) while every Big-4 firm fails to reject unimodality
on the cosine marginal. This script asks the same questions of
the mid/small-firm population (non-Big-4):
1. Does the pooled mid/small-firm signature cloud show the same
dHash multimodality?
2. Within individual mid/small firms (those with enough
signatures to support the test), does the dHash multimodality
hold firm-internally as it does in Big-4?
If yes, the dHash signature-level multimodality is corpus-universal
and the Big-4 scope restriction of v4.0 is not necessary on dHash
grounds (cf §III-G item 2 which currently rests on Big-4-level
multimodality). The cosine axis is reported alongside for
completeness, but no v4.0 claim turns on cosine multimodality
outside Big-4.
Outputs:
reports/v4_big4/midsmall_signature_diptest/
midsmall_diptest_results.json
midsmall_diptest_report.md
"""
import json
import sqlite3
import numpy as np
import diptest
from pathlib import Path
from datetime import datetime
from scipy import stats
from scipy.signal import find_peaks
DB = '/Volumes/NV2/PDF-Processing/signature-analysis/signature_analysis.db'
OUT = Path('/Volumes/NV2/PDF-Processing/signature-analysis/reports/'
'v4_big4/midsmall_signature_diptest')
OUT.mkdir(parents=True, exist_ok=True)
BIG4 = ('勤業眾信聯合', '安侯建業聯合', '資誠聯合', '安永聯合')
N_BOOT = 2000
SINGLE_FIRM_MIN_SIG = 500 # minimum signature count to run a per-firm dip test
def load_non_big4_signatures():
conn = sqlite3.connect(DB)
cur = conn.cursor()
cur.execute('''
SELECT a.firm,
s.max_similarity_to_same_accountant,
CAST(s.min_dhash_independent AS REAL)
FROM signatures s
JOIN accountants a ON s.assigned_accountant = a.name
WHERE s.assigned_accountant IS NOT NULL
AND s.max_similarity_to_same_accountant IS NOT NULL
AND s.min_dhash_independent IS NOT NULL
AND a.firm IS NOT NULL
AND a.firm NOT IN (?, ?, ?, ?)
''', BIG4)
rows = cur.fetchall()
conn.close()
return rows
def kde_dip(values, n_boot=N_BOOT):
arr = np.asarray(values, dtype=float)
arr = arr[np.isfinite(arr)]
if len(arr) < 10:
return {'n': int(len(arr)), 'skipped': 'too few points'}
dip, pval = diptest.diptest(arr, boot_pval=True, n_boot=n_boot)
kde = stats.gaussian_kde(arr, bw_method='silverman')
xs = np.linspace(arr.min(), arr.max(), 2000)
density = kde(xs)
peaks, _ = find_peaks(density, prominence=density.max() * 0.02)
antimodes = []
for i in range(len(peaks) - 1):
seg = density[peaks[i]:peaks[i + 1]]
if not len(seg):
continue
local = peaks[i] + int(np.argmin(seg))
antimodes.append(float(xs[local]))
return {
'n': int(len(arr)),
'dip': float(dip),
'dip_pvalue': float(pval),
'unimodal_alpha05': bool(pval > 0.05),
'n_modes': int(len(peaks)),
'mode_locations': [float(xs[p]) for p in peaks],
'antimodes': antimodes,
'n_boot': int(n_boot),
}
def _fmt_p(p):
if p == 0.0:
return '< 5e-4'
return f'{p:.4g}'
def main():
print('=' * 72)
print('Script 39c: Mid/Small-Firm Signature-Level Dip Test')
print('=' * 72)
rows = load_non_big4_signatures()
cos_all = np.array([r[1] for r in rows], dtype=float)
dh_all = np.array([r[2] for r in rows], dtype=float)
firms = np.array([r[0] for r in rows])
n_total = len(rows)
print(f'\nLoaded {n_total:,} non-Big-4 signatures across '
f'{len(set(firms))} firms')
# Firm size table
firm_counts = {}
for f in firms:
firm_counts[f] = firm_counts.get(f, 0) + 1
top = sorted(firm_counts.items(), key=lambda x: -x[1])
print('\nTop firms by signature count:')
for f, n in top[:10]:
print(f' {f}: {n:,}')
results = {
'meta': {
'script': '39c',
'timestamp': datetime.now().isoformat(timespec='seconds'),
'n_total': int(n_total),
'n_firms': int(len(firm_counts)),
'n_boot': N_BOOT,
'single_firm_min_sig': SINGLE_FIRM_MIN_SIG,
},
'pooled': {},
'per_firm_eligible': {},
'firm_counts': dict(firm_counts),
}
# A. Pooled non-Big-4
print('\n[A] Pooled non-Big-4')
for desc, arr in [('cos', cos_all), ('dh_indep', dh_all)]:
r = kde_dip(arr)
results['pooled'][desc] = r
print(f' {desc}: n={r["n"]:,}, dip={r["dip"]:.5f}, '
f'p={_fmt_p(r["dip_pvalue"])}, n_modes={r["n_modes"]}')
# B. Per-firm (only firms with >= SINGLE_FIRM_MIN_SIG signatures)
eligible = [f for f, n in firm_counts.items() if n >= SINGLE_FIRM_MIN_SIG]
print(f'\n[B] Per-firm dip test '
f'(firms with >= {SINGLE_FIRM_MIN_SIG} signatures: {len(eligible)})')
for f in sorted(eligible, key=lambda x: -firm_counts[x]):
mask = firms == f
results['per_firm_eligible'][f] = {'n': int(mask.sum())}
for desc, arr in [('cos', cos_all[mask]), ('dh_indep', dh_all[mask])]:
r = kde_dip(arr)
results['per_firm_eligible'][f][desc] = r
print(f' {f[:20]:<22s} {desc}: n={r["n"]:,}, dip={r["dip"]:.5f}, '
f'p={_fmt_p(r["dip_pvalue"])}, n_modes={r["n_modes"]}')
json_path = OUT / 'midsmall_diptest_results.json'
json_path.write_text(json.dumps(results, indent=2, ensure_ascii=False),
encoding='utf-8')
print(f'\n[json] {json_path}')
md = ['# Mid/Small-Firm Signature-Level Dip Test (Script 39c)',
'',
f'Generated: {results["meta"]["timestamp"]}',
f'Bootstrap replicates: {N_BOOT}',
'',
'## A. Pooled non-Big-4 signature cloud',
'',
f'n = {n_total:,} signatures across '
f'{results["meta"]["n_firms"]} firms',
'',
'| Marginal | dip | p (boot) | n_modes | unimodal @0.05 |',
'|---|---|---|---|---|']
for desc in ['cos', 'dh_indep']:
r = results['pooled'][desc]
md.append(f'| {desc} | {r["dip"]:.5f} | {_fmt_p(r["dip_pvalue"])} | '
f'{r["n_modes"]} | {r["unimodal_alpha05"]} |')
md += ['', f'## B. Single mid/small firms (>= {SINGLE_FIRM_MIN_SIG} '
f'signatures), {len(eligible)} qualify', '',
'| Firm | Marginal | n | dip | p (boot) | n_modes | unimodal @0.05 |',
'|---|---|---|---|---|---|---|']
for f in sorted(eligible, key=lambda x: -firm_counts[x]):
for desc in ['cos', 'dh_indep']:
r = results['per_firm_eligible'][f][desc]
md.append(f'| {f[:20]} | {desc} | {r["n"]:,} | {r["dip"]:.5f} | '
f'{_fmt_p(r["dip_pvalue"])} | {r["n_modes"]} | '
f'{r["unimodal_alpha05"]} |')
md += ['',
'## Reading guide',
'',
('If the pooled-non-Big-4 dHash marginal rejects unimodality '
'AND the qualifying individual mid/small firms also reject, '
'the dHash within-firm replication regime structure is '
'corpus-universal and not Big-4-specific. In that case the '
'Big-4 scope of v4.0 is justified on cosine-axis grounds '
'(Firm-A composition; §III-G item 1) and accountant-level '
'LOOO reproducibility (§III-G item 3), but not on dHash '
'multimodality grounds (§III-G item 2 should be re-scoped or '
'qualified). If the per-firm dHash tests instead fail to '
'reject inside mid/small firms, the dHash multimodality is '
'Big-4-specific and §III-G item 2 holds as stated.'),
'']
md_path = OUT / 'midsmall_diptest_report.md'
md_path.write_text('\n'.join(md), encoding='utf-8')
print(f'[md ] {md_path}')
if __name__ == '__main__':
main()