#!/usr/bin/env python3 """Script 51: publication polish. Part A: CPA-block bootstrap (1000 reps) on per-signature HC any-pair rate, and document-level bootstrap on per-document HC+MC, for ABCD & BCD. Part B: corpus-wide KDE crossover (pair-weighted intra, reproduce 0.837) plus BCD-only and BCD+nonBig4 variants. Read-only. """ import sqlite3 from collections import defaultdict import numpy as np from scipy.stats import gaussian_kde DB = '/Volumes/NV2/PDF-Processing/signature-analysis/signature_analysis.db' FIRM_A = '勤業眾信聯合' BIG4 = ('勤業眾信聯合', '安侯建業聯合', '資誠聯合', '安永聯合') ALIAS = {'勤業眾信聯合': 'A', '安侯建業聯合': 'B', '資誠聯合': 'C', '安永聯合': 'D'} SEED = 42 N_BOOT = 1000 POP = np.array([bin(i).count('1') for i in range(256)], dtype=np.uint8) def load(): conn = sqlite3.connect(f'file:{DB}?mode=ro', uri=True) cur = conn.cursor() cur.execute(""" SELECT s.assigned_accountant, a.firm, s.source_pdf, s.feature_vector, s.dhash_vector FROM signatures s JOIN accountants a ON s.assigned_accountant=a.name WHERE s.assigned_accountant IS NOT NULL AND a.firm IS NOT NULL AND s.feature_vector IS NOT NULL AND s.dhash_vector IS NOT NULL""") rows = cur.fetchall() conn.close() return rows # ============ Part A: bootstrap on anchor rates ============ def simulate(keep): n = len(keep) feats = np.stack([np.frombuffer(r[3], np.float32) for r in keep]).astype(np.float32) feats /= np.clip(np.linalg.norm(feats, axis=1, keepdims=True), 1e-9, None) dh = np.stack([np.frombuffer(r[4], np.uint8) for r in keep]) cpas = np.array([r[0] for r in keep]) docs = np.array([r[2] for r in keep]) cpa_idx = defaultdict(list) for i, c in enumerate(cpas): cpa_idx[c].append(i) cpa_idx = {c: np.array(v) for c, v in cpa_idx.items()} pool_size = {c: len(v)-1 for c, v in cpa_idx.items()} rng = np.random.default_rng(SEED) max_cos = np.zeros(n, np.float32); min_dh = np.full(n, 64, np.int32) for si in range(n): c = cpas[si]; npool = pool_size[c] if npool <= 0: continue same = cpa_idx[c] draw = rng.integers(0, n, size=npool + same.size + 20) cand = draw[~np.isin(draw, same)][:npool] cosv = feats[cand] @ feats[si] dist = POP[dh[cand] ^ dh[si]].sum(axis=1) max_cos[si] = cosv.max(); min_dh[si] = int(dist.min()) hc = (max_cos > 0.95) & (min_dh <= 5) d2 = (max_cos > 0.95) & (min_dh <= 15) return hc, d2, cpa_idx, docs def boot_part(keep, label): hc, d2, cpa_idx, docs = simulate(keep) n = len(hc) rng = np.random.default_rng(SEED + 1) cpa_list = list(cpa_idx.keys()) # CPA-block bootstrap on per-signature HC bs = np.empty(N_BOOT) for b in range(N_BOOT): cs = rng.choice(len(cpa_list), len(cpa_list), replace=True) idx = np.concatenate([cpa_idx[cpa_list[i]] for i in cs]) bs[b] = hc[idx].mean() # document-level bootstrap on per-doc D2 doc_d2 = defaultdict(bool) for i in range(n): doc_d2[docs[i]] = doc_d2[docs[i]] or bool(d2[i]) dl = np.array(list(doc_d2.keys())); dvals = np.array([doc_d2[d] for d in dl]) nd = len(dl); bd = np.empty(N_BOOT) for b in range(N_BOOT): s = rng.integers(0, nd, nd) bd[b] = dvals[s].mean() print(f'\n [{label}] per-sig HC point={hc.mean():.4f} ' f'CPA-block boot95% [{np.percentile(bs,2.5):.4f}, {np.percentile(bs,97.5):.4f}]') print(f' [{label}] per-doc HC+MC point={dvals.mean():.4f} ' f'doc boot95% [{np.percentile(bd,2.5):.4f}, {np.percentile(bd,97.5):.4f}]') # ============ Part B: pair-weighted KDE crossover ============ def crossover(keep, label): feats = np.stack([np.frombuffer(r[3], np.float32) for r in keep]).astype(np.float32) feats /= np.clip(np.linalg.norm(feats, axis=1, keepdims=True), 1e-9, None) cpas = np.array([r[0] for r in keep]) by = defaultdict(list) for i, c in enumerate(cpas): by[c].append(i) by = {c: np.array(v) for c, v in by.items() if len(v) >= 3} accts = list(by.keys()) pair_w = np.array([len(by[c])*(len(by[c])-1)/2 for c in accts], float) pair_w /= pair_w.sum() rng = np.random.default_rng(SEED) M = 100_000 # intra: CPA sampled proportional to pair count (= uniform over all intra pairs) intra = np.empty(M, np.float32) ci = rng.choice(len(accts), M, p=pair_w) for t in range(M): a, b = rng.choice(by[accts[ci[t]]], 2, replace=False) intra[t] = feats[a] @ feats[b] inter = np.empty(M, np.float32) for t in range(M): i, j = rng.choice(len(accts), 2, replace=False) inter[t] = feats[rng.choice(by[accts[i]])] @ feats[rng.choice(by[accts[j]])] xs = np.linspace(0.3, 1.0, 10000) diff = gaussian_kde(intra)(xs) - gaussian_kde(inter)(xs) cr = [float(x) for x in xs[np.where(np.diff(np.sign(diff)))[0]] if 0.6 < x < 0.99] print(f' [{label}] crossover {[f"{x:.4f}" for x in cr]} ' f'(intra {intra.mean():.4f}/{np.median(intra):.4f} inter {inter.mean():.4f}/{np.median(inter):.4f})') rows = load() abcd = [r for r in rows if r[1] in BIG4] bcd = [r for r in rows if r[1] in BIG4 and r[1] != FIRM_A] print('=== Part A: bootstrap CIs on anchor rates ===') boot_part(abcd, 'ABCD (verify ~0.109 / ~0.338)') boot_part(bcd, 'BCD-only') print('\n=== Part B: KDE crossover (pair-weighted intra, corpus-wide reproduces 0.837) ===') crossover(rows, 'corpus-wide (all firms)') crossover(bcd, 'BCD-only') crossover([r for r in rows if r[1] != FIRM_A], 'BCD + non-Big4')