Add script 40: pixel-identity FAR (0% across all v4 classifiers)
Phase 1.8 follow-up. Validates the v4.0 classifier family against the only hard ground truth in the corpus: pixel_identical_to_closest=1 (byte-identical to nearest same-CPA neighbor; mathematically impossible under independent hand-signing). n = 262 pixel-identical Big-4 signatures. Firm A 145 KPMG 8 PwC 107 EY 2 FAR (lower better; Wilson 95% CI for the misclassification rate): PaperA box rule 0.00% [0.00%, 1.45%] K=3 per-CPA hard label 0.00% [0.00%, 1.45%] Reverse-anchor (calibr.) 0.00% [0.00%, 1.45%] Per-firm: 0% misclass on every firm. Reverse-anchor cut chosen by prevalence calibration (overall replicated rate matches Paper A's 49.58%). Documented v4.0 limitation: no signature-level ground truth for hand-leaning class, so cannot ROC-optimize the cut directly. PwC's 107 pixel-identical signatures despite being the most hand-leaning firm overall (Script 38 per-CPA P_C1=0.31) illustrates the within-firm heterogeneity that v4.0's K=3 mixture captures: a PwC CPA can be hand-leaning on average while still occasionally reusing template signatures. Implication: at the only hard ground truth available in the corpus, all three v4.0 classifiers achieve perfect detection. This satisfies REQ-001 acceptance for pixel-identity FAR. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,421 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Script 40: Pixel-Identity FAR on Big-4 (hard ground truth validation)
|
||||
=======================================================================
|
||||
Phase 1.8 follow-up. Validates the v4.0 classifier family against
|
||||
the only hard ground truth available in the corpus:
|
||||
pixel_identical_to_closest = 1 (signatures byte-identical to their
|
||||
nearest same-CPA match).
|
||||
|
||||
Pixel-identical pairs are MATHEMATICALLY IMPOSSIBLE to arise from
|
||||
independent hand-signing -- they must be reuses of the same source
|
||||
image. Treating them as ground-truth replicated, we compute:
|
||||
|
||||
FAR (false-alarm-rate) := P(classifier says hand-leaning |
|
||||
ground truth is replicated)
|
||||
|
||||
for three classifiers:
|
||||
|
||||
C1 PaperA non_hand iff cos > 0.95 AND dh <= 5
|
||||
C2 K=3 per-CPA hard label, replicated = C3 (highest cos)
|
||||
C3 Reverse-anchor cos_left_tail_pct under non-Big-4 reference;
|
||||
replicated = score below explicit cut.
|
||||
Cut chosen so that the rule's overall
|
||||
replicated rate matches PaperA's overall rate
|
||||
(calibration-by-prevalence; documented limitation).
|
||||
|
||||
Additional metrics per classifier:
|
||||
- n_pixel_identical, n_correctly_called_replicated,
|
||||
n_misclassified_handleaning
|
||||
- Wilson 95% CI on FAR
|
||||
- Per-firm FAR breakdown
|
||||
|
||||
Output:
|
||||
reports/v4_big4/pixel_identity_far/
|
||||
far_results.json
|
||||
far_report.md
|
||||
far_cases.csv (every misclassified pixel-identical sig)
|
||||
"""
|
||||
|
||||
import sqlite3
|
||||
import csv
|
||||
import json
|
||||
import numpy as np
|
||||
import matplotlib
|
||||
matplotlib.use('Agg')
|
||||
import matplotlib.pyplot as plt
|
||||
from pathlib import Path
|
||||
from datetime import datetime
|
||||
from scipy import stats
|
||||
from scipy.stats import norm
|
||||
from sklearn.mixture import GaussianMixture
|
||||
from sklearn.covariance import MinCovDet
|
||||
|
||||
DB = '/Volumes/NV2/PDF-Processing/signature-analysis/signature_analysis.db'
|
||||
OUT = Path('/Volumes/NV2/PDF-Processing/signature-analysis/reports/'
|
||||
'v4_big4/pixel_identity_far')
|
||||
OUT.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
SEED = 42
|
||||
BIG4 = ('勤業眾信聯合', '安侯建業聯合', '資誠聯合', '安永聯合')
|
||||
LABEL = {'勤業眾信聯合': 'Firm A (Deloitte)', '安侯建業聯合': 'KPMG',
|
||||
'資誠聯合': 'PwC', '安永聯合': 'EY'}
|
||||
PAPER_A_COS_CUT = 0.95
|
||||
PAPER_A_DH_CUT = 5
|
||||
MIN_SIGS = 10
|
||||
|
||||
|
||||
def load_pixel_identical_big4():
|
||||
conn = sqlite3.connect(DB)
|
||||
cur = conn.cursor()
|
||||
cur.execute('''
|
||||
SELECT s.signature_id, s.assigned_accountant, a.firm,
|
||||
s.max_similarity_to_same_accountant,
|
||||
CAST(s.min_dhash_independent AS REAL),
|
||||
s.closest_match_file
|
||||
FROM signatures s
|
||||
JOIN accountants a ON s.assigned_accountant = a.name
|
||||
WHERE s.pixel_identical_to_closest = 1
|
||||
AND s.max_similarity_to_same_accountant IS NOT NULL
|
||||
AND s.min_dhash_independent IS NOT NULL
|
||||
AND a.firm IN (?, ?, ?, ?)
|
||||
''', BIG4)
|
||||
rows = cur.fetchall()
|
||||
conn.close()
|
||||
return rows
|
||||
|
||||
|
||||
def load_all_big4_signatures():
|
||||
"""For computing the calibration-by-prevalence rate of PaperA."""
|
||||
conn = sqlite3.connect(DB)
|
||||
cur = conn.cursor()
|
||||
cur.execute('''
|
||||
SELECT s.max_similarity_to_same_accountant,
|
||||
CAST(s.min_dhash_independent AS REAL)
|
||||
FROM signatures s
|
||||
JOIN accountants a ON s.assigned_accountant = a.name
|
||||
WHERE s.assigned_accountant IS NOT NULL
|
||||
AND s.max_similarity_to_same_accountant IS NOT NULL
|
||||
AND s.min_dhash_independent IS NOT NULL
|
||||
AND a.firm IN (?, ?, ?, ?)
|
||||
''', BIG4)
|
||||
rows = cur.fetchall()
|
||||
conn.close()
|
||||
cos = np.array([float(r[0]) for r in rows])
|
||||
dh = np.array([float(r[1]) for r in rows])
|
||||
return cos, dh
|
||||
|
||||
|
||||
def load_per_cpa_means_big4():
|
||||
conn = sqlite3.connect(DB)
|
||||
cur = conn.cursor()
|
||||
cur.execute('''
|
||||
SELECT s.assigned_accountant, a.firm,
|
||||
AVG(s.max_similarity_to_same_accountant) AS cos_mean,
|
||||
AVG(CAST(s.min_dhash_independent AS REAL)) AS dh_mean,
|
||||
COUNT(*) AS n
|
||||
FROM signatures s
|
||||
JOIN accountants a ON s.assigned_accountant = a.name
|
||||
WHERE s.assigned_accountant IS NOT NULL
|
||||
AND s.max_similarity_to_same_accountant IS NOT NULL
|
||||
AND s.min_dhash_independent IS NOT NULL
|
||||
AND a.firm IN (?, ?, ?, ?)
|
||||
GROUP BY s.assigned_accountant
|
||||
HAVING n >= ?
|
||||
''', BIG4 + (MIN_SIGS,))
|
||||
rows = cur.fetchall()
|
||||
conn.close()
|
||||
X = np.array([[float(r[2]), float(r[3])] for r in rows])
|
||||
return X
|
||||
|
||||
|
||||
def load_non_big4_reference_means():
|
||||
conn = sqlite3.connect(DB)
|
||||
cur = conn.cursor()
|
||||
cur.execute('''
|
||||
SELECT AVG(s.max_similarity_to_same_accountant) AS cos_mean,
|
||||
AVG(CAST(s.min_dhash_independent AS REAL)) AS dh_mean,
|
||||
COUNT(*) AS n
|
||||
FROM signatures s
|
||||
JOIN accountants a ON s.assigned_accountant = a.name
|
||||
WHERE s.assigned_accountant IS NOT NULL
|
||||
AND s.max_similarity_to_same_accountant IS NOT NULL
|
||||
AND s.min_dhash_independent IS NOT NULL
|
||||
AND a.firm IS NOT NULL
|
||||
AND a.firm NOT IN (?, ?, ?, ?)
|
||||
GROUP BY s.assigned_accountant
|
||||
HAVING n >= ?
|
||||
''', BIG4 + (MIN_SIGS,))
|
||||
rows = cur.fetchall()
|
||||
conn.close()
|
||||
return np.array([[float(r[0]), float(r[1])] for r in rows])
|
||||
|
||||
|
||||
def fit_k3(X):
|
||||
return GaussianMixture(n_components=3, covariance_type='full',
|
||||
random_state=SEED, n_init=15, max_iter=500).fit(X)
|
||||
|
||||
|
||||
def fit_reference(X):
|
||||
mcd = MinCovDet(random_state=SEED, support_fraction=0.85).fit(X)
|
||||
return {'mean': mcd.location_, 'cov': mcd.covariance_}
|
||||
|
||||
|
||||
def wilson_ci(k, n, alpha=0.05):
|
||||
if n == 0:
|
||||
return (0.0, 1.0)
|
||||
z = norm.ppf(1 - alpha / 2)
|
||||
phat = k / n
|
||||
denom = 1 + z * z / n
|
||||
center = (phat + z * z / (2 * n)) / denom
|
||||
pm = z * np.sqrt(phat * (1 - phat) / n + z * z / (4 * n * n)) / denom
|
||||
return (max(0.0, center - pm), min(1.0, center + pm))
|
||||
|
||||
|
||||
def main():
|
||||
print('=' * 72)
|
||||
print('Script 40: Pixel-Identity FAR on Big-4')
|
||||
print('=' * 72)
|
||||
|
||||
# Load pixel-identical Big-4 signatures (ground truth replicated)
|
||||
rows = load_pixel_identical_big4()
|
||||
n = len(rows)
|
||||
print(f'\nN pixel-identical Big-4 signatures (ground truth = replicated): '
|
||||
f'{n}')
|
||||
if n == 0:
|
||||
print('No pixel-identical pairs in Big-4. Exiting.')
|
||||
return
|
||||
|
||||
# Per-firm distribution
|
||||
by_firm = {}
|
||||
for r in rows:
|
||||
by_firm.setdefault(r[2], []).append(r)
|
||||
for f in BIG4:
|
||||
print(f' {LABEL[f]}: {len(by_firm.get(f, []))}')
|
||||
|
||||
sig_ids = np.array([r[0] for r in rows])
|
||||
sig_firms = np.array([r[2] for r in rows])
|
||||
cos = np.array([r[3] for r in rows], dtype=float)
|
||||
dh = np.array([r[4] for r in rows], dtype=float)
|
||||
closest = np.array([r[5] or '' for r in rows])
|
||||
|
||||
# ---------- Classifier C1: Paper A rule ----------
|
||||
paperA_replicated = (cos > PAPER_A_COS_CUT) & (dh <= PAPER_A_DH_CUT)
|
||||
paperA_misclass = ~paperA_replicated
|
||||
n_pA_correct = int(paperA_replicated.sum())
|
||||
n_pA_miss = int(paperA_misclass.sum())
|
||||
far_pA = n_pA_miss / n
|
||||
pA_lo, pA_hi = wilson_ci(n_pA_miss, n)
|
||||
print(f'\n[C1 Paper A] correct: {n_pA_correct}/{n} = '
|
||||
f'{(1 - far_pA)*100:.2f}%; FAR: {far_pA*100:.2f}% '
|
||||
f'[{pA_lo*100:.2f}%, {pA_hi*100:.2f}%]')
|
||||
|
||||
# ---------- Classifier C2: K=3 per-CPA hard label ----------
|
||||
# (Use the K=3 CPA-fit components; for each pixel-identical signature,
|
||||
# predict its membership as if it were a per-CPA point.)
|
||||
X_cpa = load_per_cpa_means_big4()
|
||||
gmm = fit_k3(X_cpa)
|
||||
order = np.argsort(gmm.means_[:, 0]) # C1 hand, C3 replicated
|
||||
label_map = {old: new for new, old in enumerate(order)}
|
||||
X_pix = np.column_stack([cos, dh])
|
||||
raw = gmm.predict(X_pix)
|
||||
k3_labels = np.array([label_map[l] for l in raw])
|
||||
# Replicated = C3 (label index 2)
|
||||
k3_replicated = (k3_labels == 2)
|
||||
k3_misclass = ~k3_replicated
|
||||
n_k3_correct = int(k3_replicated.sum())
|
||||
n_k3_miss = int(k3_misclass.sum())
|
||||
far_k3 = n_k3_miss / n
|
||||
k3_lo, k3_hi = wilson_ci(n_k3_miss, n)
|
||||
print(f'[C2 K=3 perCPA] correct: {n_k3_correct}/{n} = '
|
||||
f'{(1 - far_k3)*100:.2f}%; FAR: {far_k3*100:.2f}% '
|
||||
f'[{k3_lo*100:.2f}%, {k3_hi*100:.2f}%]')
|
||||
|
||||
# ---------- Classifier C3: Reverse-anchor with prevalence-calibrated cut ----------
|
||||
# Build reference Gaussian from non-Big-4
|
||||
X_ref = load_non_big4_reference_means()
|
||||
ref = fit_reference(X_ref)
|
||||
mu_c = ref['mean'][0]
|
||||
sd_c = float(np.sqrt(ref['cov'][0, 0]))
|
||||
|
||||
# Score every Big-4 signature; pick cut so overall replicated rate
|
||||
# matches Paper A's overall replicated rate.
|
||||
cos_all, dh_all = load_all_big4_signatures()
|
||||
paperA_overall_repl_rate = float(np.mean(
|
||||
(cos_all > PAPER_A_COS_CUT) & (dh_all <= PAPER_A_DH_CUT)))
|
||||
# Reverse-anchor score per signature
|
||||
rev_score_all = stats.norm.cdf(cos_all, loc=mu_c, scale=sd_c)
|
||||
# We want HIGHER scores = more replicated (large cosine = right tail
|
||||
# of the reference). So replicated iff rev_score > cut.
|
||||
# Pick cut at the (1 - paperA_overall_repl_rate)-quantile of rev_score_all.
|
||||
cut_quantile = 1 - paperA_overall_repl_rate
|
||||
rev_cut = float(np.quantile(rev_score_all, cut_quantile))
|
||||
print(f'\n[C3 Reverse-anchor calibration] '
|
||||
f'PaperA overall replicated rate = '
|
||||
f'{paperA_overall_repl_rate*100:.2f}%; '
|
||||
f'rev-anchor cut at {cut_quantile*100:.2f}-th pct of score = '
|
||||
f'{rev_cut:.4f}')
|
||||
|
||||
rev_score_pix = stats.norm.cdf(cos, loc=mu_c, scale=sd_c)
|
||||
rev_replicated = (rev_score_pix > rev_cut)
|
||||
rev_misclass = ~rev_replicated
|
||||
n_rev_correct = int(rev_replicated.sum())
|
||||
n_rev_miss = int(rev_misclass.sum())
|
||||
far_rev = n_rev_miss / n
|
||||
rev_lo, rev_hi = wilson_ci(n_rev_miss, n)
|
||||
print(f'[C3 Reverse-anchor] correct: {n_rev_correct}/{n} = '
|
||||
f'{(1 - far_rev)*100:.2f}%; FAR: {far_rev*100:.2f}% '
|
||||
f'[{rev_lo*100:.2f}%, {rev_hi*100:.2f}%]')
|
||||
|
||||
# ---------- Per-firm FAR ----------
|
||||
print('\n[per-firm FAR]')
|
||||
print(f' {"Firm":<22} {"n":>5} {"PaperA":>11} {"K=3":>11} {"Rev-anc":>11}')
|
||||
per_firm = {}
|
||||
for f in BIG4:
|
||||
mask = (sig_firms == f)
|
||||
n_f = int(mask.sum())
|
||||
if n_f == 0:
|
||||
per_firm[f] = {'n': 0}
|
||||
continue
|
||||
miss_pA = int(np.sum(paperA_misclass[mask]))
|
||||
miss_k3 = int(np.sum(k3_misclass[mask]))
|
||||
miss_rev = int(np.sum(rev_misclass[mask]))
|
||||
far_pA_f = miss_pA / n_f
|
||||
far_k3_f = miss_k3 / n_f
|
||||
far_rev_f = miss_rev / n_f
|
||||
per_firm[f] = {
|
||||
'n': n_f,
|
||||
'paperA_far': far_pA_f, 'paperA_misclass_n': miss_pA,
|
||||
'k3_far': far_k3_f, 'k3_misclass_n': miss_k3,
|
||||
'reverse_anchor_far': far_rev_f, 'reverse_anchor_misclass_n': miss_rev,
|
||||
}
|
||||
print(f' {LABEL[f]:<22} {n_f:>5} {far_pA_f*100:>10.2f}% '
|
||||
f'{far_k3_f*100:>10.2f}% {far_rev_f*100:>10.2f}%')
|
||||
|
||||
# ---------- Misclassified case CSV ----------
|
||||
cases_csv = OUT / 'far_cases.csv'
|
||||
with open(cases_csv, 'w', newline='', encoding='utf-8') as f:
|
||||
w = csv.writer(f)
|
||||
w.writerow(['signature_id', 'cpa', 'firm', 'firm_label',
|
||||
'cos', 'dh', 'closest_match_file',
|
||||
'paperA_call', 'k3_call', 'reverse_anchor_call'])
|
||||
for i in range(n):
|
||||
pa = 'replicated' if paperA_replicated[i] else 'hand_leaning'
|
||||
kl = ['C1_handleaning', 'C2_mixed',
|
||||
'C3_replicated'][k3_labels[i]]
|
||||
ra = 'replicated' if rev_replicated[i] else 'hand_leaning'
|
||||
# Only write rows where at least one classifier disagrees with
|
||||
# ground truth (replicated)
|
||||
if pa != 'replicated' or kl != 'C3_replicated' \
|
||||
or ra != 'replicated':
|
||||
w.writerow([sig_ids[i], rows[i][1], sig_firms[i],
|
||||
LABEL[sig_firms[i]],
|
||||
f'{cos[i]:.4f}', f'{dh[i]:.4f}', closest[i],
|
||||
pa, kl, ra])
|
||||
print(f'\nMisclassified cases CSV: {cases_csv}')
|
||||
|
||||
# Markdown report
|
||||
md = [
|
||||
'# Pixel-Identity FAR on Big-4 (Script 40)',
|
||||
f'Generated: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}',
|
||||
'',
|
||||
'## Ground truth',
|
||||
'',
|
||||
('Pixel-identical pairs (signature byte-identical to nearest '
|
||||
'same-CPA neighbor) cannot arise from independent hand-signing. '
|
||||
'They are taken as ground-truth REPLICATED. We measure each '
|
||||
'classifier\'s false-alarm rate (rate of calling these signatures '
|
||||
'hand-leaning).'),
|
||||
'',
|
||||
f'- Total Big-4 pixel-identical signatures: **{n}**',
|
||||
'',
|
||||
'## Headline FAR (lower is better)',
|
||||
'',
|
||||
'| Classifier | Correct/N | FAR | Wilson 95% CI |',
|
||||
'|---|---|---|---|',
|
||||
f'| Paper A box rule | {n_pA_correct}/{n} | **{far_pA*100:.2f}%** | '
|
||||
f'[{pA_lo*100:.2f}%, {pA_hi*100:.2f}%] |',
|
||||
f'| K=3 per-CPA hard label (C3 = replicated) | {n_k3_correct}/{n} | '
|
||||
f'**{far_k3*100:.2f}%** | [{k3_lo*100:.2f}%, {k3_hi*100:.2f}%] |',
|
||||
f'| Reverse-anchor (prevalence-calibrated cut) | {n_rev_correct}/{n} | '
|
||||
f'**{far_rev*100:.2f}%** | [{rev_lo*100:.2f}%, {rev_hi*100:.2f}%] |',
|
||||
'',
|
||||
('Reverse-anchor cut chosen so that overall replicated rate '
|
||||
f'matches Paper A overall rate ({paperA_overall_repl_rate*100:.2f}%); '
|
||||
'this is calibration-by-prevalence and is documented as a v4.0 '
|
||||
'limitation -- no signature-level ground truth exists for the '
|
||||
'hand-leaning class so we cannot pick the cut by direct ROC '
|
||||
'optimization.'),
|
||||
'',
|
||||
'## Per-firm FAR',
|
||||
'',
|
||||
'| Firm | n | Paper A FAR | K=3 FAR | Rev-anchor FAR |',
|
||||
'|---|---|---|---|---|',
|
||||
]
|
||||
for f in BIG4:
|
||||
pf = per_firm[f]
|
||||
if pf['n'] == 0:
|
||||
md.append(f'| {LABEL[f]} | 0 | n/a | n/a | n/a |')
|
||||
continue
|
||||
md.append(f'| {LABEL[f]} | {pf["n"]} | '
|
||||
f'{pf["paperA_far"]*100:.2f}% '
|
||||
f'({pf["paperA_misclass_n"]}) | '
|
||||
f'{pf["k3_far"]*100:.2f}% ({pf["k3_misclass_n"]}) | '
|
||||
f'{pf["reverse_anchor_far"]*100:.2f}% '
|
||||
f'({pf["reverse_anchor_misclass_n"]}) |')
|
||||
md += ['', '## Reading',
|
||||
'',
|
||||
('A FAR substantially below the no-information rate '
|
||||
f'(1 - {paperA_overall_repl_rate*100:.2f}% = '
|
||||
f'{(1-paperA_overall_repl_rate)*100:.2f}%) means the '
|
||||
'classifier extracts useful signal from the (cos, dh) '
|
||||
'features for distinguishing pixel-identical replication. '
|
||||
'Since pixel-identical pairs are a CONSERVATIVE SUBSET of '
|
||||
'true replication (only the byte-equal extreme), a low FAR '
|
||||
'against this subset is necessary but not sufficient evidence '
|
||||
'of correct replication detection.'),
|
||||
'',
|
||||
'## Files',
|
||||
'- `far_results.json` -- machine-readable results',
|
||||
'- `far_cases.csv` -- every misclassified pixel-identical signature',
|
||||
]
|
||||
md_path = OUT / 'far_report.md'
|
||||
md_path.write_text('\n'.join(md), encoding='utf-8')
|
||||
print(f'Report: {md_path}')
|
||||
|
||||
payload = {
|
||||
'generated_at': datetime.now().isoformat(),
|
||||
'n_pixel_identical_big4': n,
|
||||
'paper_a_cuts': {'cos': PAPER_A_COS_CUT, 'dh': PAPER_A_DH_CUT},
|
||||
'paper_a_overall_replicated_rate_big4': paperA_overall_repl_rate,
|
||||
'reverse_anchor_cut_score': rev_cut,
|
||||
'reverse_anchor_cut_quantile': cut_quantile,
|
||||
'reverse_anchor_reference_center': [float(mu_c),
|
||||
float(ref['mean'][1])],
|
||||
'classifiers': {
|
||||
'paperA': {
|
||||
'far': float(far_pA),
|
||||
'far_wilson95': [float(pA_lo), float(pA_hi)],
|
||||
'n_correct': n_pA_correct, 'n_misclass': n_pA_miss,
|
||||
},
|
||||
'k3_perCPA': {
|
||||
'far': float(far_k3),
|
||||
'far_wilson95': [float(k3_lo), float(k3_hi)],
|
||||
'n_correct': n_k3_correct, 'n_misclass': n_k3_miss,
|
||||
},
|
||||
'reverse_anchor_calibrated': {
|
||||
'far': float(far_rev),
|
||||
'far_wilson95': [float(rev_lo), float(rev_hi)],
|
||||
'n_correct': n_rev_correct, 'n_misclass': n_rev_miss,
|
||||
},
|
||||
},
|
||||
'per_firm_far': per_firm,
|
||||
}
|
||||
json_path = OUT / 'far_results.json'
|
||||
json_path.write_text(json.dumps(payload, indent=2, ensure_ascii=False),
|
||||
encoding='utf-8')
|
||||
print(f'JSON: {json_path}')
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
Reference in New Issue
Block a user