338737d9a1
Phase 1.8 follow-up. Validates the v4.0 classifier family against the only hard ground truth in the corpus: pixel_identical_to_closest=1 (byte-identical to nearest same-CPA neighbor; mathematically impossible under independent hand-signing). n = 262 pixel-identical Big-4 signatures. Firm A 145 KPMG 8 PwC 107 EY 2 FAR (lower better; Wilson 95% CI for the misclassification rate): PaperA box rule 0.00% [0.00%, 1.45%] K=3 per-CPA hard label 0.00% [0.00%, 1.45%] Reverse-anchor (calibr.) 0.00% [0.00%, 1.45%] Per-firm: 0% misclass on every firm. Reverse-anchor cut chosen by prevalence calibration (overall replicated rate matches Paper A's 49.58%). Documented v4.0 limitation: no signature-level ground truth for hand-leaning class, so cannot ROC-optimize the cut directly. PwC's 107 pixel-identical signatures despite being the most hand-leaning firm overall (Script 38 per-CPA P_C1=0.31) illustrates the within-firm heterogeneity that v4.0's K=3 mixture captures: a PwC CPA can be hand-leaning on average while still occasionally reusing template signatures. Implication: at the only hard ground truth available in the corpus, all three v4.0 classifiers achieve perfect detection. This satisfies REQ-001 acceptance for pixel-identity FAR. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
422 lines
16 KiB
Python
422 lines
16 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Script 40: Pixel-Identity FAR on Big-4 (hard ground truth validation)
|
|
=======================================================================
|
|
Phase 1.8 follow-up. Validates the v4.0 classifier family against
|
|
the only hard ground truth available in the corpus:
|
|
pixel_identical_to_closest = 1 (signatures byte-identical to their
|
|
nearest same-CPA match).
|
|
|
|
Pixel-identical pairs are MATHEMATICALLY IMPOSSIBLE to arise from
|
|
independent hand-signing -- they must be reuses of the same source
|
|
image. Treating them as ground-truth replicated, we compute:
|
|
|
|
FAR (false-alarm-rate) := P(classifier says hand-leaning |
|
|
ground truth is replicated)
|
|
|
|
for three classifiers:
|
|
|
|
C1 PaperA non_hand iff cos > 0.95 AND dh <= 5
|
|
C2 K=3 per-CPA hard label, replicated = C3 (highest cos)
|
|
C3 Reverse-anchor cos_left_tail_pct under non-Big-4 reference;
|
|
replicated = score below explicit cut.
|
|
Cut chosen so that the rule's overall
|
|
replicated rate matches PaperA's overall rate
|
|
(calibration-by-prevalence; documented limitation).
|
|
|
|
Additional metrics per classifier:
|
|
- n_pixel_identical, n_correctly_called_replicated,
|
|
n_misclassified_handleaning
|
|
- Wilson 95% CI on FAR
|
|
- Per-firm FAR breakdown
|
|
|
|
Output:
|
|
reports/v4_big4/pixel_identity_far/
|
|
far_results.json
|
|
far_report.md
|
|
far_cases.csv (every misclassified pixel-identical sig)
|
|
"""
|
|
|
|
import sqlite3
|
|
import csv
|
|
import json
|
|
import numpy as np
|
|
import matplotlib
|
|
matplotlib.use('Agg')
|
|
import matplotlib.pyplot as plt
|
|
from pathlib import Path
|
|
from datetime import datetime
|
|
from scipy import stats
|
|
from scipy.stats import norm
|
|
from sklearn.mixture import GaussianMixture
|
|
from sklearn.covariance import MinCovDet
|
|
|
|
DB = '/Volumes/NV2/PDF-Processing/signature-analysis/signature_analysis.db'
|
|
OUT = Path('/Volumes/NV2/PDF-Processing/signature-analysis/reports/'
|
|
'v4_big4/pixel_identity_far')
|
|
OUT.mkdir(parents=True, exist_ok=True)
|
|
|
|
SEED = 42
|
|
BIG4 = ('勤業眾信聯合', '安侯建業聯合', '資誠聯合', '安永聯合')
|
|
LABEL = {'勤業眾信聯合': 'Firm A (Deloitte)', '安侯建業聯合': 'KPMG',
|
|
'資誠聯合': 'PwC', '安永聯合': 'EY'}
|
|
PAPER_A_COS_CUT = 0.95
|
|
PAPER_A_DH_CUT = 5
|
|
MIN_SIGS = 10
|
|
|
|
|
|
def load_pixel_identical_big4():
|
|
conn = sqlite3.connect(DB)
|
|
cur = conn.cursor()
|
|
cur.execute('''
|
|
SELECT s.signature_id, s.assigned_accountant, a.firm,
|
|
s.max_similarity_to_same_accountant,
|
|
CAST(s.min_dhash_independent AS REAL),
|
|
s.closest_match_file
|
|
FROM signatures s
|
|
JOIN accountants a ON s.assigned_accountant = a.name
|
|
WHERE s.pixel_identical_to_closest = 1
|
|
AND s.max_similarity_to_same_accountant IS NOT NULL
|
|
AND s.min_dhash_independent IS NOT NULL
|
|
AND a.firm IN (?, ?, ?, ?)
|
|
''', BIG4)
|
|
rows = cur.fetchall()
|
|
conn.close()
|
|
return rows
|
|
|
|
|
|
def load_all_big4_signatures():
|
|
"""For computing the calibration-by-prevalence rate of PaperA."""
|
|
conn = sqlite3.connect(DB)
|
|
cur = conn.cursor()
|
|
cur.execute('''
|
|
SELECT s.max_similarity_to_same_accountant,
|
|
CAST(s.min_dhash_independent AS REAL)
|
|
FROM signatures s
|
|
JOIN accountants a ON s.assigned_accountant = a.name
|
|
WHERE s.assigned_accountant IS NOT NULL
|
|
AND s.max_similarity_to_same_accountant IS NOT NULL
|
|
AND s.min_dhash_independent IS NOT NULL
|
|
AND a.firm IN (?, ?, ?, ?)
|
|
''', BIG4)
|
|
rows = cur.fetchall()
|
|
conn.close()
|
|
cos = np.array([float(r[0]) for r in rows])
|
|
dh = np.array([float(r[1]) for r in rows])
|
|
return cos, dh
|
|
|
|
|
|
def load_per_cpa_means_big4():
|
|
conn = sqlite3.connect(DB)
|
|
cur = conn.cursor()
|
|
cur.execute('''
|
|
SELECT s.assigned_accountant, a.firm,
|
|
AVG(s.max_similarity_to_same_accountant) AS cos_mean,
|
|
AVG(CAST(s.min_dhash_independent AS REAL)) AS dh_mean,
|
|
COUNT(*) AS n
|
|
FROM signatures s
|
|
JOIN accountants a ON s.assigned_accountant = a.name
|
|
WHERE s.assigned_accountant IS NOT NULL
|
|
AND s.max_similarity_to_same_accountant IS NOT NULL
|
|
AND s.min_dhash_independent IS NOT NULL
|
|
AND a.firm IN (?, ?, ?, ?)
|
|
GROUP BY s.assigned_accountant
|
|
HAVING n >= ?
|
|
''', BIG4 + (MIN_SIGS,))
|
|
rows = cur.fetchall()
|
|
conn.close()
|
|
X = np.array([[float(r[2]), float(r[3])] for r in rows])
|
|
return X
|
|
|
|
|
|
def load_non_big4_reference_means():
|
|
conn = sqlite3.connect(DB)
|
|
cur = conn.cursor()
|
|
cur.execute('''
|
|
SELECT AVG(s.max_similarity_to_same_accountant) AS cos_mean,
|
|
AVG(CAST(s.min_dhash_independent AS REAL)) AS dh_mean,
|
|
COUNT(*) AS n
|
|
FROM signatures s
|
|
JOIN accountants a ON s.assigned_accountant = a.name
|
|
WHERE s.assigned_accountant IS NOT NULL
|
|
AND s.max_similarity_to_same_accountant IS NOT NULL
|
|
AND s.min_dhash_independent IS NOT NULL
|
|
AND a.firm IS NOT NULL
|
|
AND a.firm NOT IN (?, ?, ?, ?)
|
|
GROUP BY s.assigned_accountant
|
|
HAVING n >= ?
|
|
''', BIG4 + (MIN_SIGS,))
|
|
rows = cur.fetchall()
|
|
conn.close()
|
|
return np.array([[float(r[0]), float(r[1])] for r in rows])
|
|
|
|
|
|
def fit_k3(X):
|
|
return GaussianMixture(n_components=3, covariance_type='full',
|
|
random_state=SEED, n_init=15, max_iter=500).fit(X)
|
|
|
|
|
|
def fit_reference(X):
|
|
mcd = MinCovDet(random_state=SEED, support_fraction=0.85).fit(X)
|
|
return {'mean': mcd.location_, 'cov': mcd.covariance_}
|
|
|
|
|
|
def wilson_ci(k, n, alpha=0.05):
|
|
if n == 0:
|
|
return (0.0, 1.0)
|
|
z = norm.ppf(1 - alpha / 2)
|
|
phat = k / n
|
|
denom = 1 + z * z / n
|
|
center = (phat + z * z / (2 * n)) / denom
|
|
pm = z * np.sqrt(phat * (1 - phat) / n + z * z / (4 * n * n)) / denom
|
|
return (max(0.0, center - pm), min(1.0, center + pm))
|
|
|
|
|
|
def main():
|
|
print('=' * 72)
|
|
print('Script 40: Pixel-Identity FAR on Big-4')
|
|
print('=' * 72)
|
|
|
|
# Load pixel-identical Big-4 signatures (ground truth replicated)
|
|
rows = load_pixel_identical_big4()
|
|
n = len(rows)
|
|
print(f'\nN pixel-identical Big-4 signatures (ground truth = replicated): '
|
|
f'{n}')
|
|
if n == 0:
|
|
print('No pixel-identical pairs in Big-4. Exiting.')
|
|
return
|
|
|
|
# Per-firm distribution
|
|
by_firm = {}
|
|
for r in rows:
|
|
by_firm.setdefault(r[2], []).append(r)
|
|
for f in BIG4:
|
|
print(f' {LABEL[f]}: {len(by_firm.get(f, []))}')
|
|
|
|
sig_ids = np.array([r[0] for r in rows])
|
|
sig_firms = np.array([r[2] for r in rows])
|
|
cos = np.array([r[3] for r in rows], dtype=float)
|
|
dh = np.array([r[4] for r in rows], dtype=float)
|
|
closest = np.array([r[5] or '' for r in rows])
|
|
|
|
# ---------- Classifier C1: Paper A rule ----------
|
|
paperA_replicated = (cos > PAPER_A_COS_CUT) & (dh <= PAPER_A_DH_CUT)
|
|
paperA_misclass = ~paperA_replicated
|
|
n_pA_correct = int(paperA_replicated.sum())
|
|
n_pA_miss = int(paperA_misclass.sum())
|
|
far_pA = n_pA_miss / n
|
|
pA_lo, pA_hi = wilson_ci(n_pA_miss, n)
|
|
print(f'\n[C1 Paper A] correct: {n_pA_correct}/{n} = '
|
|
f'{(1 - far_pA)*100:.2f}%; FAR: {far_pA*100:.2f}% '
|
|
f'[{pA_lo*100:.2f}%, {pA_hi*100:.2f}%]')
|
|
|
|
# ---------- Classifier C2: K=3 per-CPA hard label ----------
|
|
# (Use the K=3 CPA-fit components; for each pixel-identical signature,
|
|
# predict its membership as if it were a per-CPA point.)
|
|
X_cpa = load_per_cpa_means_big4()
|
|
gmm = fit_k3(X_cpa)
|
|
order = np.argsort(gmm.means_[:, 0]) # C1 hand, C3 replicated
|
|
label_map = {old: new for new, old in enumerate(order)}
|
|
X_pix = np.column_stack([cos, dh])
|
|
raw = gmm.predict(X_pix)
|
|
k3_labels = np.array([label_map[l] for l in raw])
|
|
# Replicated = C3 (label index 2)
|
|
k3_replicated = (k3_labels == 2)
|
|
k3_misclass = ~k3_replicated
|
|
n_k3_correct = int(k3_replicated.sum())
|
|
n_k3_miss = int(k3_misclass.sum())
|
|
far_k3 = n_k3_miss / n
|
|
k3_lo, k3_hi = wilson_ci(n_k3_miss, n)
|
|
print(f'[C2 K=3 perCPA] correct: {n_k3_correct}/{n} = '
|
|
f'{(1 - far_k3)*100:.2f}%; FAR: {far_k3*100:.2f}% '
|
|
f'[{k3_lo*100:.2f}%, {k3_hi*100:.2f}%]')
|
|
|
|
# ---------- Classifier C3: Reverse-anchor with prevalence-calibrated cut ----------
|
|
# Build reference Gaussian from non-Big-4
|
|
X_ref = load_non_big4_reference_means()
|
|
ref = fit_reference(X_ref)
|
|
mu_c = ref['mean'][0]
|
|
sd_c = float(np.sqrt(ref['cov'][0, 0]))
|
|
|
|
# Score every Big-4 signature; pick cut so overall replicated rate
|
|
# matches Paper A's overall replicated rate.
|
|
cos_all, dh_all = load_all_big4_signatures()
|
|
paperA_overall_repl_rate = float(np.mean(
|
|
(cos_all > PAPER_A_COS_CUT) & (dh_all <= PAPER_A_DH_CUT)))
|
|
# Reverse-anchor score per signature
|
|
rev_score_all = stats.norm.cdf(cos_all, loc=mu_c, scale=sd_c)
|
|
# We want HIGHER scores = more replicated (large cosine = right tail
|
|
# of the reference). So replicated iff rev_score > cut.
|
|
# Pick cut at the (1 - paperA_overall_repl_rate)-quantile of rev_score_all.
|
|
cut_quantile = 1 - paperA_overall_repl_rate
|
|
rev_cut = float(np.quantile(rev_score_all, cut_quantile))
|
|
print(f'\n[C3 Reverse-anchor calibration] '
|
|
f'PaperA overall replicated rate = '
|
|
f'{paperA_overall_repl_rate*100:.2f}%; '
|
|
f'rev-anchor cut at {cut_quantile*100:.2f}-th pct of score = '
|
|
f'{rev_cut:.4f}')
|
|
|
|
rev_score_pix = stats.norm.cdf(cos, loc=mu_c, scale=sd_c)
|
|
rev_replicated = (rev_score_pix > rev_cut)
|
|
rev_misclass = ~rev_replicated
|
|
n_rev_correct = int(rev_replicated.sum())
|
|
n_rev_miss = int(rev_misclass.sum())
|
|
far_rev = n_rev_miss / n
|
|
rev_lo, rev_hi = wilson_ci(n_rev_miss, n)
|
|
print(f'[C3 Reverse-anchor] correct: {n_rev_correct}/{n} = '
|
|
f'{(1 - far_rev)*100:.2f}%; FAR: {far_rev*100:.2f}% '
|
|
f'[{rev_lo*100:.2f}%, {rev_hi*100:.2f}%]')
|
|
|
|
# ---------- Per-firm FAR ----------
|
|
print('\n[per-firm FAR]')
|
|
print(f' {"Firm":<22} {"n":>5} {"PaperA":>11} {"K=3":>11} {"Rev-anc":>11}')
|
|
per_firm = {}
|
|
for f in BIG4:
|
|
mask = (sig_firms == f)
|
|
n_f = int(mask.sum())
|
|
if n_f == 0:
|
|
per_firm[f] = {'n': 0}
|
|
continue
|
|
miss_pA = int(np.sum(paperA_misclass[mask]))
|
|
miss_k3 = int(np.sum(k3_misclass[mask]))
|
|
miss_rev = int(np.sum(rev_misclass[mask]))
|
|
far_pA_f = miss_pA / n_f
|
|
far_k3_f = miss_k3 / n_f
|
|
far_rev_f = miss_rev / n_f
|
|
per_firm[f] = {
|
|
'n': n_f,
|
|
'paperA_far': far_pA_f, 'paperA_misclass_n': miss_pA,
|
|
'k3_far': far_k3_f, 'k3_misclass_n': miss_k3,
|
|
'reverse_anchor_far': far_rev_f, 'reverse_anchor_misclass_n': miss_rev,
|
|
}
|
|
print(f' {LABEL[f]:<22} {n_f:>5} {far_pA_f*100:>10.2f}% '
|
|
f'{far_k3_f*100:>10.2f}% {far_rev_f*100:>10.2f}%')
|
|
|
|
# ---------- Misclassified case CSV ----------
|
|
cases_csv = OUT / 'far_cases.csv'
|
|
with open(cases_csv, 'w', newline='', encoding='utf-8') as f:
|
|
w = csv.writer(f)
|
|
w.writerow(['signature_id', 'cpa', 'firm', 'firm_label',
|
|
'cos', 'dh', 'closest_match_file',
|
|
'paperA_call', 'k3_call', 'reverse_anchor_call'])
|
|
for i in range(n):
|
|
pa = 'replicated' if paperA_replicated[i] else 'hand_leaning'
|
|
kl = ['C1_handleaning', 'C2_mixed',
|
|
'C3_replicated'][k3_labels[i]]
|
|
ra = 'replicated' if rev_replicated[i] else 'hand_leaning'
|
|
# Only write rows where at least one classifier disagrees with
|
|
# ground truth (replicated)
|
|
if pa != 'replicated' or kl != 'C3_replicated' \
|
|
or ra != 'replicated':
|
|
w.writerow([sig_ids[i], rows[i][1], sig_firms[i],
|
|
LABEL[sig_firms[i]],
|
|
f'{cos[i]:.4f}', f'{dh[i]:.4f}', closest[i],
|
|
pa, kl, ra])
|
|
print(f'\nMisclassified cases CSV: {cases_csv}')
|
|
|
|
# Markdown report
|
|
md = [
|
|
'# Pixel-Identity FAR on Big-4 (Script 40)',
|
|
f'Generated: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}',
|
|
'',
|
|
'## Ground truth',
|
|
'',
|
|
('Pixel-identical pairs (signature byte-identical to nearest '
|
|
'same-CPA neighbor) cannot arise from independent hand-signing. '
|
|
'They are taken as ground-truth REPLICATED. We measure each '
|
|
'classifier\'s false-alarm rate (rate of calling these signatures '
|
|
'hand-leaning).'),
|
|
'',
|
|
f'- Total Big-4 pixel-identical signatures: **{n}**',
|
|
'',
|
|
'## Headline FAR (lower is better)',
|
|
'',
|
|
'| Classifier | Correct/N | FAR | Wilson 95% CI |',
|
|
'|---|---|---|---|',
|
|
f'| Paper A box rule | {n_pA_correct}/{n} | **{far_pA*100:.2f}%** | '
|
|
f'[{pA_lo*100:.2f}%, {pA_hi*100:.2f}%] |',
|
|
f'| K=3 per-CPA hard label (C3 = replicated) | {n_k3_correct}/{n} | '
|
|
f'**{far_k3*100:.2f}%** | [{k3_lo*100:.2f}%, {k3_hi*100:.2f}%] |',
|
|
f'| Reverse-anchor (prevalence-calibrated cut) | {n_rev_correct}/{n} | '
|
|
f'**{far_rev*100:.2f}%** | [{rev_lo*100:.2f}%, {rev_hi*100:.2f}%] |',
|
|
'',
|
|
('Reverse-anchor cut chosen so that overall replicated rate '
|
|
f'matches Paper A overall rate ({paperA_overall_repl_rate*100:.2f}%); '
|
|
'this is calibration-by-prevalence and is documented as a v4.0 '
|
|
'limitation -- no signature-level ground truth exists for the '
|
|
'hand-leaning class so we cannot pick the cut by direct ROC '
|
|
'optimization.'),
|
|
'',
|
|
'## Per-firm FAR',
|
|
'',
|
|
'| Firm | n | Paper A FAR | K=3 FAR | Rev-anchor FAR |',
|
|
'|---|---|---|---|---|',
|
|
]
|
|
for f in BIG4:
|
|
pf = per_firm[f]
|
|
if pf['n'] == 0:
|
|
md.append(f'| {LABEL[f]} | 0 | n/a | n/a | n/a |')
|
|
continue
|
|
md.append(f'| {LABEL[f]} | {pf["n"]} | '
|
|
f'{pf["paperA_far"]*100:.2f}% '
|
|
f'({pf["paperA_misclass_n"]}) | '
|
|
f'{pf["k3_far"]*100:.2f}% ({pf["k3_misclass_n"]}) | '
|
|
f'{pf["reverse_anchor_far"]*100:.2f}% '
|
|
f'({pf["reverse_anchor_misclass_n"]}) |')
|
|
md += ['', '## Reading',
|
|
'',
|
|
('A FAR substantially below the no-information rate '
|
|
f'(1 - {paperA_overall_repl_rate*100:.2f}% = '
|
|
f'{(1-paperA_overall_repl_rate)*100:.2f}%) means the '
|
|
'classifier extracts useful signal from the (cos, dh) '
|
|
'features for distinguishing pixel-identical replication. '
|
|
'Since pixel-identical pairs are a CONSERVATIVE SUBSET of '
|
|
'true replication (only the byte-equal extreme), a low FAR '
|
|
'against this subset is necessary but not sufficient evidence '
|
|
'of correct replication detection.'),
|
|
'',
|
|
'## Files',
|
|
'- `far_results.json` -- machine-readable results',
|
|
'- `far_cases.csv` -- every misclassified pixel-identical signature',
|
|
]
|
|
md_path = OUT / 'far_report.md'
|
|
md_path.write_text('\n'.join(md), encoding='utf-8')
|
|
print(f'Report: {md_path}')
|
|
|
|
payload = {
|
|
'generated_at': datetime.now().isoformat(),
|
|
'n_pixel_identical_big4': n,
|
|
'paper_a_cuts': {'cos': PAPER_A_COS_CUT, 'dh': PAPER_A_DH_CUT},
|
|
'paper_a_overall_replicated_rate_big4': paperA_overall_repl_rate,
|
|
'reverse_anchor_cut_score': rev_cut,
|
|
'reverse_anchor_cut_quantile': cut_quantile,
|
|
'reverse_anchor_reference_center': [float(mu_c),
|
|
float(ref['mean'][1])],
|
|
'classifiers': {
|
|
'paperA': {
|
|
'far': float(far_pA),
|
|
'far_wilson95': [float(pA_lo), float(pA_hi)],
|
|
'n_correct': n_pA_correct, 'n_misclass': n_pA_miss,
|
|
},
|
|
'k3_perCPA': {
|
|
'far': float(far_k3),
|
|
'far_wilson95': [float(k3_lo), float(k3_hi)],
|
|
'n_correct': n_k3_correct, 'n_misclass': n_k3_miss,
|
|
},
|
|
'reverse_anchor_calibrated': {
|
|
'far': float(far_rev),
|
|
'far_wilson95': [float(rev_lo), float(rev_hi)],
|
|
'n_correct': n_rev_correct, 'n_misclass': n_rev_miss,
|
|
},
|
|
},
|
|
'per_firm_far': per_firm,
|
|
}
|
|
json_path = OUT / 'far_results.json'
|
|
json_path.write_text(json.dumps(payload, indent=2, ensure_ascii=False),
|
|
encoding='utf-8')
|
|
print(f'JSON: {json_path}')
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|