#!/usr/bin/env python3 """Script 50: publication-grade scoped inter-CPA anchor recompute. Faithfully reproduces Script 45's any-pair five-way pool simulation (max_cos & min_dh over a random same-size inter-CPA pool, excl. same-CPA), then reports for scopes ABCD / BCD / BCD+nonBig4: - per-signature HC (D1) and HC+MC (D2) any-pair FAR - per-document HC (D1) and HC+MC (D2) any-pair FAR - per-firm per-document D2 ABCD is printed first to verify reproduction of published values (per-sig HC~0.1102, per-doc D2~0.3375, Firm A~0.62). Read-only. """ import sqlite3 from collections import defaultdict import numpy as np DB = '/Volumes/NV2/PDF-Processing/signature-analysis/signature_analysis.db' FIRM_A = '勤業眾信聯合' BIG4 = ('勤業眾信聯合', '安侯建業聯合', '資誠聯合', '安永聯合') ALIAS = {'勤業眾信聯合': 'A', '安侯建業聯合': 'B', '資誠聯合': 'C', '安永聯合': 'D'} SEED = 42 POP = np.array([bin(i).count('1') for i in range(256)], dtype=np.uint8) def wilson(k, n, z=1.96): if n == 0: return (None, None) p = k/n; d = 1+z*z/n c = (p+z*z/(2*n))/d h = z*np.sqrt(p*(1-p)/n+z*z/(4*n*n))/d return (max(0.0, c-h), min(1.0, c+h)) def load(): conn = sqlite3.connect(f'file:{DB}?mode=ro', uri=True) cur = conn.cursor() cur.execute(""" SELECT s.assigned_accountant, a.firm, s.source_pdf, s.feature_vector, s.dhash_vector FROM signatures s JOIN accountants a ON s.assigned_accountant=a.name WHERE s.assigned_accountant IS NOT NULL AND a.firm IS NOT NULL AND s.feature_vector IS NOT NULL AND s.dhash_vector IS NOT NULL""") rows = cur.fetchall() conn.close() return rows def run(rows, keep_fn, label): keep = [r for r in rows if keep_fn(r[1])] n = len(keep) feats = np.stack([np.frombuffer(r[3], np.float32) for r in keep]).astype(np.float32) feats /= np.clip(np.linalg.norm(feats, axis=1, keepdims=True), 1e-9, None) dh = np.stack([np.frombuffer(r[4], np.uint8) for r in keep]) cpas = np.array([r[0] for r in keep]) firms = np.array([ALIAS.get(r[1], 'NonB4') for r in keep]) docs = np.array([r[2] for r in keep]) cpa_idx = defaultdict(list) for i, c in enumerate(cpas): cpa_idx[c].append(i) cpa_idx = {c: np.array(v) for c, v in cpa_idx.items()} pool_size = {c: len(v)-1 for c, v in cpa_idx.items()} rng = np.random.default_rng(SEED) max_cos = np.zeros(n, np.float32) min_dh = np.full(n, 64, np.int32) for si in range(n): c = cpas[si]; npool = pool_size[c] if npool <= 0: continue same = cpa_idx[c] draw = rng.integers(0, n, size=npool + same.size + 20) cand = draw[~np.isin(draw, same)][:npool] cosv = feats[cand] @ feats[si] dist = POP[dh[cand] ^ dh[si]].sum(axis=1) max_cos[si] = cosv.max() min_dh[si] = int(dist.min()) # any-pair classification hc = (max_cos > 0.95) & (min_dh <= 5) mc = (max_cos > 0.95) & (min_dh > 5) & (min_dh <= 15) d1 = hc d2 = hc | mc print(f'\n===== {label} (n_sig={n:,}) =====') for nm, arr in [('per-sig HC (D1)', d1), ('per-sig HC+MC (D2)', d2)]: k = int(arr.sum()); lo, hi = wilson(k, n) print(f' {nm}: {k/n:.4f} ({k}/{n}) [{lo:.4f},{hi:.4f}]') # per-document worst-case doc_d1 = defaultdict(bool); doc_d2 = defaultdict(bool); doc_firm = {} for i in range(n): if d1[i]: doc_d1[docs[i]] = True if d2[i]: doc_d2[docs[i]] = True doc_firm.setdefault(docs[i], firms[i]) doc_d1.setdefault(docs[i], False); doc_d2.setdefault(docs[i], False) dl = list(doc_d2.keys()) nd = len(dl) k1 = sum(doc_d1[d] for d in dl); k2 = sum(doc_d2[d] for d in dl) l1 = wilson(k1, nd); l2 = wilson(k2, nd) print(f' per-doc HC (D1): {k1/nd:.4f} ({k1}/{nd}) [{l1[0]:.4f},{l1[1]:.4f}]') print(f' per-doc HC+MC (D2):{k2/nd:.4f} ({k2}/{nd}) [{l2[0]:.4f},{l2[1]:.4f}]') df = np.array([doc_firm[d] for d in dl]) dv = np.array([doc_d2[d] for d in dl]) for f in sorted(set(df)): m = df == f print(f' Firm {f} per-doc D2: {dv[m].sum()/m.sum():.4f} ({int(dv[m].sum())}/{int(m.sum())})') rows = load() run(rows, lambda fm: fm in BIG4, 'ABCD (verify vs published: HC~0.110 / D2~0.338 / A~0.62)') run(rows, lambda fm: fm in BIG4 and fm != FIRM_A, 'BCD-only') run(rows, lambda fm: fm != FIRM_A, 'BCD + non-Big4')