Files
pdf_signature_extraction/signature_analysis/44_firm_matched_pool_regression.py
gbanyan 4cf21a64b2 Add Scripts 44 + 45: firm-matched-pool regression + full 5-way doc FAR
Spike checkpoint addressing codex round-31 review of Script 43:

- 44 (firm-matched-pool regression): logistic hit ~ firm + log(pool_size)
  refutes the "Firm A excess is pool-size confound" reviewer attack.
  After controlling for log(pool_size), Firm B/C/D ORs are 0.053 /
  0.010 / 0.027 vs Firm A reference (z = 62 / 60 / 42 sigma). Cross-
  firm hit matrix shows 98-100% of any-pair hits have candidates
  from the SAME firm (different CPA), confirming within-firm cross-
  CPA template sharing as the dominant collision mechanism.

- 45 (full 5-way doc FAR): per-signature and per-document FAR for
  three alarm definitions (HC / HC+MC / HC+MC+HSC). Per-document
  HC alarm FAR=17.97%, HC+MC alarm FAR=33.75% (operational rule),
  per-firm doc FAR for Firm A 62%, B/C/D 9-16%.

Together these resolve codex round-31's three main concerns:
firm/pool confound, documentation completeness on MC band, and
the operational specificity ceiling. Companion artefacts in
reports/v4_big4/{firm_matched_pool, doc_level_far_full}/.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-13 14:16:30 +08:00

