Files
pdf_signature_extraction/signature_analysis/52_canonical_bcd_locked.py
gbanyan 3c7fcc010f Paper A v4.1: BCD-baseline reframe + screening positioning + trim
- Re-anchor inter-CPA coincidence-rate (ICCR) calibration on a normative
  non-Firm-A baseline (Firms B/C/D); Firm A held out as an out-of-sample
  target. Locked canonical numbers (codex-audited; Scripts 46/52/53):
  per-comparison HC 0.00014->0.000018, per-signature HC 0.0116, per-document
  HC+MC 0.34->0.1905; KDE crossover 0.837 retained corpus-wide.
- Reposition as an operator-tunable, semi-automated screening/triage framework
  (title -> "Automated Screening..."): HC = high-specificity operating point;
  MC band demoted to low-specificity advisory; Firm A = demonstration that the
  screening surfaces a templated end, audit-quality implications deferred.
- Apply codex prose-review fixes: triage-neutral five-way labels, soften
  mechanism/specificity wording, supersede MC claim-strength, update stale
  Appendix script references (40b/43/45 -> 46/52/53).
- Trim pass: compress Sec. V discussion + Sec. III echoes (27.7k -> 26.8k
  words); no substantive content removed.
- Add analysis scripts 45-53 (firm-year trends; BCD-only ICCR recompute;
  canonical-sampler locked numbers; Firm-A out-of-sample; BCD regression +
  cross-firm hit matrix).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-04 19:35:10 +08:00

170 lines
7.1 KiB
Python

