Files
pdf_signature_extraction/signature_analysis/37_v4_k3_loo_check.py
T
gbanyan 92f1db831a Add script 37: K=3 LOOO check (P2_PARTIAL — v4.0 is salvageable with K=3)
Follow-up to Script 36's K=2 UNSTABLE finding. Tests whether K=3's
C1 hand-leaning component (~14% weight, cos~0.946, dh~9.17 from
Script 35) is firm-mass driven or a real cross-firm sub-population.

Result: C1 component shape IS stable across LOOO folds.

  Fold       C1 cos    C1 dh    C1 weight
  baseline   0.9457    9.1715   0.143
  -FirmA     0.9425   10.1263   0.145
  -KPMG      0.9441    9.1591   0.127
  -PwC       0.9504    8.4068   0.126
  -EY        0.9439    9.2897   0.120

  Max drift vs baseline: cos 0.0047, dh 0.955, weight 0.023
  -- all within heuristic stability bars (0.01, 1.0, 0.10).

Held-out prediction divergence vs Script 35 baseline:

  Firm A     predicted  4.68%  vs baseline  0.0%   (+4.68 pp)
  KPMG       predicted  7.14%  vs baseline  8.9%   (-1.76 pp)
  PwC        predicted 36.27%  vs baseline 23.5%   (+12.77 pp)
  EY         predicted 17.31%  vs baseline 11.5%   (+5.81 pp)

Verdict: P2_PARTIAL.

Methodological insight: K=3 disentangles the firm-mass/mechanism
confound that broke K=2. C3 (cos~0.983, dh~2.4) absorbs Firm A's
templated mass; C1 (cos~0.946, dh~9.17) captures cross-firm
hand-leaning. Membership boundary shifts slightly (±5-13 pp)
across folds, reflecting honest calibration uncertainty rather
than collapse.

Implication: v4.0 can pivot to a "characterized cluster structure
with bounded reproducibility" framing instead of the original
"clean natural threshold" pitch. Honest, defensible, but a
different paper than v3.20.0 was building.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-12 14:57:40 +08:00

479 lines
19 KiB
Python

