Files
pdf_signature_extraction/signature_analysis/39_v4_signature_level_convergence.py
gbanyan 39575cef49 Add script 39: signature-level convergence (SIG_CONVERGENCE_MODERATE)
Phase 1.7 follow-up to Script 38's per-CPA convergence. Tests
whether the convergence holds at signature granularity, preempting
"per-CPA aggregation washes out signal" reviewer attacks.

Three signature-level labels per Big-4 signature (n=150,442):
  L1 PaperA      non_hand iff cos > 0.95 AND dh <= 5
  L2 K=3 perCPA  hard assignment under per-CPA-fit components
  L3 K=3 perSig  hard assignment under fresh signature-level fit

Component comparison (per-CPA vs per-signature K=3):

  Component        Per-CPA cos/dh/wt     Per-Sig cos/dh/wt
  C1 hand-leaning  0.9457/9.17/0.143     0.9280/9.75/0.146
  C2 mixed         0.9558/6.66/0.536     0.9625/6.04/0.582
  C3 replicated    0.9826/2.41/0.321     0.9890/1.27/0.272

  Component drift modest: max |dcos| = 0.018, max |ddh| = 1.15.

Cohen kappa (binary, 1 = replicated):

  PaperA vs K=3 perCPA       kappa = 0.6616  substantial
  PaperA vs K=3 perSig       kappa = 0.5586  moderate
  K=3 perCPA vs K=3 perSig   kappa = 0.8701  almost perfect

Per-firm binary agreement PaperA vs K=3 perCPA:

  Firm A 86.13%, KPMG 77.46%, PwC 82.64%, EY 85.01%.

Verdict: SIG_CONVERGENCE_MODERATE (all kappas >= 0.40; per-CPA
aggregation captures most signature-level structure).

Implication for v4.0: per-CPA K=3 is robust to aggregation level
(kappa = 0.87 vs per-signature fit). The modest disagreement
between K=3 and Paper A's box rule (kappa 0.56-0.66) reflects
different decision geometries -- K=3 posterior soft boundary vs
Paper A rectangle box -- not a fundamental signal disagreement.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-12 15:07:48 +08:00

392 lines
15 KiB
Python

