Files
pdf_signature_extraction/signature_analysis/19_pixel_identity_validation.py
T
gbanyan 68689c9f9b Correct Firm A framing: replication-dominated, not pure
Interview evidence from multiple Firm A accountants confirms that MOST
use replication (stamping / firm-level e-signing) but a MINORITY may
still hand-sign. Firm A is therefore a "replication-dominated" population,
not a "pure" one. This framing is consistent with:

- 92.5% of Firm A signatures exceed cosine 0.95 (majority replication)
- The long left tail (~7%) captures the minority hand-signers, not scan
  noise or preprocessing artifacts
- Hartigan dip test: Firm A cosine unimodal long-tail (p=0.17)
- Accountant-level GMM: of 180 Firm A accountants, 139 cluster in C1
  (high-replication) and 32 in C2 (middle band = minority hand-signers)

Updates docstrings and report text in Scripts 15, 16, 18, 19 to match.
Partner v3's "near-universal non-hand-signing" language corrected.

Script 19 regenerated with the updated text.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-20 21:57:16 +08:00

424 lines
17 KiB
Python

#!/usr/bin/env python3
"""
Script 19: Pixel-Identity Validation (No Human Annotation Required)
===================================================================
Validates the cosine + dHash dual classifier using three naturally
occurring reference populations instead of manual labels:
Positive anchor 1: pixel_identical_to_closest = 1
Two signature images byte-identical after crop/resize.
Mathematically impossible to arise from independent hand-signing
=> absolute ground truth for replication.
Positive anchor 2: Firm A (Deloitte) signatures
Interview evidence from multiple Firm A accountants confirms that
MOST use replication (stamping / firm-level e-signing) but a
MINORITY may still hand-sign. Firm A is therefore a
"replication-dominated" population (not a pure one). We use it as
a strong prior positive for the majority regime, while noting that
~7% of Firm A signatures fall below cosine 0.95 consistent with
the minority hand-signers. This matches the long left tail
observed in the dip test (Script 15) and the Firm A members who
land in C2 (middle band) of the accountant-level GMM (Script 18).
Negative anchor: signatures with cosine <= low threshold
Pairs with very low cosine similarity cannot plausibly be pixel
duplicates, so they serve as absolute negatives.
Metrics reported:
- FAR/FRR/EER using the pixel-identity anchor as the gold positive
and low-similarity pairs as the gold negative.
- Precision/Recall/F1 at cosine and dHash thresholds from Scripts
15/16/17/18.
- Convergence with Firm A anchor (what fraction of Firm A signatures
are correctly classified at each threshold).
Small visual sanity sample (30 pairs) is exported for spot-check, but
metrics are derived entirely from pixel and Firm A evidence.
Output:
reports/pixel_validation/pixel_validation_report.md
reports/pixel_validation/pixel_validation_results.json
reports/pixel_validation/roc_cosine.png, roc_dhash.png
reports/pixel_validation/sanity_sample.csv
"""
import sqlite3
import json
import numpy as np
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt
from pathlib import Path
from datetime import datetime
DB = '/Volumes/NV2/PDF-Processing/signature-analysis/signature_analysis.db'
OUT = Path('/Volumes/NV2/PDF-Processing/signature-analysis/reports/'
'pixel_validation')
OUT.mkdir(parents=True, exist_ok=True)
FIRM_A = '勤業眾信聯合'
NEGATIVE_COSINE_UPPER = 0.70 # pairs with max-cosine < 0.70 assumed not replicated
SANITY_SAMPLE_SIZE = 30
def load_signatures():
conn = sqlite3.connect(DB)
cur = conn.cursor()
cur.execute('''
SELECT s.signature_id, s.image_filename, s.assigned_accountant,
a.firm, s.max_similarity_to_same_accountant,
s.phash_distance_to_closest, s.min_dhash_independent,
s.pixel_identical_to_closest, s.closest_match_file
FROM signatures s
LEFT JOIN accountants a ON s.assigned_accountant = a.name
WHERE s.max_similarity_to_same_accountant IS NOT NULL
''')
rows = cur.fetchall()
conn.close()
data = []
for r in rows:
data.append({
'sig_id': r[0], 'filename': r[1], 'accountant': r[2],
'firm': r[3] or '(unknown)',
'cosine': float(r[4]),
'dhash_cond': None if r[5] is None else int(r[5]),
'dhash_indep': None if r[6] is None else int(r[6]),
'pixel_identical': int(r[7] or 0),
'closest_match': r[8],
})
return data
def confusion(y_true, y_pred):
tp = int(np.sum((y_true == 1) & (y_pred == 1)))
fp = int(np.sum((y_true == 0) & (y_pred == 1)))
fn = int(np.sum((y_true == 1) & (y_pred == 0)))
tn = int(np.sum((y_true == 0) & (y_pred == 0)))
return tp, fp, fn, tn
def classification_metrics(y_true, y_pred):
tp, fp, fn, tn = confusion(y_true, y_pred)
denom_p = max(tp + fp, 1)
denom_r = max(tp + fn, 1)
precision = tp / denom_p
recall = tp / denom_r
f1 = (2 * precision * recall / (precision + recall)
if precision + recall > 0 else 0.0)
far = fp / max(fp + tn, 1) # false acceptance rate (over negatives)
frr = fn / max(fn + tp, 1) # false rejection rate (over positives)
return {
'tp': tp, 'fp': fp, 'fn': fn, 'tn': tn,
'precision': float(precision),
'recall': float(recall),
'f1': float(f1),
'far': float(far),
'frr': float(frr),
}
def sweep_threshold(scores, y, directions, thresholds):
"""For direction 'above' a prediction is positive if score > threshold;
for 'below' it is positive if score < threshold."""
out = []
for t in thresholds:
if directions == 'above':
y_pred = (scores > t).astype(int)
else:
y_pred = (scores < t).astype(int)
m = classification_metrics(y, y_pred)
m['threshold'] = float(t)
out.append(m)
return out
def find_eer(sweep):
"""EER = point where FAR ≈ FRR; interpolated from nearest pair."""
thr = np.array([s['threshold'] for s in sweep])
far = np.array([s['far'] for s in sweep])
frr = np.array([s['frr'] for s in sweep])
diff = far - frr
signs = np.sign(diff)
changes = np.where(np.diff(signs) != 0)[0]
if len(changes) == 0:
idx = int(np.argmin(np.abs(diff)))
return {'threshold': float(thr[idx]), 'far': float(far[idx]),
'frr': float(frr[idx]), 'eer': float(0.5 * (far[idx] + frr[idx]))}
i = int(changes[0])
w = abs(diff[i]) / (abs(diff[i]) + abs(diff[i + 1]) + 1e-12)
thr_i = (1 - w) * thr[i] + w * thr[i + 1]
far_i = (1 - w) * far[i] + w * far[i + 1]
frr_i = (1 - w) * frr[i] + w * frr[i + 1]
return {'threshold': float(thr_i), 'far': float(far_i),
'frr': float(frr_i), 'eer': float(0.5 * (far_i + frr_i))}
def plot_roc(sweep, title, out_path):
far = np.array([s['far'] for s in sweep])
frr = np.array([s['frr'] for s in sweep])
thr = np.array([s['threshold'] for s in sweep])
fig, axes = plt.subplots(1, 2, figsize=(13, 5))
ax = axes[0]
ax.plot(far, 1 - frr, 'b-', lw=2)
ax.plot([0, 1], [0, 1], 'k--', alpha=0.4)
ax.set_xlabel('FAR')
ax.set_ylabel('1 - FRR (True Positive Rate)')
ax.set_title(f'{title} - ROC')
ax.set_xlim(0, 1)
ax.set_ylim(0, 1)
ax.grid(alpha=0.3)
ax = axes[1]
ax.plot(thr, far, 'r-', lw=2, label='FAR')
ax.plot(thr, frr, 'b-', lw=2, label='FRR')
ax.set_xlabel('Threshold')
ax.set_ylabel('Error rate')
ax.set_title(f'{title} - FAR / FRR vs threshold')
ax.legend()
ax.grid(alpha=0.3)
plt.tight_layout()
fig.savefig(out_path, dpi=150)
plt.close()
def main():
print('='*70)
print('Script 19: Pixel-Identity Validation (No Annotation)')
print('='*70)
data = load_signatures()
print(f'\nTotal signatures loaded: {len(data):,}')
cos = np.array([d['cosine'] for d in data])
dh_indep = np.array([d['dhash_indep'] if d['dhash_indep'] is not None
else -1 for d in data])
pix = np.array([d['pixel_identical'] for d in data])
firm = np.array([d['firm'] for d in data])
print(f'Pixel-identical: {int(pix.sum()):,} signatures')
print(f'Firm A signatures: {int((firm == FIRM_A).sum()):,}')
print(f'Negative anchor (cosine < {NEGATIVE_COSINE_UPPER}): '
f'{int((cos < NEGATIVE_COSINE_UPPER).sum()):,}')
# Build labelled set:
# positive = pixel_identical == 1
# negative = cosine < NEGATIVE_COSINE_UPPER (and not pixel_identical)
pos_mask = pix == 1
neg_mask = (cos < NEGATIVE_COSINE_UPPER) & (~pos_mask)
labelled_mask = pos_mask | neg_mask
y = pos_mask[labelled_mask].astype(int)
cos_l = cos[labelled_mask]
dh_l = dh_indep[labelled_mask]
# --- Sweep cosine threshold
cos_thresh = np.linspace(0.50, 1.00, 101)
cos_sweep = sweep_threshold(cos_l, y, 'above', cos_thresh)
cos_eer = find_eer(cos_sweep)
print(f'\nCosine EER: threshold={cos_eer["threshold"]:.4f}, '
f'EER={cos_eer["eer"]:.4f}')
# --- Sweep dHash threshold (independent)
dh_l_valid = dh_l >= 0
y_dh = y[dh_l_valid]
dh_valid = dh_l[dh_l_valid]
dh_thresh = np.arange(0, 40)
dh_sweep = sweep_threshold(dh_valid, y_dh, 'below', dh_thresh)
dh_eer = find_eer(dh_sweep)
print(f'dHash EER: threshold={dh_eer["threshold"]:.4f}, '
f'EER={dh_eer["eer"]:.4f}')
# Plots
plot_roc(cos_sweep, 'Cosine (pixel-identity anchor)',
OUT / 'roc_cosine.png')
plot_roc(dh_sweep, 'Independent dHash (pixel-identity anchor)',
OUT / 'roc_dhash.png')
# --- Evaluate canonical thresholds
canonical = [
('cosine', 0.837, 'above', cos, pos_mask, neg_mask),
('cosine', 0.941, 'above', cos, pos_mask, neg_mask),
('cosine', 0.95, 'above', cos, pos_mask, neg_mask),
('dhash_indep', 5, 'below', dh_indep, pos_mask,
neg_mask & (dh_indep >= 0)),
('dhash_indep', 8, 'below', dh_indep, pos_mask,
neg_mask & (dh_indep >= 0)),
('dhash_indep', 15, 'below', dh_indep, pos_mask,
neg_mask & (dh_indep >= 0)),
]
canonical_results = []
for name, thr, direction, scores, p_mask, n_mask in canonical:
labelled = p_mask | n_mask
valid = labelled & (scores >= 0 if 'dhash' in name else np.ones_like(
labelled, dtype=bool))
y_local = p_mask[valid].astype(int)
s = scores[valid]
if direction == 'above':
y_pred = (s > thr).astype(int)
else:
y_pred = (s < thr).astype(int)
m = classification_metrics(y_local, y_pred)
m.update({'indicator': name, 'threshold': float(thr),
'direction': direction})
canonical_results.append(m)
print(f" {name} @ {thr:>5} ({direction}): "
f"P={m['precision']:.3f}, R={m['recall']:.3f}, "
f"F1={m['f1']:.3f}, FAR={m['far']:.4f}, FRR={m['frr']:.4f}")
# --- Firm A anchor validation
firm_a_mask = firm == FIRM_A
firm_a_cos = cos[firm_a_mask]
firm_a_dh = dh_indep[firm_a_mask]
firm_a_rates = {}
for thr in [0.837, 0.941, 0.95]:
firm_a_rates[f'cosine>{thr}'] = float(np.mean(firm_a_cos > thr))
for thr in [5, 8, 15]:
valid = firm_a_dh >= 0
firm_a_rates[f'dhash_indep<={thr}'] = float(
np.mean(firm_a_dh[valid] <= thr))
# Dual thresholds
firm_a_rates['cosine>0.95 AND dhash_indep<=8'] = float(
np.mean((firm_a_cos > 0.95) &
(firm_a_dh >= 0) & (firm_a_dh <= 8)))
print('\nFirm A anchor validation:')
for k, v in firm_a_rates.items():
print(f' {k}: {v*100:.2f}%')
# --- Stratified sanity sample (30 signatures across 5 strata)
rng = np.random.default_rng(42)
strata = [
('pixel_identical', pix == 1),
('high_cos_low_dh',
(cos > 0.95) & (dh_indep >= 0) & (dh_indep <= 5) & (pix == 0)),
('borderline',
(cos > 0.837) & (cos < 0.95) & (dh_indep >= 0) & (dh_indep <= 15)),
('style_consistency_only',
(cos > 0.95) & (dh_indep >= 0) & (dh_indep > 15)),
('likely_genuine', cos < NEGATIVE_COSINE_UPPER),
]
sanity_sample = []
per_stratum = SANITY_SAMPLE_SIZE // len(strata)
for stratum_name, m in strata:
idx = np.where(m)[0]
pick = rng.choice(idx, size=min(per_stratum, len(idx)), replace=False)
for i in pick:
d = data[i]
sanity_sample.append({
'stratum': stratum_name, 'sig_id': d['sig_id'],
'filename': d['filename'], 'accountant': d['accountant'],
'firm': d['firm'], 'cosine': d['cosine'],
'dhash_indep': d['dhash_indep'],
'pixel_identical': d['pixel_identical'],
'closest_match': d['closest_match'],
})
csv_path = OUT / 'sanity_sample.csv'
with open(csv_path, 'w', encoding='utf-8') as f:
keys = ['stratum', 'sig_id', 'filename', 'accountant', 'firm',
'cosine', 'dhash_indep', 'pixel_identical', 'closest_match']
f.write(','.join(keys) + '\n')
for row in sanity_sample:
f.write(','.join(str(row[k]) if row[k] is not None else ''
for k in keys) + '\n')
print(f'\nSanity sample CSV: {csv_path}')
# --- Save results
summary = {
'generated_at': datetime.now().isoformat(),
'n_signatures': len(data),
'n_pixel_identical': int(pos_mask.sum()),
'n_firm_a': int(firm_a_mask.sum()),
'n_negative_anchor': int(neg_mask.sum()),
'negative_cosine_upper': NEGATIVE_COSINE_UPPER,
'eer_cosine': cos_eer,
'eer_dhash_indep': dh_eer,
'canonical_thresholds': canonical_results,
'firm_a_anchor_rates': firm_a_rates,
'cosine_sweep': cos_sweep,
'dhash_sweep': dh_sweep,
}
with open(OUT / 'pixel_validation_results.json', 'w') as f:
json.dump(summary, f, indent=2, ensure_ascii=False)
print(f'JSON: {OUT / "pixel_validation_results.json"}')
# --- Markdown
md = [
'# Pixel-Identity Validation Report',
f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}",
'',
'## Anchors (no human annotation required)',
'',
f'* **Pixel-identical anchor (gold positive):** '
f'{int(pos_mask.sum()):,} signatures whose closest same-accountant',
' match is byte-identical after crop/normalise. Under handwriting',
' physics this can only arise from image duplication.',
f'* **Negative anchor:** signatures whose maximum same-accountant',
f' cosine is below {NEGATIVE_COSINE_UPPER} '
f'({int(neg_mask.sum()):,} signatures). Treated as',
' confirmed not-replicated.',
f'* **Firm A anchor:** Deloitte ({int(firm_a_mask.sum()):,} signatures),',
' a replication-dominated population per interviews with multiple',
' Firm A accountants: most use replication (stamping / firm-level',
' e-signing), but a minority may still hand-sign. Used as a strong',
' prior positive for the majority regime, with the ~7% below',
' cosine 0.95 reflecting the minority hand-signers.',
'',
'## Equal Error Rate (EER)',
'',
'| Indicator | Direction | EER threshold | EER |',
'|-----------|-----------|---------------|-----|',
f"| Cosine max-similarity | > t | {cos_eer['threshold']:.4f} | "
f"{cos_eer['eer']:.4f} |",
f"| Independent min dHash | < t | {dh_eer['threshold']:.4f} | "
f"{dh_eer['eer']:.4f} |",
'',
'## Canonical thresholds',
'',
'| Indicator | Threshold | Precision | Recall | F1 | FAR | FRR |',
'|-----------|-----------|-----------|--------|----|-----|-----|',
]
for c in canonical_results:
md.append(
f"| {c['indicator']} | {c['threshold']} "
f"({c['direction']}) | {c['precision']:.3f} | "
f"{c['recall']:.3f} | {c['f1']:.3f} | "
f"{c['far']:.4f} | {c['frr']:.4f} |"
)
md += ['', '## Firm A anchor validation', '',
'| Rule | Firm A rate |',
'|------|-------------|']
for k, v in firm_a_rates.items():
md.append(f'| {k} | {v*100:.2f}% |')
md += ['', '## Sanity sample', '',
f'A stratified sample of {len(sanity_sample)} signatures '
'(pixel-identical, high-cos/low-dh, borderline, style-only, '
'likely-genuine) is exported to `sanity_sample.csv` for visual',
'spot-check. These are **not** used to compute metrics.',
'',
'## Interpretation',
'',
'Because the gold positive is a *subset* of the true replication',
'positives (only those that happen to be pixel-identical to their',
'nearest match), recall is conservative: the classifier should',
'catch pixel-identical pairs reliably and will additionally flag',
'many non-pixel-identical replications (low dHash but not zero).',
'FAR against the low-cosine negative anchor is the meaningful',
'upper bound on spurious replication flags.',
'',
'Convergence of thresholds across Scripts 15 (dip test), 16 (BD),',
'17 (Beta mixture), 18 (accountant mixture) and the EER here',
'should be reported in the paper as multi-method validation.',
]
(OUT / 'pixel_validation_report.md').write_text('\n'.join(md),
encoding='utf-8')
print(f'Report: {OUT / "pixel_validation_report.md"}')
if __name__ == '__main__':
main()