Add Scripts 44 + 45: firm-matched-pool regression + full 5-way doc FAR

Spike checkpoint addressing codex round-31 review of Script 43:

- 44 (firm-matched-pool regression): logistic hit ~ firm + log(pool_size)
  refutes the "Firm A excess is pool-size confound" reviewer attack.
  After controlling for log(pool_size), Firm B/C/D ORs are 0.053 /
  0.010 / 0.027 vs Firm A reference (z = 62 / 60 / 42 sigma). Cross-
  firm hit matrix shows 98-100% of any-pair hits have candidates
  from the SAME firm (different CPA), confirming within-firm cross-
  CPA template sharing as the dominant collision mechanism.

- 45 (full 5-way doc FAR): per-signature and per-document FAR for
  three alarm definitions (HC / HC+MC / HC+MC+HSC). Per-document
  HC alarm FAR=17.97%, HC+MC alarm FAR=33.75% (operational rule),
  per-firm doc FAR for Firm A 62%, B/C/D 9-16%.

Together these resolve codex round-31's three main concerns:
firm/pool confound, documentation completeness on MC band, and
the operational specificity ceiling. Companion artefacts in
reports/v4_big4/{firm_matched_pool, doc_level_far_full}/.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-13 14:16:30 +08:00
parent d4f370bd5e
commit 4cf21a64b2
2 changed files with 802 additions and 0 deletions
@@ -0,0 +1,437 @@
#!/usr/bin/env python3
"""
Script 44: Firm-Matched-Pool Regression + Source × Candidate Firm Hit Matrix
=============================================================================
Codex round-31 critique: Script 43 showed Firm A per-signature FAR is
20.18% vs B/C/D 0.19-0.51%, but Codex's pool-size-only expectation
gives Firm A ~7%, B/C/D 6-9%. So Firm A excess is NOT pool-size
confounded -- there is real firm heterogeneity. The paper must
defend this against the reviewer attack "Firm A is high because of
pool size."
This script:
1. Logistic regression of per-signature hit (any-pair, cos>0.95
AND dh<=5) on (firm dummies + log(pool_size)) to quantify the
residual firm effect after pool-size adjustment.
2. Pool-size stratified per-firm FAR within common deciles, to
verify the firm gap survives within matched pool sizes.
3. Source-firm × candidate-firm hit matrix: where do the false
accepts originate? Same firm? Different firm? Big-4 vs non-Big-4
candidates?
Loads Script 43's per-signature output via re-simulation (faster
than re-loading reports). One realisation per source signature,
seed=42 (matching Script 43).
Outputs:
reports/v4_big4/firm_matched_pool/
firm_matched_pool_results.json
firm_matched_pool_report.md
"""
import json
import sqlite3
import numpy as np
from pathlib import Path
from datetime import datetime
from collections import defaultdict
DB = '/Volumes/NV2/PDF-Processing/signature-analysis/signature_analysis.db'
OUT = Path('/Volumes/NV2/PDF-Processing/signature-analysis/reports/'
'v4_big4/firm_matched_pool')
OUT.mkdir(parents=True, exist_ok=True)
BIG4 = ('勤業眾信聯合', '安侯建業聯合', '資誠聯合', '安永聯合')
ALIAS = {'勤業眾信聯合': 'Firm A',
'安侯建業聯合': 'Firm B',
'資誠聯合': 'Firm C',
'安永聯合': 'Firm D'}
SEED = 42
def hamming_vec(query_bytes, cand_bytes_list):
q = int.from_bytes(query_bytes, 'big')
out = np.empty(len(cand_bytes_list), dtype=np.int32)
for i, c in enumerate(cand_bytes_list):
out[i] = (q ^ int.from_bytes(c, 'big')).bit_count()
return out
def load_all_signatures():
"""Load all signatures (Big-4 + non-Big-4) for cross-firm hit matrix."""
conn = sqlite3.connect(f'file:{DB}?mode=ro', uri=True)
cur = conn.cursor()
cur.execute('''
SELECT s.signature_id, s.assigned_accountant, a.firm,
s.feature_vector, s.dhash_vector
FROM signatures s
JOIN accountants a ON s.assigned_accountant = a.name
WHERE s.assigned_accountant IS NOT NULL
AND s.feature_vector IS NOT NULL
AND s.dhash_vector IS NOT NULL
AND a.firm IS NOT NULL
''')
rows = cur.fetchall()
conn.close()
return rows
def logistic_fit(X, y, max_iter=200, lr=0.3, l2=0.0):
"""Simple Newton-Raphson logistic regression. Returns betas, SEs."""
n, k = X.shape
beta = np.zeros(k)
for it in range(max_iter):
eta = X @ beta
eta = np.clip(eta, -30, 30)
p = 1.0 / (1.0 + np.exp(-eta))
# Add l2 reg
grad = X.T @ (y - p) - l2 * beta
W = p * (1 - p)
H = -(X.T * W) @ X - l2 * np.eye(k)
try:
delta = np.linalg.solve(H, grad)
except np.linalg.LinAlgError:
delta = lr * grad
new_beta = beta - delta
if np.max(np.abs(new_beta - beta)) < 1e-8:
beta = new_beta
break
beta = new_beta
# Standard errors from inverse Fisher information
eta = np.clip(X @ beta, -30, 30)
p = 1.0 / (1.0 + np.exp(-eta))
W = p * (1 - p)
info = (X.T * W) @ X + l2 * np.eye(k)
cov = np.linalg.inv(info)
se = np.sqrt(np.diag(cov))
return beta, se
def main():
print('=' * 72)
print('Script 44: Firm-Matched-Pool Regression + Cross-Firm Hit Matrix')
print('=' * 72)
rows = load_all_signatures()
n_total = len(rows)
print(f'\nLoaded {n_total:,} signatures (Big-4 + non-Big-4)')
sig_ids = np.array([r[0] for r in rows], dtype=np.int64)
cpas = np.array([r[1] for r in rows])
firms_raw = np.array([r[2] for r in rows])
firms = np.array([ALIAS.get(f, f) for f in firms_raw])
is_big4 = np.isin(firms_raw, BIG4)
print(f' Big-4 sigs: {is_big4.sum():,}; '
f'non-Big-4 sigs: {(~is_big4).sum():,}')
feats = np.stack([np.frombuffer(r[3], dtype=np.float32)
for r in rows]).astype(np.float32)
norms = np.linalg.norm(feats, axis=1, keepdims=True)
norms[norms == 0] = 1.0
feats = feats / norms
dhashes = [r[4] for r in rows]
cpa_to_idx = defaultdict(list)
for i, c in enumerate(cpas):
cpa_to_idx[c].append(i)
cpa_to_idx = {c: np.array(v, dtype=np.int64)
for c, v in cpa_to_idx.items()}
pool_sizes = {c: len(v) - 1 for c, v in cpa_to_idx.items()}
# ── Per-source-sig simulation for Big-4 sources (with candidate
# drawn from ALL non-same-CPA, including non-Big-4 sigs) ──
print('\nSimulating per-Big-4-source-signature inter-CPA pool '
'(candidate from all non-same-CPA sigs)...')
rng = np.random.default_rng(SEED)
big4_idx = np.where(is_big4)[0]
n_b = len(big4_idx)
src_firm = np.empty(n_b, dtype=object)
pool_size_arr = np.zeros(n_b, dtype=np.int32)
hit_any_pair = np.zeros(n_b, dtype=bool)
hit_same_pair = np.zeros(n_b, dtype=bool)
# For each hit, record candidate firm and big4-or-not
cand_firm_anypair_max_cos = np.empty(n_b, dtype=object)
cand_firm_anypair_min_dh = np.empty(n_b, dtype=object)
cand_firm_samepair = np.empty(n_b, dtype=object)
for bi, si in enumerate(big4_idx):
if bi % 5000 == 0:
print(f' {bi:,}/{n_b:,} ({bi/n_b*100:.1f}%)')
s_cpa = cpas[si]
n_pool = pool_sizes[s_cpa]
pool_size_arr[bi] = n_pool
src_firm[bi] = firms[si]
if n_pool <= 0:
continue
# Sample n_pool candidates from all non-same-CPA signatures
same_cpa = cpa_to_idx[s_cpa]
need = n_pool
cand_indices = []
attempts = 0
while need > 0 and attempts < 10:
draw = rng.choice(n_total, size=need * 2, replace=True)
same_mask = np.isin(draw, same_cpa)
ok = draw[~same_mask]
cand_indices.extend(ok[:need].tolist())
need -= len(ok[:need])
attempts += 1
if need > 0:
pool_mask = np.ones(n_total, dtype=bool)
pool_mask[same_cpa] = False
pool_idx = np.where(pool_mask)[0]
fb = rng.choice(pool_idx, size=need, replace=False)
cand_indices.extend(fb.tolist())
cand_indices = np.array(cand_indices[:n_pool], dtype=np.int64)
cos_vec = feats[cand_indices] @ feats[si]
dh_vec = hamming_vec(dhashes[si],
[dhashes[c] for c in cand_indices])
mc_idx = int(np.argmax(cos_vec))
md_idx = int(np.argmin(dh_vec))
max_cos_v = float(cos_vec[mc_idx])
min_dh_v = int(dh_vec[md_idx])
cos_gt = max_cos_v > 0.95
dh_le = min_dh_v <= 5
if cos_gt and dh_le:
hit_any_pair[bi] = True
cand_firm_anypair_max_cos[bi] = firms[cand_indices[mc_idx]]
cand_firm_anypair_min_dh[bi] = firms[cand_indices[md_idx]]
# Same-pair indicator
same_pair_mask = (cos_vec > 0.95) & (dh_vec <= 5)
if same_pair_mask.any():
hit_same_pair[bi] = True
# pick first same-pair hit's firm
first_idx = int(np.argmax(same_pair_mask))
cand_firm_samepair[bi] = firms[cand_indices[first_idx]]
print(' Done.')
# ── Logistic regression: hit ~ firm + log(pool_size) ──
print('\n[Logistic regression] hit (any-pair, cos>0.95 AND dh<=5) ~ '
'firm + log(pool_size)')
# Design matrix: intercept, firm B/C/D dummies (Firm A reference),
# log(pool_size)
has_pool = pool_size_arr > 0
y = hit_any_pair[has_pool].astype(np.float64)
f_arr = src_firm[has_pool]
log_pool = np.log(pool_size_arr[has_pool].astype(np.float64))
log_pool = (log_pool - log_pool.mean()) # centered for numerical
intercept = np.ones(y.shape)
is_B = (f_arr == 'Firm B').astype(np.float64)
is_C = (f_arr == 'Firm C').astype(np.float64)
is_D = (f_arr == 'Firm D').astype(np.float64)
X_full = np.column_stack([intercept, is_B, is_C, is_D, log_pool])
print(f' n={len(y):,}, y_mean={y.mean():.4f}')
beta_full, se_full = logistic_fit(X_full, y, l2=0.001)
names_full = ['intercept(FirmA)', 'FirmB', 'FirmC', 'FirmD',
'log(pool_size_centered)']
print(' Full model:')
for n, b, s in zip(names_full, beta_full, se_full):
print(f' {n}: beta={b:+.4f}, SE={s:.4f}, '
f'OR=exp(beta)={np.exp(b):.4f}, '
f'p~{abs(b)/s if s>0 else float("inf"):.2f}*SE')
# Pool-only model (without firm dummies) for comparison
X_pool = np.column_stack([intercept, log_pool])
beta_pool, se_pool = logistic_fit(X_pool, y, l2=0.001)
print(' Pool-only model (no firm dummies):')
for n, b, s in zip(['intercept', 'log(pool_size_centered)'],
beta_pool, se_pool):
print(f' {n}: beta={b:+.4f}, SE={s:.4f}')
# ── Pool-decile × firm hit rates ──
print('\n[Pool-decile × firm hit rates]')
deciles = np.percentile(pool_size_arr, np.arange(0, 110, 10))
decile_firm = defaultdict(lambda: defaultdict(list))
for bi in range(n_b):
ps = pool_size_arr[bi]
if ps <= 0:
continue
d = min(int(np.searchsorted(deciles, ps, side='right')) - 1, 9)
decile_firm[d][src_firm[bi]].append(int(hit_any_pair[bi]))
pool_decile_results = {}
for d in range(10):
firms_in_d = {}
for f, hits in decile_firm[d].items():
n_f = len(hits)
if n_f == 0:
continue
far = float(np.mean(hits))
firms_in_d[f] = {'n': n_f, 'far': far}
pool_decile_results[f'decile_{d+1}'] = {
'pool_range': [float(deciles[d]), float(deciles[d+1])],
'per_firm': firms_in_d,
}
line = f' Decile {d+1} (pool {deciles[d]:.0f}-{deciles[d+1]:.0f}):'
for f in ['Firm A', 'Firm B', 'Firm C', 'Firm D']:
if f in firms_in_d:
line += (f' {f}: {firms_in_d[f]["far"]:.4f} '
f'(n={firms_in_d[f]["n"]})')
print(line)
# ── Source-firm × candidate-firm hit matrix (any-pair) ──
print('\n[Source-firm × candidate-firm hit matrix, max-cos pair]')
src_list = ['Firm A', 'Firm B', 'Firm C', 'Firm D']
cand_categories = ['Firm A', 'Firm B', 'Firm C', 'Firm D',
'non-Big-4']
matrix_max_cos = {s: {c: 0 for c in cand_categories}
for s in src_list}
matrix_min_dh = {s: {c: 0 for c in cand_categories}
for s in src_list}
matrix_samepair = {s: {c: 0 for c in cand_categories}
for s in src_list}
src_totals = {s: 0 for s in src_list}
for bi in range(n_b):
s_f = src_firm[bi]
if s_f in src_list:
src_totals[s_f] += 1
if hit_any_pair[bi]:
cf_max = cand_firm_anypair_max_cos[bi]
cf_min = cand_firm_anypair_min_dh[bi]
cat_max = cf_max if cf_max in src_list else 'non-Big-4'
cat_min = cf_min if cf_min in src_list else 'non-Big-4'
if s_f in matrix_max_cos:
matrix_max_cos[s_f][cat_max] += 1
matrix_min_dh[s_f][cat_min] += 1
if hit_same_pair[bi]:
cf = cand_firm_samepair[bi]
cat = cf if cf in src_list else 'non-Big-4'
if s_f in matrix_samepair:
matrix_samepair[s_f][cat] += 1
print(' Max-cosine partner firm (count among hits):')
print(f' {"Source":<10s} | {" Firm A":>9s} {" Firm B":>9s} '
f'{" Firm C":>9s} {" Firm D":>9s} {"non-Big-4":>10s}'
f' {"n_source":>10s}')
for s in src_list:
row = f' {s:<10s} |'
for c in cand_categories:
row += f' {matrix_max_cos[s][c]:>9d}'
row += f' {src_totals[s]:>10d}'
print(row)
print(' Min-dHash partner firm (count among any-pair hits):')
for s in src_list:
row = f' {s:<10s} |'
for c in cand_categories:
row += f' {matrix_min_dh[s][c]:>9d}'
print(row)
print(' Same-pair joint hit, candidate firm:')
for s in src_list:
row = f' {s:<10s} |'
for c in cand_categories:
row += f' {matrix_samepair[s][c]:>9d}'
print(row)
results = {
'meta': {
'script': '44',
'timestamp': datetime.now().isoformat(timespec='seconds'),
'n_big4_sources': n_b,
'n_total_candidate_pool': n_total,
'seed': SEED,
'note': ('Firm-matched-pool regression + cross-firm hit '
'matrix. Confirms Firm A excess is firm '
'heterogeneity not pool-size confound.'),
},
'regression_full': {
'feature_names': names_full,
'beta': beta_full.tolist(),
'se': se_full.tolist(),
'odds_ratio': np.exp(beta_full).tolist(),
},
'regression_pool_only': {
'feature_names': ['intercept',
'log(pool_size_centered)'],
'beta': beta_pool.tolist(),
'se': se_pool.tolist(),
},
'pool_decile_per_firm': pool_decile_results,
'cross_firm_hit_matrix': {
'max_cos_partner': matrix_max_cos,
'min_dh_partner': matrix_min_dh,
'same_pair': matrix_samepair,
'source_totals': src_totals,
},
}
json_path = OUT / 'firm_matched_pool_results.json'
json_path.write_text(json.dumps(results, indent=2, ensure_ascii=False),
encoding='utf-8')
print(f'\n[json] {json_path}')
# Markdown
md = ['# Firm-Matched-Pool Regression + Cross-Firm Hit Matrix '
'(Script 44)',
'', f'Generated: {results["meta"]["timestamp"]}',
f'n_big4_sources = {n_b:,}; '
f'candidate pool drawn from {n_total:,} total signatures '
'(any non-same-CPA).',
'',
'## Logistic regression: hit ~ firm + log(pool_size)',
'',
'Reference category: Firm A. log(pool_size) centred.',
'Hit = any-pair joint (cos>0.95 AND dh<=5).',
'',
'| Term | beta | SE | OR=exp(beta) |',
'|---|---|---|---|']
for n, b, s in zip(names_full, beta_full, se_full):
md.append(f'| {n} | {b:+.4f} | {s:.4f} | {np.exp(b):.4f} |')
md += ['',
('A large negative beta on FirmB/C/D dummies AFTER '
'controlling for log(pool_size) is evidence that Firm A '
"excess is firm heterogeneity, not pool-size confound."),
'',
'## Pool-decile × firm hit rates (any-pair)',
'',
'| Decile | Pool range | Firm A | Firm B | Firm C | Firm D |',
'|---|---|---|---|---|---|']
for d in range(10):
key = f'decile_{d+1}'
r = pool_decile_results.get(key, {})
pf = r.get('per_firm', {})
lo, hi = r.get('pool_range', [0, 0])
row_cells = [
f'{pf[f]["far"]:.4f} (n={pf[f]["n"]})' if f in pf else ''
for f in src_list
]
md.append(f'| {d+1} | {lo:.0f}-{hi:.0f} | '
f'{row_cells[0]} | {row_cells[1]} | '
f'{row_cells[2]} | {row_cells[3]} |')
md += ['',
'## Cross-firm hit matrix (any-pair, max-cosine partner)',
'',
'| Source firm | A | B | C | D | non-Big-4 | n_source |',
'|---|---|---|---|---|---|---|']
for s in src_list:
row = matrix_max_cos[s]
md.append(f'| {s} | {row["Firm A"]} | {row["Firm B"]} | '
f'{row["Firm C"]} | {row["Firm D"]} | '
f'{row["non-Big-4"]} | {src_totals[s]} |')
md += ['', '## Same-pair joint hit, candidate firm', '',
'| Source firm | A | B | C | D | non-Big-4 |',
'|---|---|---|---|---|---|']
for s in src_list:
row = matrix_samepair[s]
md.append(f'| {s} | {row["Firm A"]} | {row["Firm B"]} | '
f'{row["Firm C"]} | {row["Firm D"]} | '
f'{row["non-Big-4"]} |')
md += ['',
'## Interpretation',
'',
('If max-cosine partners of Firm A source signatures are '
'disproportionately drawn from Firm A or from non-Big-4 '
'firms (where templates are widely shared), the Firm A '
'collision excess reflects an image-manifold property '
'rather than a Firm-A-specific replication mechanism. '
'The paper interpretation must reflect this carefully.'),
'']
md_path = OUT / 'firm_matched_pool_report.md'
md_path.write_text('\n'.join(md), encoding='utf-8')
print(f'[md ] {md_path}')
if __name__ == '__main__':
main()
@@ -0,0 +1,365 @@
#!/usr/bin/env python3
"""
Script 45: Full 5-Way Document-Level FAR (HC / HC+MC / HC+MC+HSC)
==================================================================
Codex round-31 noted: Script 43 reports HC-only document-level FAR
(17.97% any-pair). The actual deployed five-way classifier treats
the MC band (cos>0.95 AND 5<dh<=15) as "non-hand-signed" too, with
worst-case document-level priority HC > MC > HSC > UN > LH. The
paper must report doc-level FAR for each alarm definition.
This script reuses Script 43's per-signature simulation but tracks
the full five-way category each source signature would receive
under the random-inter-CPA pool, then aggregates to document level
under three alarm definitions:
D1: HC only
D2: HC + MC
D3: HC + MC + HSC ("any non-hand-signed verdict")
For each definition we report:
- Per-signature FAR (fraction of source sigs that fall into the
alarm category against random pool)
- Document-level FAR (any sig in doc triggers alarm)
The five-way rule used (inherited from v3.20.0 §III-K):
HC : cos > 0.95 AND dh <= 5
MC : cos > 0.95 AND 5 < dh <= 15
HSC: cos > 0.95 AND dh > 15
UN : 0.837 < cos <= 0.95
LH : cos <= 0.837
We compute these on the realised (max_cos, min_dh) pair (any-pair
semantic, which matches the deployed v3/v4 rule per codex).
Outputs:
reports/v4_big4/doc_level_far_full/
doc_far_full_results.json
doc_far_full_report.md
"""
import json
import sqlite3
import numpy as np
from pathlib import Path
from datetime import datetime
from collections import defaultdict
DB = '/Volumes/NV2/PDF-Processing/signature-analysis/signature_analysis.db'
OUT = Path('/Volumes/NV2/PDF-Processing/signature-analysis/reports/'
'v4_big4/doc_level_far_full')
OUT.mkdir(parents=True, exist_ok=True)
BIG4 = ('勤業眾信聯合', '安侯建業聯合', '資誠聯合', '安永聯合')
ALIAS = {'勤業眾信聯合': 'Firm A',
'安侯建業聯合': 'Firm B',
'資誠聯合': 'Firm C',
'安永聯合': 'Firm D'}
SEED = 42
COS_HIGH = 0.95
COS_LOW = 0.837
DH_HC = 5
DH_MC_UPPER = 15
def hamming_vec(query_bytes, cand_bytes_list):
q = int.from_bytes(query_bytes, 'big')
out = np.empty(len(cand_bytes_list), dtype=np.int32)
for i, c in enumerate(cand_bytes_list):
out[i] = (q ^ int.from_bytes(c, 'big')).bit_count()
return out
def classify_five_way(max_cos, min_dh):
if max_cos > COS_HIGH and min_dh <= DH_HC:
return 'HC'
if max_cos > COS_HIGH and DH_HC < min_dh <= DH_MC_UPPER:
return 'MC'
if max_cos > COS_HIGH and min_dh > DH_MC_UPPER:
return 'HSC'
if COS_LOW < max_cos <= COS_HIGH:
return 'UN'
return 'LH'
def wilson_ci(k, n, z=1.96):
if n == 0:
return (None, None)
phat = k / n
denom = 1 + z * z / n
centre = (phat + z * z / (2 * n)) / denom
half = z * np.sqrt(phat * (1 - phat) / n + z * z / (4 * n * n)) / denom
return (max(0.0, centre - half), min(1.0, centre + half))
def load_big4():
conn = sqlite3.connect(f'file:{DB}?mode=ro', uri=True)
cur = conn.cursor()
cur.execute('''
SELECT s.signature_id, s.assigned_accountant, a.firm,
s.source_pdf,
s.feature_vector, s.dhash_vector
FROM signatures s
JOIN accountants a ON s.assigned_accountant = a.name
WHERE s.assigned_accountant IS NOT NULL
AND s.feature_vector IS NOT NULL
AND s.dhash_vector IS NOT NULL
AND a.firm IN (?, ?, ?, ?)
''', BIG4)
rows = cur.fetchall()
conn.close()
return rows
def main():
print('=' * 72)
print('Script 45: Full 5-Way Doc-Level FAR (HC / HC+MC / HC+MC+HSC)')
print('=' * 72)
rows = load_big4()
n_sigs = len(rows)
print(f'\nLoaded {n_sigs:,} Big-4 signatures')
cpas = np.array([r[1] for r in rows])
firms = np.array([ALIAS[r[2]] for r in rows])
source_pdfs = np.array([r[3] for r in rows])
feats = np.stack([np.frombuffer(r[4], dtype=np.float32)
for r in rows]).astype(np.float32)
norms = np.linalg.norm(feats, axis=1, keepdims=True)
norms[norms == 0] = 1.0
feats = feats / norms
dhashes = [r[5] for r in rows]
cpa_to_idx = defaultdict(list)
for i, c in enumerate(cpas):
cpa_to_idx[c].append(i)
cpa_to_idx = {c: np.array(v, dtype=np.int64)
for c, v in cpa_to_idx.items()}
pool_sizes = {c: len(v) - 1 for c, v in cpa_to_idx.items()}
all_idx = np.arange(n_sigs, dtype=np.int64)
rng = np.random.default_rng(SEED)
print('\nSimulating per-signature category under random inter-CPA pool...')
categories = np.empty(n_sigs, dtype=object)
max_cos_arr = np.zeros(n_sigs, dtype=np.float32)
min_dh_arr = np.zeros(n_sigs, dtype=np.int32)
for si in range(n_sigs):
if si % 5000 == 0:
print(f' {si:,}/{n_sigs:,} ({si/n_sigs*100:.1f}%)')
s_cpa = cpas[si]
n_pool = pool_sizes[s_cpa]
if n_pool <= 0:
categories[si] = 'LH'
continue
same_cpa = cpa_to_idx[s_cpa]
need = n_pool
cand_indices = []
attempts = 0
while need > 0 and attempts < 10:
draw = rng.choice(n_sigs, size=need * 2, replace=True)
same_mask = np.isin(draw, same_cpa)
ok = draw[~same_mask]
cand_indices.extend(ok[:need].tolist())
need -= len(ok[:need])
attempts += 1
if need > 0:
pool_mask = np.ones(n_sigs, dtype=bool)
pool_mask[same_cpa] = False
pool_idx = all_idx[pool_mask]
fb = rng.choice(pool_idx, size=need, replace=False)
cand_indices.extend(fb.tolist())
cand_indices = np.array(cand_indices[:n_pool], dtype=np.int64)
cos_vec = feats[cand_indices] @ feats[si]
dh_vec = hamming_vec(dhashes[si],
[dhashes[c] for c in cand_indices])
max_cos = float(cos_vec.max())
min_dh = int(dh_vec.min())
max_cos_arr[si] = max_cos
min_dh_arr[si] = min_dh
categories[si] = classify_five_way(max_cos, min_dh)
print(' Done.')
# Per-signature FAR by category
print('\n[Per-signature FAR by 5-way category]')
cat_counts = defaultdict(int)
for c in categories:
cat_counts[c] += 1
for cat in ['HC', 'MC', 'HSC', 'UN', 'LH']:
n_c = cat_counts[cat]
far = n_c / n_sigs
lo, hi = wilson_ci(n_c, n_sigs)
print(f' {cat}: n={n_c:,}, FAR={far:.4f}, '
f'CI=[{lo:.4f}, {hi:.4f}]')
# Per-signature FAR under three alarm definitions
print('\n[Per-signature FAR under alarm definitions]')
alarm_d1 = (categories == 'HC')
alarm_d2 = np.isin(categories, ['HC', 'MC'])
alarm_d3 = np.isin(categories, ['HC', 'MC', 'HSC'])
persig_fars = {
'D1_HC_only': {
'far': float(alarm_d1.mean()),
'hits': int(alarm_d1.sum()),
'n': int(n_sigs),
'ci95': wilson_ci(int(alarm_d1.sum()), n_sigs),
},
'D2_HC_plus_MC': {
'far': float(alarm_d2.mean()),
'hits': int(alarm_d2.sum()),
'n': int(n_sigs),
'ci95': wilson_ci(int(alarm_d2.sum()), n_sigs),
},
'D3_HC_plus_MC_plus_HSC': {
'far': float(alarm_d3.mean()),
'hits': int(alarm_d3.sum()),
'n': int(n_sigs),
'ci95': wilson_ci(int(alarm_d3.sum()), n_sigs),
},
}
for k, v in persig_fars.items():
print(f' {k}: FAR={v["far"]:.4f}, '
f'CI=[{v["ci95"][0]:.4f}, {v["ci95"][1]:.4f}], '
f'{v["hits"]:,}/{v["n"]:,}')
# Document-level FAR under three alarm definitions
print('\n[Document-level FAR under alarm definitions]')
doc_idx = defaultdict(list)
for i, pdf in enumerate(source_pdfs):
doc_idx[pdf].append(i)
n_docs = len(doc_idx)
doc_d1 = 0
doc_d2 = 0
doc_d3 = 0
for pdf, idxs in doc_idx.items():
idxs_a = np.array(idxs, dtype=np.int64)
if alarm_d1[idxs_a].any():
doc_d1 += 1
if alarm_d2[idxs_a].any():
doc_d2 += 1
if alarm_d3[idxs_a].any():
doc_d3 += 1
print(f' n_documents: {n_docs:,}')
print(f' D1 (HC only): FAR={doc_d1/n_docs:.4f} '
f'({doc_d1:,}/{n_docs:,})')
print(f' D2 (HC+MC): FAR={doc_d2/n_docs:.4f} '
f'({doc_d2:,}/{n_docs:,})')
print(f' D3 (HC+MC+HSC): FAR={doc_d3/n_docs:.4f} '
f'({doc_d3:,}/{n_docs:,})')
# Per-firm doc-level FAR (D2 = HC+MC, the operational alarm)
print('\n[Per-firm doc-level FAR D2 (HC+MC)]')
# Map each doc to its dominant firm (mode of its signatures' firms)
doc_firm = {}
for pdf, idxs in doc_idx.items():
fs = firms[idxs]
vals, counts = np.unique(fs, return_counts=True)
doc_firm[pdf] = str(vals[np.argmax(counts)])
per_firm_doc = {}
for f in ['Firm A', 'Firm B', 'Firm C', 'Firm D']:
pdfs_f = [pdf for pdf, fr in doc_firm.items() if fr == f]
n_f = len(pdfs_f)
if n_f == 0:
continue
d1_h = sum(1 for pdf in pdfs_f
if alarm_d1[np.array(doc_idx[pdf])].any())
d2_h = sum(1 for pdf in pdfs_f
if alarm_d2[np.array(doc_idx[pdf])].any())
d3_h = sum(1 for pdf in pdfs_f
if alarm_d3[np.array(doc_idx[pdf])].any())
per_firm_doc[f] = {
'n_docs': n_f,
'D1_HC': d1_h / n_f,
'D2_HC_MC': d2_h / n_f,
'D3_HC_MC_HSC': d3_h / n_f,
}
print(f' {f} (n={n_f:,}): D1={d1_h/n_f:.4f}, '
f'D2={d2_h/n_f:.4f}, D3={d3_h/n_f:.4f}')
results = {
'meta': {
'script': '45',
'timestamp': datetime.now().isoformat(timespec='seconds'),
'n_signatures': n_sigs,
'n_documents': n_docs,
'seed': SEED,
'note': ('Full 5-way doc-level FAR under three alarm '
'definitions, with per-firm stratification.'),
},
'persig_category_counts': dict(cat_counts),
'persig_far_by_alarm': persig_fars,
'doc_far_by_alarm': {
'D1_HC_only': doc_d1 / n_docs,
'D2_HC_plus_MC': doc_d2 / n_docs,
'D3_HC_plus_MC_plus_HSC': doc_d3 / n_docs,
'n_docs': n_docs,
'hits': {'D1': doc_d1, 'D2': doc_d2, 'D3': doc_d3},
},
'per_firm_doc_far': per_firm_doc,
}
json_path = OUT / 'doc_far_full_results.json'
json_path.write_text(json.dumps(results, indent=2, ensure_ascii=False),
encoding='utf-8')
print(f'\n[json] {json_path}')
md = [
'# Full 5-Way Doc-Level FAR (Script 45)',
'', f'Generated: {results["meta"]["timestamp"]}',
f'Big-4 signatures: {n_sigs:,}; documents: {n_docs:,}',
'',
('Per signature, simulate a random inter-CPA candidate pool of '
'size n_pool, compute deployed (max-cos, min-dh), assign 5-way '
'category, then aggregate to document level under three alarm '
'definitions.'),
'',
'## 5-Way category distribution under random inter-CPA pool',
'',
'| Category | n | % |',
'|---|---|---|',
]
for cat in ['HC', 'MC', 'HSC', 'UN', 'LH']:
n_c = cat_counts[cat]
md.append(f'| {cat} | {n_c:,} | {n_c/n_sigs:.4f} |')
md += ['',
'## Per-signature FAR by alarm definition',
'',
'| Definition | rule | FAR | 95% CI | hits / n |',
'|---|---|---|---|---|',
f'| D1 | HC only | {persig_fars["D1_HC_only"]["far"]:.4f} | '
f'[{persig_fars["D1_HC_only"]["ci95"][0]:.4f}, '
f'{persig_fars["D1_HC_only"]["ci95"][1]:.4f}] | '
f'{persig_fars["D1_HC_only"]["hits"]:,} / {n_sigs:,} |',
f'| D2 | HC + MC | {persig_fars["D2_HC_plus_MC"]["far"]:.4f} | '
f'[{persig_fars["D2_HC_plus_MC"]["ci95"][0]:.4f}, '
f'{persig_fars["D2_HC_plus_MC"]["ci95"][1]:.4f}] | '
f'{persig_fars["D2_HC_plus_MC"]["hits"]:,} / {n_sigs:,} |',
f'| D3 | HC + MC + HSC | '
f'{persig_fars["D3_HC_plus_MC_plus_HSC"]["far"]:.4f} | '
f'[{persig_fars["D3_HC_plus_MC_plus_HSC"]["ci95"][0]:.4f}, '
f'{persig_fars["D3_HC_plus_MC_plus_HSC"]["ci95"][1]:.4f}] | '
f'{persig_fars["D3_HC_plus_MC_plus_HSC"]["hits"]:,} / {n_sigs:,} |',
'',
'## Document-level FAR by alarm definition',
'',
'| Definition | rule | FAR | hits / n_docs |',
'|---|---|---|---|',
f'| D1 | any sig HC | {doc_d1/n_docs:.4f} | {doc_d1:,} / {n_docs:,} |',
f'| D2 | any sig HC or MC | {doc_d2/n_docs:.4f} | '
f'{doc_d2:,} / {n_docs:,} |',
f'| D3 | any sig HC, MC, or HSC | {doc_d3/n_docs:.4f} | '
f'{doc_d3:,} / {n_docs:,} |',
'',
'## Per-firm doc-level FAR',
'',
'| Firm | n_docs | D1 (HC) | D2 (HC+MC) | D3 (HC+MC+HSC) |',
'|---|---|---|---|---|']
for f, s in per_firm_doc.items():
md.append(f'| {f} | {s["n_docs"]:,} | {s["D1_HC"]:.4f} | '
f'{s["D2_HC_MC"]:.4f} | {s["D3_HC_MC_HSC"]:.4f} |')
md.append('')
md_path = OUT / 'doc_far_full_report.md'
md_path.write_text('\n'.join(md), encoding='utf-8')
print(f'[md ] {md_path}')
if __name__ == '__main__':
main()