Files
pdf_signature_extraction/paper/recalibrate_classification.py
T
gbanyan 939a348da4 Add Paper A (IEEE TAI) complete draft with Firm A-calibrated dual-method classification
Paper draft includes all sections (Abstract through Conclusion), 36 references,
and supporting scripts. Key methodology: Cosine similarity + dHash dual-method
verification with thresholds calibrated against known-replication firm (Firm A).

Includes:
- 8 section markdown files (paper_a_*.md)
- Ablation study script (ResNet-50 vs VGG-16 vs EfficientNet-B0)
- Recalibrated classification script (84,386 PDFs, 5-tier system)
- Figure generation and Word export scripts
- Citation renumbering script ([1]-[36])
- Signature analysis pipeline (12 steps)
- YOLO extraction scripts

Three rounds of AI review completed (GPT-5.4, Claude Opus 4.6, Gemini 3 Pro).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-06 23:05:33 +08:00

306 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
Recalibrate classification using Firm A as ground truth.
Dual-method only: Cosine + dHash (drops SSIM and pixel-identical).
Approach:
1. Load per-signature best-match cosine + pHash from DB
2. Use Firm A (勤業眾信聯合) as known-positive calibration set
3. Analyze 2D distribution (cosine × pHash) for Firm A vs others
4. Determine calibrated thresholds
5. Reclassify all PDFs
6. Output new Table VII
"""
import sqlite3
import numpy as np
from collections import defaultdict
from pathlib import Path
import json
DB_PATH = '/Volumes/NV2/PDF-Processing/signature-analysis/signature_analysis.db'
OUTPUT_DIR = Path('/Volumes/NV2/PDF-Processing/signature-analysis/recalibrated')
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
FIRM_A = '勤業眾信聯合'
KDE_CROSSOVER = 0.837 # from intra/inter analysis
def load_data():
"""Load per-signature data with cosine and pHash."""
conn = sqlite3.connect(DB_PATH)
cur = conn.cursor()
cur.execute('''
SELECT s.signature_id, s.image_filename, s.assigned_accountant,
s.max_similarity_to_same_accountant,
s.phash_distance_to_closest,
a.firm
FROM signatures s
LEFT JOIN accountants a ON s.assigned_accountant = a.name
WHERE s.assigned_accountant IS NOT NULL
AND s.max_similarity_to_same_accountant IS NOT NULL
''')
rows = cur.fetchall()
conn.close()
data = []
for r in rows:
data.append({
'sig_id': r[0],
'filename': r[1],
'accountant': r[2],
'cosine': r[3],
'phash': r[4], # may be None
'firm': r[5],
})
print(f"Loaded {len(data):,} signatures")
return data
def analyze_firm_a(data):
"""Analyze Firm A's dual-method distribution to calibrate thresholds."""
firm_a = [d for d in data if d['firm'] == FIRM_A]
others = [d for d in data if d['firm'] != FIRM_A]
print(f"\n{'='*60}")
print(f"FIRM A CALIBRATION ANALYSIS")
print(f"{'='*60}")
print(f"Firm A signatures: {len(firm_a):,}")
print(f"Other signatures: {len(others):,}")
# Firm A cosine distribution
fa_cosine = np.array([d['cosine'] for d in firm_a])
ot_cosine = np.array([d['cosine'] for d in others])
print(f"\n--- Cosine Similarity ---")
print(f"Firm A: mean={fa_cosine.mean():.4f}, std={fa_cosine.std():.4f}, "
f"p1={np.percentile(fa_cosine,1):.4f}, p5={np.percentile(fa_cosine,5):.4f}")
print(f"Others: mean={ot_cosine.mean():.4f}, std={ot_cosine.std():.4f}")
# Firm A pHash distribution (only where available)
fa_phash = [d['phash'] for d in firm_a if d['phash'] is not None]
ot_phash = [d['phash'] for d in others if d['phash'] is not None]
print(f"\n--- pHash (dHash) Distance ---")
print(f"Firm A with pHash: {len(fa_phash):,}")
print(f"Others with pHash: {len(ot_phash):,}")
if fa_phash:
fa_ph = np.array(fa_phash)
print(f"Firm A: mean={fa_ph.mean():.2f}, median={np.median(fa_ph):.0f}, "
f"p95={np.percentile(fa_ph,95):.0f}")
print(f" pHash=0: {(fa_ph==0).sum():,} ({100*(fa_ph==0).mean():.1f}%)")
print(f" pHash<=2: {(fa_ph<=2).sum():,} ({100*(fa_ph<=2).mean():.1f}%)")
print(f" pHash<=5: {(fa_ph<=5).sum():,} ({100*(fa_ph<=5).mean():.1f}%)")
print(f" pHash<=10:{(fa_ph<=10).sum():,} ({100*(fa_ph<=10).mean():.1f}%)")
print(f" pHash<=15:{(fa_ph<=15).sum():,} ({100*(fa_ph<=15).mean():.1f}%)")
print(f" pHash>15: {(fa_ph>15).sum():,} ({100*(fa_ph>15).mean():.1f}%)")
if ot_phash:
ot_ph = np.array(ot_phash)
print(f"\nOthers: mean={ot_ph.mean():.2f}, median={np.median(ot_ph):.0f}")
print(f" pHash=0: {(ot_ph==0).sum():,} ({100*(ot_ph==0).mean():.1f}%)")
print(f" pHash<=5: {(ot_ph<=5).sum():,} ({100*(ot_ph<=5).mean():.1f}%)")
print(f" pHash<=10:{(ot_ph<=10).sum():,} ({100*(ot_ph<=10).mean():.1f}%)")
print(f" pHash>15: {(ot_ph>15).sum():,} ({100*(ot_ph>15).mean():.1f}%)")
# 2D analysis: cosine × pHash for Firm A
print(f"\n--- 2D Analysis: Cosine × pHash (Firm A) ---")
fa_both = [(d['cosine'], d['phash']) for d in firm_a if d['phash'] is not None]
if fa_both:
cosines, phashes = zip(*fa_both)
cosines = np.array(cosines)
phashes = np.array(phashes)
# Cross-tabulate
for cos_thresh in [0.95, 0.90, KDE_CROSSOVER]:
for ph_thresh in [5, 10, 15]:
match = ((cosines > cos_thresh) & (phashes <= ph_thresh)).sum()
total = len(cosines)
print(f" Cosine>{cos_thresh:.3f} AND pHash<={ph_thresh}: "
f"{match:,}/{total:,} ({100*match/total:.1f}%)")
# Same for others (high cosine subset)
print(f"\n--- 2D Analysis: Cosine × pHash (Others, cosine > 0.95 only) ---")
ot_both_high = [(d['cosine'], d['phash']) for d in others
if d['phash'] is not None and d['cosine'] > 0.95]
if ot_both_high:
cosines_o, phashes_o = zip(*ot_both_high)
phashes_o = np.array(phashes_o)
print(f" N (others with cosine>0.95 and pHash): {len(ot_both_high):,}")
for ph_thresh in [5, 10, 15]:
match = (phashes_o <= ph_thresh).sum()
print(f" pHash<={ph_thresh}: {match:,}/{len(phashes_o):,} ({100*match/len(phashes_o):.1f}%)")
return fa_phash, ot_phash
def reclassify_pdfs(data):
"""
Reclassify all PDFs using calibrated dual-method thresholds.
New classification (cosine + dHash only):
1. High-confidence replication: cosine > 0.95 AND pHash ≤ 5
2. Moderate-confidence replication: cosine > 0.95 AND pHash 6-15
3. High style consistency: cosine > 0.95 AND (pHash > 15 OR pHash unavailable)
4. Uncertain: cosine between KDE_CROSSOVER and 0.95
5. Likely genuine: cosine < KDE_CROSSOVER
"""
# Group signatures by PDF (derive PDF from filename pattern)
# Filename format: {company}_{year}_{type}_sig{N}.png or similar
# We need to group by source PDF
conn = sqlite3.connect(DB_PATH)
cur = conn.cursor()
# Get PDF-level data
cur.execute('''
SELECT s.signature_id, s.image_filename, s.assigned_accountant,
s.max_similarity_to_same_accountant,
s.phash_distance_to_closest,
a.firm
FROM signatures s
LEFT JOIN accountants a ON s.assigned_accountant = a.name
WHERE s.assigned_accountant IS NOT NULL
AND s.max_similarity_to_same_accountant IS NOT NULL
''')
rows = cur.fetchall()
# Group by PDF: extract PDF identifier from filename
# Signature filenames are like: {pdfname}_page{N}_sig{M}.png
pdf_sigs = defaultdict(list)
for r in rows:
sig_id, filename, accountant, cosine, phash, firm = r
# Extract PDF name (everything before _page or _sig)
parts = filename.rsplit('_sig', 1)
pdf_key = parts[0] if len(parts) > 1 else filename.rsplit('.', 1)[0]
# Further strip _page part
page_parts = pdf_key.rsplit('_page', 1)
pdf_key = page_parts[0] if len(page_parts) > 1 else pdf_key
pdf_sigs[pdf_key].append({
'cosine': cosine,
'phash': phash,
'firm': firm,
'accountant': accountant,
})
conn.close()
print(f"\n{'='*60}")
print(f"RECLASSIFICATION (Dual-Method: Cosine + dHash)")
print(f"{'='*60}")
print(f"Total PDFs: {len(pdf_sigs):,}")
# Classify each PDF based on its signatures
verdicts = defaultdict(int)
firm_a_verdicts = defaultdict(int)
details = []
for pdf_key, sigs in pdf_sigs.items():
# Use the signature with the highest cosine as the representative
best_sig = max(sigs, key=lambda s: s['cosine'])
cosine = best_sig['cosine']
phash = best_sig['phash']
is_firm_a = best_sig['firm'] == FIRM_A
# Also check if ANY signature in this PDF has low pHash
min_phash = None
for s in sigs:
if s['phash'] is not None:
if min_phash is None or s['phash'] < min_phash:
min_phash = s['phash']
# Classification
if cosine > 0.95 and min_phash is not None and min_phash <= 5:
verdict = 'high_confidence_replication'
elif cosine > 0.95 and min_phash is not None and min_phash <= 15:
verdict = 'moderate_confidence_replication'
elif cosine > 0.95:
verdict = 'high_style_consistency'
elif cosine > KDE_CROSSOVER:
verdict = 'uncertain'
else:
verdict = 'likely_genuine'
verdicts[verdict] += 1
if is_firm_a:
firm_a_verdicts[verdict] += 1
details.append({
'pdf': pdf_key,
'cosine': cosine,
'min_phash': min_phash,
'verdict': verdict,
'is_firm_a': is_firm_a,
})
total = sum(verdicts.values())
firm_a_total = sum(firm_a_verdicts.values())
# Print results
print(f"\n--- New Classification Results ---")
print(f"{'Verdict':<35} {'Count':>8} {'%':>7} | {'Firm A':>8} {'%':>7}")
print("-" * 75)
order = ['high_confidence_replication', 'moderate_confidence_replication',
'high_style_consistency', 'uncertain', 'likely_genuine']
labels = {
'high_confidence_replication': 'High-conf. replication',
'moderate_confidence_replication': 'Moderate-conf. replication',
'high_style_consistency': 'High style consistency',
'uncertain': 'Uncertain',
'likely_genuine': 'Likely genuine',
}
for v in order:
n = verdicts.get(v, 0)
fa = firm_a_verdicts.get(v, 0)
pct = 100 * n / total if total > 0 else 0
fa_pct = 100 * fa / firm_a_total if firm_a_total > 0 else 0
print(f" {labels.get(v, v):<33} {n:>8,} {pct:>6.1f}% | {fa:>8,} {fa_pct:>6.1f}%")
print("-" * 75)
print(f" {'Total':<33} {total:>8,} {'100.0%':>7} | {firm_a_total:>8,} {'100.0%':>7}")
# Precision/Recall using Firm A as positive set
print(f"\n--- Firm A Capture Rate (Calibration Validation) ---")
fa_replication = firm_a_verdicts.get('high_confidence_replication', 0) + \
firm_a_verdicts.get('moderate_confidence_replication', 0)
print(f" Firm A classified as replication (high+moderate): {fa_replication:,}/{firm_a_total:,} "
f"({100*fa_replication/firm_a_total:.1f}%)")
fa_high = firm_a_verdicts.get('high_confidence_replication', 0)
print(f" Firm A classified as high-confidence: {fa_high:,}/{firm_a_total:,} "
f"({100*fa_high/firm_a_total:.1f}%)")
# Save results
results = {
'classification': {v: verdicts.get(v, 0) for v in order},
'firm_a': {v: firm_a_verdicts.get(v, 0) for v in order},
'total_pdfs': total,
'firm_a_pdfs': firm_a_total,
'thresholds': {
'cosine_high': 0.95,
'kde_crossover': KDE_CROSSOVER,
'phash_high_confidence': 5,
'phash_moderate_confidence': 15,
},
}
with open(OUTPUT_DIR / 'recalibrated_results.json', 'w') as f:
json.dump(results, f, indent=2)
print(f"\nResults saved: {OUTPUT_DIR / 'recalibrated_results.json'}")
return results
def main():
data = load_data()
analyze_firm_a(data)
results = reclassify_pdfs(data)
if __name__ == "__main__":
main()