Files
pdf_signature_extraction/signature_analysis/40_v4_pixel_identity_far.py
T
gbanyan 338737d9a1 Add script 40: pixel-identity FAR (0% across all v4 classifiers)
Phase 1.8 follow-up. Validates the v4.0 classifier family against
the only hard ground truth in the corpus: pixel_identical_to_closest=1
(byte-identical to nearest same-CPA neighbor; mathematically impossible
under independent hand-signing).

n = 262 pixel-identical Big-4 signatures.

  Firm A   145
  KPMG       8
  PwC      107
  EY         2

FAR (lower better; Wilson 95% CI for the misclassification rate):

  PaperA box rule           0.00%  [0.00%, 1.45%]
  K=3 per-CPA hard label    0.00%  [0.00%, 1.45%]
  Reverse-anchor (calibr.)  0.00%  [0.00%, 1.45%]

Per-firm: 0% misclass on every firm.

Reverse-anchor cut chosen by prevalence calibration (overall
replicated rate matches Paper A's 49.58%). Documented v4.0
limitation: no signature-level ground truth for hand-leaning
class, so cannot ROC-optimize the cut directly.

PwC's 107 pixel-identical signatures despite being the most
hand-leaning firm overall (Script 38 per-CPA P_C1=0.31)
illustrates the within-firm heterogeneity that v4.0's K=3
mixture captures: a PwC CPA can be hand-leaning on average
while still occasionally reusing template signatures.

Implication: at the only hard ground truth available in the
corpus, all three v4.0 classifiers achieve perfect detection.
This satisfies REQ-001 acceptance for pixel-identity FAR.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-12 15:10:03 +08:00

422 lines
16 KiB
Python

#!/usr/bin/env python3
"""
Script 40: Pixel-Identity FAR on Big-4 (hard ground truth validation)
=======================================================================
Phase 1.8 follow-up. Validates the v4.0 classifier family against
the only hard ground truth available in the corpus:
pixel_identical_to_closest = 1 (signatures byte-identical to their
nearest same-CPA match).
Pixel-identical pairs are MATHEMATICALLY IMPOSSIBLE to arise from
independent hand-signing -- they must be reuses of the same source
image. Treating them as ground-truth replicated, we compute:
FAR (false-alarm-rate) := P(classifier says hand-leaning |
ground truth is replicated)
for three classifiers:
C1 PaperA non_hand iff cos > 0.95 AND dh <= 5
C2 K=3 per-CPA hard label, replicated = C3 (highest cos)
C3 Reverse-anchor cos_left_tail_pct under non-Big-4 reference;
replicated = score below explicit cut.
Cut chosen so that the rule's overall
replicated rate matches PaperA's overall rate
(calibration-by-prevalence; documented limitation).
Additional metrics per classifier:
- n_pixel_identical, n_correctly_called_replicated,
n_misclassified_handleaning
- Wilson 95% CI on FAR
- Per-firm FAR breakdown
Output:
reports/v4_big4/pixel_identity_far/
far_results.json
far_report.md
far_cases.csv (every misclassified pixel-identical sig)
"""
import sqlite3
import csv
import json
import numpy as np
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt
from pathlib import Path
from datetime import datetime
from scipy import stats
from scipy.stats import norm
from sklearn.mixture import GaussianMixture
from sklearn.covariance import MinCovDet
DB = '/Volumes/NV2/PDF-Processing/signature-analysis/signature_analysis.db'
OUT = Path('/Volumes/NV2/PDF-Processing/signature-analysis/reports/'
'v4_big4/pixel_identity_far')
OUT.mkdir(parents=True, exist_ok=True)
SEED = 42
BIG4 = ('勤業眾信聯合', '安侯建業聯合', '資誠聯合', '安永聯合')
LABEL = {'勤業眾信聯合': 'Firm A (Deloitte)', '安侯建業聯合': 'KPMG',
'資誠聯合': 'PwC', '安永聯合': 'EY'}
PAPER_A_COS_CUT = 0.95
PAPER_A_DH_CUT = 5
MIN_SIGS = 10
def load_pixel_identical_big4():
conn = sqlite3.connect(DB)
cur = conn.cursor()
cur.execute('''
SELECT s.signature_id, s.assigned_accountant, a.firm,
s.max_similarity_to_same_accountant,
CAST(s.min_dhash_independent AS REAL),
s.closest_match_file
FROM signatures s
JOIN accountants a ON s.assigned_accountant = a.name
WHERE s.pixel_identical_to_closest = 1
AND s.max_similarity_to_same_accountant IS NOT NULL
AND s.min_dhash_independent IS NOT NULL
AND a.firm IN (?, ?, ?, ?)
''', BIG4)
rows = cur.fetchall()
conn.close()
return rows
def load_all_big4_signatures():
"""For computing the calibration-by-prevalence rate of PaperA."""
conn = sqlite3.connect(DB)
cur = conn.cursor()
cur.execute('''
SELECT s.max_similarity_to_same_accountant,
CAST(s.min_dhash_independent AS REAL)
FROM signatures s
JOIN accountants a ON s.assigned_accountant = a.name
WHERE s.assigned_accountant IS NOT NULL
AND s.max_similarity_to_same_accountant IS NOT NULL
AND s.min_dhash_independent IS NOT NULL
AND a.firm IN (?, ?, ?, ?)
''', BIG4)
rows = cur.fetchall()
conn.close()
cos = np.array([float(r[0]) for r in rows])
dh = np.array([float(r[1]) for r in rows])
return cos, dh
def load_per_cpa_means_big4():
conn = sqlite3.connect(DB)
cur = conn.cursor()
cur.execute('''
SELECT s.assigned_accountant, a.firm,
AVG(s.max_similarity_to_same_accountant) AS cos_mean,
AVG(CAST(s.min_dhash_independent AS REAL)) AS dh_mean,
COUNT(*) AS n
FROM signatures s
JOIN accountants a ON s.assigned_accountant = a.name
WHERE s.assigned_accountant IS NOT NULL
AND s.max_similarity_to_same_accountant IS NOT NULL
AND s.min_dhash_independent IS NOT NULL
AND a.firm IN (?, ?, ?, ?)
GROUP BY s.assigned_accountant
HAVING n >= ?
''', BIG4 + (MIN_SIGS,))
rows = cur.fetchall()
conn.close()
X = np.array([[float(r[2]), float(r[3])] for r in rows])
return X
def load_non_big4_reference_means():
conn = sqlite3.connect(DB)
cur = conn.cursor()
cur.execute('''
SELECT AVG(s.max_similarity_to_same_accountant) AS cos_mean,
AVG(CAST(s.min_dhash_independent AS REAL)) AS dh_mean,
COUNT(*) AS n
FROM signatures s
JOIN accountants a ON s.assigned_accountant = a.name
WHERE s.assigned_accountant IS NOT NULL
AND s.max_similarity_to_same_accountant IS NOT NULL
AND s.min_dhash_independent IS NOT NULL
AND a.firm IS NOT NULL
AND a.firm NOT IN (?, ?, ?, ?)
GROUP BY s.assigned_accountant
HAVING n >= ?
''', BIG4 + (MIN_SIGS,))
rows = cur.fetchall()
conn.close()
return np.array([[float(r[0]), float(r[1])] for r in rows])
def fit_k3(X):
return GaussianMixture(n_components=3, covariance_type='full',
random_state=SEED, n_init=15, max_iter=500).fit(X)
def fit_reference(X):
mcd = MinCovDet(random_state=SEED, support_fraction=0.85).fit(X)
return {'mean': mcd.location_, 'cov': mcd.covariance_}
def wilson_ci(k, n, alpha=0.05):
if n == 0:
return (0.0, 1.0)
z = norm.ppf(1 - alpha / 2)
phat = k / n
denom = 1 + z * z / n
center = (phat + z * z / (2 * n)) / denom
pm = z * np.sqrt(phat * (1 - phat) / n + z * z / (4 * n * n)) / denom
return (max(0.0, center - pm), min(1.0, center + pm))
def main():
print('=' * 72)
print('Script 40: Pixel-Identity FAR on Big-4')
print('=' * 72)
# Load pixel-identical Big-4 signatures (ground truth replicated)
rows = load_pixel_identical_big4()
n = len(rows)
print(f'\nN pixel-identical Big-4 signatures (ground truth = replicated): '
f'{n}')
if n == 0:
print('No pixel-identical pairs in Big-4. Exiting.')
return
# Per-firm distribution
by_firm = {}
for r in rows:
by_firm.setdefault(r[2], []).append(r)
for f in BIG4:
print(f' {LABEL[f]}: {len(by_firm.get(f, []))}')
sig_ids = np.array([r[0] for r in rows])
sig_firms = np.array([r[2] for r in rows])
cos = np.array([r[3] for r in rows], dtype=float)
dh = np.array([r[4] for r in rows], dtype=float)
closest = np.array([r[5] or '' for r in rows])
# ---------- Classifier C1: Paper A rule ----------
paperA_replicated = (cos > PAPER_A_COS_CUT) & (dh <= PAPER_A_DH_CUT)
paperA_misclass = ~paperA_replicated
n_pA_correct = int(paperA_replicated.sum())
n_pA_miss = int(paperA_misclass.sum())
far_pA = n_pA_miss / n
pA_lo, pA_hi = wilson_ci(n_pA_miss, n)
print(f'\n[C1 Paper A] correct: {n_pA_correct}/{n} = '
f'{(1 - far_pA)*100:.2f}%; FAR: {far_pA*100:.2f}% '
f'[{pA_lo*100:.2f}%, {pA_hi*100:.2f}%]')
# ---------- Classifier C2: K=3 per-CPA hard label ----------
# (Use the K=3 CPA-fit components; for each pixel-identical signature,
# predict its membership as if it were a per-CPA point.)
X_cpa = load_per_cpa_means_big4()
gmm = fit_k3(X_cpa)
order = np.argsort(gmm.means_[:, 0]) # C1 hand, C3 replicated
label_map = {old: new for new, old in enumerate(order)}
X_pix = np.column_stack([cos, dh])
raw = gmm.predict(X_pix)
k3_labels = np.array([label_map[l] for l in raw])
# Replicated = C3 (label index 2)
k3_replicated = (k3_labels == 2)
k3_misclass = ~k3_replicated
n_k3_correct = int(k3_replicated.sum())
n_k3_miss = int(k3_misclass.sum())
far_k3 = n_k3_miss / n
k3_lo, k3_hi = wilson_ci(n_k3_miss, n)
print(f'[C2 K=3 perCPA] correct: {n_k3_correct}/{n} = '
f'{(1 - far_k3)*100:.2f}%; FAR: {far_k3*100:.2f}% '
f'[{k3_lo*100:.2f}%, {k3_hi*100:.2f}%]')
# ---------- Classifier C3: Reverse-anchor with prevalence-calibrated cut ----------
# Build reference Gaussian from non-Big-4
X_ref = load_non_big4_reference_means()
ref = fit_reference(X_ref)
mu_c = ref['mean'][0]
sd_c = float(np.sqrt(ref['cov'][0, 0]))
# Score every Big-4 signature; pick cut so overall replicated rate
# matches Paper A's overall replicated rate.
cos_all, dh_all = load_all_big4_signatures()
paperA_overall_repl_rate = float(np.mean(
(cos_all > PAPER_A_COS_CUT) & (dh_all <= PAPER_A_DH_CUT)))
# Reverse-anchor score per signature
rev_score_all = stats.norm.cdf(cos_all, loc=mu_c, scale=sd_c)
# We want HIGHER scores = more replicated (large cosine = right tail
# of the reference). So replicated iff rev_score > cut.
# Pick cut at the (1 - paperA_overall_repl_rate)-quantile of rev_score_all.
cut_quantile = 1 - paperA_overall_repl_rate
rev_cut = float(np.quantile(rev_score_all, cut_quantile))
print(f'\n[C3 Reverse-anchor calibration] '
f'PaperA overall replicated rate = '
f'{paperA_overall_repl_rate*100:.2f}%; '
f'rev-anchor cut at {cut_quantile*100:.2f}-th pct of score = '
f'{rev_cut:.4f}')
rev_score_pix = stats.norm.cdf(cos, loc=mu_c, scale=sd_c)
rev_replicated = (rev_score_pix > rev_cut)
rev_misclass = ~rev_replicated
n_rev_correct = int(rev_replicated.sum())
n_rev_miss = int(rev_misclass.sum())
far_rev = n_rev_miss / n
rev_lo, rev_hi = wilson_ci(n_rev_miss, n)
print(f'[C3 Reverse-anchor] correct: {n_rev_correct}/{n} = '
f'{(1 - far_rev)*100:.2f}%; FAR: {far_rev*100:.2f}% '
f'[{rev_lo*100:.2f}%, {rev_hi*100:.2f}%]')
# ---------- Per-firm FAR ----------
print('\n[per-firm FAR]')
print(f' {"Firm":<22} {"n":>5} {"PaperA":>11} {"K=3":>11} {"Rev-anc":>11}')
per_firm = {}
for f in BIG4:
mask = (sig_firms == f)
n_f = int(mask.sum())
if n_f == 0:
per_firm[f] = {'n': 0}
continue
miss_pA = int(np.sum(paperA_misclass[mask]))
miss_k3 = int(np.sum(k3_misclass[mask]))
miss_rev = int(np.sum(rev_misclass[mask]))
far_pA_f = miss_pA / n_f
far_k3_f = miss_k3 / n_f
far_rev_f = miss_rev / n_f
per_firm[f] = {
'n': n_f,
'paperA_far': far_pA_f, 'paperA_misclass_n': miss_pA,
'k3_far': far_k3_f, 'k3_misclass_n': miss_k3,
'reverse_anchor_far': far_rev_f, 'reverse_anchor_misclass_n': miss_rev,
}
print(f' {LABEL[f]:<22} {n_f:>5} {far_pA_f*100:>10.2f}% '
f'{far_k3_f*100:>10.2f}% {far_rev_f*100:>10.2f}%')
# ---------- Misclassified case CSV ----------
cases_csv = OUT / 'far_cases.csv'
with open(cases_csv, 'w', newline='', encoding='utf-8') as f:
w = csv.writer(f)
w.writerow(['signature_id', 'cpa', 'firm', 'firm_label',
'cos', 'dh', 'closest_match_file',
'paperA_call', 'k3_call', 'reverse_anchor_call'])
for i in range(n):
pa = 'replicated' if paperA_replicated[i] else 'hand_leaning'
kl = ['C1_handleaning', 'C2_mixed',
'C3_replicated'][k3_labels[i]]
ra = 'replicated' if rev_replicated[i] else 'hand_leaning'
# Only write rows where at least one classifier disagrees with
# ground truth (replicated)
if pa != 'replicated' or kl != 'C3_replicated' \
or ra != 'replicated':
w.writerow([sig_ids[i], rows[i][1], sig_firms[i],
LABEL[sig_firms[i]],
f'{cos[i]:.4f}', f'{dh[i]:.4f}', closest[i],
pa, kl, ra])
print(f'\nMisclassified cases CSV: {cases_csv}')
# Markdown report
md = [
'# Pixel-Identity FAR on Big-4 (Script 40)',
f'Generated: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}',
'',
'## Ground truth',
'',
('Pixel-identical pairs (signature byte-identical to nearest '
'same-CPA neighbor) cannot arise from independent hand-signing. '
'They are taken as ground-truth REPLICATED. We measure each '
'classifier\'s false-alarm rate (rate of calling these signatures '
'hand-leaning).'),
'',
f'- Total Big-4 pixel-identical signatures: **{n}**',
'',
'## Headline FAR (lower is better)',
'',
'| Classifier | Correct/N | FAR | Wilson 95% CI |',
'|---|---|---|---|',
f'| Paper A box rule | {n_pA_correct}/{n} | **{far_pA*100:.2f}%** | '
f'[{pA_lo*100:.2f}%, {pA_hi*100:.2f}%] |',
f'| K=3 per-CPA hard label (C3 = replicated) | {n_k3_correct}/{n} | '
f'**{far_k3*100:.2f}%** | [{k3_lo*100:.2f}%, {k3_hi*100:.2f}%] |',
f'| Reverse-anchor (prevalence-calibrated cut) | {n_rev_correct}/{n} | '
f'**{far_rev*100:.2f}%** | [{rev_lo*100:.2f}%, {rev_hi*100:.2f}%] |',
'',
('Reverse-anchor cut chosen so that overall replicated rate '
f'matches Paper A overall rate ({paperA_overall_repl_rate*100:.2f}%); '
'this is calibration-by-prevalence and is documented as a v4.0 '
'limitation -- no signature-level ground truth exists for the '
'hand-leaning class so we cannot pick the cut by direct ROC '
'optimization.'),
'',
'## Per-firm FAR',
'',
'| Firm | n | Paper A FAR | K=3 FAR | Rev-anchor FAR |',
'|---|---|---|---|---|',
]
for f in BIG4:
pf = per_firm[f]
if pf['n'] == 0:
md.append(f'| {LABEL[f]} | 0 | n/a | n/a | n/a |')
continue
md.append(f'| {LABEL[f]} | {pf["n"]} | '
f'{pf["paperA_far"]*100:.2f}% '
f'({pf["paperA_misclass_n"]}) | '
f'{pf["k3_far"]*100:.2f}% ({pf["k3_misclass_n"]}) | '
f'{pf["reverse_anchor_far"]*100:.2f}% '
f'({pf["reverse_anchor_misclass_n"]}) |')
md += ['', '## Reading',
'',
('A FAR substantially below the no-information rate '
f'(1 - {paperA_overall_repl_rate*100:.2f}% = '
f'{(1-paperA_overall_repl_rate)*100:.2f}%) means the '
'classifier extracts useful signal from the (cos, dh) '
'features for distinguishing pixel-identical replication. '
'Since pixel-identical pairs are a CONSERVATIVE SUBSET of '
'true replication (only the byte-equal extreme), a low FAR '
'against this subset is necessary but not sufficient evidence '
'of correct replication detection.'),
'',
'## Files',
'- `far_results.json` -- machine-readable results',
'- `far_cases.csv` -- every misclassified pixel-identical signature',
]
md_path = OUT / 'far_report.md'
md_path.write_text('\n'.join(md), encoding='utf-8')
print(f'Report: {md_path}')
payload = {
'generated_at': datetime.now().isoformat(),
'n_pixel_identical_big4': n,
'paper_a_cuts': {'cos': PAPER_A_COS_CUT, 'dh': PAPER_A_DH_CUT},
'paper_a_overall_replicated_rate_big4': paperA_overall_repl_rate,
'reverse_anchor_cut_score': rev_cut,
'reverse_anchor_cut_quantile': cut_quantile,
'reverse_anchor_reference_center': [float(mu_c),
float(ref['mean'][1])],
'classifiers': {
'paperA': {
'far': float(far_pA),
'far_wilson95': [float(pA_lo), float(pA_hi)],
'n_correct': n_pA_correct, 'n_misclass': n_pA_miss,
},
'k3_perCPA': {
'far': float(far_k3),
'far_wilson95': [float(k3_lo), float(k3_hi)],
'n_correct': n_k3_correct, 'n_misclass': n_k3_miss,
},
'reverse_anchor_calibrated': {
'far': float(far_rev),
'far_wilson95': [float(rev_lo), float(rev_hi)],
'n_correct': n_rev_correct, 'n_misclass': n_rev_miss,
},
},
'per_firm_far': per_firm,
}
json_path = OUT / 'far_results.json'
json_path.write_text(json.dumps(payload, indent=2, ensure_ascii=False),
encoding='utf-8')
print(f'JSON: {json_path}')
if __name__ == '__main__':
main()