438 lines
17 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
Script 44: Firm-Matched-Pool Regression + Source × Candidate Firm Hit Matrix
=============================================================================
Codex round-31 critique: Script 43 showed Firm A per-signature FAR is
20.18% vs B/C/D 0.19-0.51%, but Codex's pool-size-only expectation
gives Firm A ~7%, B/C/D 6-9%. So Firm A excess is NOT pool-size
confounded -- there is real firm heterogeneity. The paper must
defend this against the reviewer attack "Firm A is high because of
pool size."
This script:
1. Logistic regression of per-signature hit (any-pair, cos>0.95
AND dh<=5) on (firm dummies + log(pool_size)) to quantify the
residual firm effect after pool-size adjustment.
2. Pool-size stratified per-firm FAR within common deciles, to
verify the firm gap survives within matched pool sizes.
3. Source-firm × candidate-firm hit matrix: where do the false
accepts originate? Same firm? Different firm? Big-4 vs non-Big-4
candidates?
Loads Script 43's per-signature output via re-simulation (faster
than re-loading reports). One realisation per source signature,
seed=42 (matching Script 43).
Outputs:
reports/v4_big4/firm_matched_pool/
firm_matched_pool_results.json
firm_matched_pool_report.md
"""
import json
import sqlite3
import numpy as np
from pathlib import Path
from datetime import datetime
from collections import defaultdict
DB = '/Volumes/NV2/PDF-Processing/signature-analysis/signature_analysis.db'
OUT = Path('/Volumes/NV2/PDF-Processing/signature-analysis/reports/'
'v4_big4/firm_matched_pool')
OUT.mkdir(parents=True, exist_ok=True)
BIG4 = ('勤業眾信聯合', '安侯建業聯合', '資誠聯合', '安永聯合')
ALIAS = {'勤業眾信聯合': 'Firm A',
'安侯建業聯合': 'Firm B',
'資誠聯合': 'Firm C',
'安永聯合': 'Firm D'}
SEED = 42
def hamming_vec(query_bytes, cand_bytes_list):
q = int.from_bytes(query_bytes, 'big')
out = np.empty(len(cand_bytes_list), dtype=np.int32)
for i, c in enumerate(cand_bytes_list):
out[i] = (q ^ int.from_bytes(c, 'big')).bit_count()
return out
def load_all_signatures():
"""Load all signatures (Big-4 + non-Big-4) for cross-firm hit matrix."""
conn = sqlite3.connect(f'file:{DB}?mode=ro', uri=True)
cur = conn.cursor()
cur.execute('''
SELECT s.signature_id, s.assigned_accountant, a.firm,
s.feature_vector, s.dhash_vector
FROM signatures s
JOIN accountants a ON s.assigned_accountant = a.name
WHERE s.assigned_accountant IS NOT NULL
AND s.feature_vector IS NOT NULL
AND s.dhash_vector IS NOT NULL
AND a.firm IS NOT NULL
''')
rows = cur.fetchall()
conn.close()
return rows
def logistic_fit(X, y, max_iter=200, lr=0.3, l2=0.0):
"""Simple Newton-Raphson logistic regression. Returns betas, SEs."""
n, k = X.shape
beta = np.zeros(k)
for it in range(max_iter):
eta = X @ beta
eta = np.clip(eta, -30, 30)
p = 1.0 / (1.0 + np.exp(-eta))
# Add l2 reg
grad = X.T @ (y - p) - l2 * beta
W = p * (1 - p)
H = -(X.T * W) @ X - l2 * np.eye(k)
try:
delta = np.linalg.solve(H, grad)
except np.linalg.LinAlgError:
delta = lr * grad
new_beta = beta - delta
if np.max(np.abs(new_beta - beta)) < 1e-8:
beta = new_beta
break
beta = new_beta
# Standard errors from inverse Fisher information
eta = np.clip(X @ beta, -30, 30)
p = 1.0 / (1.0 + np.exp(-eta))
W = p * (1 - p)
info = (X.T * W) @ X + l2 * np.eye(k)
cov = np.linalg.inv(info)
se = np.sqrt(np.diag(cov))
return beta, se
def main():
print('=' * 72)
print('Script 44: Firm-Matched-Pool Regression + Cross-Firm Hit Matrix')
print('=' * 72)
rows = load_all_signatures()
n_total = len(rows)
print(f'\nLoaded {n_total:,} signatures (Big-4 + non-Big-4)')
sig_ids = np.array([r[0] for r in rows], dtype=np.int64)
cpas = np.array([r[1] for r in rows])
firms_raw = np.array([r[2] for r in rows])
firms = np.array([ALIAS.get(f, f) for f in firms_raw])
is_big4 = np.isin(firms_raw, BIG4)
print(f' Big-4 sigs: {is_big4.sum():,}; '
f'non-Big-4 sigs: {(~is_big4).sum():,}')
feats = np.stack([np.frombuffer(r[3], dtype=np.float32)
for r in rows]).astype(np.float32)
norms = np.linalg.norm(feats, axis=1, keepdims=True)
norms[norms == 0] = 1.0
feats = feats / norms
dhashes = [r[4] for r in rows]
cpa_to_idx = defaultdict(list)
for i, c in enumerate(cpas):
cpa_to_idx[c].append(i)
cpa_to_idx = {c: np.array(v, dtype=np.int64)
for c, v in cpa_to_idx.items()}
pool_sizes = {c: len(v) - 1 for c, v in cpa_to_idx.items()}
# ── Per-source-sig simulation for Big-4 sources (with candidate
# drawn from ALL non-same-CPA, including non-Big-4 sigs) ──
print('\nSimulating per-Big-4-source-signature inter-CPA pool '
'(candidate from all non-same-CPA sigs)...')
rng = np.random.default_rng(SEED)
big4_idx = np.where(is_big4)[0]
n_b = len(big4_idx)
src_firm = np.empty(n_b, dtype=object)
pool_size_arr = np.zeros(n_b, dtype=np.int32)
hit_any_pair = np.zeros(n_b, dtype=bool)
hit_same_pair = np.zeros(n_b, dtype=bool)
# For each hit, record candidate firm and big4-or-not
cand_firm_anypair_max_cos = np.empty(n_b, dtype=object)
cand_firm_anypair_min_dh = np.empty(n_b, dtype=object)
cand_firm_samepair = np.empty(n_b, dtype=object)
for bi, si in enumerate(big4_idx):
if bi % 5000 == 0:
print(f' {bi:,}/{n_b:,} ({bi/n_b*100:.1f}%)')
s_cpa = cpas[si]
n_pool = pool_sizes[s_cpa]
pool_size_arr[bi] = n_pool
src_firm[bi] = firms[si]
if n_pool <= 0:
continue
# Sample n_pool candidates from all non-same-CPA signatures
same_cpa = cpa_to_idx[s_cpa]
need = n_pool
cand_indices = []
attempts = 0
while need > 0 and attempts < 10:
draw = rng.choice(n_total, size=need * 2, replace=True)
same_mask = np.isin(draw, same_cpa)
ok = draw[~same_mask]
cand_indices.extend(ok[:need].tolist())
need -= len(ok[:need])
attempts += 1
if need > 0:
pool_mask = np.ones(n_total, dtype=bool)
pool_mask[same_cpa] = False
pool_idx = np.where(pool_mask)[0]
fb = rng.choice(pool_idx, size=need, replace=False)
cand_indices.extend(fb.tolist())
cand_indices = np.array(cand_indices[:n_pool], dtype=np.int64)
cos_vec = feats[cand_indices] @ feats[si]
dh_vec = hamming_vec(dhashes[si],
[dhashes[c] for c in cand_indices])
mc_idx = int(np.argmax(cos_vec))
md_idx = int(np.argmin(dh_vec))
max_cos_v = float(cos_vec[mc_idx])
min_dh_v = int(dh_vec[md_idx])
cos_gt = max_cos_v > 0.95
dh_le = min_dh_v <= 5
if cos_gt and dh_le:
hit_any_pair[bi] = True
cand_firm_anypair_max_cos[bi] = firms[cand_indices[mc_idx]]
cand_firm_anypair_min_dh[bi] = firms[cand_indices[md_idx]]
# Same-pair indicator
same_pair_mask = (cos_vec > 0.95) & (dh_vec <= 5)
if same_pair_mask.any():
hit_same_pair[bi] = True
# pick first same-pair hit's firm
first_idx = int(np.argmax(same_pair_mask))
cand_firm_samepair[bi] = firms[cand_indices[first_idx]]
print(' Done.')
# ── Logistic regression: hit ~ firm + log(pool_size) ──
print('\n[Logistic regression] hit (any-pair, cos>0.95 AND dh<=5) ~ '
'firm + log(pool_size)')
# Design matrix: intercept, firm B/C/D dummies (Firm A reference),
# log(pool_size)
has_pool = pool_size_arr > 0
y = hit_any_pair[has_pool].astype(np.float64)
f_arr = src_firm[has_pool]
log_pool = np.log(pool_size_arr[has_pool].astype(np.float64))
log_pool = (log_pool - log_pool.mean()) # centered for numerical
intercept = np.ones(y.shape)
is_B = (f_arr == 'Firm B').astype(np.float64)
is_C = (f_arr == 'Firm C').astype(np.float64)
is_D = (f_arr == 'Firm D').astype(np.float64)
X_full = np.column_stack([intercept, is_B, is_C, is_D, log_pool])
print(f' n={len(y):,}, y_mean={y.mean():.4f}')
beta_full, se_full = logistic_fit(X_full, y, l2=0.001)
names_full = ['intercept(FirmA)', 'FirmB', 'FirmC', 'FirmD',
'log(pool_size_centered)']
print(' Full model:')
for n, b, s in zip(names_full, beta_full, se_full):
print(f' {n}: beta={b:+.4f}, SE={s:.4f}, '
f'OR=exp(beta)={np.exp(b):.4f}, '
f'p~{abs(b)/s if s>0 else float("inf"):.2f}*SE')
# Pool-only model (without firm dummies) for comparison
X_pool = np.column_stack([intercept, log_pool])
beta_pool, se_pool = logistic_fit(X_pool, y, l2=0.001)
print(' Pool-only model (no firm dummies):')
for n, b, s in zip(['intercept', 'log(pool_size_centered)'],
beta_pool, se_pool):
print(f' {n}: beta={b:+.4f}, SE={s:.4f}')
# ── Pool-decile × firm hit rates ──
print('\n[Pool-decile × firm hit rates]')
deciles = np.percentile(pool_size_arr, np.arange(0, 110, 10))
decile_firm = defaultdict(lambda: defaultdict(list))
for bi in range(n_b):
ps = pool_size_arr[bi]
if ps <= 0:
continue
d = min(int(np.searchsorted(deciles, ps, side='right')) - 1, 9)
decile_firm[d][src_firm[bi]].append(int(hit_any_pair[bi]))
pool_decile_results = {}
for d in range(10):
firms_in_d = {}
for f, hits in decile_firm[d].items():
n_f = len(hits)
if n_f == 0:
continue
far = float(np.mean(hits))
firms_in_d[f] = {'n': n_f, 'far': far}
pool_decile_results[f'decile_{d+1}'] = {
'pool_range': [float(deciles[d]), float(deciles[d+1])],
'per_firm': firms_in_d,
}
line = f' Decile {d+1} (pool {deciles[d]:.0f}-{deciles[d+1]:.0f}):'
for f in ['Firm A', 'Firm B', 'Firm C', 'Firm D']:
if f in firms_in_d:
line += (f' {f}: {firms_in_d[f]["far"]:.4f} '
f'(n={firms_in_d[f]["n"]})')
print(line)
# ── Source-firm × candidate-firm hit matrix (any-pair) ──
print('\n[Source-firm × candidate-firm hit matrix, max-cos pair]')
src_list = ['Firm A', 'Firm B', 'Firm C', 'Firm D']
cand_categories = ['Firm A', 'Firm B', 'Firm C', 'Firm D',
'non-Big-4']
matrix_max_cos = {s: {c: 0 for c in cand_categories}
for s in src_list}
matrix_min_dh = {s: {c: 0 for c in cand_categories}
for s in src_list}
matrix_samepair = {s: {c: 0 for c in cand_categories}
for s in src_list}
src_totals = {s: 0 for s in src_list}
for bi in range(n_b):
s_f = src_firm[bi]
if s_f in src_list:
src_totals[s_f] += 1
if hit_any_pair[bi]:
cf_max = cand_firm_anypair_max_cos[bi]
cf_min = cand_firm_anypair_min_dh[bi]
cat_max = cf_max if cf_max in src_list else 'non-Big-4'
cat_min = cf_min if cf_min in src_list else 'non-Big-4'
if s_f in matrix_max_cos:
matrix_max_cos[s_f][cat_max] += 1
matrix_min_dh[s_f][cat_min] += 1
if hit_same_pair[bi]:
cf = cand_firm_samepair[bi]
cat = cf if cf in src_list else 'non-Big-4'
if s_f in matrix_samepair:
matrix_samepair[s_f][cat] += 1
print(' Max-cosine partner firm (count among hits):')
print(f' {"Source":<10s} | {" Firm A":>9s} {" Firm B":>9s} '
f'{" Firm C":>9s} {" Firm D":>9s} {"non-Big-4":>10s}'
f' {"n_source":>10s}')
for s in src_list:
row = f' {s:<10s} |'
for c in cand_categories:
row += f' {matrix_max_cos[s][c]:>9d}'
row += f' {src_totals[s]:>10d}'
print(row)
print(' Min-dHash partner firm (count among any-pair hits):')
for s in src_list:
row = f' {s:<10s} |'
for c in cand_categories:
row += f' {matrix_min_dh[s][c]:>9d}'
print(row)
print(' Same-pair joint hit, candidate firm:')
for s in src_list:
row = f' {s:<10s} |'
for c in cand_categories:
row += f' {matrix_samepair[s][c]:>9d}'
print(row)
results = {
'meta': {
'script': '44',
'timestamp': datetime.now().isoformat(timespec='seconds'),
'n_big4_sources': n_b,
'n_total_candidate_pool': n_total,
'seed': SEED,
'note': ('Firm-matched-pool regression + cross-firm hit '
'matrix. Confirms Firm A excess is firm '
'heterogeneity not pool-size confound.'),
},
'regression_full': {
'feature_names': names_full,
'beta': beta_full.tolist(),
'se': se_full.tolist(),
'odds_ratio': np.exp(beta_full).tolist(),
},
'regression_pool_only': {
'feature_names': ['intercept',
'log(pool_size_centered)'],
'beta': beta_pool.tolist(),
'se': se_pool.tolist(),
},
'pool_decile_per_firm': pool_decile_results,
'cross_firm_hit_matrix': {
'max_cos_partner': matrix_max_cos,
'min_dh_partner': matrix_min_dh,
'same_pair': matrix_samepair,
'source_totals': src_totals,
},
}
json_path = OUT / 'firm_matched_pool_results.json'
json_path.write_text(json.dumps(results, indent=2, ensure_ascii=False),
encoding='utf-8')
print(f'\n[json] {json_path}')
# Markdown
md = ['# Firm-Matched-Pool Regression + Cross-Firm Hit Matrix '
'(Script 44)',
'', f'Generated: {results["meta"]["timestamp"]}',
f'n_big4_sources = {n_b:,}; '
f'candidate pool drawn from {n_total:,} total signatures '
'(any non-same-CPA).',
'',
'## Logistic regression: hit ~ firm + log(pool_size)',
'',
'Reference category: Firm A. log(pool_size) centred.',
'Hit = any-pair joint (cos>0.95 AND dh<=5).',
'',
'| Term | beta | SE | OR=exp(beta) |',
'|---|---|---|---|']
for n, b, s in zip(names_full, beta_full, se_full):
md.append(f'| {n} | {b:+.4f} | {s:.4f} | {np.exp(b):.4f} |')
md += ['',
('A large negative beta on FirmB/C/D dummies AFTER '
'controlling for log(pool_size) is evidence that Firm A '
"excess is firm heterogeneity, not pool-size confound."),
'',
'## Pool-decile × firm hit rates (any-pair)',
'',
'| Decile | Pool range | Firm A | Firm B | Firm C | Firm D |',
'|---|---|---|---|---|---|']
for d in range(10):
key = f'decile_{d+1}'
r = pool_decile_results.get(key, {})
pf = r.get('per_firm', {})
lo, hi = r.get('pool_range', [0, 0])
row_cells = [
f'{pf[f]["far"]:.4f} (n={pf[f]["n"]})' if f in pf else ''
for f in src_list
]
md.append(f'| {d+1} | {lo:.0f}-{hi:.0f} | '
f'{row_cells[0]} | {row_cells[1]} | '
f'{row_cells[2]} | {row_cells[3]} |')
md += ['',
'## Cross-firm hit matrix (any-pair, max-cosine partner)',
'',
'| Source firm | A | B | C | D | non-Big-4 | n_source |',
'|---|---|---|---|---|---|---|']
for s in src_list:
row = matrix_max_cos[s]
md.append(f'| {s} | {row["Firm A"]} | {row["Firm B"]} | '
f'{row["Firm C"]} | {row["Firm D"]} | '
f'{row["non-Big-4"]} | {src_totals[s]} |')
md += ['', '## Same-pair joint hit, candidate firm', '',
'| Source firm | A | B | C | D | non-Big-4 |',
'|---|---|---|---|---|---|']
for s in src_list:
row = matrix_samepair[s]
md.append(f'| {s} | {row["Firm A"]} | {row["Firm B"]} | '
f'{row["Firm C"]} | {row["Firm D"]} | '
f'{row["non-Big-4"]} |')
md += ['',
'## Interpretation',
'',
('If max-cosine partners of Firm A source signatures are '
'disproportionately drawn from Firm A or from non-Big-4 '
'firms (where templates are widely shared), the Firm A '
'collision excess reflects an image-manifold property '
'rather than a Firm-A-specific replication mechanism. '
'The paper interpretation must reflect this carefully.'),
'']
md_path = OUT / 'firm_matched_pool_report.md'
md_path.write_text('\n'.join(md), encoding='utf-8')
print(f'[md ] {md_path}')
if __name__ == '__main__':
main()