Add Scripts 39b/c/d/e + 40b + 43: anchor-based FAR diagnostics

Spike checkpoint in response to codex rounds 28-30 review:

- 39b/c: signature-level dip test on Big-4 and non-Big-4 marginals
- 39d: dHash discrete-value robustness (raw vs jittered + histogram
  valleys + firm residualization); confirms within-firm dHash dip
  rejection is integer-mass-point artefact
- 39e: dHash firm-residualized + jittered 2x2 factorial decomposition;
  confirms Big-4 pooled dh "multimodality" is composition + integer
  artefact (centered + jittered p=0.35, 0/5 seeds reject)
- 40b: inter-CPA per-pair FAR sweep (cos + dh marginal + joint +
  conditional); replicates v3 cos>0.95 FAR=0.0006 and provides
  v4-new dh FAR curve
- 43: pool-normalized per-signature FAR (codex round-30 fix for
  per-pair vs per-signature conflation); per-sig FAR for deployed
  any-pair rule = 11.02%, per-firm structure shows Firm A 20% vs
  B/C/D <1%

These scripts replace the distributional path (K=3 mixture / dip /
antimode) with anchor-based threshold derivation. Companion
artefacts in reports/v4_big4/{signature_level_diptest,
midsmall_signature_diptest, dhash_discrete_robustness,
inter_cpa_far_sweep, pool_normalized_far}/.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-13 14:08:49 +08:00
parent 6db5d635f5
commit d4f370bd5e
6 changed files with 2086 additions and 0 deletions
@@ -0,0 +1,413 @@
#!/usr/bin/env python3
"""
Script 40b: Inter-CPA FAR Sweep for cos and dHash (joint + marginal)
=====================================================================
After codex round-29 destroyed the distributional path to thresholds
(K=3 mixture / dip / antimode shown composition-driven by Scripts
39b39e), v4.0 pivots to an anchor-based threshold framework:
empirically derived from inter-CPA negative anchor specificity.
Inter-CPA pairs (different CPAs, all-firm) are the negative anchor:
they are by definition not same-CPA replications, and the user's
within-CPA mechanism-transition concern (a CPA might switch from
hand-sign to template mid-career) does not enter the inter-CPA
calibration because each sampled pair crosses CPA boundaries.
This script samples a large number of inter-CPA pairs and computes
both descriptors per pair (cosine via feature_vector dot product;
Hamming distance via dhash_vector XOR). It then sweeps:
1. FAR(cos > k) across k in [0.80, 0.99]
2. FAR(dHash <= k) across k in [0, 20]
3. Joint FAR(cos > 0.95 AND dHash <= k) for k in [0, 20]
4. Conditional FAR(dHash <= k | cos > 0.95) -- the v3 inherited
rule's marginal specificity contribution from dHash
Outputs:
reports/v4_big4/inter_cpa_far_sweep/
far_sweep_results.json
far_sweep_report.md
Sample size: 500,000 inter-CPA pairs (matches v3 Script 10
convention). Big-4-only and full-corpus variants both reported.
"""
import json
import sqlite3
import numpy as np
from pathlib import Path
from datetime import datetime
from collections import defaultdict
DB = '/Volumes/NV2/PDF-Processing/signature-analysis/signature_analysis.db'
OUT = Path('/Volumes/NV2/PDF-Processing/signature-analysis/reports/'
'v4_big4/inter_cpa_far_sweep')
OUT.mkdir(parents=True, exist_ok=True)
BIG4 = ('勤業眾信聯合', '安侯建業聯合', '資誠聯合', '安永聯合')
ALIAS = {'勤業眾信聯合': 'Firm A',
'安侯建業聯合': 'Firm B',
'資誠聯合': 'Firm C',
'安永聯合': 'Firm D'}
N_PAIRS = 500_000
SEED = 42
COS_GRID = [0.80, 0.83, 0.85, 0.87, 0.89, 0.90, 0.91, 0.92, 0.93, 0.94,
0.945, 0.95, 0.955, 0.96, 0.965, 0.97, 0.975, 0.98, 0.985,
0.99]
DH_GRID = list(range(0, 21))
def hamming_64bit(a_bytes, b_bytes):
"""Hamming distance between two 8-byte (64-bit) dHash byte strings."""
a = int.from_bytes(a_bytes, 'big')
b = int.from_bytes(b_bytes, 'big')
return (a ^ b).bit_count()
def load_signatures():
conn = sqlite3.connect(f'file:{DB}?mode=ro', uri=True)
cur = conn.cursor()
cur.execute('''
SELECT s.signature_id, s.assigned_accountant, a.firm,
s.feature_vector, s.dhash_vector
FROM signatures s
JOIN accountants a ON s.assigned_accountant = a.name
WHERE s.assigned_accountant IS NOT NULL
AND s.feature_vector IS NOT NULL
AND s.dhash_vector IS NOT NULL
AND a.firm IS NOT NULL
''')
rows = cur.fetchall()
conn.close()
return rows
def sample_inter_cpa_pairs(rows, n_pairs, seed, restrict_to_big4=False):
"""Sample inter-CPA pairs and compute (cos, dh) for each."""
rng = np.random.default_rng(seed)
if restrict_to_big4:
rows = [r for r in rows if r[2] in BIG4]
scope = 'big4_only'
else:
scope = 'all_firms'
print(f' [{scope}] {len(rows):,} signatures available')
by_acct = defaultdict(list)
for r in rows:
by_acct[r[1]].append(r)
accountants = list(by_acct.keys())
n_acct = len(accountants)
print(f' [{scope}] {n_acct} accountants')
features = {a: np.stack(
[np.frombuffer(r[3], dtype=np.float32) for r in by_acct[a]]
) for a in accountants}
dhashes = {a: [r[4] for r in by_acct[a]] for a in accountants}
cos_vals = np.empty(n_pairs, dtype=np.float32)
dh_vals = np.empty(n_pairs, dtype=np.int32)
n_done = 0
for _ in range(n_pairs):
i, j = rng.choice(n_acct, 2, replace=False)
a1, a2 = accountants[i], accountants[j]
n1, n2 = len(by_acct[a1]), len(by_acct[a2])
k1 = int(rng.integers(0, n1))
k2 = int(rng.integers(0, n2))
f1 = features[a1][k1]
f2 = features[a2][k2]
cos = float(f1 @ f2)
d = hamming_64bit(dhashes[a1][k1], dhashes[a2][k2])
cos_vals[n_done] = cos
dh_vals[n_done] = d
n_done += 1
return scope, cos_vals, dh_vals
def wilson_ci(k, n, z=1.96):
if n == 0:
return (None, None)
phat = k / n
denom = 1 + z * z / n
centre = (phat + z * z / (2 * n)) / denom
half = z * np.sqrt(phat * (1 - phat) / n + z * z / (4 * n * n)) / denom
return (max(0.0, centre - half), min(1.0, centre + half))
def far_at_cos(cos_vals, k):
n = len(cos_vals)
hits = int((cos_vals > k).sum())
lo, hi = wilson_ci(hits, n)
return {'k': float(k), 'n': n, 'hits': hits,
'far': hits / n, 'ci95_lo': lo, 'ci95_hi': hi}
def far_at_dh_le(dh_vals, k):
n = len(dh_vals)
hits = int((dh_vals <= k).sum())
lo, hi = wilson_ci(hits, n)
return {'k': int(k), 'n': n, 'hits': hits,
'far': hits / n, 'ci95_lo': lo, 'ci95_hi': hi}
def joint_far(cos_vals, dh_vals, cos_k, dh_k):
n = len(cos_vals)
hits = int(((cos_vals > cos_k) & (dh_vals <= dh_k)).sum())
lo, hi = wilson_ci(hits, n)
return {'cos_k': float(cos_k), 'dh_k': int(dh_k),
'n': n, 'hits': hits,
'far': hits / n, 'ci95_lo': lo, 'ci95_hi': hi}
def cond_far(cos_vals, dh_vals, cos_k, dh_k):
"""FAR(dh<=k | cos>cos_k)"""
cos_mask = cos_vals > cos_k
n_cond = int(cos_mask.sum())
if n_cond == 0:
return {'cos_k': float(cos_k), 'dh_k': int(dh_k),
'n_cond': 0, 'hits': 0,
'cond_far': None, 'ci95_lo': None, 'ci95_hi': None}
hits = int(((dh_vals <= dh_k) & cos_mask).sum())
lo, hi = wilson_ci(hits, n_cond)
return {'cos_k': float(cos_k), 'dh_k': int(dh_k),
'n_cond': n_cond, 'hits': hits,
'cond_far': hits / n_cond, 'ci95_lo': lo, 'ci95_hi': hi}
def invert_far_target(curve_entries, target, key='far'):
"""Return the entries bracketing the target FAR (linear scan)."""
sorted_e = sorted(curve_entries, key=lambda e: e[key])
for e in sorted_e:
if e[key] <= target:
best = e
else:
break
return best if sorted_e and sorted_e[0][key] <= target else None
def _fmt(x, fmt='.5f'):
return 'None' if x is None else format(x, fmt)
def run_scope(rows, scope_name, restrict_to_big4):
print(f'\n== Scope: {scope_name} ==')
scope_label, cos_vals, dh_vals = sample_inter_cpa_pairs(
rows, N_PAIRS, SEED, restrict_to_big4=restrict_to_big4)
print(f' Sampled {len(cos_vals):,} inter-CPA pairs')
print(f' cos: mean={cos_vals.mean():.4f}, '
f'median={np.median(cos_vals):.4f}, '
f'std={cos_vals.std():.4f}')
print(f' dh : mean={dh_vals.mean():.4f}, '
f'median={np.median(dh_vals):.4f}, '
f'std={dh_vals.std():.4f}')
cos_curve = [far_at_cos(cos_vals, k) for k in COS_GRID]
dh_curve = [far_at_dh_le(dh_vals, k) for k in DH_GRID]
joint_curve_95 = [joint_far(cos_vals, dh_vals, 0.95, k) for k in DH_GRID]
cond_curve_95 = [cond_far(cos_vals, dh_vals, 0.95, k) for k in DH_GRID]
print('\n [Cos FAR sweep]')
for e in cos_curve:
print(f' cos > {e["k"]:.3f}: FAR={_fmt(e["far"])}, '
f'CI=[{_fmt(e["ci95_lo"])}, {_fmt(e["ci95_hi"])}], '
f'hits={e["hits"]}/{e["n"]}')
print('\n [dHash FAR sweep]')
for e in dh_curve:
print(f' dh <= {e["k"]:2d}: FAR={_fmt(e["far"])}, '
f'CI=[{_fmt(e["ci95_lo"])}, {_fmt(e["ci95_hi"])}], '
f'hits={e["hits"]}/{e["n"]}')
print('\n [Joint FAR (cos > 0.95 AND dh <= k)]')
for e in joint_curve_95:
print(f' dh <= {e["dh_k"]:2d}: FAR={_fmt(e["far"])}, '
f'hits={e["hits"]}/{e["n"]}')
print('\n [Conditional FAR(dh <= k | cos > 0.95)]')
for e in cond_curve_95:
cf = e['cond_far']
print(f' dh <= {e["dh_k"]:2d}: P(dh<=k | cos>0.95)='
f'{_fmt(cf) if cf is not None else "n/a"}, '
f'hits={e["hits"]}/{e["n_cond"]}')
targets = [0.005, 0.001, 0.0005, 0.0001]
inv = {}
for t in targets:
inv[f'cos_far_<=_{t}'] = invert_far_target(cos_curve, t, 'far')
inv[f'dh_far_<=_{t}'] = invert_far_target(dh_curve, t, 'far')
inv[f'joint_at_cos95_far_<=_{t}'] = invert_far_target(
joint_curve_95, t, 'far')
print('\n [Threshold inversion]')
for tgt in targets:
e = inv[f'cos_far_<=_{tgt}']
if e is not None:
print(f' FAR <= {tgt}: max cos threshold with FAR<=tgt is '
f'cos > {e["k"]:.3f} (FAR={e["far"]:.5f})')
e = inv[f'dh_far_<=_{tgt}']
if e is not None:
print(f' FAR <= {tgt}: max dh threshold with FAR<=tgt is '
f'dh <= {e["k"]} (FAR={e["far"]:.5f})')
e = inv[f'joint_at_cos95_far_<=_{tgt}']
if e is not None:
print(f' FAR <= {tgt}: under cos>0.95, max dh threshold '
f'with joint FAR<=tgt is dh <= {e["dh_k"]} '
f'(joint FAR={e["far"]:.5f})')
return {
'scope': scope_label,
'n_pairs': int(len(cos_vals)),
'cos_summary': {
'mean': float(cos_vals.mean()),
'median': float(np.median(cos_vals)),
'std': float(cos_vals.std()),
'p99': float(np.percentile(cos_vals, 99)),
'p999': float(np.percentile(cos_vals, 99.9)),
'max': float(cos_vals.max()),
},
'dh_summary': {
'mean': float(dh_vals.mean()),
'median': float(np.median(dh_vals)),
'std': float(dh_vals.std()),
'p01': float(np.percentile(dh_vals, 1)),
'p001': float(np.percentile(dh_vals, 0.1)),
'min': int(dh_vals.min()),
},
'cos_far_curve': cos_curve,
'dh_far_curve': dh_curve,
'joint_far_at_cos95_curve': joint_curve_95,
'cond_far_at_cos95_curve': cond_curve_95,
'threshold_inversions': inv,
}
def main():
print('=' * 72)
print('Script 40b: Inter-CPA FAR Sweep (cos + dHash, joint + marginal)')
print('=' * 72)
rows = load_signatures()
print(f'\nLoaded {len(rows):,} signatures (full corpus)')
results = {
'meta': {
'script': '40b',
'timestamp': datetime.now().isoformat(timespec='seconds'),
'n_pairs_sampled': N_PAIRS,
'seed': SEED,
'note': ('Inter-CPA pair-level FAR sweep for cos and dHash. '
'Anchor-based threshold derivation; replaces '
'distributional path attacked in codex round-29.'),
},
'scopes': {},
}
results['scopes']['big4_only'] = run_scope(
rows, 'Big-4 only', restrict_to_big4=True)
results['scopes']['all_firms'] = run_scope(
rows, 'All firms', restrict_to_big4=False)
json_path = OUT / 'far_sweep_results.json'
json_path.write_text(json.dumps(results, indent=2, ensure_ascii=False),
encoding='utf-8')
print(f'\n[json] {json_path}')
md = [
'# Inter-CPA FAR Sweep (Script 40b)',
'',
f'Generated: {results["meta"]["timestamp"]}',
f'Inter-CPA pair samples per scope: {N_PAIRS:,}; seed: {SEED}',
'',
('Anchor-based threshold derivation. For each scope (Big-4 only '
'or all firms), sample random inter-CPA pairs and compute '
'cosine + Hamming distance per pair. Report False Acceptance '
'Rates (FAR) at various thresholds; invert FAR target to '
'derive thresholds with empirical specificity guarantees.'),
'',
]
for scope in ['big4_only', 'all_firms']:
s = results['scopes'][scope]
md += [f'## Scope: {scope} ({s["n_pairs"]:,} pairs)', '',
'### Cosine FAR curve', '',
'| cos > k | FAR | 95% CI | hits / n |',
'|---|---|---|---|']
for e in s['cos_far_curve']:
md.append(f'| {e["k"]:.3f} | {_fmt(e["far"])} | '
f'[{_fmt(e["ci95_lo"])}, {_fmt(e["ci95_hi"])}] | '
f'{e["hits"]:,} / {e["n"]:,} |')
md += ['', '### dHash FAR curve', '',
'| dh <= k | FAR | 95% CI | hits / n |',
'|---|---|---|---|']
for e in s['dh_far_curve']:
md.append(f'| {e["k"]:2d} | {_fmt(e["far"])} | '
f'[{_fmt(e["ci95_lo"])}, {_fmt(e["ci95_hi"])}] | '
f'{e["hits"]:,} / {e["n"]:,} |')
md += ['', '### Joint FAR (cos > 0.95 AND dh <= k)', '',
'| dh <= k | Joint FAR | hits / n |',
'|---|---|---|']
for e in s['joint_far_at_cos95_curve']:
md.append(f'| {e["dh_k"]:2d} | {_fmt(e["far"])} | '
f'{e["hits"]:,} / {e["n"]:,} |')
md += ['',
'### Conditional FAR(dh <= k | cos > 0.95)',
'',
'Among inter-CPA pairs that already exceed cos > 0.95, '
'what fraction also have dh <= k? This quantifies '
"dHash's marginal specificity contribution given the cos "
"gate is already applied.",
'',
'| dh <= k | Conditional FAR | hits / n_cond |',
'|---|---|---|']
for e in s['cond_far_at_cos95_curve']:
cf = e['cond_far']
md.append(f'| {e["dh_k"]:2d} | '
f'{_fmt(cf) if cf is not None else "n/a"} | '
f'{e["hits"]:,} / {e["n_cond"]:,} |')
md += ['', '### Threshold inversion', '',
'| FAR target | cos thresh | dh thresh | joint dh thresh '
'(under cos>0.95) |',
'|---|---|---|---|']
for tgt in [0.005, 0.001, 0.0005, 0.0001]:
e_c = s['threshold_inversions'].get(f'cos_far_<=_{tgt}')
e_d = s['threshold_inversions'].get(f'dh_far_<=_{tgt}')
e_j = s['threshold_inversions'].get(
f'joint_at_cos95_far_<=_{tgt}')
c_str = (f'cos > {e_c["k"]:.3f} (FAR={e_c["far"]:.5f})'
if e_c else 'unachievable')
d_str = (f'dh <= {e_d["k"]} (FAR={e_d["far"]:.5f})'
if e_d else 'unachievable')
j_str = (f'dh <= {e_j["dh_k"]} (FAR={e_j["far"]:.5f})'
if e_j else 'unachievable')
md.append(f'| {tgt} | {c_str} | {d_str} | {j_str} |')
md.append('')
md += [
'## Interpretation',
'',
('- The cosine FAR curve replicates and extends v3.x §IV-I '
'Table X (which reported FAR=0.0005 at cos>0.95 from a '
'similar but smaller-sample inter-CPA negative anchor).'),
('- The dHash FAR curve is the v4 contribution: prior v3.x '
'work used dh<=5 by convention without an empirical '
'specificity derivation. This script derives a specificity '
"target → dh threshold mapping."),
('- The conditional FAR(dh<=k | cos>0.95) curve tells us '
'whether dHash adds specificity given the cos gate. If the '
"conditional FAR at dh<=5 is meaningfully lower than 1.0, "
'dHash is providing additional specificity. If it is near '
'1.0, dHash is largely redundant given cos>0.95 and the '
'five-way rule should be simplified.'),
('- Thresholds derived by inverting FAR targets are '
'specificity-anchored operating points, not distributional '
'antimodes. They are robust to the integer-mass-point and '
'between-firm-composition artefacts identified in Scripts '
'39b39e.'),
'',
]
md_path = OUT / 'far_sweep_report.md'
md_path.write_text('\n'.join(md), encoding='utf-8')
print(f'[md ] {md_path}')
if __name__ == '__main__':
main()