#!/usr/bin/env python3
"""
Script 37: K=3 Leave-One-Firm-Out Check (Path P2 viability test)
=================================================================
Follow-up to Script 36's UNSTABLE K=2 LOOO finding. Tests whether the
K=3 mixture's C1 component (lowest-cosine "hand-leaning" cluster,
~14% weight per Script 35) is a real cross-firm sub-population or
is also firm-mass driven.
Reference: Script 35 (full Big-4 K=3) reported C1 cluster membership:
Firm A 0/171 = 0.0%
KPMG 10/112 = 8.9%
PwC 24/102 = 23.5%
EY 6/52 = 11.5%
The hypothesis: if C1 is a true cross-firm hand-leaning sub-population,
then:
- Across 4 LOOO folds, the C1 component should sit at roughly the
same (cos, dh) coordinates with similar weight.
- When the held-out firm's CPAs are assigned via the fold's K=3
posterior, the fraction in C1 should approximate the Script 35
full-data percentages.
If C1 collapses, shifts dramatically, or fails to predict held-out
membership, then K=3 is also firm-mass driven and Path P2 fails.
Output:
reports/v4_big4/k3_loo_check/
k3_loo_results.json
k3_loo_report.md
panel_k3_loo_<firm>.png
"""
import sqlite3
import json
import numpy as np
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt
from pathlib import Path
from datetime import datetime
from sklearn.mixture import GaussianMixture
from scipy.stats import norm
DB = '/Volumes/NV2/PDF-Processing/signature-analysis/signature_analysis.db'
OUT = Path('/Volumes/NV2/PDF-Processing/signature-analysis/reports/'
'v4_big4/k3_loo_check')
OUT.mkdir(parents=True, exist_ok=True)
MIN_SIGS = 10
SEED = 42
BIG4 = ('勤業眾信聯合', '安侯建業聯合', '資誠聯合', '安永聯合')
LABEL = {'勤業眾信聯合': 'Firm A (Deloitte)', '安侯建業聯合': 'KPMG',
'資誠聯合': 'PwC', '安永聯合': 'EY'}
SLUG = {'勤業眾信聯合': 'FirmA', '安侯建業聯合': 'KPMG',
'資誠聯合': 'PwC', '安永聯合': 'EY'}
# Script 35 full-Big-4 K=3 baseline (informational; reproduce here as expected)
SCRIPT35_C1_PCT = {'勤業眾信聯合': 0.0, '安侯建業聯合': 8.9,
'資誠聯合': 23.5, '安永聯合': 11.5}
def load_big4_accountants():
conn = sqlite3.connect(DB)
cur = conn.cursor()
cur.execute('''
SELECT s.assigned_accountant,
a.firm,
AVG(s.max_similarity_to_same_accountant) AS cos_mean,
AVG(CAST(s.min_dhash_independent AS REAL)) AS dh_mean,
COUNT(*) AS n
FROM signatures s
JOIN accountants a ON s.assigned_accountant = a.name
WHERE s.assigned_accountant IS NOT NULL
AND s.max_similarity_to_same_accountant IS NOT NULL
AND s.min_dhash_independent IS NOT NULL
AND a.firm IN (?, ?, ?, ?)
GROUP BY s.assigned_accountant
HAVING n >= ?
''', BIG4 + (MIN_SIGS,))
rows = cur.fetchall()
conn.close()
return [{'cpa': r[0], 'firm': r[1],
'cos_mean': float(r[2]), 'dh_mean': float(r[3]),
'n_sigs': int(r[4])} for r in rows]
def fit_k3(X):
return GaussianMixture(n_components=3, covariance_type='full',
random_state=SEED, n_init=15, max_iter=500).fit(X)
def sort_components_by_cos(gmm):
"""Return ordering such that comp[0] has lowest cosine mean."""
return np.argsort(gmm.means_[:, 0])
def wilson_ci(k, n, alpha=0.05):
if n == 0:
return (0.0, 1.0)
z = norm.ppf(1 - alpha / 2)
phat = k / n
denom = 1 + z * z / n
center = (phat + z * z / (2 * n)) / denom
pm = z * np.sqrt(phat * (1 - phat) / n + z * z / (4 * n * n)) / denom
return (max(0.0, center - pm), min(1.0, center + pm))
def run_full_baseline(cpas):
print('\n[A] Full-Big-4 K=3 baseline (replicates Script 35)')
X = np.column_stack([
[c['cos_mean'] for c in cpas],
[c['dh_mean'] for c in cpas],
])
gmm = fit_k3(X)
order = sort_components_by_cos(gmm)
means = gmm.means_[order]
weights = gmm.weights_[order]
raw_labels = gmm.predict(X)
label_map = {old: new for new, old in enumerate(order)}
labels = np.array([label_map[l] for l in raw_labels])
by_firm_c1 = {f: 0 for f in BIG4}
by_firm_total = {f: 0 for f in BIG4}
for c, lab in zip(cpas, labels):
by_firm_total[c['firm']] += 1
if lab == 0:
by_firm_c1[c['firm']] += 1
print(f' C1 (hand-leaning) center: cos={means[0,0]:.4f}, '
f'dh={means[0,1]:.4f}, weight={weights[0]:.3f}')
print(f' C2 (mixed) center: cos={means[1,0]:.4f}, '
f'dh={means[1,1]:.4f}, weight={weights[1]:.3f}')
print(f' C3 (replicated) center: cos={means[2,0]:.4f}, '
f'dh={means[2,1]:.4f}, weight={weights[2]:.3f}')
print(' C1 membership by firm:')
for f in BIG4:
n = by_firm_total[f]
k = by_firm_c1[f]
pct = 100 * k / n if n else 0.0
print(f' {LABEL[f]:<22} {k:>3}/{n:>3} = {pct:5.2f}% '
f'(Script 35 expected: {SCRIPT35_C1_PCT[f]}%)')
return {
'means_sorted': means.tolist(),
'weights_sorted': weights.tolist(),
'c1_membership_by_firm': {
f: {'k': int(by_firm_c1[f]), 'n': int(by_firm_total[f]),
'pct': float(100 * by_firm_c1[f] / by_firm_total[f])
if by_firm_total[f] else 0.0}
for f in BIG4
},
}
def run_loo(cpas):
by_firm = {}
for c in cpas:
by_firm.setdefault(c['firm'], []).append(c)
fold_results = {}
for held_firm in BIG4:
train = [c for c in cpas if c['firm'] != held_firm]
held = by_firm[held_firm]
X_train = np.column_stack([
[c['cos_mean'] for c in train],
[c['dh_mean'] for c in train],
])
X_held = np.column_stack([
[c['cos_mean'] for c in held],
[c['dh_mean'] for c in held],
])
gmm = fit_k3(X_train)
order = sort_components_by_cos(gmm)
means = gmm.means_[order]
weights = gmm.weights_[order]
# Posterior on held-out
raw_post = gmm.predict_proba(X_held)
post = raw_post[:, order]
held_labels = np.argmax(post, axis=1)
n_c1 = int(np.sum(held_labels == 0))
n_c2 = int(np.sum(held_labels == 1))
n_c3 = int(np.sum(held_labels == 2))
n_held = len(held)
c1_rate = n_c1 / n_held if n_held else 0.0
wlo, whi = wilson_ci(n_c1, n_held)
# Train-side weights for stability check
print(f'\n[B] LOOO fold: held = {LABEL[held_firm]}')
print(f' train K=3 components (sorted by cos):')
for i in range(3):
print(f' C{i+1}: cos={means[i,0]:.4f}, dh={means[i,1]:.4f}, '
f'weight={weights[i]:.3f}')
print(f' held-out assignments: C1={n_c1}/{n_held} = '
f'{c1_rate*100:.2f}% [Wilson 95%: '
f'{wlo*100:.2f}%, {whi*100:.2f}%]')
print(f' C2={n_c2}/{n_held} = '
f'{n_c2/n_held*100:.2f}%')
print(f' C3={n_c3}/{n_held} = '
f'{n_c3/n_held*100:.2f}%')
print(f' Script 35 expected C1 for {LABEL[held_firm]}: '
f'{SCRIPT35_C1_PCT[held_firm]}%')
fold_results[held_firm] = {
'n_train': len(train),
'n_held': n_held,
'k3_components_sorted_by_cos': {
'means': means.tolist(),
'weights': weights.tolist(),
},
'held_out_assignments': {
'n_c1_handleaning': n_c1,
'n_c2_mixed': n_c2,
'n_c3_replicated': n_c3,
'c1_rate': float(c1_rate),
'c1_wilson95': [float(wlo), float(whi)],
},
'script35_expected_c1_pct': SCRIPT35_C1_PCT[held_firm],
}
return fold_results
def stability_summary(fold_results, baseline):
"""Aggregate C1 component drift across folds."""
c1_means_cos = [fold_results[f]['k3_components_sorted_by_cos']['means'][0][0]
for f in BIG4]
c1_means_dh = [fold_results[f]['k3_components_sorted_by_cos']['means'][0][1]
for f in BIG4]
c1_weights = [fold_results[f]['k3_components_sorted_by_cos']['weights'][0]
for f in BIG4]
base_c1_cos = baseline['means_sorted'][0][0]
base_c1_dh = baseline['means_sorted'][0][1]
base_c1_w = baseline['weights_sorted'][0]
summary = {
'fold_c1_cos_means': c1_means_cos,
'fold_c1_dh_means': c1_means_dh,
'fold_c1_weights': c1_weights,
'baseline_c1': {'cos': base_c1_cos, 'dh': base_c1_dh,
'weight': base_c1_w},
'max_c1_cos_dev_from_baseline': float(
max(abs(np.array(c1_means_cos) - base_c1_cos))),
'max_c1_dh_dev_from_baseline': float(
max(abs(np.array(c1_means_dh) - base_c1_dh))),
'max_c1_weight_dev_from_baseline': float(
max(abs(np.array(c1_weights) - base_c1_w))),
}
# Heuristic stability bars (these are exploratory, not formal test):
cos_stable = summary['max_c1_cos_dev_from_baseline'] <= 0.01
dh_stable = summary['max_c1_dh_dev_from_baseline'] <= 1.0
weight_stable = summary['max_c1_weight_dev_from_baseline'] <= 0.10
summary['cos_stable'] = bool(cos_stable)
summary['dh_stable'] = bool(dh_stable)
summary['weight_stable'] = bool(weight_stable)
summary['c1_component_stable'] = bool(cos_stable and dh_stable
and weight_stable)
# Held-out C1 prediction agreement with Script 35 expectation
pred_v_expected = []
for f in BIG4:
actual = fold_results[f]['held_out_assignments']['c1_rate'] * 100
expected = SCRIPT35_C1_PCT[f]
pred_v_expected.append({
'firm': LABEL[f],
'predicted_c1_pct': actual,
'expected_c1_pct': expected,
'abs_diff': abs(actual - expected),
})
summary['held_out_prediction_check'] = pred_v_expected
summary['max_abs_pct_diff'] = float(max(p['abs_diff']
for p in pred_v_expected))
# Verdict
if (summary['c1_component_stable']
and summary['max_abs_pct_diff'] <= 5.0):
verdict = 'P2_STRONG'
msg = ('K=3 C1 component is stable across LOOO folds (cos drift '
'<= 0.01, dh drift <= 1.0, weight drift <= 0.10); held-out '
'C1 predictions agree with Script 35 baseline within 5pp. '
'Path P2 is viable: K=3 captures a real cross-firm '
'hand-leaning cluster.')
elif summary['c1_component_stable']:
verdict = 'P2_PARTIAL'
msg = ('K=3 C1 component is stable but held-out C1 prediction '
f'diverges from Script 35 baseline (max abs diff '
f'{summary["max_abs_pct_diff"]:.1f}pp). Cluster exists but '
'membership is not well-predicted by held-out fit.')
else:
verdict = 'P2_WEAK'
msg = ('K=3 C1 component is NOT stable across LOOO folds (cos drift '
f'{summary["max_c1_cos_dev_from_baseline"]:.4f}, dh drift '
f'{summary["max_c1_dh_dev_from_baseline"]:.3f}, weight drift '
f'{summary["max_c1_weight_dev_from_baseline"]:.3f}). '
'K=3 is also firm-mass driven; Path P2 fails.')
summary['verdict'] = verdict
summary['verdict_message'] = msg
return summary
def render_panels(cpas, fold_results):
by_firm = {}
for c in cpas:
by_firm.setdefault(c['firm'], []).append(c)
for held_firm in BIG4:
held = by_firm[held_firm]
train = [c for c in cpas if c['firm'] != held_firm]
fr = fold_results[held_firm]
means = np.array(fr['k3_components_sorted_by_cos']['means'])
weights = fr['k3_components_sorted_by_cos']['weights']
rate = fr['held_out_assignments']['c1_rate']
n_c1 = fr['held_out_assignments']['n_c1_handleaning']
n_h = fr['n_held']
wlo, whi = fr['held_out_assignments']['c1_wilson95']
fig, ax = plt.subplots(figsize=(9, 7))
ax.scatter([c['cos_mean'] for c in train],
[c['dh_mean'] for c in train], s=18, alpha=0.4,
color='lightgray',
label=f'Train (other Big-3, n={len(train)})')
ax.scatter([c['cos_mean'] for c in held],
[c['dh_mean'] for c in held], s=42, alpha=0.85,
color='crimson', edgecolor='white',
label=f'Held-out: {LABEL[held_firm]} (n={n_h})')
markers = ['v', 's', '^']
comp_colors = ['darkred', 'goldenrod', 'navy']
comp_labels = ['C1 hand-leaning', 'C2 mixed', 'C3 replicated']
for i in range(3):
ax.scatter([means[i, 0]], [means[i, 1]], s=200,
marker=markers[i], color=comp_colors[i],
edgecolor='black', linewidth=1.5,
label=f'{comp_labels[i]}: ({means[i,0]:.3f}, '
f'{means[i,1]:.2f}), w={weights[i]:.2f}')
ax.set_xlabel('Accountant cos_mean')
ax.set_ylabel('Accountant dh_mean')
ax.set_title(
f'K=3 LOOO held-out {LABEL[held_firm]}: C1 = {n_c1}/{n_h} = '
f'{rate*100:.1f}% [Wilson 95%: {wlo*100:.1f}%, '
f'{whi*100:.1f}%]\n(Script 35 baseline expected: '
f'{SCRIPT35_C1_PCT[held_firm]}%)')
ax.legend(fontsize=8, loc='upper right')
ax.grid(alpha=0.3)
fig.tight_layout()
fig.savefig(OUT / f'panel_k3_loo_{SLUG[held_firm]}.png', dpi=150)
plt.close(fig)
def render_md(baseline, fold_results, summary):
md = [
'# Phase 1.5: K=3 LOOO Check (Path P2 viability)',
f'Generated: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}',
'',
'## A. Full-Big-4 K=3 baseline (replicates Script 35)',
'',
'| Component | mean cos | mean dh | weight |',
'|---|---|---|---|',
]
for i, (m, w) in enumerate(zip(baseline['means_sorted'],
baseline['weights_sorted'])):
name = ['C1 hand-leaning', 'C2 mixed',
'C3 replicated'][i]
md.append(f'| {name} | {m[0]:.4f} | {m[1]:.4f} | {w:.3f} |')
md += ['',
'### Baseline C1 membership by firm',
'',
'| Firm | Baseline C1 / total | % | Script 35 expected |',
'|---|---|---|---|']
for f in BIG4:
b = baseline['c1_membership_by_firm'][f]
md.append(f'| {LABEL[f]} | {b["k"]}/{b["n"]} | {b["pct"]:.2f}% | '
f'{SCRIPT35_C1_PCT[f]}% |')
md += ['', '## B. Leave-One-Firm-Out K=3 fits', '']
for f in BIG4:
fr = fold_results[f]
means = fr['k3_components_sorted_by_cos']['means']
weights = fr['k3_components_sorted_by_cos']['weights']
ass = fr['held_out_assignments']
md += [f'### Held-out: {LABEL[f]}',
'',
f'- n_train = {fr["n_train"]}, n_held = {fr["n_held"]}',
f'- Held-out assignments: '
f'C1={ass["n_c1_handleaning"]}/{fr["n_held"]} = '
f'{ass["c1_rate"]*100:.2f}% '
f'[Wilson 95%: {ass["c1_wilson95"][0]*100:.2f}%, '
f'{ass["c1_wilson95"][1]*100:.2f}%]; '
f'C2={ass["n_c2_mixed"]}; C3={ass["n_c3_replicated"]}',
f'- Script 35 baseline expected C1: '
f'{SCRIPT35_C1_PCT[f]}%',
'',
'| Train K=3 component | mean cos | mean dh | weight |',
'|---|---|---|---|']
for i, (m, w) in enumerate(zip(means, weights)):
name = ['C1 hand-leaning', 'C2 mixed',
'C3 replicated'][i]
md.append(f'| {name} | {m[0]:.4f} | {m[1]:.4f} | {w:.3f} |')
md.append('')
md += ['## C. Cross-fold C1 stability summary', '',
f'- Baseline C1 (full Big-4): cos = '
f'{summary["baseline_c1"]["cos"]:.4f}, dh = '
f'{summary["baseline_c1"]["dh"]:.4f}, weight = '
f'{summary["baseline_c1"]["weight"]:.3f}',
f'- Fold C1 cos means: {summary["fold_c1_cos_means"]}',
f'- Fold C1 dh means: {summary["fold_c1_dh_means"]}',
f'- Fold C1 weights : {summary["fold_c1_weights"]}',
f'- Max |C1 cos dev| vs baseline: '
f'{summary["max_c1_cos_dev_from_baseline"]:.4f} '
f'(stable bar: 0.01, {"OK" if summary["cos_stable"] else "FAIL"})',
f'- Max |C1 dh dev| vs baseline: '
f'{summary["max_c1_dh_dev_from_baseline"]:.3f} '
f'(stable bar: 1.0, {"OK" if summary["dh_stable"] else "FAIL"})',
f'- Max |C1 weight dev| vs baseline: '
f'{summary["max_c1_weight_dev_from_baseline"]:.3f} '
f'(stable bar: 0.10, {"OK" if summary["weight_stable"] else "FAIL"})',
'',
'### Held-out prediction vs Script 35 baseline',
'',
'| Firm | Predicted C1% | Expected C1% | |diff| pp |',
'|---|---|---|---|']
for entry in summary['held_out_prediction_check']:
md.append(f'| {entry["firm"]} | {entry["predicted_c1_pct"]:.2f}% | '
f'{entry["expected_c1_pct"]}% | '
f'{entry["abs_diff"]:.2f} |')
md += ['',
f'- Max |%diff| across folds: {summary["max_abs_pct_diff"]:.2f}pp '
f'(viable bar: <= 5.0 pp)',
'',
f'## Verdict: **{summary["verdict"]}**',
'',
summary['verdict_message'],
'',
'### Verdict legend',
'- **P2_STRONG**: C1 cluster reproducible across folds AND '
'held-out predictions match Script 35 baseline within 5 pp. '
'K=3 captures a real cross-firm hand-leaning sub-population; '
'Paper A v4.0 can use K=3 hard assignment as the operational '
'classifier.',
'- **P2_PARTIAL**: C1 cluster shape reproducible but membership '
'predictions diverge. Cluster exists conceptually but is not '
'predictively useful as an operational classifier.',
'- **P2_WEAK**: C1 cluster shifts substantially across folds. '
'K=3 is also firm-mass driven; v4.0 needs a different strategy '
'(P1 firm-templatedness reframe, P3 rollback, or P4 '
'reverse-anchor).',
]
return '\n'.join(md)
def main():
print('=' * 72)
print('Script 37: K=3 LOOO Check (Path P2 viability)')
print('=' * 72)
cpas = load_big4_accountants()
print(f'\nN Big-4 CPAs: {len(cpas)}')
baseline = run_full_baseline(cpas)
fold_results = run_loo(cpas)
summary = stability_summary(fold_results, baseline)
print(f'\n[C] Verdict: {summary["verdict"]}')
print(f' {summary["verdict_message"]}')
render_panels(cpas, fold_results)
payload = {
'generated_at': datetime.now().isoformat(),
'min_sigs_per_accountant': MIN_SIGS,
'random_seed': SEED,
'n_cpas_total': len(cpas),
'baseline': baseline,
'loo_folds': fold_results,
'stability_summary': summary,
'script35_c1_baseline_pct': SCRIPT35_C1_PCT,
}
json_path = OUT / 'k3_loo_results.json'
json_path.write_text(json.dumps(payload, indent=2, ensure_ascii=False),
encoding='utf-8')
print(f'JSON: {json_path}')
md = render_md(baseline, fold_results, summary)
md_path = OUT / 'k3_loo_report.md'
md_path.write_text(md, encoding='utf-8')
print(f'Report: {md_path}')
if __name__ == '__main__':
main()