#!/usr/bin/env python3 """Script 55: PRIMARY calibration on the clean pre-e-signature baseline BCD 2013-2019 (Firms B/C/D, fiscal years 2013-2019). Rationale: co-author interviews confirm B/C/D progressively adopted e-signature systems after 2020 (staggered timing), so 2013-2019 BCD is the construct-clean hand-signing baseline. Canonical retry-loop sampler (matches Scripts 43/45/52), any-pair. Reports the floor + Firm A (all years) scored out-of-sample against it, and BCD 2020+ scored against the same threshold. Read-only. """ import sqlite3 from collections import defaultdict, Counter import numpy as np DB = '/Volumes/NV2/PDF-Processing/signature-analysis/signature_analysis.db' FIRM_A = '勤業眾信聯合' BIG4 = ('勤業眾信聯合', '安侯建業聯合', '資誠聯合', '安永聯合') ALIAS = {'勤業眾信聯合': 'A', '安侯建業聯合': 'B', '資誠聯合': 'C', '安永聯合': 'D'} SEED = 42 N_BOOT = 1000 POP = np.array([bin(i).count('1') for i in range(256)], dtype=np.uint8) def wilson(k, n, z=1.96): if n == 0: return (None, None) p = k/n; d = 1+z*z/n; c = (p+z*z/(2*n))/d h = z*np.sqrt(p*(1-p)/n+z*z/(4*n*n))/d return (max(0.0, c-h), min(1.0, c+h)) def canon_sampler(rng, n, npool, same, all_idx): need = npool; cand = []; att = 0 while need > 0 and att < 10: draw = rng.choice(n, size=need*2, replace=True) ok = draw[~np.isin(draw, same)] cand.extend(ok[:need].tolist()); need -= len(ok[:need]); att += 1 if need > 0: pm = np.ones(n, bool); pm[same] = False cand.extend(rng.choice(all_idx[pm], size=need, replace=False).tolist()) return np.array(cand[:npool], dtype=np.int64) conn = sqlite3.connect(f'file:{DB}?mode=ro', uri=True) cur = conn.cursor() cur.execute("""SELECT s.assigned_accountant,a.firm,CAST(substr(s.year_month,1,4) AS INT), s.source_pdf,s.feature_vector,s.dhash_vector, s.max_similarity_to_same_accountant,s.min_dhash_independent FROM signatures s JOIN accountants a ON s.assigned_accountant=a.name WHERE a.firm IN (?,?,?,?) AND s.year_month IS NOT NULL AND s.feature_vector IS NOT NULL AND s.dhash_vector IS NOT NULL""", BIG4) rows = cur.fetchall() conn.close() def prep(rec): feats = np.stack([np.frombuffer(r[4], np.float32) for r in rec]).astype(np.float32) norms = np.linalg.norm(feats, axis=1, keepdims=True); norms[norms == 0] = 1.0 feats /= norms dh = np.stack([np.frombuffer(r[5], np.uint8) for r in rec]) return feats, dh def floor_on(baseline_rec, label): """Canonical per-sig/per-doc HC floor on a baseline population.""" feats, dh = prep(baseline_rec) n = len(baseline_rec) cpas = np.array([r[0] for r in baseline_rec]) firms = np.array([ALIAS[r[1]] for r in baseline_rec]) docs = np.array([r[3] for r in baseline_rec]) cidx = defaultdict(list) for i, c in enumerate(cpas): cidx[c].append(i) cidx = {c: np.array(v) for c, v in cidx.items()} psize = {c: len(v)-1 for c, v in cidx.items()} all_idx = np.arange(n) rng = np.random.default_rng(SEED) mx = np.zeros(n, np.float32); mn = np.full(n, 64, np.int32) for si in range(n): np_ = psize[cpas[si]] if np_ <= 0: continue cand = canon_sampler(rng, n, np_, cidx[cpas[si]], all_idx) cosv = feats[cand] @ feats[si] mx[si] = cosv.max(); mn[si] = int(POP[dh[cand] ^ dh[si]].sum(axis=1).min()) hc = (mx > 0.95) & (mn <= 5); d2 = (mx > 0.95) & (mn <= 15) k = int(hc.sum()) rng2 = np.random.default_rng(SEED+1); cl = list(cidx.keys()) bs = np.array([hc[np.concatenate([cidx[cl[i]] for i in rng2.choice(len(cl), len(cl), True)])].mean() for _ in range(N_BOOT)]) print(f'\n [{label}] n_sig={n:,}, CPAs={len(cidx)}') print(f' per-sig HC floor = {k/n:.4f} ({k}/{n}) CPA-boot95% [{np.percentile(bs,2.5):.4f},{np.percentile(bs,97.5):.4f}]') dd1 = defaultdict(bool); dd2 = defaultdict(bool); dfirm = {} for i in range(n): if hc[i]: dd1[docs[i]] = True if d2[i]: dd2[docs[i]] = True dfirm.setdefault(docs[i], []).append(firms[i]) dd1.setdefault(docs[i], False); dd2.setdefault(docs[i], False) dl = list(dd1.keys()); nd = len(dl) print(f' per-doc HC = {sum(dd1[d] for d in dl)/nd:.4f}; per-doc HC+MC = {sum(dd2[d] for d in dl)/nd:.4f} (n_doc={nd:,})') dom = {d: Counter(dfirm[d]).most_common(1)[0][0] for d in dl} for f in ['B', 'C', 'D']: ds = [d for d in dl if dom[d] == f] if ds: print(f' Firm {f} per-doc HC+MC: {sum(dd2[d] for d in ds)/len(ds):.4f} ({sum(dd2[d] for d in ds)}/{len(ds)})') return k/n def a_vs_baseline(baseline_rec, a_rec, label): bf, bdh = prep(baseline_rec); nb = len(baseline_rec) a_cpa = defaultdict(list) for i, r in enumerate(a_rec): a_cpa[r[0]].append(i) psize = {c: len(v)-1 for c, v in a_cpa.items()} rng = np.random.default_rng(SEED) hc = np.zeros(len(a_rec), bool) for i, r in enumerate(a_rec): np_ = psize[r[0]] if np_ <= 0: continue cand = rng.integers(0, nb, size=np_) sf = np.frombuffer(r[4], np.float32).astype(np.float32); sf /= max(np.linalg.norm(sf), 1e-9) cosv = bf[cand] @ sf if (cosv > 0.95).any(): dist = POP[bdh[cand] ^ np.frombuffer(r[5], np.uint8)].sum(axis=1) hc[i] = bool(((cosv > 0.95) & (dist <= 5)).any()) k = int(hc.sum()); n = len(a_rec); lo, hi = wilson(k, n) print(f' [{label}] Firm A (all yrs) vs BCD-2013-2019 pool: per-sig HC = {k/n:.4f} ({k}/{n}) [{lo:.5f},{hi:.5f}]') bcd_pre = [r for r in rows if r[1] != FIRM_A and 2013 <= r[2] <= 2019] bcd_post = [r for r in rows if r[1] != FIRM_A and r[2] >= 2020] A_all = [r for r in rows if r[1] == FIRM_A] print('=== PRIMARY floor: BCD 2013-2019 ===') fl = floor_on(bcd_pre, 'BCD 2013-2019 (PRIMARY)') print('\n=== Firm A scored against the BCD-2013-2019 threshold ===') a_vs_baseline(bcd_pre, A_all, 'A out-of-sample') A_obs = [r for r in A_all if r[6] is not None and r[7] is not None] ak = sum(1 for r in A_obs if r[6] > 0.95 and r[7] <= 5) print(f' Firm A observed (all yrs, own pools): per-sig HC = {ak/len(A_obs):.4f} -> {ak/len(A_obs)/fl:.0f}x the BCD-2013-2019 floor') print('\n=== (optional) BCD 2020+ floor, same method (may be inflated by e-signing) ===') floor_on(bcd_post, 'BCD 2020-2023 (post e-signing)')