#!/usr/bin/env python3
"""Script 52: canonical-correct locked publication numbers (supersedes 48/50,
fixes the per-firm assignment and A-out-of-sample any-pair issues codex flagged).
Uses the EXACT canonical candidate sampler of Scripts 43/45 (retry-loop to
collect exactly n_pool non-same-CPA candidates, rng.choice, default_rng(42)),
any-pair max-cos/min-dHash five-way classification, and dominant-firm document
assignment. Scopes: ABCD / BCD / BCD+nonBig4, plus Firm-A out-of-sample vs a
clean BCD candidate pool. Read-only.
"""
import sqlite3
from collections import defaultdict, Counter
import numpy as np
DB = '/Volumes/NV2/PDF-Processing/signature-analysis/signature_analysis.db'
FIRM_A = '勤業眾信聯合'
BIG4 = ('勤業眾信聯合', '安侯建業聯合', '資誠聯合', '安永聯合')
ALIAS = {'勤業眾信聯合': 'A', '安侯建業聯合': 'B', '資誠聯合': 'C', '安永聯合': 'D'}
SEED = 42
N_BOOT = 1000
POP = np.array([bin(i).count('1') for i in range(256)], dtype=np.uint8)
def wilson(k, n, z=1.96):
if n == 0:
return (None, None)
p = k/n; d = 1+z*z/n
c = (p+z*z/(2*n))/d
h = z*np.sqrt(p*(1-p)/n+z*z/(4*n*n))/d
return (max(0.0, c-h), min(1.0, c+h))
def load():
conn = sqlite3.connect(f'file:{DB}?mode=ro', uri=True)
cur = conn.cursor()
cur.execute("""
SELECT s.assigned_accountant, a.firm, s.source_pdf,
s.feature_vector, s.dhash_vector
FROM signatures s JOIN accountants a ON s.assigned_accountant=a.name
WHERE s.assigned_accountant IS NOT NULL AND a.firm IS NOT NULL
AND s.feature_vector IS NOT NULL AND s.dhash_vector IS NOT NULL""")
rows = cur.fetchall()
conn.close()
return rows
def canonical_sampler(rng, n, n_pool, same_cpa, all_idx):
"""EXACT Scripts 43/45 sampler: retry-loop to exactly n_pool non-same."""
need = n_pool
cand = []
attempts = 0
while need > 0 and attempts < 10:
draw = rng.choice(n, size=need * 2, replace=True)
ok = draw[~np.isin(draw, same_cpa)]
cand.extend(ok[:need].tolist())
need -= len(ok[:need])
attempts += 1
if need > 0:
pool_mask = np.ones(n, dtype=bool)
pool_mask[same_cpa] = False
fb = rng.choice(all_idx[pool_mask], size=need, replace=False)
cand.extend(fb.tolist())
return np.array(cand[:n_pool], dtype=np.int64)
def simulate(keep):
n = len(keep)
feats = np.stack([np.frombuffer(r[3], np.float32) for r in keep]).astype(np.float32)
norms = np.linalg.norm(feats, axis=1, keepdims=True); norms[norms == 0] = 1.0
feats = feats / norms
dh = np.stack([np.frombuffer(r[4], np.uint8) for r in keep])
cpas = np.array([r[0] for r in keep])
firms = np.array([ALIAS.get(r[1], 'NonB4') for r in keep])
docs = np.array([r[2] for r in keep])
cpa_idx = defaultdict(list)
for i, c in enumerate(cpas):
cpa_idx[c].append(i)
cpa_idx = {c: np.array(v) for c, v in cpa_idx.items()}
pool_size = {c: len(v)-1 for c, v in cpa_idx.items()}
all_idx = np.arange(n)
rng = np.random.default_rng(SEED)
max_cos = np.zeros(n, np.float32); min_dh = np.full(n, 64, np.int32)
for si in range(n):
np_ = pool_size[cpas[si]]
if np_ <= 0:
continue
cand = canonical_sampler(rng, n, np_, cpa_idx[cpas[si]], all_idx)
cosv = feats[cand] @ feats[si]
dist = POP[dh[cand] ^ dh[si]].sum(axis=1)
max_cos[si] = cosv.max(); min_dh[si] = int(dist.min())
return max_cos, min_dh, cpas, firms, docs, cpa_idx
def report(keep, label):
max_cos, min_dh, cpas, firms, docs, cpa_idx = simulate(keep)
n = len(cpas)
hc = (max_cos > 0.95) & (min_dh <= 5)
d2 = (max_cos > 0.95) & (min_dh <= 15)
print(f'\n===== {label} (n_sig={n:,}) =====')
for nm, a in [('per-sig HC', hc), ('per-sig HC+MC', d2)]:
k = int(a.sum()); lo, hi = wilson(k, n)
print(f' {nm}: {k/n:.6f} ({k}/{n}) [{lo:.4f},{hi:.4f}]')
# CPA-block bootstrap on per-sig HC
rng = np.random.default_rng(SEED + 1)
cl = list(cpa_idx.keys())
bs = np.empty(N_BOOT)
for b in range(N_BOOT):
cs = rng.choice(len(cl), len(cl), replace=True)
idx = np.concatenate([cpa_idx[cl[i]] for i in cs])
bs[b] = hc[idx].mean()
print(f' per-sig HC CPA-block boot95% [{np.percentile(bs,2.5):.4f},{np.percentile(bs,97.5):.4f}]')
# per-doc, dominant-firm assignment (canonical)
doc_sigs = defaultdict(list)
for i in range(n):
doc_sigs[docs[i]].append(i)
dl = list(doc_sigs.keys()); nd = len(dl)
doc_d1 = np.array([hc[doc_sigs[d]].any() for d in dl])
doc_d2 = np.array([d2[doc_sigs[d]].any() for d in dl])
doc_firm = np.array([Counter(firms[doc_sigs[d]]).most_common(1)[0][0] for d in dl])
print(f' per-doc HC: {doc_d1.mean():.6f} ({int(doc_d1.sum())}/{nd})')
print(f' per-doc HC+MC: {doc_d2.mean():.6f} ({int(doc_d2.sum())}/{nd})')
for f in sorted(set(doc_firm)):
m = doc_firm == f
print(f' Firm {f} per-doc D2: {doc_d2[m].mean():.4f} ({int(doc_d2[m].sum())}/{int(m.sum())})')
def a_out_of_sample(rows):
"""Firm A source vs clean BCD candidate pool, any-pair, pool=count-1."""
A = [r for r in rows if r[1] == FIRM_A]
BCD = [r for r in rows if r[1] in BIG4 and r[1] != FIRM_A]
bf = np.stack([np.frombuffer(r[3], np.float32) for r in BCD]).astype(np.float32)
nb = bf.shape[0]
bn = np.linalg.norm(bf, axis=1, keepdims=True); bn[bn == 0] = 1.0; bf = bf/bn
bdh = np.stack([np.frombuffer(r[4], np.uint8) for r in BCD])
a_cpa = defaultdict(list)
for i, r in enumerate(A):
a_cpa[r[0]].append(i)
pool_size = {c: len(v)-1 for c, v in a_cpa.items()}
rng = np.random.default_rng(SEED)
hc = np.zeros(len(A), bool); d2 = np.zeros(len(A), bool)
docs = np.array([r[2] for r in A])
for i, r in enumerate(A):
np_ = pool_size[r[0]]
if np_ <= 0: # singleton CPA: no same-CPA pool, skip (canonical)
continue
cand = rng.choice(nb, size=np_, replace=True) # A not in BCD pool
sf = np.frombuffer(r[3], np.float32).astype(np.float32)
sf = sf/max(np.linalg.norm(sf), 1e-9)
cosv = bf[cand] @ sf
dist = POP[bdh[cand] ^ np.frombuffer(r[4], np.uint8)].sum(axis=1)
mc, md = cosv.max(), int(dist.min())
hc[i] = (mc > 0.95) and (md <= 5)
d2[i] = (mc > 0.95) and (md <= 15)
k = int(hc.sum()); n = len(A); lo, hi = wilson(k, n)
print(f'\n===== Firm A out-of-sample vs clean BCD pool (any-pair) =====')
print(f' per-sig HC: {k/n:.6f} ({k}/{n}) [{lo:.5f},{hi:.5f}]')
ds = defaultdict(list)
for i in range(n):
ds[docs[i]].append(i)
dl = list(ds.keys())
dd2 = np.array([d2[ds[d]].any() for d in dl])
print(f' per-doc HC+MC: {dd2.mean():.6f} ({int(dd2.sum())}/{len(dl)})')
rows = load()
report([r for r in rows if r[1] in BIG4], 'ABCD (verify: per-sig HC~0.1102 / per-doc D2~0.3375)')
report([r for r in rows if r[1] in BIG4 and r[1] != FIRM_A], 'BCD-only (verify codex: HC~0.0116 / doc HC~0.0226 / doc D2~0.1905)')
report([r for r in rows if r[1] != FIRM_A], 'BCD + non-Big4')
a_out_of_sample(rows)