#!/usr/bin/env python3 """ Compute independent min dHash for all signatures. =================================================== Currently phash_distance_to_closest is conditional on cosine-nearest pair. This script computes an INDEPENDENT min dHash: for each signature, find the pair within the same accountant that has the smallest dHash distance, regardless of cosine similarity. Three metrics after this script: 1. max_similarity_to_same_accountant (max cosine) — primary classifier 2. min_dhash_independent (independent min) — independent 2nd classifier 3. phash_distance_to_closest (conditional) — diagnostic tool Phase 1: Compute dHash vector for each image, store as BLOB in DB Phase 2: All-pairs hamming distance within same accountant, store min """ import sqlite3 import numpy as np import cv2 import os import sys import time from multiprocessing import Pool, cpu_count from pathlib import Path DB_PATH = '/Volumes/NV2/PDF-Processing/signature-analysis/signature_analysis.db' IMAGE_DIR = '/Volumes/NV2/PDF-Processing/yolo-signatures/images' NUM_WORKERS = max(1, cpu_count() - 2) BATCH_SIZE = 5000 HASH_SIZE = 8 # 9x8 -> 8x8 = 64-bit hash # ── Phase 1: Compute dHash per image ───────────────────────────────── def compute_dhash_for_file(args): """Compute dHash for a single image file. Returns (sig_id, hash_bytes) or (sig_id, None).""" sig_id, filename = args path = os.path.join(IMAGE_DIR, filename) try: img = cv2.imread(path, cv2.IMREAD_GRAYSCALE) if img is None: return (sig_id, None) resized = cv2.resize(img, (HASH_SIZE + 1, HASH_SIZE)) diff = resized[:, 1:] > resized[:, :-1] # 8x8 = 64 bits return (sig_id, np.packbits(diff.flatten()).tobytes()) except Exception: return (sig_id, None) def phase1_compute_hashes(): """Compute and store dHash for all signatures.""" conn = sqlite3.connect(DB_PATH) cur = conn.cursor() # Add columns if not exist for col in ['dhash_vector BLOB', 'min_dhash_independent INTEGER', 'min_dhash_independent_match TEXT']: try: cur.execute(f'ALTER TABLE signatures ADD COLUMN {col}') except sqlite3.OperationalError: pass conn.commit() # Check which signatures already have dhash_vector cur.execute(''' SELECT signature_id, image_filename FROM signatures WHERE feature_vector IS NOT NULL AND assigned_accountant IS NOT NULL AND dhash_vector IS NULL ''') todo = cur.fetchall() if not todo: # Check total with dhash cur.execute('SELECT COUNT(*) FROM signatures WHERE dhash_vector IS NOT NULL') n_done = cur.fetchone()[0] print(f" Phase 1 already complete ({n_done:,} hashes in DB)") conn.close() return print(f" Computing dHash for {len(todo):,} images ({NUM_WORKERS} workers)...") t0 = time.time() processed = 0 for batch_start in range(0, len(todo), BATCH_SIZE): batch = todo[batch_start:batch_start + BATCH_SIZE] with Pool(NUM_WORKERS) as pool: results = pool.map(compute_dhash_for_file, batch) updates = [(dhash, sid) for sid, dhash in results if dhash is not None] cur.executemany('UPDATE signatures SET dhash_vector = ? WHERE signature_id = ?', updates) conn.commit() processed += len(batch) elapsed = time.time() - t0 rate = processed / elapsed eta = (len(todo) - processed) / rate if rate > 0 else 0 print(f" {processed:,}/{len(todo):,} ({rate:.0f}/s, ETA {eta:.0f}s)") conn.close() elapsed = time.time() - t0 print(f" Phase 1 done: {processed:,} hashes in {elapsed:.1f}s") # ── Phase 2: All-pairs min dHash within same accountant ────────────── def hamming_distance(h1_bytes, h2_bytes): """Hamming distance between two packed dHash byte strings.""" a = np.frombuffer(h1_bytes, dtype=np.uint8) b = np.frombuffer(h2_bytes, dtype=np.uint8) xor = np.bitwise_xor(a, b) return sum(bin(byte).count('1') for byte in xor) def phase2_compute_min_dhash(): """For each accountant group, find the min dHash pair per signature.""" conn = sqlite3.connect(DB_PATH) cur = conn.cursor() # Load all signatures with dhash cur.execute(''' SELECT s.signature_id, s.assigned_accountant, s.dhash_vector, s.image_filename FROM signatures s WHERE s.dhash_vector IS NOT NULL AND s.assigned_accountant IS NOT NULL ''') rows = cur.fetchall() print(f" Loaded {len(rows):,} signatures with dHash") # Group by accountant acct_groups = {} for sig_id, acct, dhash, filename in rows: acct_groups.setdefault(acct, []).append((sig_id, dhash, filename)) # Filter out singletons acct_groups = {k: v for k, v in acct_groups.items() if len(v) >= 2} total_sigs = sum(len(v) for v in acct_groups.values()) total_pairs = sum(len(v) * (len(v) - 1) // 2 for v in acct_groups.values()) print(f" {len(acct_groups)} accountants, {total_sigs:,} signatures, {total_pairs:,} pairs") t0 = time.time() updates = [] accts_done = 0 for acct, sigs in acct_groups.items(): n = len(sigs) sig_ids = [s[0] for s in sigs] hashes = [s[1] for s in sigs] filenames = [s[2] for s in sigs] # Unpack all hashes to bit arrays for vectorized hamming bits = np.array([np.unpackbits(np.frombuffer(h, dtype=np.uint8)) for h in hashes], dtype=np.uint8) # shape: (n, 64) # Pairwise hamming via XOR + sum # For groups up to ~2000, direct matrix computation is fine # hamming_matrix[i,j] = number of differing bits between i and j xor_matrix = bits[:, None, :] ^ bits[None, :, :] # (n, n, 64) hamming_matrix = xor_matrix.sum(axis=2) # (n, n) np.fill_diagonal(hamming_matrix, 999) # exclude self # For each signature, find min min_indices = np.argmin(hamming_matrix, axis=1) min_distances = hamming_matrix[np.arange(n), min_indices] for i in range(n): updates.append(( int(min_distances[i]), filenames[min_indices[i]], sig_ids[i] )) accts_done += 1 if accts_done % 100 == 0: elapsed = time.time() - t0 print(f" {accts_done}/{len(acct_groups)} accountants ({elapsed:.0f}s)") # Write to DB print(f" Writing {len(updates):,} results to DB...") cur.executemany(''' UPDATE signatures SET min_dhash_independent = ?, min_dhash_independent_match = ? WHERE signature_id = ? ''', updates) conn.commit() conn.close() elapsed = time.time() - t0 print(f" Phase 2 done: {len(updates):,} signatures in {elapsed:.1f}s") # ── Phase 3: Summary statistics ────────────────────────────────────── def print_summary(): """Print summary comparing conditional vs independent dHash.""" conn = sqlite3.connect(DB_PATH) cur = conn.cursor() # Overall stats cur.execute(''' SELECT COUNT(*) as n, AVG(phash_distance_to_closest) as cond_mean, AVG(min_dhash_independent) as indep_mean FROM signatures WHERE min_dhash_independent IS NOT NULL AND phash_distance_to_closest IS NOT NULL ''') n, cond_mean, indep_mean = cur.fetchone() print(f"\n{'='*65}") print(f" COMPARISON: Conditional vs Independent dHash") print(f"{'='*65}") print(f" N = {n:,}") print(f" Conditional dHash (cosine-nearest pair): mean = {cond_mean:.2f}") print(f" Independent dHash (all-pairs min): mean = {indep_mean:.2f}") # Percentiles cur.execute(''' SELECT phash_distance_to_closest, min_dhash_independent FROM signatures WHERE min_dhash_independent IS NOT NULL AND phash_distance_to_closest IS NOT NULL ''') rows = cur.fetchall() cond = np.array([r[0] for r in rows]) indep = np.array([r[1] for r in rows]) print(f"\n {'Percentile':<12} {'Conditional':>12} {'Independent':>12} {'Diff':>8}") print(f" {'-'*44}") for p in [1, 5, 10, 25, 50, 75, 90, 95, 99]: cv = np.percentile(cond, p) iv = np.percentile(indep, p) print(f" P{p:<10d} {cv:>12.1f} {iv:>12.1f} {iv-cv:>+8.1f}") # Agreement analysis print(f"\n Agreement analysis (both ≤ threshold):") for t in [5, 10, 15, 21]: both = np.sum((cond <= t) & (indep <= t)) cond_only = np.sum((cond <= t) & (indep > t)) indep_only = np.sum((cond > t) & (indep <= t)) neither = np.sum((cond > t) & (indep > t)) agree_pct = (both + neither) / len(cond) * 100 print(f" θ={t:>2d}: both={both:,}, cond_only={cond_only:,}, " f"indep_only={indep_only:,}, neither={neither:,} (agree={agree_pct:.1f}%)") # Firm A specific cur.execute(''' SELECT s.phash_distance_to_closest, s.min_dhash_independent FROM signatures s LEFT JOIN accountants a ON s.assigned_accountant = a.name WHERE a.firm = '勤業眾信聯合' AND s.min_dhash_independent IS NOT NULL AND s.phash_distance_to_closest IS NOT NULL ''') rows = cur.fetchall() if rows: cond_a = np.array([r[0] for r in rows]) indep_a = np.array([r[1] for r in rows]) print(f"\n Firm A (勤業眾信) — N={len(rows):,}:") print(f" {'Percentile':<12} {'Conditional':>12} {'Independent':>12}") print(f" {'-'*36}") for p in [50, 75, 90, 95, 99]: print(f" P{p:<10d} {np.percentile(cond_a, p):>12.1f} {np.percentile(indep_a, p):>12.1f}") conn.close() def main(): t_start = time.time() print("=" * 65) print(" Independent Min dHash Computation") print("=" * 65) print(f"\n[Phase 1] Computing dHash vectors...") phase1_compute_hashes() print(f"\n[Phase 2] Computing all-pairs min dHash per accountant...") phase2_compute_min_dhash() print(f"\n[Phase 3] Summary...") print_summary() elapsed = time.time() - t_start print(f"\nTotal time: {elapsed:.0f}s ({elapsed/60:.1f} min)") if __name__ == "__main__": main()