#!/usr/bin/env python3 """ Script 28: Byte-Identity Decomposition + Cross-Firm Dual-Descriptor Convergence ================================================================================ Produces two reproducible artifacts cited in the manuscript that previously lacked dedicated provenance (codex review v3.18.1 items #7 and #8): (#7) Byte-identical Firm A signature decomposition: - Total Firm A signatures with pixel_identical_to_closest = 1 - Number of distinct Firm A partners they span - Number of partners in the registry (denominator) - Number of byte-identical pairs that span DIFFERENT fiscal years (#8) Cross-firm dual-descriptor convergence: - Among signatures with cosine > 0.95 (per-signature best-match), the fraction with min_dhash_independent <= 5, broken out by Firm A vs Non-Firm-A. Output: /Volumes/NV2/PDF-Processing/signature-analysis/reports/byte_identity_decomp/ byte_identity_decomposition.json byte_identity_decomposition.md These figures are intended to be cited from the paper (Section IV-F.1 for #7; Section IV-H.2 for #8) so that every quantitative claim in the manuscript traces to a specific JSON field. """ import json import sqlite3 from datetime import datetime from pathlib import Path DB = '/Volumes/NV2/PDF-Processing/signature-analysis/signature_analysis.db' OUT = Path('/Volumes/NV2/PDF-Processing/signature-analysis/reports/' 'byte_identity_decomp') OUT.mkdir(parents=True, exist_ok=True) FIRM_A = '勤業眾信聯合' def byte_identity_decomposition(conn): """Codex item #7: 145 / 50 / 180 / 35 decomposition.""" cur = conn.cursor() cur.execute(""" SELECT COUNT(DISTINCT name) FROM accountants WHERE firm = ? """, (FIRM_A,)) n_registered_partners = cur.fetchone()[0] cur.execute(""" WITH byte_pairs AS ( SELECT s1.signature_id AS sig_a, s1.assigned_accountant AS partner, s1.year_month AS ym_a, s2.year_month AS ym_b FROM signatures s1 JOIN signatures s2 ON s1.closest_match_file = s2.image_filename WHERE s1.pixel_identical_to_closest = 1 AND s1.excel_firm = ? ) SELECT COUNT(*) AS total_pixel_identical_firm_a, COUNT(DISTINCT partner) AS partners_with_pixel_identical, SUM(CASE WHEN substr(ym_a,1,4) <> substr(ym_b,1,4) THEN 1 ELSE 0 END) AS cross_year_pairs FROM byte_pairs """, (FIRM_A,)) n_total, n_partners, n_cross_year = cur.fetchone() return { 'definition': ( 'Among Firm A signatures whose nearest same-CPA match is ' 'byte-identical after crop and normalization ' '(pixel_identical_to_closest = 1), this section reports the ' 'count, the distinct-partner spread, the registry denominator, ' 'and the subset whose byte-identical match is in a different ' 'fiscal year.' ), 'firm_label': 'Firm A', 'n_pixel_identical_firm_a_signatures': n_total, 'n_distinct_partners_with_pixel_identical': n_partners, 'n_registered_partners_in_firm_a': n_registered_partners, 'partner_coverage_share': round(n_partners / n_registered_partners, 4), 'n_cross_year_byte_identical_pairs': n_cross_year, } def cross_firm_dual_convergence(conn): """Codex item #8: per-signature dual-descriptor convergence by firm.""" cur = conn.cursor() cur.execute(""" SELECT CASE WHEN excel_firm = ? THEN 'Firm A' ELSE 'Non-Firm-A' END AS firm_group, COUNT(*) AS n_signatures_above_095, SUM(CASE WHEN min_dhash_independent <= 5 THEN 1 ELSE 0 END) AS n_dhash_le_5 FROM signatures WHERE max_similarity_to_same_accountant > 0.95 AND assigned_accountant IS NOT NULL AND min_dhash_independent IS NOT NULL GROUP BY firm_group ORDER BY firm_group """, (FIRM_A,)) rows = cur.fetchall() by_group = {} for firm_group, n_above, n_dhash in rows: by_group[firm_group] = { 'n_signatures_above_cosine_095': n_above, 'n_dhash_indep_le_5': n_dhash, 'pct_dhash_indep_le_5': round(100.0 * n_dhash / n_above, 2), } return { 'definition': ( 'Per-signature best-match cosine > 0.95 AND assigned_accountant ' 'IS NOT NULL AND min_dhash_independent IS NOT NULL. The reported ' 'percentage is the share of these signatures whose independent ' 'min dHash to any same-CPA signature is <= 5.' ), 'unit_of_observation': 'signature', 'cosine_threshold': 0.95, 'dhash_indep_threshold': 5, 'by_firm_group': by_group, } def write_markdown(payload, path): bid = payload['byte_identity_decomposition'] cf = payload['cross_firm_dual_convergence'] lines = [] lines.append('# Byte-Identity Decomposition + Cross-Firm Dual-Descriptor ' 'Convergence') lines.append('') lines.append(f"Generated at: {payload['generated_at']}") lines.append('') lines.append('## 1. Byte-Identity Decomposition (Firm A)') lines.append('') lines.append(bid['definition']) lines.append('') lines.append('| Quantity | Value |') lines.append('|----------|-------|') lines.append(f"| Pixel-identical Firm A signatures | " f"{bid['n_pixel_identical_firm_a_signatures']} |") lines.append(f"| Distinct Firm A partners with at least one such pair | " f"{bid['n_distinct_partners_with_pixel_identical']} |") lines.append(f"| Registered Firm A partners | " f"{bid['n_registered_partners_in_firm_a']} |") lines.append(f"| Partner coverage share | " f"{bid['partner_coverage_share']:.3f} |") lines.append(f"| Pairs whose byte-identical match spans different fiscal " f"years | {bid['n_cross_year_byte_identical_pairs']} |") lines.append('') lines.append('## 2. Cross-Firm Dual-Descriptor Convergence') lines.append('') lines.append(cf['definition']) lines.append('') lines.append('| Firm group | N signatures with cosine > 0.95 | ' 'N with dHash_indep <= 5 | % with dHash_indep <= 5 |') lines.append('|------------|--------------------------------:|' '------------------------:|------------------------:|') for grp in ('Firm A', 'Non-Firm-A'): g = cf['by_firm_group'][grp] lines.append(f"| {grp} | " f"{g['n_signatures_above_cosine_095']:,} | " f"{g['n_dhash_indep_le_5']:,} | " f"{g['pct_dhash_indep_le_5']:.2f}% |") path.write_text('\n'.join(lines) + '\n', encoding='utf-8') def main(): conn = sqlite3.connect(DB) try: payload = { 'generated_at': datetime.now().isoformat(timespec='seconds'), 'database_path': DB, 'firm_a_label': FIRM_A, 'byte_identity_decomposition': byte_identity_decomposition(conn), 'cross_firm_dual_convergence': cross_firm_dual_convergence(conn), } finally: conn.close() json_path = OUT / 'byte_identity_decomposition.json' json_path.write_text(json.dumps(payload, indent=2, ensure_ascii=False), encoding='utf-8') print(f'Wrote {json_path}') md_path = OUT / 'byte_identity_decomposition.md' write_markdown(payload, md_path) print(f'Wrote {md_path}') if __name__ == '__main__': main()