Files
pdf_signature_extraction/signature_analysis/40b_inter_cpa_far_sweep.py
gbanyan d4f370bd5e Add Scripts 39b/c/d/e + 40b + 43: anchor-based FAR diagnostics
Spike checkpoint in response to codex rounds 28-30 review:

- 39b/c: signature-level dip test on Big-4 and non-Big-4 marginals
- 39d: dHash discrete-value robustness (raw vs jittered + histogram
  valleys + firm residualization); confirms within-firm dHash dip
  rejection is integer-mass-point artefact
- 39e: dHash firm-residualized + jittered 2x2 factorial decomposition;
  confirms Big-4 pooled dh "multimodality" is composition + integer
  artefact (centered + jittered p=0.35, 0/5 seeds reject)
- 40b: inter-CPA per-pair FAR sweep (cos + dh marginal + joint +
  conditional); replicates v3 cos>0.95 FAR=0.0006 and provides
  v4-new dh FAR curve
- 43: pool-normalized per-signature FAR (codex round-30 fix for
  per-pair vs per-signature conflation); per-sig FAR for deployed
  any-pair rule = 11.02%, per-firm structure shows Firm A 20% vs
  B/C/D <1%

These scripts replace the distributional path (K=3 mixture / dip /
antimode) with anchor-based threshold derivation. Companion
artefacts in reports/v4_big4/{signature_level_diptest,
midsmall_signature_diptest, dhash_discrete_robustness,
inter_cpa_far_sweep, pool_normalized_far}/.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-13 14:08:49 +08:00

