From 338737d9a1ceac81a18af4c8fa603c683f2d65aa Mon Sep 17 00:00:00 2001 From: gbanyan Date: Tue, 12 May 2026 15:10:03 +0800 Subject: [PATCH] Add script 40: pixel-identity FAR (0% across all v4 classifiers) Phase 1.8 follow-up. Validates the v4.0 classifier family against the only hard ground truth in the corpus: pixel_identical_to_closest=1 (byte-identical to nearest same-CPA neighbor; mathematically impossible under independent hand-signing). n = 262 pixel-identical Big-4 signatures. Firm A 145 KPMG 8 PwC 107 EY 2 FAR (lower better; Wilson 95% CI for the misclassification rate): PaperA box rule 0.00% [0.00%, 1.45%] K=3 per-CPA hard label 0.00% [0.00%, 1.45%] Reverse-anchor (calibr.) 0.00% [0.00%, 1.45%] Per-firm: 0% misclass on every firm. Reverse-anchor cut chosen by prevalence calibration (overall replicated rate matches Paper A's 49.58%). Documented v4.0 limitation: no signature-level ground truth for hand-leaning class, so cannot ROC-optimize the cut directly. PwC's 107 pixel-identical signatures despite being the most hand-leaning firm overall (Script 38 per-CPA P_C1=0.31) illustrates the within-firm heterogeneity that v4.0's K=3 mixture captures: a PwC CPA can be hand-leaning on average while still occasionally reusing template signatures. Implication: at the only hard ground truth available in the corpus, all three v4.0 classifiers achieve perfect detection. This satisfies REQ-001 acceptance for pixel-identity FAR. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../40_v4_pixel_identity_far.py | 421 ++++++++++++++++++ 1 file changed, 421 insertions(+) create mode 100644 signature_analysis/40_v4_pixel_identity_far.py diff --git a/signature_analysis/40_v4_pixel_identity_far.py b/signature_analysis/40_v4_pixel_identity_far.py new file mode 100644 index 0000000..eeb5df1 --- /dev/null +++ b/signature_analysis/40_v4_pixel_identity_far.py @@ -0,0 +1,421 @@ +#!/usr/bin/env python3 +""" +Script 40: Pixel-Identity FAR on Big-4 (hard ground truth validation) +======================================================================= +Phase 1.8 follow-up. Validates the v4.0 classifier family against +the only hard ground truth available in the corpus: +pixel_identical_to_closest = 1 (signatures byte-identical to their +nearest same-CPA match). + +Pixel-identical pairs are MATHEMATICALLY IMPOSSIBLE to arise from +independent hand-signing -- they must be reuses of the same source +image. Treating them as ground-truth replicated, we compute: + + FAR (false-alarm-rate) := P(classifier says hand-leaning | + ground truth is replicated) + +for three classifiers: + + C1 PaperA non_hand iff cos > 0.95 AND dh <= 5 + C2 K=3 per-CPA hard label, replicated = C3 (highest cos) + C3 Reverse-anchor cos_left_tail_pct under non-Big-4 reference; + replicated = score below explicit cut. + Cut chosen so that the rule's overall + replicated rate matches PaperA's overall rate + (calibration-by-prevalence; documented limitation). + +Additional metrics per classifier: + - n_pixel_identical, n_correctly_called_replicated, + n_misclassified_handleaning + - Wilson 95% CI on FAR + - Per-firm FAR breakdown + +Output: + reports/v4_big4/pixel_identity_far/ + far_results.json + far_report.md + far_cases.csv (every misclassified pixel-identical sig) +""" + +import sqlite3 +import csv +import json +import numpy as np +import matplotlib +matplotlib.use('Agg') +import matplotlib.pyplot as plt +from pathlib import Path +from datetime import datetime +from scipy import stats +from scipy.stats import norm +from sklearn.mixture import GaussianMixture +from sklearn.covariance import MinCovDet + +DB = '/Volumes/NV2/PDF-Processing/signature-analysis/signature_analysis.db' +OUT = Path('/Volumes/NV2/PDF-Processing/signature-analysis/reports/' + 'v4_big4/pixel_identity_far') +OUT.mkdir(parents=True, exist_ok=True) + +SEED = 42 +BIG4 = ('勤業眾信聯合', '安侯建業聯合', '資誠聯合', '安永聯合') +LABEL = {'勤業眾信聯合': 'Firm A (Deloitte)', '安侯建業聯合': 'KPMG', + '資誠聯合': 'PwC', '安永聯合': 'EY'} +PAPER_A_COS_CUT = 0.95 +PAPER_A_DH_CUT = 5 +MIN_SIGS = 10 + + +def load_pixel_identical_big4(): + conn = sqlite3.connect(DB) + cur = conn.cursor() + cur.execute(''' + SELECT s.signature_id, s.assigned_accountant, a.firm, + s.max_similarity_to_same_accountant, + CAST(s.min_dhash_independent AS REAL), + s.closest_match_file + FROM signatures s + JOIN accountants a ON s.assigned_accountant = a.name + WHERE s.pixel_identical_to_closest = 1 + AND s.max_similarity_to_same_accountant IS NOT NULL + AND s.min_dhash_independent IS NOT NULL + AND a.firm IN (?, ?, ?, ?) + ''', BIG4) + rows = cur.fetchall() + conn.close() + return rows + + +def load_all_big4_signatures(): + """For computing the calibration-by-prevalence rate of PaperA.""" + conn = sqlite3.connect(DB) + cur = conn.cursor() + cur.execute(''' + SELECT s.max_similarity_to_same_accountant, + CAST(s.min_dhash_independent AS REAL) + FROM signatures s + JOIN accountants a ON s.assigned_accountant = a.name + WHERE s.assigned_accountant IS NOT NULL + AND s.max_similarity_to_same_accountant IS NOT NULL + AND s.min_dhash_independent IS NOT NULL + AND a.firm IN (?, ?, ?, ?) + ''', BIG4) + rows = cur.fetchall() + conn.close() + cos = np.array([float(r[0]) for r in rows]) + dh = np.array([float(r[1]) for r in rows]) + return cos, dh + + +def load_per_cpa_means_big4(): + conn = sqlite3.connect(DB) + cur = conn.cursor() + cur.execute(''' + SELECT s.assigned_accountant, a.firm, + AVG(s.max_similarity_to_same_accountant) AS cos_mean, + AVG(CAST(s.min_dhash_independent AS REAL)) AS dh_mean, + COUNT(*) AS n + FROM signatures s + JOIN accountants a ON s.assigned_accountant = a.name + WHERE s.assigned_accountant IS NOT NULL + AND s.max_similarity_to_same_accountant IS NOT NULL + AND s.min_dhash_independent IS NOT NULL + AND a.firm IN (?, ?, ?, ?) + GROUP BY s.assigned_accountant + HAVING n >= ? + ''', BIG4 + (MIN_SIGS,)) + rows = cur.fetchall() + conn.close() + X = np.array([[float(r[2]), float(r[3])] for r in rows]) + return X + + +def load_non_big4_reference_means(): + conn = sqlite3.connect(DB) + cur = conn.cursor() + cur.execute(''' + SELECT AVG(s.max_similarity_to_same_accountant) AS cos_mean, + AVG(CAST(s.min_dhash_independent AS REAL)) AS dh_mean, + COUNT(*) AS n + FROM signatures s + JOIN accountants a ON s.assigned_accountant = a.name + WHERE s.assigned_accountant IS NOT NULL + AND s.max_similarity_to_same_accountant IS NOT NULL + AND s.min_dhash_independent IS NOT NULL + AND a.firm IS NOT NULL + AND a.firm NOT IN (?, ?, ?, ?) + GROUP BY s.assigned_accountant + HAVING n >= ? + ''', BIG4 + (MIN_SIGS,)) + rows = cur.fetchall() + conn.close() + return np.array([[float(r[0]), float(r[1])] for r in rows]) + + +def fit_k3(X): + return GaussianMixture(n_components=3, covariance_type='full', + random_state=SEED, n_init=15, max_iter=500).fit(X) + + +def fit_reference(X): + mcd = MinCovDet(random_state=SEED, support_fraction=0.85).fit(X) + return {'mean': mcd.location_, 'cov': mcd.covariance_} + + +def wilson_ci(k, n, alpha=0.05): + if n == 0: + return (0.0, 1.0) + z = norm.ppf(1 - alpha / 2) + phat = k / n + denom = 1 + z * z / n + center = (phat + z * z / (2 * n)) / denom + pm = z * np.sqrt(phat * (1 - phat) / n + z * z / (4 * n * n)) / denom + return (max(0.0, center - pm), min(1.0, center + pm)) + + +def main(): + print('=' * 72) + print('Script 40: Pixel-Identity FAR on Big-4') + print('=' * 72) + + # Load pixel-identical Big-4 signatures (ground truth replicated) + rows = load_pixel_identical_big4() + n = len(rows) + print(f'\nN pixel-identical Big-4 signatures (ground truth = replicated): ' + f'{n}') + if n == 0: + print('No pixel-identical pairs in Big-4. Exiting.') + return + + # Per-firm distribution + by_firm = {} + for r in rows: + by_firm.setdefault(r[2], []).append(r) + for f in BIG4: + print(f' {LABEL[f]}: {len(by_firm.get(f, []))}') + + sig_ids = np.array([r[0] for r in rows]) + sig_firms = np.array([r[2] for r in rows]) + cos = np.array([r[3] for r in rows], dtype=float) + dh = np.array([r[4] for r in rows], dtype=float) + closest = np.array([r[5] or '' for r in rows]) + + # ---------- Classifier C1: Paper A rule ---------- + paperA_replicated = (cos > PAPER_A_COS_CUT) & (dh <= PAPER_A_DH_CUT) + paperA_misclass = ~paperA_replicated + n_pA_correct = int(paperA_replicated.sum()) + n_pA_miss = int(paperA_misclass.sum()) + far_pA = n_pA_miss / n + pA_lo, pA_hi = wilson_ci(n_pA_miss, n) + print(f'\n[C1 Paper A] correct: {n_pA_correct}/{n} = ' + f'{(1 - far_pA)*100:.2f}%; FAR: {far_pA*100:.2f}% ' + f'[{pA_lo*100:.2f}%, {pA_hi*100:.2f}%]') + + # ---------- Classifier C2: K=3 per-CPA hard label ---------- + # (Use the K=3 CPA-fit components; for each pixel-identical signature, + # predict its membership as if it were a per-CPA point.) + X_cpa = load_per_cpa_means_big4() + gmm = fit_k3(X_cpa) + order = np.argsort(gmm.means_[:, 0]) # C1 hand, C3 replicated + label_map = {old: new for new, old in enumerate(order)} + X_pix = np.column_stack([cos, dh]) + raw = gmm.predict(X_pix) + k3_labels = np.array([label_map[l] for l in raw]) + # Replicated = C3 (label index 2) + k3_replicated = (k3_labels == 2) + k3_misclass = ~k3_replicated + n_k3_correct = int(k3_replicated.sum()) + n_k3_miss = int(k3_misclass.sum()) + far_k3 = n_k3_miss / n + k3_lo, k3_hi = wilson_ci(n_k3_miss, n) + print(f'[C2 K=3 perCPA] correct: {n_k3_correct}/{n} = ' + f'{(1 - far_k3)*100:.2f}%; FAR: {far_k3*100:.2f}% ' + f'[{k3_lo*100:.2f}%, {k3_hi*100:.2f}%]') + + # ---------- Classifier C3: Reverse-anchor with prevalence-calibrated cut ---------- + # Build reference Gaussian from non-Big-4 + X_ref = load_non_big4_reference_means() + ref = fit_reference(X_ref) + mu_c = ref['mean'][0] + sd_c = float(np.sqrt(ref['cov'][0, 0])) + + # Score every Big-4 signature; pick cut so overall replicated rate + # matches Paper A's overall replicated rate. + cos_all, dh_all = load_all_big4_signatures() + paperA_overall_repl_rate = float(np.mean( + (cos_all > PAPER_A_COS_CUT) & (dh_all <= PAPER_A_DH_CUT))) + # Reverse-anchor score per signature + rev_score_all = stats.norm.cdf(cos_all, loc=mu_c, scale=sd_c) + # We want HIGHER scores = more replicated (large cosine = right tail + # of the reference). So replicated iff rev_score > cut. + # Pick cut at the (1 - paperA_overall_repl_rate)-quantile of rev_score_all. + cut_quantile = 1 - paperA_overall_repl_rate + rev_cut = float(np.quantile(rev_score_all, cut_quantile)) + print(f'\n[C3 Reverse-anchor calibration] ' + f'PaperA overall replicated rate = ' + f'{paperA_overall_repl_rate*100:.2f}%; ' + f'rev-anchor cut at {cut_quantile*100:.2f}-th pct of score = ' + f'{rev_cut:.4f}') + + rev_score_pix = stats.norm.cdf(cos, loc=mu_c, scale=sd_c) + rev_replicated = (rev_score_pix > rev_cut) + rev_misclass = ~rev_replicated + n_rev_correct = int(rev_replicated.sum()) + n_rev_miss = int(rev_misclass.sum()) + far_rev = n_rev_miss / n + rev_lo, rev_hi = wilson_ci(n_rev_miss, n) + print(f'[C3 Reverse-anchor] correct: {n_rev_correct}/{n} = ' + f'{(1 - far_rev)*100:.2f}%; FAR: {far_rev*100:.2f}% ' + f'[{rev_lo*100:.2f}%, {rev_hi*100:.2f}%]') + + # ---------- Per-firm FAR ---------- + print('\n[per-firm FAR]') + print(f' {"Firm":<22} {"n":>5} {"PaperA":>11} {"K=3":>11} {"Rev-anc":>11}') + per_firm = {} + for f in BIG4: + mask = (sig_firms == f) + n_f = int(mask.sum()) + if n_f == 0: + per_firm[f] = {'n': 0} + continue + miss_pA = int(np.sum(paperA_misclass[mask])) + miss_k3 = int(np.sum(k3_misclass[mask])) + miss_rev = int(np.sum(rev_misclass[mask])) + far_pA_f = miss_pA / n_f + far_k3_f = miss_k3 / n_f + far_rev_f = miss_rev / n_f + per_firm[f] = { + 'n': n_f, + 'paperA_far': far_pA_f, 'paperA_misclass_n': miss_pA, + 'k3_far': far_k3_f, 'k3_misclass_n': miss_k3, + 'reverse_anchor_far': far_rev_f, 'reverse_anchor_misclass_n': miss_rev, + } + print(f' {LABEL[f]:<22} {n_f:>5} {far_pA_f*100:>10.2f}% ' + f'{far_k3_f*100:>10.2f}% {far_rev_f*100:>10.2f}%') + + # ---------- Misclassified case CSV ---------- + cases_csv = OUT / 'far_cases.csv' + with open(cases_csv, 'w', newline='', encoding='utf-8') as f: + w = csv.writer(f) + w.writerow(['signature_id', 'cpa', 'firm', 'firm_label', + 'cos', 'dh', 'closest_match_file', + 'paperA_call', 'k3_call', 'reverse_anchor_call']) + for i in range(n): + pa = 'replicated' if paperA_replicated[i] else 'hand_leaning' + kl = ['C1_handleaning', 'C2_mixed', + 'C3_replicated'][k3_labels[i]] + ra = 'replicated' if rev_replicated[i] else 'hand_leaning' + # Only write rows where at least one classifier disagrees with + # ground truth (replicated) + if pa != 'replicated' or kl != 'C3_replicated' \ + or ra != 'replicated': + w.writerow([sig_ids[i], rows[i][1], sig_firms[i], + LABEL[sig_firms[i]], + f'{cos[i]:.4f}', f'{dh[i]:.4f}', closest[i], + pa, kl, ra]) + print(f'\nMisclassified cases CSV: {cases_csv}') + + # Markdown report + md = [ + '# Pixel-Identity FAR on Big-4 (Script 40)', + f'Generated: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}', + '', + '## Ground truth', + '', + ('Pixel-identical pairs (signature byte-identical to nearest ' + 'same-CPA neighbor) cannot arise from independent hand-signing. ' + 'They are taken as ground-truth REPLICATED. We measure each ' + 'classifier\'s false-alarm rate (rate of calling these signatures ' + 'hand-leaning).'), + '', + f'- Total Big-4 pixel-identical signatures: **{n}**', + '', + '## Headline FAR (lower is better)', + '', + '| Classifier | Correct/N | FAR | Wilson 95% CI |', + '|---|---|---|---|', + f'| Paper A box rule | {n_pA_correct}/{n} | **{far_pA*100:.2f}%** | ' + f'[{pA_lo*100:.2f}%, {pA_hi*100:.2f}%] |', + f'| K=3 per-CPA hard label (C3 = replicated) | {n_k3_correct}/{n} | ' + f'**{far_k3*100:.2f}%** | [{k3_lo*100:.2f}%, {k3_hi*100:.2f}%] |', + f'| Reverse-anchor (prevalence-calibrated cut) | {n_rev_correct}/{n} | ' + f'**{far_rev*100:.2f}%** | [{rev_lo*100:.2f}%, {rev_hi*100:.2f}%] |', + '', + ('Reverse-anchor cut chosen so that overall replicated rate ' + f'matches Paper A overall rate ({paperA_overall_repl_rate*100:.2f}%); ' + 'this is calibration-by-prevalence and is documented as a v4.0 ' + 'limitation -- no signature-level ground truth exists for the ' + 'hand-leaning class so we cannot pick the cut by direct ROC ' + 'optimization.'), + '', + '## Per-firm FAR', + '', + '| Firm | n | Paper A FAR | K=3 FAR | Rev-anchor FAR |', + '|---|---|---|---|---|', + ] + for f in BIG4: + pf = per_firm[f] + if pf['n'] == 0: + md.append(f'| {LABEL[f]} | 0 | n/a | n/a | n/a |') + continue + md.append(f'| {LABEL[f]} | {pf["n"]} | ' + f'{pf["paperA_far"]*100:.2f}% ' + f'({pf["paperA_misclass_n"]}) | ' + f'{pf["k3_far"]*100:.2f}% ({pf["k3_misclass_n"]}) | ' + f'{pf["reverse_anchor_far"]*100:.2f}% ' + f'({pf["reverse_anchor_misclass_n"]}) |') + md += ['', '## Reading', + '', + ('A FAR substantially below the no-information rate ' + f'(1 - {paperA_overall_repl_rate*100:.2f}% = ' + f'{(1-paperA_overall_repl_rate)*100:.2f}%) means the ' + 'classifier extracts useful signal from the (cos, dh) ' + 'features for distinguishing pixel-identical replication. ' + 'Since pixel-identical pairs are a CONSERVATIVE SUBSET of ' + 'true replication (only the byte-equal extreme), a low FAR ' + 'against this subset is necessary but not sufficient evidence ' + 'of correct replication detection.'), + '', + '## Files', + '- `far_results.json` -- machine-readable results', + '- `far_cases.csv` -- every misclassified pixel-identical signature', + ] + md_path = OUT / 'far_report.md' + md_path.write_text('\n'.join(md), encoding='utf-8') + print(f'Report: {md_path}') + + payload = { + 'generated_at': datetime.now().isoformat(), + 'n_pixel_identical_big4': n, + 'paper_a_cuts': {'cos': PAPER_A_COS_CUT, 'dh': PAPER_A_DH_CUT}, + 'paper_a_overall_replicated_rate_big4': paperA_overall_repl_rate, + 'reverse_anchor_cut_score': rev_cut, + 'reverse_anchor_cut_quantile': cut_quantile, + 'reverse_anchor_reference_center': [float(mu_c), + float(ref['mean'][1])], + 'classifiers': { + 'paperA': { + 'far': float(far_pA), + 'far_wilson95': [float(pA_lo), float(pA_hi)], + 'n_correct': n_pA_correct, 'n_misclass': n_pA_miss, + }, + 'k3_perCPA': { + 'far': float(far_k3), + 'far_wilson95': [float(k3_lo), float(k3_hi)], + 'n_correct': n_k3_correct, 'n_misclass': n_k3_miss, + }, + 'reverse_anchor_calibrated': { + 'far': float(far_rev), + 'far_wilson95': [float(rev_lo), float(rev_hi)], + 'n_correct': n_rev_correct, 'n_misclass': n_rev_miss, + }, + }, + 'per_firm_far': per_firm, + } + json_path = OUT / 'far_results.json' + json_path.write_text(json.dumps(payload, indent=2, ensure_ascii=False), + encoding='utf-8') + print(f'JSON: {json_path}') + + +if __name__ == '__main__': + main()