#!/usr/bin/env python3 """Script 52: canonical-correct locked publication numbers (supersedes 48/50, fixes the per-firm assignment and A-out-of-sample any-pair issues codex flagged). Uses the EXACT canonical candidate sampler of Scripts 43/45 (retry-loop to collect exactly n_pool non-same-CPA candidates, rng.choice, default_rng(42)), any-pair max-cos/min-dHash five-way classification, and dominant-firm document assignment. Scopes: ABCD / BCD / BCD+nonBig4, plus Firm-A out-of-sample vs a clean BCD candidate pool. Read-only. """ import sqlite3 from collections import defaultdict, Counter import numpy as np DB = '/Volumes/NV2/PDF-Processing/signature-analysis/signature_analysis.db' FIRM_A = '勤業眾信聯合' BIG4 = ('勤業眾信聯合', '安侯建業聯合', '資誠聯合', '安永聯合') ALIAS = {'勤業眾信聯合': 'A', '安侯建業聯合': 'B', '資誠聯合': 'C', '安永聯合': 'D'} SEED = 42 N_BOOT = 1000 POP = np.array([bin(i).count('1') for i in range(256)], dtype=np.uint8) def wilson(k, n, z=1.96): if n == 0: return (None, None) p = k/n; d = 1+z*z/n c = (p+z*z/(2*n))/d h = z*np.sqrt(p*(1-p)/n+z*z/(4*n*n))/d return (max(0.0, c-h), min(1.0, c+h)) def load(): conn = sqlite3.connect(f'file:{DB}?mode=ro', uri=True) cur = conn.cursor() cur.execute(""" SELECT s.assigned_accountant, a.firm, s.source_pdf, s.feature_vector, s.dhash_vector FROM signatures s JOIN accountants a ON s.assigned_accountant=a.name WHERE s.assigned_accountant IS NOT NULL AND a.firm IS NOT NULL AND s.feature_vector IS NOT NULL AND s.dhash_vector IS NOT NULL""") rows = cur.fetchall() conn.close() return rows def canonical_sampler(rng, n, n_pool, same_cpa, all_idx): """EXACT Scripts 43/45 sampler: retry-loop to exactly n_pool non-same.""" need = n_pool cand = [] attempts = 0 while need > 0 and attempts < 10: draw = rng.choice(n, size=need * 2, replace=True) ok = draw[~np.isin(draw, same_cpa)] cand.extend(ok[:need].tolist()) need -= len(ok[:need]) attempts += 1 if need > 0: pool_mask = np.ones(n, dtype=bool) pool_mask[same_cpa] = False fb = rng.choice(all_idx[pool_mask], size=need, replace=False) cand.extend(fb.tolist()) return np.array(cand[:n_pool], dtype=np.int64) def simulate(keep): n = len(keep) feats = np.stack([np.frombuffer(r[3], np.float32) for r in keep]).astype(np.float32) norms = np.linalg.norm(feats, axis=1, keepdims=True); norms[norms == 0] = 1.0 feats = feats / norms dh = np.stack([np.frombuffer(r[4], np.uint8) for r in keep]) cpas = np.array([r[0] for r in keep]) firms = np.array([ALIAS.get(r[1], 'NonB4') for r in keep]) docs = np.array([r[2] for r in keep]) cpa_idx = defaultdict(list) for i, c in enumerate(cpas): cpa_idx[c].append(i) cpa_idx = {c: np.array(v) for c, v in cpa_idx.items()} pool_size = {c: len(v)-1 for c, v in cpa_idx.items()} all_idx = np.arange(n) rng = np.random.default_rng(SEED) max_cos = np.zeros(n, np.float32); min_dh = np.full(n, 64, np.int32) for si in range(n): np_ = pool_size[cpas[si]] if np_ <= 0: continue cand = canonical_sampler(rng, n, np_, cpa_idx[cpas[si]], all_idx) cosv = feats[cand] @ feats[si] dist = POP[dh[cand] ^ dh[si]].sum(axis=1) max_cos[si] = cosv.max(); min_dh[si] = int(dist.min()) return max_cos, min_dh, cpas, firms, docs, cpa_idx def report(keep, label): max_cos, min_dh, cpas, firms, docs, cpa_idx = simulate(keep) n = len(cpas) hc = (max_cos > 0.95) & (min_dh <= 5) d2 = (max_cos > 0.95) & (min_dh <= 15) print(f'\n===== {label} (n_sig={n:,}) =====') for nm, a in [('per-sig HC', hc), ('per-sig HC+MC', d2)]: k = int(a.sum()); lo, hi = wilson(k, n) print(f' {nm}: {k/n:.6f} ({k}/{n}) [{lo:.4f},{hi:.4f}]') # CPA-block bootstrap on per-sig HC rng = np.random.default_rng(SEED + 1) cl = list(cpa_idx.keys()) bs = np.empty(N_BOOT) for b in range(N_BOOT): cs = rng.choice(len(cl), len(cl), replace=True) idx = np.concatenate([cpa_idx[cl[i]] for i in cs]) bs[b] = hc[idx].mean() print(f' per-sig HC CPA-block boot95% [{np.percentile(bs,2.5):.4f},{np.percentile(bs,97.5):.4f}]') # per-doc, dominant-firm assignment (canonical) doc_sigs = defaultdict(list) for i in range(n): doc_sigs[docs[i]].append(i) dl = list(doc_sigs.keys()); nd = len(dl) doc_d1 = np.array([hc[doc_sigs[d]].any() for d in dl]) doc_d2 = np.array([d2[doc_sigs[d]].any() for d in dl]) doc_firm = np.array([Counter(firms[doc_sigs[d]]).most_common(1)[0][0] for d in dl]) print(f' per-doc HC: {doc_d1.mean():.6f} ({int(doc_d1.sum())}/{nd})') print(f' per-doc HC+MC: {doc_d2.mean():.6f} ({int(doc_d2.sum())}/{nd})') for f in sorted(set(doc_firm)): m = doc_firm == f print(f' Firm {f} per-doc D2: {doc_d2[m].mean():.4f} ({int(doc_d2[m].sum())}/{int(m.sum())})') def a_out_of_sample(rows): """Firm A source vs clean BCD candidate pool, any-pair, pool=count-1.""" A = [r for r in rows if r[1] == FIRM_A] BCD = [r for r in rows if r[1] in BIG4 and r[1] != FIRM_A] bf = np.stack([np.frombuffer(r[3], np.float32) for r in BCD]).astype(np.float32) nb = bf.shape[0] bn = np.linalg.norm(bf, axis=1, keepdims=True); bn[bn == 0] = 1.0; bf = bf/bn bdh = np.stack([np.frombuffer(r[4], np.uint8) for r in BCD]) a_cpa = defaultdict(list) for i, r in enumerate(A): a_cpa[r[0]].append(i) pool_size = {c: len(v)-1 for c, v in a_cpa.items()} rng = np.random.default_rng(SEED) hc = np.zeros(len(A), bool); d2 = np.zeros(len(A), bool) docs = np.array([r[2] for r in A]) for i, r in enumerate(A): np_ = pool_size[r[0]] if np_ <= 0: # singleton CPA: no same-CPA pool, skip (canonical) continue cand = rng.choice(nb, size=np_, replace=True) # A not in BCD pool sf = np.frombuffer(r[3], np.float32).astype(np.float32) sf = sf/max(np.linalg.norm(sf), 1e-9) cosv = bf[cand] @ sf dist = POP[bdh[cand] ^ np.frombuffer(r[4], np.uint8)].sum(axis=1) mc, md = cosv.max(), int(dist.min()) hc[i] = (mc > 0.95) and (md <= 5) d2[i] = (mc > 0.95) and (md <= 15) k = int(hc.sum()); n = len(A); lo, hi = wilson(k, n) print(f'\n===== Firm A out-of-sample vs clean BCD pool (any-pair) =====') print(f' per-sig HC: {k/n:.6f} ({k}/{n}) [{lo:.5f},{hi:.5f}]') ds = defaultdict(list) for i in range(n): ds[docs[i]].append(i) dl = list(ds.keys()) dd2 = np.array([d2[ds[d]].any() for d in dl]) print(f' per-doc HC+MC: {dd2.mean():.6f} ({int(dd2.sum())}/{len(dl)})') rows = load() report([r for r in rows if r[1] in BIG4], 'ABCD (verify: per-sig HC~0.1102 / per-doc D2~0.3375)') report([r for r in rows if r[1] in BIG4 and r[1] != FIRM_A], 'BCD-only (verify codex: HC~0.0116 / doc HC~0.0226 / doc D2~0.1905)') report([r for r in rows if r[1] != FIRM_A], 'BCD + non-Big4') a_out_of_sample(rows)