414 lines
16 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
Script 40b: Inter-CPA FAR Sweep for cos and dHash (joint + marginal)
=====================================================================
After codex round-29 destroyed the distributional path to thresholds
(K=3 mixture / dip / antimode shown composition-driven by Scripts
39b39e), v4.0 pivots to an anchor-based threshold framework:
empirically derived from inter-CPA negative anchor specificity.
Inter-CPA pairs (different CPAs, all-firm) are the negative anchor:
they are by definition not same-CPA replications, and the user's
within-CPA mechanism-transition concern (a CPA might switch from
hand-sign to template mid-career) does not enter the inter-CPA
calibration because each sampled pair crosses CPA boundaries.
This script samples a large number of inter-CPA pairs and computes
both descriptors per pair (cosine via feature_vector dot product;
Hamming distance via dhash_vector XOR). It then sweeps:
1. FAR(cos > k) across k in [0.80, 0.99]
2. FAR(dHash <= k) across k in [0, 20]
3. Joint FAR(cos > 0.95 AND dHash <= k) for k in [0, 20]
4. Conditional FAR(dHash <= k | cos > 0.95) -- the v3 inherited
rule's marginal specificity contribution from dHash
Outputs:
reports/v4_big4/inter_cpa_far_sweep/
far_sweep_results.json
far_sweep_report.md
Sample size: 500,000 inter-CPA pairs (matches v3 Script 10
convention). Big-4-only and full-corpus variants both reported.
"""
import json
import sqlite3
import numpy as np
from pathlib import Path
from datetime import datetime
from collections import defaultdict
DB = '/Volumes/NV2/PDF-Processing/signature-analysis/signature_analysis.db'
OUT = Path('/Volumes/NV2/PDF-Processing/signature-analysis/reports/'
'v4_big4/inter_cpa_far_sweep')
OUT.mkdir(parents=True, exist_ok=True)
BIG4 = ('勤業眾信聯合', '安侯建業聯合', '資誠聯合', '安永聯合')
ALIAS = {'勤業眾信聯合': 'Firm A',
'安侯建業聯合': 'Firm B',
'資誠聯合': 'Firm C',
'安永聯合': 'Firm D'}
N_PAIRS = 500_000
SEED = 42
COS_GRID = [0.80, 0.83, 0.85, 0.87, 0.89, 0.90, 0.91, 0.92, 0.93, 0.94,
0.945, 0.95, 0.955, 0.96, 0.965, 0.97, 0.975, 0.98, 0.985,
0.99]
DH_GRID = list(range(0, 21))
def hamming_64bit(a_bytes, b_bytes):
"""Hamming distance between two 8-byte (64-bit) dHash byte strings."""
a = int.from_bytes(a_bytes, 'big')
b = int.from_bytes(b_bytes, 'big')
return (a ^ b).bit_count()
def load_signatures():
conn = sqlite3.connect(f'file:{DB}?mode=ro', uri=True)
cur = conn.cursor()
cur.execute('''
SELECT s.signature_id, s.assigned_accountant, a.firm,
s.feature_vector, s.dhash_vector
FROM signatures s
JOIN accountants a ON s.assigned_accountant = a.name
WHERE s.assigned_accountant IS NOT NULL
AND s.feature_vector IS NOT NULL
AND s.dhash_vector IS NOT NULL
AND a.firm IS NOT NULL
''')
rows = cur.fetchall()
conn.close()
return rows
def sample_inter_cpa_pairs(rows, n_pairs, seed, restrict_to_big4=False):
"""Sample inter-CPA pairs and compute (cos, dh) for each."""
rng = np.random.default_rng(seed)
if restrict_to_big4:
rows = [r for r in rows if r[2] in BIG4]
scope = 'big4_only'
else:
scope = 'all_firms'
print(f' [{scope}] {len(rows):,} signatures available')
by_acct = defaultdict(list)
for r in rows:
by_acct[r[1]].append(r)
accountants = list(by_acct.keys())
n_acct = len(accountants)
print(f' [{scope}] {n_acct} accountants')
features = {a: np.stack(
[np.frombuffer(r[3], dtype=np.float32) for r in by_acct[a]]
) for a in accountants}
dhashes = {a: [r[4] for r in by_acct[a]] for a in accountants}
cos_vals = np.empty(n_pairs, dtype=np.float32)
dh_vals = np.empty(n_pairs, dtype=np.int32)
n_done = 0
for _ in range(n_pairs):
i, j = rng.choice(n_acct, 2, replace=False)
a1, a2 = accountants[i], accountants[j]
n1, n2 = len(by_acct[a1]), len(by_acct[a2])
k1 = int(rng.integers(0, n1))
k2 = int(rng.integers(0, n2))
f1 = features[a1][k1]
f2 = features[a2][k2]
cos = float(f1 @ f2)
d = hamming_64bit(dhashes[a1][k1], dhashes[a2][k2])
cos_vals[n_done] = cos
dh_vals[n_done] = d
n_done += 1
return scope, cos_vals, dh_vals
def wilson_ci(k, n, z=1.96):
if n == 0:
return (None, None)
phat = k / n
denom = 1 + z * z / n
centre = (phat + z * z / (2 * n)) / denom
half = z * np.sqrt(phat * (1 - phat) / n + z * z / (4 * n * n)) / denom
return (max(0.0, centre - half), min(1.0, centre + half))
def far_at_cos(cos_vals, k):
n = len(cos_vals)
hits = int((cos_vals > k).sum())
lo, hi = wilson_ci(hits, n)
return {'k': float(k), 'n': n, 'hits': hits,
'far': hits / n, 'ci95_lo': lo, 'ci95_hi': hi}
def far_at_dh_le(dh_vals, k):
n = len(dh_vals)
hits = int((dh_vals <= k).sum())
lo, hi = wilson_ci(hits, n)
return {'k': int(k), 'n': n, 'hits': hits,
'far': hits / n, 'ci95_lo': lo, 'ci95_hi': hi}
def joint_far(cos_vals, dh_vals, cos_k, dh_k):
n = len(cos_vals)
hits = int(((cos_vals > cos_k) & (dh_vals <= dh_k)).sum())
lo, hi = wilson_ci(hits, n)
return {'cos_k': float(cos_k), 'dh_k': int(dh_k),
'n': n, 'hits': hits,
'far': hits / n, 'ci95_lo': lo, 'ci95_hi': hi}
def cond_far(cos_vals, dh_vals, cos_k, dh_k):
"""FAR(dh<=k | cos>cos_k)"""
cos_mask = cos_vals > cos_k
n_cond = int(cos_mask.sum())
if n_cond == 0:
return {'cos_k': float(cos_k), 'dh_k': int(dh_k),
'n_cond': 0, 'hits': 0,
'cond_far': None, 'ci95_lo': None, 'ci95_hi': None}
hits = int(((dh_vals <= dh_k) & cos_mask).sum())
lo, hi = wilson_ci(hits, n_cond)
return {'cos_k': float(cos_k), 'dh_k': int(dh_k),
'n_cond': n_cond, 'hits': hits,
'cond_far': hits / n_cond, 'ci95_lo': lo, 'ci95_hi': hi}
def invert_far_target(curve_entries, target, key='far'):
"""Return the entries bracketing the target FAR (linear scan)."""
sorted_e = sorted(curve_entries, key=lambda e: e[key])
for e in sorted_e:
if e[key] <= target:
best = e
else:
break
return best if sorted_e and sorted_e[0][key] <= target else None
def _fmt(x, fmt='.5f'):
return 'None' if x is None else format(x, fmt)
def run_scope(rows, scope_name, restrict_to_big4):
print(f'\n== Scope: {scope_name} ==')
scope_label, cos_vals, dh_vals = sample_inter_cpa_pairs(
rows, N_PAIRS, SEED, restrict_to_big4=restrict_to_big4)
print(f' Sampled {len(cos_vals):,} inter-CPA pairs')
print(f' cos: mean={cos_vals.mean():.4f}, '
f'median={np.median(cos_vals):.4f}, '
f'std={cos_vals.std():.4f}')
print(f' dh : mean={dh_vals.mean():.4f}, '
f'median={np.median(dh_vals):.4f}, '
f'std={dh_vals.std():.4f}')
cos_curve = [far_at_cos(cos_vals, k) for k in COS_GRID]
dh_curve = [far_at_dh_le(dh_vals, k) for k in DH_GRID]
joint_curve_95 = [joint_far(cos_vals, dh_vals, 0.95, k) for k in DH_GRID]
cond_curve_95 = [cond_far(cos_vals, dh_vals, 0.95, k) for k in DH_GRID]
print('\n [Cos FAR sweep]')
for e in cos_curve:
print(f' cos > {e["k"]:.3f}: FAR={_fmt(e["far"])}, '
f'CI=[{_fmt(e["ci95_lo"])}, {_fmt(e["ci95_hi"])}], '
f'hits={e["hits"]}/{e["n"]}')
print('\n [dHash FAR sweep]')
for e in dh_curve:
print(f' dh <= {e["k"]:2d}: FAR={_fmt(e["far"])}, '
f'CI=[{_fmt(e["ci95_lo"])}, {_fmt(e["ci95_hi"])}], '
f'hits={e["hits"]}/{e["n"]}')
print('\n [Joint FAR (cos > 0.95 AND dh <= k)]')
for e in joint_curve_95:
print(f' dh <= {e["dh_k"]:2d}: FAR={_fmt(e["far"])}, '
f'hits={e["hits"]}/{e["n"]}')
print('\n [Conditional FAR(dh <= k | cos > 0.95)]')
for e in cond_curve_95:
cf = e['cond_far']
print(f' dh <= {e["dh_k"]:2d}: P(dh<=k | cos>0.95)='
f'{_fmt(cf) if cf is not None else "n/a"}, '
f'hits={e["hits"]}/{e["n_cond"]}')
targets = [0.005, 0.001, 0.0005, 0.0001]
inv = {}
for t in targets:
inv[f'cos_far_<=_{t}'] = invert_far_target(cos_curve, t, 'far')
inv[f'dh_far_<=_{t}'] = invert_far_target(dh_curve, t, 'far')
inv[f'joint_at_cos95_far_<=_{t}'] = invert_far_target(
joint_curve_95, t, 'far')
print('\n [Threshold inversion]')
for tgt in targets:
e = inv[f'cos_far_<=_{tgt}']
if e is not None:
print(f' FAR <= {tgt}: max cos threshold with FAR<=tgt is '
f'cos > {e["k"]:.3f} (FAR={e["far"]:.5f})')
e = inv[f'dh_far_<=_{tgt}']
if e is not None:
print(f' FAR <= {tgt}: max dh threshold with FAR<=tgt is '
f'dh <= {e["k"]} (FAR={e["far"]:.5f})')
e = inv[f'joint_at_cos95_far_<=_{tgt}']
if e is not None:
print(f' FAR <= {tgt}: under cos>0.95, max dh threshold '
f'with joint FAR<=tgt is dh <= {e["dh_k"]} '
f'(joint FAR={e["far"]:.5f})')
return {
'scope': scope_label,
'n_pairs': int(len(cos_vals)),
'cos_summary': {
'mean': float(cos_vals.mean()),
'median': float(np.median(cos_vals)),
'std': float(cos_vals.std()),
'p99': float(np.percentile(cos_vals, 99)),
'p999': float(np.percentile(cos_vals, 99.9)),
'max': float(cos_vals.max()),
},
'dh_summary': {
'mean': float(dh_vals.mean()),
'median': float(np.median(dh_vals)),
'std': float(dh_vals.std()),
'p01': float(np.percentile(dh_vals, 1)),
'p001': float(np.percentile(dh_vals, 0.1)),
'min': int(dh_vals.min()),
},
'cos_far_curve': cos_curve,
'dh_far_curve': dh_curve,
'joint_far_at_cos95_curve': joint_curve_95,
'cond_far_at_cos95_curve': cond_curve_95,
'threshold_inversions': inv,
}
def main():
print('=' * 72)
print('Script 40b: Inter-CPA FAR Sweep (cos + dHash, joint + marginal)')
print('=' * 72)
rows = load_signatures()
print(f'\nLoaded {len(rows):,} signatures (full corpus)')
results = {
'meta': {
'script': '40b',
'timestamp': datetime.now().isoformat(timespec='seconds'),
'n_pairs_sampled': N_PAIRS,
'seed': SEED,
'note': ('Inter-CPA pair-level FAR sweep for cos and dHash. '
'Anchor-based threshold derivation; replaces '
'distributional path attacked in codex round-29.'),
},
'scopes': {},
}
results['scopes']['big4_only'] = run_scope(
rows, 'Big-4 only', restrict_to_big4=True)
results['scopes']['all_firms'] = run_scope(
rows, 'All firms', restrict_to_big4=False)
json_path = OUT / 'far_sweep_results.json'
json_path.write_text(json.dumps(results, indent=2, ensure_ascii=False),
encoding='utf-8')
print(f'\n[json] {json_path}')
md = [
'# Inter-CPA FAR Sweep (Script 40b)',
'',
f'Generated: {results["meta"]["timestamp"]}',
f'Inter-CPA pair samples per scope: {N_PAIRS:,}; seed: {SEED}',
'',
('Anchor-based threshold derivation. For each scope (Big-4 only '
'or all firms), sample random inter-CPA pairs and compute '
'cosine + Hamming distance per pair. Report False Acceptance '
'Rates (FAR) at various thresholds; invert FAR target to '
'derive thresholds with empirical specificity guarantees.'),
'',
]
for scope in ['big4_only', 'all_firms']:
s = results['scopes'][scope]
md += [f'## Scope: {scope} ({s["n_pairs"]:,} pairs)', '',
'### Cosine FAR curve', '',
'| cos > k | FAR | 95% CI | hits / n |',
'|---|---|---|---|']
for e in s['cos_far_curve']:
md.append(f'| {e["k"]:.3f} | {_fmt(e["far"])} | '
f'[{_fmt(e["ci95_lo"])}, {_fmt(e["ci95_hi"])}] | '
f'{e["hits"]:,} / {e["n"]:,} |')
md += ['', '### dHash FAR curve', '',
'| dh <= k | FAR | 95% CI | hits / n |',
'|---|---|---|---|']
for e in s['dh_far_curve']:
md.append(f'| {e["k"]:2d} | {_fmt(e["far"])} | '
f'[{_fmt(e["ci95_lo"])}, {_fmt(e["ci95_hi"])}] | '
f'{e["hits"]:,} / {e["n"]:,} |')
md += ['', '### Joint FAR (cos > 0.95 AND dh <= k)', '',
'| dh <= k | Joint FAR | hits / n |',
'|---|---|---|']
for e in s['joint_far_at_cos95_curve']:
md.append(f'| {e["dh_k"]:2d} | {_fmt(e["far"])} | '
f'{e["hits"]:,} / {e["n"]:,} |')
md += ['',
'### Conditional FAR(dh <= k | cos > 0.95)',
'',
'Among inter-CPA pairs that already exceed cos > 0.95, '
'what fraction also have dh <= k? This quantifies '
"dHash's marginal specificity contribution given the cos "
"gate is already applied.",
'',
'| dh <= k | Conditional FAR | hits / n_cond |',
'|---|---|---|']
for e in s['cond_far_at_cos95_curve']:
cf = e['cond_far']
md.append(f'| {e["dh_k"]:2d} | '
f'{_fmt(cf) if cf is not None else "n/a"} | '
f'{e["hits"]:,} / {e["n_cond"]:,} |')
md += ['', '### Threshold inversion', '',
'| FAR target | cos thresh | dh thresh | joint dh thresh '
'(under cos>0.95) |',
'|---|---|---|---|']
for tgt in [0.005, 0.001, 0.0005, 0.0001]:
e_c = s['threshold_inversions'].get(f'cos_far_<=_{tgt}')
e_d = s['threshold_inversions'].get(f'dh_far_<=_{tgt}')
e_j = s['threshold_inversions'].get(
f'joint_at_cos95_far_<=_{tgt}')
c_str = (f'cos > {e_c["k"]:.3f} (FAR={e_c["far"]:.5f})'
if e_c else 'unachievable')
d_str = (f'dh <= {e_d["k"]} (FAR={e_d["far"]:.5f})'
if e_d else 'unachievable')
j_str = (f'dh <= {e_j["dh_k"]} (FAR={e_j["far"]:.5f})'
if e_j else 'unachievable')
md.append(f'| {tgt} | {c_str} | {d_str} | {j_str} |')
md.append('')
md += [
'## Interpretation',
'',
('- The cosine FAR curve replicates and extends v3.x §IV-I '
'Table X (which reported FAR=0.0005 at cos>0.95 from a '
'similar but smaller-sample inter-CPA negative anchor).'),
('- The dHash FAR curve is the v4 contribution: prior v3.x '
'work used dh<=5 by convention without an empirical '
'specificity derivation. This script derives a specificity '
"target → dh threshold mapping."),
('- The conditional FAR(dh<=k | cos>0.95) curve tells us '
'whether dHash adds specificity given the cos gate. If the '
"conditional FAR at dh<=5 is meaningfully lower than 1.0, "
'dHash is providing additional specificity. If it is near '
'1.0, dHash is largely redundant given cos>0.95 and the '
'five-way rule should be simplified.'),
('- Thresholds derived by inverting FAR targets are '
'specificity-anchored operating points, not distributional '
'antimodes. They are robust to the integer-mass-point and '
'between-firm-composition artefacts identified in Scripts '
'39b39e.'),
'',
]
md_path = OUT / 'far_sweep_report.md'
md_path.write_text('\n'.join(md), encoding='utf-8')
print(f'[md ] {md_path}')
if __name__ == '__main__':
main()