#!/usr/bin/env python3 """ Script 19: Pixel-Identity Validation (No Human Annotation Required) =================================================================== Validates the cosine + dHash dual classifier using three naturally occurring reference populations instead of manual labels: Positive anchor 1: pixel_identical_to_closest = 1 Two signature images byte-identical after crop/resize. Mathematically impossible to arise from independent hand-signing => absolute ground truth for replication. Positive anchor 2: Firm A (Deloitte) signatures Interview evidence from multiple Firm A accountants confirms that MOST use replication (stamping / firm-level e-signing) but a MINORITY may still hand-sign. Firm A is therefore a "replication-dominated" population (not a pure one). We use it as a strong prior positive for the majority regime, while noting that ~7% of Firm A signatures fall below cosine 0.95 consistent with the minority hand-signers. This matches the long left tail observed in the dip test (Script 15) and the Firm A members who land in C2 (middle band) of the accountant-level GMM (Script 18). Negative anchor: signatures with cosine <= low threshold Pairs with very low cosine similarity cannot plausibly be pixel duplicates, so they serve as absolute negatives. Metrics reported: - FAR/FRR/EER using the pixel-identity anchor as the gold positive and low-similarity pairs as the gold negative. - Precision/Recall/F1 at cosine and dHash thresholds from Scripts 15/16/17/18. - Convergence with Firm A anchor (what fraction of Firm A signatures are correctly classified at each threshold). Small visual sanity sample (30 pairs) is exported for spot-check, but metrics are derived entirely from pixel and Firm A evidence. Output: reports/pixel_validation/pixel_validation_report.md reports/pixel_validation/pixel_validation_results.json reports/pixel_validation/roc_cosine.png, roc_dhash.png reports/pixel_validation/sanity_sample.csv """ import sqlite3 import json import numpy as np import matplotlib matplotlib.use('Agg') import matplotlib.pyplot as plt from pathlib import Path from datetime import datetime DB = '/Volumes/NV2/PDF-Processing/signature-analysis/signature_analysis.db' OUT = Path('/Volumes/NV2/PDF-Processing/signature-analysis/reports/' 'pixel_validation') OUT.mkdir(parents=True, exist_ok=True) FIRM_A = '勤業眾信聯合' NEGATIVE_COSINE_UPPER = 0.70 # pairs with max-cosine < 0.70 assumed not replicated SANITY_SAMPLE_SIZE = 30 def load_signatures(): conn = sqlite3.connect(DB) cur = conn.cursor() cur.execute(''' SELECT s.signature_id, s.image_filename, s.assigned_accountant, a.firm, s.max_similarity_to_same_accountant, s.phash_distance_to_closest, s.min_dhash_independent, s.pixel_identical_to_closest, s.closest_match_file FROM signatures s LEFT JOIN accountants a ON s.assigned_accountant = a.name WHERE s.max_similarity_to_same_accountant IS NOT NULL ''') rows = cur.fetchall() conn.close() data = [] for r in rows: data.append({ 'sig_id': r[0], 'filename': r[1], 'accountant': r[2], 'firm': r[3] or '(unknown)', 'cosine': float(r[4]), 'dhash_cond': None if r[5] is None else int(r[5]), 'dhash_indep': None if r[6] is None else int(r[6]), 'pixel_identical': int(r[7] or 0), 'closest_match': r[8], }) return data def confusion(y_true, y_pred): tp = int(np.sum((y_true == 1) & (y_pred == 1))) fp = int(np.sum((y_true == 0) & (y_pred == 1))) fn = int(np.sum((y_true == 1) & (y_pred == 0))) tn = int(np.sum((y_true == 0) & (y_pred == 0))) return tp, fp, fn, tn def classification_metrics(y_true, y_pred): tp, fp, fn, tn = confusion(y_true, y_pred) denom_p = max(tp + fp, 1) denom_r = max(tp + fn, 1) precision = tp / denom_p recall = tp / denom_r f1 = (2 * precision * recall / (precision + recall) if precision + recall > 0 else 0.0) far = fp / max(fp + tn, 1) # false acceptance rate (over negatives) frr = fn / max(fn + tp, 1) # false rejection rate (over positives) return { 'tp': tp, 'fp': fp, 'fn': fn, 'tn': tn, 'precision': float(precision), 'recall': float(recall), 'f1': float(f1), 'far': float(far), 'frr': float(frr), } def sweep_threshold(scores, y, directions, thresholds): """For direction 'above' a prediction is positive if score > threshold; for 'below' it is positive if score < threshold.""" out = [] for t in thresholds: if directions == 'above': y_pred = (scores > t).astype(int) else: y_pred = (scores < t).astype(int) m = classification_metrics(y, y_pred) m['threshold'] = float(t) out.append(m) return out def find_eer(sweep): """EER = point where FAR ≈ FRR; interpolated from nearest pair.""" thr = np.array([s['threshold'] for s in sweep]) far = np.array([s['far'] for s in sweep]) frr = np.array([s['frr'] for s in sweep]) diff = far - frr signs = np.sign(diff) changes = np.where(np.diff(signs) != 0)[0] if len(changes) == 0: idx = int(np.argmin(np.abs(diff))) return {'threshold': float(thr[idx]), 'far': float(far[idx]), 'frr': float(frr[idx]), 'eer': float(0.5 * (far[idx] + frr[idx]))} i = int(changes[0]) w = abs(diff[i]) / (abs(diff[i]) + abs(diff[i + 1]) + 1e-12) thr_i = (1 - w) * thr[i] + w * thr[i + 1] far_i = (1 - w) * far[i] + w * far[i + 1] frr_i = (1 - w) * frr[i] + w * frr[i + 1] return {'threshold': float(thr_i), 'far': float(far_i), 'frr': float(frr_i), 'eer': float(0.5 * (far_i + frr_i))} def plot_roc(sweep, title, out_path): far = np.array([s['far'] for s in sweep]) frr = np.array([s['frr'] for s in sweep]) thr = np.array([s['threshold'] for s in sweep]) fig, axes = plt.subplots(1, 2, figsize=(13, 5)) ax = axes[0] ax.plot(far, 1 - frr, 'b-', lw=2) ax.plot([0, 1], [0, 1], 'k--', alpha=0.4) ax.set_xlabel('FAR') ax.set_ylabel('1 - FRR (True Positive Rate)') ax.set_title(f'{title} - ROC') ax.set_xlim(0, 1) ax.set_ylim(0, 1) ax.grid(alpha=0.3) ax = axes[1] ax.plot(thr, far, 'r-', lw=2, label='FAR') ax.plot(thr, frr, 'b-', lw=2, label='FRR') ax.set_xlabel('Threshold') ax.set_ylabel('Error rate') ax.set_title(f'{title} - FAR / FRR vs threshold') ax.legend() ax.grid(alpha=0.3) plt.tight_layout() fig.savefig(out_path, dpi=150) plt.close() def main(): print('='*70) print('Script 19: Pixel-Identity Validation (No Annotation)') print('='*70) data = load_signatures() print(f'\nTotal signatures loaded: {len(data):,}') cos = np.array([d['cosine'] for d in data]) dh_indep = np.array([d['dhash_indep'] if d['dhash_indep'] is not None else -1 for d in data]) pix = np.array([d['pixel_identical'] for d in data]) firm = np.array([d['firm'] for d in data]) print(f'Pixel-identical: {int(pix.sum()):,} signatures') print(f'Firm A signatures: {int((firm == FIRM_A).sum()):,}') print(f'Negative anchor (cosine < {NEGATIVE_COSINE_UPPER}): ' f'{int((cos < NEGATIVE_COSINE_UPPER).sum()):,}') # Build labelled set: # positive = pixel_identical == 1 # negative = cosine < NEGATIVE_COSINE_UPPER (and not pixel_identical) pos_mask = pix == 1 neg_mask = (cos < NEGATIVE_COSINE_UPPER) & (~pos_mask) labelled_mask = pos_mask | neg_mask y = pos_mask[labelled_mask].astype(int) cos_l = cos[labelled_mask] dh_l = dh_indep[labelled_mask] # --- Sweep cosine threshold cos_thresh = np.linspace(0.50, 1.00, 101) cos_sweep = sweep_threshold(cos_l, y, 'above', cos_thresh) cos_eer = find_eer(cos_sweep) print(f'\nCosine EER: threshold={cos_eer["threshold"]:.4f}, ' f'EER={cos_eer["eer"]:.4f}') # --- Sweep dHash threshold (independent) dh_l_valid = dh_l >= 0 y_dh = y[dh_l_valid] dh_valid = dh_l[dh_l_valid] dh_thresh = np.arange(0, 40) dh_sweep = sweep_threshold(dh_valid, y_dh, 'below', dh_thresh) dh_eer = find_eer(dh_sweep) print(f'dHash EER: threshold={dh_eer["threshold"]:.4f}, ' f'EER={dh_eer["eer"]:.4f}') # Plots plot_roc(cos_sweep, 'Cosine (pixel-identity anchor)', OUT / 'roc_cosine.png') plot_roc(dh_sweep, 'Independent dHash (pixel-identity anchor)', OUT / 'roc_dhash.png') # --- Evaluate canonical thresholds canonical = [ ('cosine', 0.837, 'above', cos, pos_mask, neg_mask), ('cosine', 0.941, 'above', cos, pos_mask, neg_mask), ('cosine', 0.95, 'above', cos, pos_mask, neg_mask), ('dhash_indep', 5, 'below', dh_indep, pos_mask, neg_mask & (dh_indep >= 0)), ('dhash_indep', 8, 'below', dh_indep, pos_mask, neg_mask & (dh_indep >= 0)), ('dhash_indep', 15, 'below', dh_indep, pos_mask, neg_mask & (dh_indep >= 0)), ] canonical_results = [] for name, thr, direction, scores, p_mask, n_mask in canonical: labelled = p_mask | n_mask valid = labelled & (scores >= 0 if 'dhash' in name else np.ones_like( labelled, dtype=bool)) y_local = p_mask[valid].astype(int) s = scores[valid] if direction == 'above': y_pred = (s > thr).astype(int) else: y_pred = (s < thr).astype(int) m = classification_metrics(y_local, y_pred) m.update({'indicator': name, 'threshold': float(thr), 'direction': direction}) canonical_results.append(m) print(f" {name} @ {thr:>5} ({direction}): " f"P={m['precision']:.3f}, R={m['recall']:.3f}, " f"F1={m['f1']:.3f}, FAR={m['far']:.4f}, FRR={m['frr']:.4f}") # --- Firm A anchor validation firm_a_mask = firm == FIRM_A firm_a_cos = cos[firm_a_mask] firm_a_dh = dh_indep[firm_a_mask] firm_a_rates = {} for thr in [0.837, 0.941, 0.95]: firm_a_rates[f'cosine>{thr}'] = float(np.mean(firm_a_cos > thr)) for thr in [5, 8, 15]: valid = firm_a_dh >= 0 firm_a_rates[f'dhash_indep<={thr}'] = float( np.mean(firm_a_dh[valid] <= thr)) # Dual thresholds firm_a_rates['cosine>0.95 AND dhash_indep<=8'] = float( np.mean((firm_a_cos > 0.95) & (firm_a_dh >= 0) & (firm_a_dh <= 8))) print('\nFirm A anchor validation:') for k, v in firm_a_rates.items(): print(f' {k}: {v*100:.2f}%') # --- Stratified sanity sample (30 signatures across 5 strata) rng = np.random.default_rng(42) strata = [ ('pixel_identical', pix == 1), ('high_cos_low_dh', (cos > 0.95) & (dh_indep >= 0) & (dh_indep <= 5) & (pix == 0)), ('borderline', (cos > 0.837) & (cos < 0.95) & (dh_indep >= 0) & (dh_indep <= 15)), ('style_consistency_only', (cos > 0.95) & (dh_indep >= 0) & (dh_indep > 15)), ('likely_genuine', cos < NEGATIVE_COSINE_UPPER), ] sanity_sample = [] per_stratum = SANITY_SAMPLE_SIZE // len(strata) for stratum_name, m in strata: idx = np.where(m)[0] pick = rng.choice(idx, size=min(per_stratum, len(idx)), replace=False) for i in pick: d = data[i] sanity_sample.append({ 'stratum': stratum_name, 'sig_id': d['sig_id'], 'filename': d['filename'], 'accountant': d['accountant'], 'firm': d['firm'], 'cosine': d['cosine'], 'dhash_indep': d['dhash_indep'], 'pixel_identical': d['pixel_identical'], 'closest_match': d['closest_match'], }) csv_path = OUT / 'sanity_sample.csv' with open(csv_path, 'w', encoding='utf-8') as f: keys = ['stratum', 'sig_id', 'filename', 'accountant', 'firm', 'cosine', 'dhash_indep', 'pixel_identical', 'closest_match'] f.write(','.join(keys) + '\n') for row in sanity_sample: f.write(','.join(str(row[k]) if row[k] is not None else '' for k in keys) + '\n') print(f'\nSanity sample CSV: {csv_path}') # --- Save results summary = { 'generated_at': datetime.now().isoformat(), 'n_signatures': len(data), 'n_pixel_identical': int(pos_mask.sum()), 'n_firm_a': int(firm_a_mask.sum()), 'n_negative_anchor': int(neg_mask.sum()), 'negative_cosine_upper': NEGATIVE_COSINE_UPPER, 'eer_cosine': cos_eer, 'eer_dhash_indep': dh_eer, 'canonical_thresholds': canonical_results, 'firm_a_anchor_rates': firm_a_rates, 'cosine_sweep': cos_sweep, 'dhash_sweep': dh_sweep, } with open(OUT / 'pixel_validation_results.json', 'w') as f: json.dump(summary, f, indent=2, ensure_ascii=False) print(f'JSON: {OUT / "pixel_validation_results.json"}') # --- Markdown md = [ '# Pixel-Identity Validation Report', f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}", '', '## Anchors (no human annotation required)', '', f'* **Pixel-identical anchor (gold positive):** ' f'{int(pos_mask.sum()):,} signatures whose closest same-accountant', ' match is byte-identical after crop/normalise. Under handwriting', ' physics this can only arise from image duplication.', f'* **Negative anchor:** signatures whose maximum same-accountant', f' cosine is below {NEGATIVE_COSINE_UPPER} ' f'({int(neg_mask.sum()):,} signatures). Treated as', ' confirmed not-replicated.', f'* **Firm A anchor:** Deloitte ({int(firm_a_mask.sum()):,} signatures),', ' a replication-dominated population per interviews with multiple', ' Firm A accountants: most use replication (stamping / firm-level', ' e-signing), but a minority may still hand-sign. Used as a strong', ' prior positive for the majority regime, with the ~7% below', ' cosine 0.95 reflecting the minority hand-signers.', '', '## Equal Error Rate (EER)', '', '| Indicator | Direction | EER threshold | EER |', '|-----------|-----------|---------------|-----|', f"| Cosine max-similarity | > t | {cos_eer['threshold']:.4f} | " f"{cos_eer['eer']:.4f} |", f"| Independent min dHash | < t | {dh_eer['threshold']:.4f} | " f"{dh_eer['eer']:.4f} |", '', '## Canonical thresholds', '', '| Indicator | Threshold | Precision | Recall | F1 | FAR | FRR |', '|-----------|-----------|-----------|--------|----|-----|-----|', ] for c in canonical_results: md.append( f"| {c['indicator']} | {c['threshold']} " f"({c['direction']}) | {c['precision']:.3f} | " f"{c['recall']:.3f} | {c['f1']:.3f} | " f"{c['far']:.4f} | {c['frr']:.4f} |" ) md += ['', '## Firm A anchor validation', '', '| Rule | Firm A rate |', '|------|-------------|'] for k, v in firm_a_rates.items(): md.append(f'| {k} | {v*100:.2f}% |') md += ['', '## Sanity sample', '', f'A stratified sample of {len(sanity_sample)} signatures ' '(pixel-identical, high-cos/low-dh, borderline, style-only, ' 'likely-genuine) is exported to `sanity_sample.csv` for visual', 'spot-check. These are **not** used to compute metrics.', '', '## Interpretation', '', 'Because the gold positive is a *subset* of the true replication', 'positives (only those that happen to be pixel-identical to their', 'nearest match), recall is conservative: the classifier should', 'catch pixel-identical pairs reliably and will additionally flag', 'many non-pixel-identical replications (low dHash but not zero).', 'FAR against the low-cosine negative anchor is the meaningful', 'upper bound on spurious replication flags.', '', 'Convergence of thresholds across Scripts 15 (dip test), 16 (BD),', '17 (Beta mixture), 18 (accountant mixture) and the EER here', 'should be reported in the paper as multi-method validation.', ] (OUT / 'pixel_validation_report.md').write_text('\n'.join(md), encoding='utf-8') print(f'Report: {OUT / "pixel_validation_report.md"}') if __name__ == '__main__': main()