#!/usr/bin/env python3
"""
Script 39: Signature-Level Convergence (preempts aggregation attack)
======================================================================
Phase 1.7 follow-up to Script 38's per-CPA convergence. Verifies
that the per-CPA K=3 + reverse-anchor + Paper A agreement holds at
the signature level (not just per-CPA mean), so a reviewer cannot
attack with "you washed out within-CPA heterogeneity by averaging".
Three labels per Big-4 signature:
L1 PaperA_rule: non_hand iff cos > 0.95 AND dh <= 5
L2 K3_perCPA: hard assignment under per-CPA K=3 components
fit on accountant means (Script 38 baseline)
L3 K3_perSig: hard assignment under a fresh K=3 fit on the
signature-level (cos, dh) cloud
Output:
reports/v4_big4/signature_level_convergence/
sig_level_results.json
sig_level_report.md
crosstab_paperA_vs_k3perCPA.csv
crosstab_paperA_vs_k3perSig.csv
crosstab_k3perCPA_vs_k3perSig.csv
Headline metrics:
- Cohen's kappa for each pairwise label comparison
- Per-firm marginal agreement
- Component drift between per-CPA K=3 and per-signature K=3
"""
import sqlite3
import csv
import json
import numpy as np
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt
from pathlib import Path
from datetime import datetime
from sklearn.mixture import GaussianMixture
DB = '/Volumes/NV2/PDF-Processing/signature-analysis/signature_analysis.db'
OUT = Path('/Volumes/NV2/PDF-Processing/signature-analysis/reports/'
'v4_big4/signature_level_convergence')
OUT.mkdir(parents=True, exist_ok=True)
SEED = 42
BIG4 = ('勤業眾信聯合', '安侯建業聯合', '資誠聯合', '安永聯合')
LABEL = {'勤業眾信聯合': 'Firm A (Deloitte)', '安侯建業聯合': 'KPMG',
'資誠聯合': 'PwC', '安永聯合': 'EY'}
PAPER_A_COS_CUT = 0.95
PAPER_A_DH_CUT = 5
MIN_SIGS = 10 # for the per-CPA K=3 fit only
def load_big4_signatures():
conn = sqlite3.connect(DB)
cur = conn.cursor()
cur.execute('''
SELECT s.signature_id, s.assigned_accountant, a.firm,
s.max_similarity_to_same_accountant,
CAST(s.min_dhash_independent AS REAL)
FROM signatures s
JOIN accountants a ON s.assigned_accountant = a.name
WHERE s.assigned_accountant IS NOT NULL
AND s.max_similarity_to_same_accountant IS NOT NULL
AND s.min_dhash_independent IS NOT NULL
AND a.firm IN (?, ?, ?, ?)
''', BIG4)
rows = cur.fetchall()
conn.close()
return rows
def load_per_cpa_means():
"""Returns (cpa_array, firm_array, X_2d) for the per-CPA fit."""
conn = sqlite3.connect(DB)
cur = conn.cursor()
cur.execute('''
SELECT s.assigned_accountant, a.firm,
AVG(s.max_similarity_to_same_accountant) AS cos_mean,
AVG(CAST(s.min_dhash_independent AS REAL)) AS dh_mean,
COUNT(*) AS n
FROM signatures s
JOIN accountants a ON s.assigned_accountant = a.name
WHERE s.assigned_accountant IS NOT NULL
AND s.max_similarity_to_same_accountant IS NOT NULL
AND s.min_dhash_independent IS NOT NULL
AND a.firm IN (?, ?, ?, ?)
GROUP BY s.assigned_accountant
HAVING n >= ?
''', BIG4 + (MIN_SIGS,))
rows = cur.fetchall()
conn.close()
cpas = [r[0] for r in rows]
firms = [r[1] for r in rows]
X = np.array([[float(r[2]), float(r[3])] for r in rows])
return cpas, firms, X
def fit_k3(X, seed=SEED):
return GaussianMixture(n_components=3, covariance_type='full',
random_state=seed, n_init=15, max_iter=500).fit(X)
def label_paperA(cos, dh):
"""Returns 0 = non_hand (replicated), 1 = hand_leaning."""
return np.where((cos > PAPER_A_COS_CUT) & (dh <= PAPER_A_DH_CUT), 0, 1)
def label_k3(gmm, X, order):
"""Returns hard label in {0=C1, 1=C2, 2=C3} where C1 = lowest cos."""
raw = gmm.predict(X)
label_map = {old: new for new, old in enumerate(order)}
return np.array([label_map[l] for l in raw])
def cohen_kappa(y1, y2):
"""Cohen's kappa for two label arrays."""
n = len(y1)
if n == 0:
return 0.0
classes = sorted(set(y1.tolist()) | set(y2.tolist()))
k = len(classes)
cm = np.zeros((k, k), dtype=float)
for a, b in zip(y1, y2):
cm[classes.index(int(a)), classes.index(int(b))] += 1
p_o = np.sum(np.diag(cm)) / n
row_marg = cm.sum(axis=1) / n
col_marg = cm.sum(axis=0) / n
p_e = float(np.sum(row_marg * col_marg))
if p_e == 1.0:
return 1.0 if p_o == 1.0 else 0.0
return float((p_o - p_e) / (1 - p_e))
def crosstab(y1, y2, labels1, labels2):
"""Cross-tabulation as a dict-of-dicts."""
out = {a: {b: 0 for b in labels2} for a in labels1}
for a, b in zip(y1, y2):
out[labels1[int(a)]][labels2[int(b)]] += 1
return out
def write_crosstab_csv(ct, name, labels1, labels2):
p = OUT / name
with open(p, 'w', newline='', encoding='utf-8') as f:
w = csv.writer(f)
w.writerow([''] + labels2 + ['total'])
for a in labels1:
row = [a] + [ct[a][b] for b in labels2]
row.append(sum(ct[a].values()))
w.writerow(row)
col_totals = [sum(ct[a][b] for a in labels1) for b in labels2]
w.writerow(['total'] + col_totals + [sum(col_totals)])
return p
def per_firm_agreement(firms_arr, y1, y2):
out = {}
for f in BIG4:
mask = (firms_arr == f)
n = int(mask.sum())
if n == 0:
out[f] = {'n': 0, 'agreement': None}
continue
agree_count = int(np.sum(y1[mask] == y2[mask]))
out[f] = {
'n': n,
'agree_count': agree_count,
'agreement_rate': float(agree_count / n),
}
return out
def main():
print('=' * 72)
print('Script 39: Signature-Level Convergence')
print('=' * 72)
# 1. Per-CPA K=3 (Script 38 baseline reproduction)
cpas, cpa_firms, X_cpa = load_per_cpa_means()
print(f'\n[setup] N CPAs (n_sigs >= {MIN_SIGS}): {len(cpas)}')
gmm_cpa = fit_k3(X_cpa)
order_cpa = np.argsort(gmm_cpa.means_[:, 0])
means_cpa = gmm_cpa.means_[order_cpa]
weights_cpa = gmm_cpa.weights_[order_cpa]
print(' Per-CPA K=3 components (sorted by cos):')
for i, name in enumerate(['C1 hand-leaning', 'C2 mixed',
'C3 replicated']):
print(f' {name}: cos={means_cpa[i,0]:.4f}, '
f'dh={means_cpa[i,1]:.4f}, weight={weights_cpa[i]:.3f}')
# 2. Load all Big-4 signatures
rows = load_big4_signatures()
n_sig = len(rows)
sig_ids = np.array([r[0] for r in rows])
sig_firms = np.array([r[2] for r in rows])
cos = np.array([r[3] for r in rows], dtype=float)
dh = np.array([r[4] for r in rows], dtype=float)
X_sig = np.column_stack([cos, dh])
print(f'\n[setup] N Big-4 signatures: {n_sig:,}')
# 3. Three labels per signature
L1 = label_paperA(cos, dh)
L2 = label_k3(gmm_cpa, X_sig, order_cpa)
print('\n[fit] Per-signature K=3 (fresh fit on signature cloud)')
gmm_sig = fit_k3(X_sig)
order_sig = np.argsort(gmm_sig.means_[:, 0])
means_sig = gmm_sig.means_[order_sig]
weights_sig = gmm_sig.weights_[order_sig]
print(' Per-signature K=3 components (sorted by cos):')
for i, name in enumerate(['C1 hand-leaning', 'C2 mixed',
'C3 replicated']):
print(f' {name}: cos={means_sig[i,0]:.4f}, '
f'dh={means_sig[i,1]:.4f}, weight={weights_sig[i]:.3f}')
L3 = label_k3(gmm_sig, X_sig, order_sig)
# 4. Cross-tabs
paperA_labels = ['non_hand', 'hand_leaning']
k3_labels = ['C1_handleaning', 'C2_mixed', 'C3_replicated']
ct_p_vs_kcpa = crosstab(L1, L2, paperA_labels, k3_labels)
ct_p_vs_ksig = crosstab(L1, L3, paperA_labels, k3_labels)
ct_kcpa_vs_ksig = crosstab(L2, L3, k3_labels, k3_labels)
write_crosstab_csv(ct_p_vs_kcpa, 'crosstab_paperA_vs_k3perCPA.csv',
paperA_labels, k3_labels)
write_crosstab_csv(ct_p_vs_ksig, 'crosstab_paperA_vs_k3perSig.csv',
paperA_labels, k3_labels)
write_crosstab_csv(ct_kcpa_vs_ksig, 'crosstab_k3perCPA_vs_k3perSig.csv',
k3_labels, k3_labels)
# 5. Cohen's kappa (collapse K=3 -> binary {C1+C2 = hand-ish, C3 = replicated})
L2_bin = (L2 == 2).astype(int) # 1 = replicated (C3), 0 = otherwise
L3_bin = (L3 == 2).astype(int)
L1_bin = 1 - L1 # invert so 1 = non_hand (replicated), 0 = hand-leaning
print('\n[kappa] Cohen kappa, binary collapse (1 = replicated)')
kappa_p_kcpa = cohen_kappa(L1_bin, L2_bin)
kappa_p_ksig = cohen_kappa(L1_bin, L3_bin)
kappa_kcpa_ksig = cohen_kappa(L2_bin, L3_bin)
print(f' PaperA vs K=3-perCPA : kappa = {kappa_p_kcpa:.4f}')
print(f' PaperA vs K=3-perSig : kappa = {kappa_p_ksig:.4f}')
print(f' K=3-CPA vs K=3-perSig : kappa = {kappa_kcpa_ksig:.4f}')
# 6. Per-firm agreement
print('\n[per-firm] Binary agreement (collapsed):')
print(f' {"Firm":<22} {"n_sigs":>9} {"P_vs_K3CPA":>11} '
f'{"P_vs_K3sig":>11} {"K3CPA_vs_K3sig":>15}')
per_firm_p_kcpa = per_firm_agreement(sig_firms, L1_bin, L2_bin)
per_firm_p_ksig = per_firm_agreement(sig_firms, L1_bin, L3_bin)
per_firm_kcpa_ksig = per_firm_agreement(sig_firms, L2_bin, L3_bin)
for f in BIG4:
a1 = per_firm_p_kcpa[f]['agreement_rate']
a2 = per_firm_p_ksig[f]['agreement_rate']
a3 = per_firm_kcpa_ksig[f]['agreement_rate']
print(f' {LABEL[f]:<22} {per_firm_p_kcpa[f]["n"]:>9,} '
f'{a1*100:>10.2f}% {a2*100:>10.2f}% {a3*100:>14.2f}%')
# 7. Component drift between per-CPA and per-signature K=3
print('\n[drift] Per-CPA K=3 vs per-signature K=3 components:')
drift = []
for i, name in enumerate(['C1 hand-leaning', 'C2 mixed',
'C3 replicated']):
d_cos = abs(means_cpa[i, 0] - means_sig[i, 0])
d_dh = abs(means_cpa[i, 1] - means_sig[i, 1])
d_w = abs(weights_cpa[i] - weights_sig[i])
drift.append({'component': name, 'd_cos': float(d_cos),
'd_dh': float(d_dh), 'd_weight': float(d_w)})
print(f' {name}: |dcos|={d_cos:.4f}, |ddh|={d_dh:.3f}, '
f'|dweight|={d_w:.3f}')
# Verdict
if (kappa_p_kcpa >= 0.6 and kappa_p_ksig >= 0.6
and kappa_kcpa_ksig >= 0.6):
verdict = 'SIG_CONVERGENCE_STRONG'
msg = ('All three pairwise Cohen kappas >= 0.60 (substantial '
'agreement at signature level); per-CPA aggregation does '
'not wash out signal.')
elif (kappa_p_kcpa >= 0.4 and kappa_p_ksig >= 0.4
and kappa_kcpa_ksig >= 0.4):
verdict = 'SIG_CONVERGENCE_MODERATE'
msg = ('All three pairwise Cohen kappas >= 0.40 (moderate '
'agreement); per-CPA aggregation captures most of the '
'signature-level structure.')
else:
verdict = 'SIG_CONVERGENCE_WEAK'
msg = ('At least one pairwise Cohen kappa < 0.40; per-CPA '
'aggregation hides meaningful signature-level disagreement '
'between methods.')
print(f'\n[verdict] {verdict}')
print(f' {msg}')
payload = {
'generated_at': datetime.now().isoformat(),
'n_signatures_big4': int(n_sig),
'n_cpas_for_per_cpa_fit': int(len(cpas)),
'paper_a_cuts': {'cos': PAPER_A_COS_CUT, 'dh': PAPER_A_DH_CUT},
'per_cpa_k3': {
'means': means_cpa.tolist(),
'weights': weights_cpa.tolist(),
},
'per_signature_k3': {
'means': means_sig.tolist(),
'weights': weights_sig.tolist(),
},
'component_drift_per_CPA_vs_per_sig': drift,
'cohen_kappa_binary_collapse': {
'paperA_vs_k3perCPA': float(kappa_p_kcpa),
'paperA_vs_k3perSig': float(kappa_p_ksig),
'k3perCPA_vs_k3perSig': float(kappa_kcpa_ksig),
},
'crosstabs': {
'paperA_vs_k3perCPA': ct_p_vs_kcpa,
'paperA_vs_k3perSig': ct_p_vs_ksig,
'k3perCPA_vs_k3perSig': ct_kcpa_vs_ksig,
},
'per_firm_agreement': {
'paperA_vs_k3perCPA': {f: per_firm_p_kcpa[f] for f in BIG4},
'paperA_vs_k3perSig': {f: per_firm_p_ksig[f] for f in BIG4},
'k3perCPA_vs_k3perSig': {f: per_firm_kcpa_ksig[f] for f in BIG4},
},
'verdict': {'class': verdict, 'explanation': msg},
}
json_path = OUT / 'sig_level_results.json'
json_path.write_text(json.dumps(payload, indent=2, ensure_ascii=False),
encoding='utf-8')
print(f'\nJSON: {json_path}')
# Markdown report
md = [
'# Signature-Level Convergence Check (Script 39)',
f'Generated: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}',
'',
'## Goal',
'',
('Verify that the per-CPA convergence found in Script 38 holds at '
'signature granularity, so a reviewer cannot attack with '
'"per-CPA aggregation washes out heterogeneity."'),
'',
'## Three signature-level labels',
'',
'- **PaperA**: non_hand iff cos > 0.95 AND dh <= 5',
'- **K=3 perCPA**: hard assignment under K=3 components fit on '
f'{len(cpas)} per-CPA means (Script 38 baseline)',
'- **K=3 perSig**: hard assignment under K=3 components fit '
f'directly on the {n_sig:,} signature-level (cos, dh) cloud',
'',
'## Component comparison',
'',
'| Component | Per-CPA cos | Per-CPA dh | Per-CPA wt | Per-Sig cos | Per-Sig dh | Per-Sig wt |',
'|---|---|---|---|---|---|---|',
]
for i, name in enumerate(['C1 hand-leaning', 'C2 mixed',
'C3 replicated']):
md.append(f'| {name} | {means_cpa[i,0]:.4f} | {means_cpa[i,1]:.4f} | '
f'{weights_cpa[i]:.3f} | {means_sig[i,0]:.4f} | '
f'{means_sig[i,1]:.4f} | {weights_sig[i]:.3f} |')
md += ['', '## Cohen kappa (binary: 1 = replicated, 0 = hand-leaning)',
'',
'| Pair | kappa |',
'|---|---|',
f'| PaperA vs K=3 perCPA | **{kappa_p_kcpa:.4f}** |',
f'| PaperA vs K=3 perSig | **{kappa_p_ksig:.4f}** |',
f'| K=3 perCPA vs K=3 perSig | **{kappa_kcpa_ksig:.4f}** |',
'',
('Reference: kappa <= 0 = no agreement, 0.0-0.2 slight, '
'0.2-0.4 fair, 0.4-0.6 moderate, 0.6-0.8 substantial, '
'0.8-1.0 almost perfect (Landis & Koch 1977).'),
'',
'## Per-firm binary agreement', '',
'| Firm | n_sigs | PaperA vs K3-perCPA | PaperA vs K3-perSig | K3-CPA vs K3-Sig |',
'|---|---|---|---|---|',
]
for f in BIG4:
md.append(f'| {LABEL[f]} | {per_firm_p_kcpa[f]["n"]:,} | '
f'{per_firm_p_kcpa[f]["agreement_rate"]*100:.2f}% | '
f'{per_firm_p_ksig[f]["agreement_rate"]*100:.2f}% | '
f'{per_firm_kcpa_ksig[f]["agreement_rate"]*100:.2f}% |')
md += ['', f'## Verdict: **{verdict}**',
'', msg, '',
'### Verdict legend',
'- SIG_CONVERGENCE_STRONG: all 3 kappas >= 0.60 (substantial)',
'- SIG_CONVERGENCE_MODERATE: all 3 kappas >= 0.40 (moderate)',
'- SIG_CONVERGENCE_WEAK: at least one kappa < 0.40',
]
md_path = OUT / 'sig_level_report.md'
md_path.write_text('\n'.join(md), encoding='utf-8')
print(f'Report: {md_path}')
if __name__ == '__main__':
main()