#!/usr/bin/env python3 """ Script 21: Expanded Validation with Larger Negative Anchor + Held-out Firm A ============================================================================ Addresses three weaknesses of Script 19's pixel-identity validation: (a) Negative anchor of n=35 (cosine<0.70) is too small to give meaningful FAR confidence intervals. (b) Pixel-identical positive anchor is a CONSERVATIVE SUBSET of the true non-hand-signed class, not representative of the broader positive class. Recall against this subset is therefore a lower-bound calibration check, not a generalizable recall estimate. (c) Firm A is both the calibration anchor and a validation anchor (circular). The 70/30 fold split makes within-Firm-A sampling variance visible without claiming external validation. This script: 1. Constructs a large inter-CPA negative anchor (~50,000 pairs) by randomly sampling pairs from different CPAs. Inter-CPA high similarity is highly unlikely to arise from legitimate signing. 2. Splits Firm A CPAs 70/30 into CALIBRATION and HELDOUT folds. Re-derives signature-level thresholds from the calibration fold only, then reports capture rates on the heldout fold. 3. Computes 95% Wilson confidence intervals for FAR at canonical thresholds (Table X in the manuscript). Legacy / diagnostic-only metrics: Helper functions for EER, Precision, Recall, F1, and FRR remain in this script for backward compatibility. The manuscript intentionally OMITS these metrics from Table X because the byte-identical positive anchor has cosine ~= 1 by construction (so FRR / EER are arithmetic tautologies) and because positive and negative anchors are constructed from different sampling units, making prevalence arbitrary (so Precision and F1 have no meaningful population interpretation). Only FAR against the large inter-CPA negative anchor is reported as a biometric metric in the paper. Output: reports/expanded_validation/expanded_validation_report.md reports/expanded_validation/expanded_validation_results.json """ import sqlite3 import json import numpy as np from pathlib import Path from datetime import datetime from scipy.stats import norm DB = '/Volumes/NV2/PDF-Processing/signature-analysis/signature_analysis.db' OUT = Path('/Volumes/NV2/PDF-Processing/signature-analysis/reports/' 'expanded_validation') OUT.mkdir(parents=True, exist_ok=True) FIRM_A = '勤業眾信聯合' N_INTER_PAIRS = 50_000 SEED = 42 def wilson_ci(k, n, alpha=0.05): if n == 0: return (0.0, 1.0) z = norm.ppf(1 - alpha / 2) phat = k / n denom = 1 + z * z / n center = (phat + z * z / (2 * n)) / denom pm = z * np.sqrt(phat * (1 - phat) / n + z * z / (4 * n * n)) / denom return (max(0.0, center - pm), min(1.0, center + pm)) def load_signatures(): conn = sqlite3.connect(DB) cur = conn.cursor() cur.execute(''' SELECT s.signature_id, s.assigned_accountant, a.firm, s.max_similarity_to_same_accountant, s.min_dhash_independent, s.pixel_identical_to_closest FROM signatures s LEFT JOIN accountants a ON s.assigned_accountant = a.name WHERE s.max_similarity_to_same_accountant IS NOT NULL ''') rows = cur.fetchall() conn.close() return rows def load_feature_vectors_sample(n=2000): """Load feature vectors for inter-CPA negative-anchor sampling.""" conn = sqlite3.connect(DB) cur = conn.cursor() cur.execute(''' SELECT signature_id, assigned_accountant, feature_vector FROM signatures WHERE feature_vector IS NOT NULL AND assigned_accountant IS NOT NULL ORDER BY RANDOM() LIMIT ? ''', (n,)) rows = cur.fetchall() conn.close() out = [] for r in rows: vec = np.frombuffer(r[2], dtype=np.float32) out.append({'sig_id': r[0], 'accountant': r[1], 'feature': vec}) return out def build_inter_cpa_negative(sample, n_pairs=N_INTER_PAIRS, seed=SEED): """Sample random cross-CPA pairs; return their cosine similarities.""" rng = np.random.default_rng(seed) n = len(sample) feats = np.stack([s['feature'] for s in sample]) accts = np.array([s['accountant'] for s in sample]) sims = [] tries = 0 while len(sims) < n_pairs and tries < n_pairs * 10: i = rng.integers(n) j = rng.integers(n) if i == j or accts[i] == accts[j]: tries += 1 continue sim = float(feats[i] @ feats[j]) sims.append(sim) tries += 1 return np.array(sims) def classification_metrics(y_true, y_pred): y_true = np.asarray(y_true).astype(int) y_pred = np.asarray(y_pred).astype(int) tp = int(np.sum((y_true == 1) & (y_pred == 1))) fp = int(np.sum((y_true == 0) & (y_pred == 1))) fn = int(np.sum((y_true == 1) & (y_pred == 0))) tn = int(np.sum((y_true == 0) & (y_pred == 0))) p_den = max(tp + fp, 1) r_den = max(tp + fn, 1) far_den = max(fp + tn, 1) frr_den = max(fn + tp, 1) precision = tp / p_den recall = tp / r_den f1 = (2 * precision * recall / (precision + recall) if (precision + recall) > 0 else 0.0) far = fp / far_den frr = fn / frr_den far_ci = wilson_ci(fp, far_den) frr_ci = wilson_ci(fn, frr_den) return { 'tp': tp, 'fp': fp, 'fn': fn, 'tn': tn, 'precision': float(precision), 'recall': float(recall), 'f1': float(f1), 'far': float(far), 'frr': float(frr), 'far_ci95': [float(x) for x in far_ci], 'frr_ci95': [float(x) for x in frr_ci], 'n_pos': int(tp + fn), 'n_neg': int(tn + fp), } def sweep_threshold(scores, y, direction, thresholds): out = [] for t in thresholds: if direction == 'above': y_pred = (scores > t).astype(int) else: y_pred = (scores < t).astype(int) m = classification_metrics(y, y_pred) m['threshold'] = float(t) out.append(m) return out def find_eer(sweep): thr = np.array([s['threshold'] for s in sweep]) far = np.array([s['far'] for s in sweep]) frr = np.array([s['frr'] for s in sweep]) diff = far - frr signs = np.sign(diff) changes = np.where(np.diff(signs) != 0)[0] if len(changes) == 0: idx = int(np.argmin(np.abs(diff))) return {'threshold': float(thr[idx]), 'far': float(far[idx]), 'frr': float(frr[idx]), 'eer': float(0.5 * (far[idx] + frr[idx]))} i = int(changes[0]) w = abs(diff[i]) / (abs(diff[i]) + abs(diff[i + 1]) + 1e-12) thr_i = (1 - w) * thr[i] + w * thr[i + 1] far_i = (1 - w) * far[i] + w * far[i + 1] frr_i = (1 - w) * frr[i] + w * frr[i + 1] return {'threshold': float(thr_i), 'far': float(far_i), 'frr': float(frr_i), 'eer': float(0.5 * (far_i + frr_i))} def main(): print('=' * 70) print('Script 21: Expanded Validation') print('=' * 70) rows = load_signatures() print(f'\nLoaded {len(rows):,} signatures') sig_ids = [r[0] for r in rows] accts = [r[1] for r in rows] firms = [r[2] or '(unknown)' for r in rows] cos = np.array([r[3] for r in rows], dtype=float) dh = np.array([-1 if r[4] is None else r[4] for r in rows], dtype=float) pix = np.array([r[5] or 0 for r in rows], dtype=int) firm_a_mask = np.array([f == FIRM_A for f in firms]) print(f'Firm A signatures: {int(firm_a_mask.sum()):,}') # --- (1) INTER-CPA NEGATIVE ANCHOR --- print(f'\n[1] Building inter-CPA negative anchor ({N_INTER_PAIRS} pairs)...') sample = load_feature_vectors_sample(n=3000) inter_cos = build_inter_cpa_negative(sample, n_pairs=N_INTER_PAIRS) print(f' inter-CPA cos: mean={inter_cos.mean():.4f}, ' f'p95={np.percentile(inter_cos, 95):.4f}, ' f'p99={np.percentile(inter_cos, 99):.4f}, ' f'max={inter_cos.max():.4f}') # --- (2) POSITIVES --- # Pixel-identical (gold) + optional Firm A extension pos_pix_mask = pix == 1 n_pix = int(pos_pix_mask.sum()) print(f'\n[2] Positive anchors:') print(f' pixel-identical signatures: {n_pix}') # Build negative anchor scores = inter-CPA cosine distribution # Positive anchor scores = pixel-identical signatures' max same-CPA cosine # NB: the two distributions are not drawn from the same random variable # (one is intra-CPA max, the other is inter-CPA random), so we treat the # inter-CPA distribution as a negative reference for threshold sweep. # Combined labeled set: positives=pixel-identical sigs' max cosine, # negatives=inter-CPA random pair cosines. pos_scores = cos[pos_pix_mask] neg_scores = inter_cos y = np.concatenate([np.ones(len(pos_scores)), np.zeros(len(neg_scores))]) scores = np.concatenate([pos_scores, neg_scores]) # Sweep thresholds thr = np.linspace(0.30, 1.00, 141) sweep = sweep_threshold(scores, y, 'above', thr) eer = find_eer(sweep) print(f'\n[3] Cosine EER (pos=pixel-identical, neg=inter-CPA n={len(inter_cos)}):') print(f" threshold={eer['threshold']:.4f}, EER={eer['eer']:.4f}") # Canonical threshold evaluations with Wilson CIs canonical = {} for tt in [0.70, 0.80, 0.837, 0.90, 0.945, 0.95, 0.973, 0.979]: y_pred = (scores > tt).astype(int) m = classification_metrics(y, y_pred) m['threshold'] = float(tt) canonical[f'cos>{tt:.3f}'] = m print(f" @ {tt:.3f}: P={m['precision']:.3f}, R={m['recall']:.3f}, " f"FAR={m['far']:.4f} (CI95={m['far_ci95'][0]:.4f}-" f"{m['far_ci95'][1]:.4f}), FRR={m['frr']:.4f}") # --- (3) HELD-OUT FIRM A --- print('\n[4] Held-out Firm A 70/30 split:') rng = np.random.default_rng(SEED) firm_a_accts = sorted(set(a for a, f in zip(accts, firms) if f == FIRM_A)) rng.shuffle(firm_a_accts) n_calib = int(0.7 * len(firm_a_accts)) calib_accts = set(firm_a_accts[:n_calib]) heldout_accts = set(firm_a_accts[n_calib:]) print(f' Calibration fold CPAs: {len(calib_accts)}, ' f'heldout fold CPAs: {len(heldout_accts)}') calib_mask = np.array([a in calib_accts for a in accts]) heldout_mask = np.array([a in heldout_accts for a in accts]) print(f' Calibration sigs: {int(calib_mask.sum())}, ' f'heldout sigs: {int(heldout_mask.sum())}') # Derive per-signature thresholds from calibration fold: # - Firm A cos median, 1st-pct, 5th-pct # - Firm A dHash median, 95th-pct calib_cos = cos[calib_mask] calib_dh = dh[calib_mask] calib_dh = calib_dh[calib_dh >= 0] cal_cos_med = float(np.median(calib_cos)) cal_cos_p1 = float(np.percentile(calib_cos, 1)) cal_cos_p5 = float(np.percentile(calib_cos, 5)) cal_dh_med = float(np.median(calib_dh)) cal_dh_p95 = float(np.percentile(calib_dh, 95)) print(f' Calib Firm A cos: median={cal_cos_med:.4f}, P1={cal_cos_p1:.4f}, P5={cal_cos_p5:.4f}') print(f' Calib Firm A dHash: median={cal_dh_med:.2f}, P95={cal_dh_p95:.2f}') # Apply canonical rules to heldout fold held_cos = cos[heldout_mask] held_dh = dh[heldout_mask] held_dh_valid = held_dh >= 0 held_rates = {} for tt in [0.837, 0.945, 0.95, cal_cos_p5]: rate = float(np.mean(held_cos > tt)) k = int(np.sum(held_cos > tt)) lo, hi = wilson_ci(k, len(held_cos)) held_rates[f'cos>{tt:.4f}'] = { 'rate': rate, 'k': k, 'n': int(len(held_cos)), 'wilson95': [float(lo), float(hi)], } for tt in [5, 8, 15, cal_dh_p95]: rate = float(np.mean(held_dh[held_dh_valid] <= tt)) k = int(np.sum(held_dh[held_dh_valid] <= tt)) lo, hi = wilson_ci(k, int(held_dh_valid.sum())) held_rates[f'dh_indep<={tt:.2f}'] = { 'rate': rate, 'k': k, 'n': int(held_dh_valid.sum()), 'wilson95': [float(lo), float(hi)], } # Dual rule dual_mask = (held_cos > 0.95) & (held_dh >= 0) & (held_dh <= 8) rate = float(np.mean(dual_mask)) k = int(dual_mask.sum()) lo, hi = wilson_ci(k, len(dual_mask)) held_rates['cos>0.95 AND dh<=8'] = { 'rate': rate, 'k': k, 'n': int(len(dual_mask)), 'wilson95': [float(lo), float(hi)], } print(' Heldout Firm A rates:') for k, v in held_rates.items(): print(f' {k}: {v["rate"]*100:.2f}% ' f'[{v["wilson95"][0]*100:.2f}, {v["wilson95"][1]*100:.2f}]') # --- Save --- summary = { 'generated_at': datetime.now().isoformat(), 'n_signatures': len(rows), 'n_firm_a': int(firm_a_mask.sum()), 'n_pixel_identical': n_pix, 'n_inter_cpa_negatives': len(inter_cos), 'inter_cpa_cos_stats': { 'mean': float(inter_cos.mean()), 'p95': float(np.percentile(inter_cos, 95)), 'p99': float(np.percentile(inter_cos, 99)), 'max': float(inter_cos.max()), }, 'cosine_eer': eer, 'canonical_thresholds': canonical, 'held_out_firm_a': { 'calibration_cpas': len(calib_accts), 'heldout_cpas': len(heldout_accts), 'calibration_sig_count': int(calib_mask.sum()), 'heldout_sig_count': int(heldout_mask.sum()), 'calib_cos_median': cal_cos_med, 'calib_cos_p1': cal_cos_p1, 'calib_cos_p5': cal_cos_p5, 'calib_dh_median': cal_dh_med, 'calib_dh_p95': cal_dh_p95, 'heldout_rates': held_rates, }, } with open(OUT / 'expanded_validation_results.json', 'w') as f: json.dump(summary, f, indent=2, ensure_ascii=False) print(f'\nJSON: {OUT / "expanded_validation_results.json"}') # Markdown md = [ '# Expanded Validation Report', f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}", '', '## 1. Inter-CPA Negative Anchor', '', f'* N random cross-CPA pairs sampled: {len(inter_cos):,}', f'* Inter-CPA cosine: mean={inter_cos.mean():.4f}, ' f'P95={np.percentile(inter_cos, 95):.4f}, ' f'P99={np.percentile(inter_cos, 99):.4f}, max={inter_cos.max():.4f}', '', 'This anchor is a meaningful negative set because inter-CPA pairs', 'cannot arise from legitimate reuse of a single signer\'s image.', '', '## 2. Cosine Threshold Sweep (pos=pixel-identical, neg=inter-CPA)', '', f"EER threshold: {eer['threshold']:.4f}, EER: {eer['eer']:.4f}", '', '| Threshold | Precision | Recall | F1 | FAR | FAR 95% CI | FRR |', '|-----------|-----------|--------|----|-----|------------|-----|', ] for k, m in canonical.items(): md.append( f"| {m['threshold']:.3f} | {m['precision']:.3f} | " f"{m['recall']:.3f} | {m['f1']:.3f} | {m['far']:.4f} | " f"[{m['far_ci95'][0]:.4f}, {m['far_ci95'][1]:.4f}] | " f"{m['frr']:.4f} |" ) md += [ '', '## 3. Held-out Firm A 70/30 Validation', '', f'* Firm A CPAs randomly split by CPA (not by signature) into', f' calibration (n={len(calib_accts)}) and heldout (n={len(heldout_accts)}).', f'* Calibration Firm A signatures: {int(calib_mask.sum()):,}. ' f'Heldout signatures: {int(heldout_mask.sum()):,}.', '', '### Calibration-fold anchor statistics (for thresholds)', '', f'* Firm A cosine: median = {cal_cos_med:.4f}, ' f'P1 = {cal_cos_p1:.4f}, P5 = {cal_cos_p5:.4f}', f'* Firm A dHash (independent min): median = {cal_dh_med:.2f}, ' f'P95 = {cal_dh_p95:.2f}', '', '### Heldout-fold capture rates (with Wilson 95% CIs)', '', '| Rule | Heldout rate | Wilson 95% CI | k / n |', '|------|--------------|---------------|-------|', ] for k, v in held_rates.items(): md.append( f"| {k} | {v['rate']*100:.2f}% | " f"[{v['wilson95'][0]*100:.2f}%, {v['wilson95'][1]*100:.2f}%] | " f"{v['k']}/{v['n']} |" ) md += [ '', '## Interpretation', '', 'The inter-CPA negative anchor (N ~50,000) gives tight confidence', 'intervals on FAR at each threshold, addressing the small-negative', 'anchor limitation of Script 19 (n=35).', '', 'The 70/30 Firm A split breaks the circular-validation concern of', 'using the same calibration anchor for threshold derivation and', 'validation. Calibration-fold percentiles derive the thresholds;', 'heldout-fold rates with Wilson 95% CIs show how those thresholds', 'generalize to Firm A CPAs that did not contribute to calibration.', ] (OUT / 'expanded_validation_report.md').write_text('\n'.join(md), encoding='utf-8') print(f'Report: {OUT / "expanded_validation_report.md"}') if __name__ == '__main__': main()