#!/usr/bin/env python3 """ Script 19: Pixel-Identity Validation (No Human Annotation Required) =================================================================== Validates the cosine + dHash dual classifier using three naturally occurring reference populations instead of manual labels: Positive anchor 1: pixel_identical_to_closest = 1 Two signature images byte-identical after crop/resize. Mathematically impossible to arise from independent hand-signing => pair-level proof of image reuse and a CONSERVATIVE-SUBSET ground truth for non-hand-signing (only those whose nearest same-CPA match happens to be byte-identical). Positive anchor 2: Firm A signatures Treated in the manuscript as a REPLICATION-DOMINATED population based on the paper's own image evidence: the byte-level pair analysis, the Firm A per-signature similarity distribution, the partner-ranking concentration, and the intra-report consistency gap. Approximately 7% of Firm A signatures fall below cosine 0.95, forming the long left tail observed in the dip test (Script 15). Negative anchor: signatures with cosine <= low threshold Pairs with very low cosine similarity cannot plausibly be pixel duplicates, so they serve as a conservative supplementary negative reference. Metrics computed (legacy; NOT all reported in the manuscript): - FAR against the inter-CPA negative anchor is the primary metric reported (Table X). The byte-identical positive anchor has cosine ~= 1 by construction, so FRR / EER / Precision / F1 against that subset are arithmetic tautologies (FRR is trivially 0 below threshold 1) and are intentionally OMITTED from Table X. Legacy EER/FRR/precision/F1 helper functions remain in this script for diagnostic use only and their outputs are NOT cited as biometric performance in the paper. - Convergence with Firm A anchor (what fraction of Firm A signatures are correctly classified at each threshold). Output: reports/pixel_validation/pixel_validation_report.md reports/pixel_validation/pixel_validation_results.json reports/pixel_validation/roc_cosine.png, roc_dhash.png """ import sqlite3 import json import numpy as np import matplotlib matplotlib.use('Agg') import matplotlib.pyplot as plt from pathlib import Path from datetime import datetime DB = '/Volumes/NV2/PDF-Processing/signature-analysis/signature_analysis.db' OUT = Path('/Volumes/NV2/PDF-Processing/signature-analysis/reports/' 'pixel_validation') OUT.mkdir(parents=True, exist_ok=True) FIRM_A = '勤業眾信聯合' NEGATIVE_COSINE_UPPER = 0.70 # pairs with max-cosine < 0.70 assumed not replicated SANITY_SAMPLE_SIZE = 30 def load_signatures(): conn = sqlite3.connect(DB) cur = conn.cursor() cur.execute(''' SELECT s.signature_id, s.image_filename, s.assigned_accountant, a.firm, s.max_similarity_to_same_accountant, s.phash_distance_to_closest, s.min_dhash_independent, s.pixel_identical_to_closest, s.closest_match_file FROM signatures s LEFT JOIN accountants a ON s.assigned_accountant = a.name WHERE s.max_similarity_to_same_accountant IS NOT NULL ''') rows = cur.fetchall() conn.close() data = [] for r in rows: data.append({ 'sig_id': r[0], 'filename': r[1], 'accountant': r[2], 'firm': r[3] or '(unknown)', 'cosine': float(r[4]), 'dhash_cond': None if r[5] is None else int(r[5]), 'dhash_indep': None if r[6] is None else int(r[6]), 'pixel_identical': int(r[7] or 0), 'closest_match': r[8], }) return data def confusion(y_true, y_pred): tp = int(np.sum((y_true == 1) & (y_pred == 1))) fp = int(np.sum((y_true == 0) & (y_pred == 1))) fn = int(np.sum((y_true == 1) & (y_pred == 0))) tn = int(np.sum((y_true == 0) & (y_pred == 0))) return tp, fp, fn, tn def classification_metrics(y_true, y_pred): tp, fp, fn, tn = confusion(y_true, y_pred) denom_p = max(tp + fp, 1) denom_r = max(tp + fn, 1) precision = tp / denom_p recall = tp / denom_r f1 = (2 * precision * recall / (precision + recall) if precision + recall > 0 else 0.0) far = fp / max(fp + tn, 1) # false acceptance rate (over negatives) frr = fn / max(fn + tp, 1) # false rejection rate (over positives) return { 'tp': tp, 'fp': fp, 'fn': fn, 'tn': tn, 'precision': float(precision), 'recall': float(recall), 'f1': float(f1), 'far': float(far), 'frr': float(frr), } def sweep_threshold(scores, y, directions, thresholds): """For direction 'above' a prediction is positive if score > threshold; for 'below' it is positive if score < threshold.""" out = [] for t in thresholds: if directions == 'above': y_pred = (scores > t).astype(int) else: y_pred = (scores < t).astype(int) m = classification_metrics(y, y_pred) m['threshold'] = float(t) out.append(m) return out def find_eer(sweep): """EER = point where FAR ≈ FRR; interpolated from nearest pair.""" thr = np.array([s['threshold'] for s in sweep]) far = np.array([s['far'] for s in sweep]) frr = np.array([s['frr'] for s in sweep]) diff = far - frr signs = np.sign(diff) changes = np.where(np.diff(signs) != 0)[0] if len(changes) == 0: idx = int(np.argmin(np.abs(diff))) return {'threshold': float(thr[idx]), 'far': float(far[idx]), 'frr': float(frr[idx]), 'eer': float(0.5 * (far[idx] + frr[idx]))} i = int(changes[0]) w = abs(diff[i]) / (abs(diff[i]) + abs(diff[i + 1]) + 1e-12) thr_i = (1 - w) * thr[i] + w * thr[i + 1] far_i = (1 - w) * far[i] + w * far[i + 1] frr_i = (1 - w) * frr[i] + w * frr[i + 1] return {'threshold': float(thr_i), 'far': float(far_i), 'frr': float(frr_i), 'eer': float(0.5 * (far_i + frr_i))} def plot_roc(sweep, title, out_path): far = np.array([s['far'] for s in sweep]) frr = np.array([s['frr'] for s in sweep]) thr = np.array([s['threshold'] for s in sweep]) fig, axes = plt.subplots(1, 2, figsize=(13, 5)) ax = axes[0] ax.plot(far, 1 - frr, 'b-', lw=2) ax.plot([0, 1], [0, 1], 'k--', alpha=0.4) ax.set_xlabel('FAR') ax.set_ylabel('1 - FRR (True Positive Rate)') ax.set_title(f'{title} - ROC') ax.set_xlim(0, 1) ax.set_ylim(0, 1) ax.grid(alpha=0.3) ax = axes[1] ax.plot(thr, far, 'r-', lw=2, label='FAR') ax.plot(thr, frr, 'b-', lw=2, label='FRR') ax.set_xlabel('Threshold') ax.set_ylabel('Error rate') ax.set_title(f'{title} - FAR / FRR vs threshold') ax.legend() ax.grid(alpha=0.3) plt.tight_layout() fig.savefig(out_path, dpi=150) plt.close() def main(): print('='*70) print('Script 19: Pixel-Identity Validation (No Annotation)') print('='*70) data = load_signatures() print(f'\nTotal signatures loaded: {len(data):,}') cos = np.array([d['cosine'] for d in data]) dh_indep = np.array([d['dhash_indep'] if d['dhash_indep'] is not None else -1 for d in data]) pix = np.array([d['pixel_identical'] for d in data]) firm = np.array([d['firm'] for d in data]) print(f'Pixel-identical: {int(pix.sum()):,} signatures') print(f'Firm A signatures: {int((firm == FIRM_A).sum()):,}') print(f'Negative anchor (cosine < {NEGATIVE_COSINE_UPPER}): ' f'{int((cos < NEGATIVE_COSINE_UPPER).sum()):,}') # Build labelled set: # positive = pixel_identical == 1 # negative = cosine < NEGATIVE_COSINE_UPPER (and not pixel_identical) pos_mask = pix == 1 neg_mask = (cos < NEGATIVE_COSINE_UPPER) & (~pos_mask) labelled_mask = pos_mask | neg_mask y = pos_mask[labelled_mask].astype(int) cos_l = cos[labelled_mask] dh_l = dh_indep[labelled_mask] # --- Sweep cosine threshold cos_thresh = np.linspace(0.50, 1.00, 101) cos_sweep = sweep_threshold(cos_l, y, 'above', cos_thresh) cos_eer = find_eer(cos_sweep) print(f'\nCosine EER: threshold={cos_eer["threshold"]:.4f}, ' f'EER={cos_eer["eer"]:.4f}') # --- Sweep dHash threshold (independent) dh_l_valid = dh_l >= 0 y_dh = y[dh_l_valid] dh_valid = dh_l[dh_l_valid] dh_thresh = np.arange(0, 40) dh_sweep = sweep_threshold(dh_valid, y_dh, 'below', dh_thresh) dh_eer = find_eer(dh_sweep) print(f'dHash EER: threshold={dh_eer["threshold"]:.4f}, ' f'EER={dh_eer["eer"]:.4f}') # Plots plot_roc(cos_sweep, 'Cosine (pixel-identity anchor)', OUT / 'roc_cosine.png') plot_roc(dh_sweep, 'Independent dHash (pixel-identity anchor)', OUT / 'roc_dhash.png') # --- Evaluate canonical thresholds canonical = [ ('cosine', 0.837, 'above', cos, pos_mask, neg_mask), ('cosine', 0.941, 'above', cos, pos_mask, neg_mask), ('cosine', 0.95, 'above', cos, pos_mask, neg_mask), ('dhash_indep', 5, 'below', dh_indep, pos_mask, neg_mask & (dh_indep >= 0)), ('dhash_indep', 8, 'below', dh_indep, pos_mask, neg_mask & (dh_indep >= 0)), ('dhash_indep', 15, 'below', dh_indep, pos_mask, neg_mask & (dh_indep >= 0)), ] canonical_results = [] for name, thr, direction, scores, p_mask, n_mask in canonical: labelled = p_mask | n_mask valid = labelled & (scores >= 0 if 'dhash' in name else np.ones_like( labelled, dtype=bool)) y_local = p_mask[valid].astype(int) s = scores[valid] if direction == 'above': y_pred = (s > thr).astype(int) else: y_pred = (s < thr).astype(int) m = classification_metrics(y_local, y_pred) m.update({'indicator': name, 'threshold': float(thr), 'direction': direction}) canonical_results.append(m) print(f" {name} @ {thr:>5} ({direction}): " f"P={m['precision']:.3f}, R={m['recall']:.3f}, " f"F1={m['f1']:.3f}, FAR={m['far']:.4f}, FRR={m['frr']:.4f}") # --- Firm A anchor validation firm_a_mask = firm == FIRM_A firm_a_cos = cos[firm_a_mask] firm_a_dh = dh_indep[firm_a_mask] firm_a_rates = {} for thr in [0.837, 0.941, 0.95]: firm_a_rates[f'cosine>{thr}'] = float(np.mean(firm_a_cos > thr)) for thr in [5, 8, 15]: valid = firm_a_dh >= 0 firm_a_rates[f'dhash_indep<={thr}'] = float( np.mean(firm_a_dh[valid] <= thr)) # Dual thresholds firm_a_rates['cosine>0.95 AND dhash_indep<=8'] = float( np.mean((firm_a_cos > 0.95) & (firm_a_dh >= 0) & (firm_a_dh <= 8))) print('\nFirm A anchor validation:') for k, v in firm_a_rates.items(): print(f' {k}: {v*100:.2f}%') # --- Stratified sanity sample (30 signatures across 5 strata) rng = np.random.default_rng(42) strata = [ ('pixel_identical', pix == 1), ('high_cos_low_dh', (cos > 0.95) & (dh_indep >= 0) & (dh_indep <= 5) & (pix == 0)), ('borderline', (cos > 0.837) & (cos < 0.95) & (dh_indep >= 0) & (dh_indep <= 15)), ('style_consistency_only', (cos > 0.95) & (dh_indep >= 0) & (dh_indep > 15)), ('likely_genuine', cos < NEGATIVE_COSINE_UPPER), ] sanity_sample = [] per_stratum = SANITY_SAMPLE_SIZE // len(strata) for stratum_name, m in strata: idx = np.where(m)[0] pick = rng.choice(idx, size=min(per_stratum, len(idx)), replace=False) for i in pick: d = data[i] sanity_sample.append({ 'stratum': stratum_name, 'sig_id': d['sig_id'], 'filename': d['filename'], 'accountant': d['accountant'], 'firm': d['firm'], 'cosine': d['cosine'], 'dhash_indep': d['dhash_indep'], 'pixel_identical': d['pixel_identical'], 'closest_match': d['closest_match'], }) csv_path = OUT / 'sanity_sample.csv' with open(csv_path, 'w', encoding='utf-8') as f: keys = ['stratum', 'sig_id', 'filename', 'accountant', 'firm', 'cosine', 'dhash_indep', 'pixel_identical', 'closest_match'] f.write(','.join(keys) + '\n') for row in sanity_sample: f.write(','.join(str(row[k]) if row[k] is not None else '' for k in keys) + '\n') print(f'\nSanity sample CSV: {csv_path}') # --- Save results summary = { 'generated_at': datetime.now().isoformat(), 'n_signatures': len(data), 'n_pixel_identical': int(pos_mask.sum()), 'n_firm_a': int(firm_a_mask.sum()), 'n_negative_anchor': int(neg_mask.sum()), 'negative_cosine_upper': NEGATIVE_COSINE_UPPER, 'eer_cosine': cos_eer, 'eer_dhash_indep': dh_eer, 'canonical_thresholds': canonical_results, 'firm_a_anchor_rates': firm_a_rates, 'cosine_sweep': cos_sweep, 'dhash_sweep': dh_sweep, } with open(OUT / 'pixel_validation_results.json', 'w') as f: json.dump(summary, f, indent=2, ensure_ascii=False) print(f'JSON: {OUT / "pixel_validation_results.json"}') # --- Markdown md = [ '# Pixel-Identity Validation Report', f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}", '', '## Anchors (no human annotation required)', '', f'* **Pixel-identical anchor (gold positive):** ' f'{int(pos_mask.sum()):,} signatures whose closest same-accountant', ' match is byte-identical after crop/normalise. Under handwriting', ' physics this can only arise from image duplication.', f'* **Negative anchor:** signatures whose maximum same-accountant', f' cosine is below {NEGATIVE_COSINE_UPPER} ' f'({int(neg_mask.sum()):,} signatures). Treated as', ' confirmed not-replicated.', f'* **Firm A anchor:** Deloitte ({int(firm_a_mask.sum()):,} signatures),', ' a replication-dominated population per interviews with multiple', ' Firm A accountants: most use replication (stamping / firm-level', ' e-signing), but a minority may still hand-sign. Used as a strong', ' prior positive for the majority regime, with the ~7% below', ' cosine 0.95 reflecting the minority hand-signers.', '', '## Equal Error Rate (EER)', '', '| Indicator | Direction | EER threshold | EER |', '|-----------|-----------|---------------|-----|', f"| Cosine max-similarity | > t | {cos_eer['threshold']:.4f} | " f"{cos_eer['eer']:.4f} |", f"| Independent min dHash | < t | {dh_eer['threshold']:.4f} | " f"{dh_eer['eer']:.4f} |", '', '## Canonical thresholds', '', '| Indicator | Threshold | Precision | Recall | F1 | FAR | FRR |', '|-----------|-----------|-----------|--------|----|-----|-----|', ] for c in canonical_results: md.append( f"| {c['indicator']} | {c['threshold']} " f"({c['direction']}) | {c['precision']:.3f} | " f"{c['recall']:.3f} | {c['f1']:.3f} | " f"{c['far']:.4f} | {c['frr']:.4f} |" ) md += ['', '## Firm A anchor validation', '', '| Rule | Firm A rate |', '|------|-------------|'] for k, v in firm_a_rates.items(): md.append(f'| {k} | {v*100:.2f}% |') md += ['', '## Sanity sample', '', f'A stratified sample of {len(sanity_sample)} signatures ' '(pixel-identical, high-cos/low-dh, borderline, style-only, ' 'likely-genuine) is exported to `sanity_sample.csv` for visual', 'spot-check. These are **not** used to compute metrics.', '', '## Interpretation', '', 'Because the gold positive is a *subset* of the true replication', 'positives (only those that happen to be pixel-identical to their', 'nearest match), recall is conservative: the classifier should', 'catch pixel-identical pairs reliably and will additionally flag', 'many non-pixel-identical replications (low dHash but not zero).', 'FAR against the low-cosine negative anchor is the meaningful', 'upper bound on spurious replication flags.', '', 'Convergence of thresholds across Scripts 15 (dip test), 16 (BD),', '17 (Beta mixture), 18 (accountant mixture) and the EER here', 'should be reported in the paper as multi-method validation.', ] (OUT / 'pixel_validation_report.md').write_text('\n'.join(md), encoding='utf-8') print(f'Report: {OUT / "pixel_validation_report.md"}') if __name__ == '__main__': main()