#!/usr/bin/env python3 """ Script 39d: dHash Discrete-Value Robustness Diagnostics ======================================================== Codex (gpt-5.5 xhigh) attack on Script 39b/39c findings revealed that the within-firm dHash dip-test rejections are driven by integer mass points (dHash takes integer values 0..64). A uniform jitter of [-0.5, +0.5] eliminates dip rejection in every firm tested. This script consolidates that finding into a permanent diagnostic and adds: 1. Raw vs jittered dip with multi-seed robustness (5 seeds) 2. Integer-histogram valley analysis: locate local minima between adjacent peaks in the binned integer distribution; report whether any valley centers near dh = 5 3. Firm-residualized dip on dHash (analog of cosine firm-mean centering that confirmed the cosine reframe) 4. Pairwise pair-coincidence: does the same same-CPA pair achieve both max cosine and min dHash, or are the two descriptors attached to different pairs? Foundation for "is (cos, dh) a joint signature regime descriptor or two parallel descriptors" This script does not derive operational thresholds; it characterises whether the v4.0 K=3 mixture and v3.x cos>0.95 AND dh<=5 rule are robustly supported once integer-discreteness artifacts are removed. Outputs: reports/v4_big4/dhash_discrete_robustness/ dhash_discrete_results.json dhash_discrete_report.md """ import json import sqlite3 import numpy as np import diptest from pathlib import Path from datetime import datetime DB = '/Volumes/NV2/PDF-Processing/signature-analysis/signature_analysis.db' OUT = Path('/Volumes/NV2/PDF-Processing/signature-analysis/reports/' 'v4_big4/dhash_discrete_robustness') OUT.mkdir(parents=True, exist_ok=True) BIG4 = ('勤業眾信聯合', '安侯建業聯合', '資誠聯合', '安永聯合') ALIAS = {'勤業眾信聯合': 'Firm A', '安侯建業聯合': 'Firm B', '資誠聯合': 'Firm C', '安永聯合': 'Firm D'} N_BOOT = 2000 JITTER_SEEDS = [42, 43, 44, 45, 46] SINGLE_FIRM_MIN_SIG = 500 def load_signatures(): conn = sqlite3.connect(f'file:{DB}?mode=ro', uri=True) cur = conn.cursor() cur.execute(''' SELECT a.firm, s.assigned_accountant, s.max_similarity_to_same_accountant, CAST(s.min_dhash_independent AS REAL) FROM signatures s JOIN accountants a ON s.assigned_accountant = a.name WHERE s.assigned_accountant IS NOT NULL AND s.max_similarity_to_same_accountant IS NOT NULL AND s.min_dhash_independent IS NOT NULL AND a.firm IS NOT NULL ''') rows = cur.fetchall() conn.close() return rows def dip(values, n_boot=N_BOOT): arr = np.asarray(values, dtype=float) arr = arr[np.isfinite(arr)] d, p = diptest.diptest(arr, boot_pval=True, n_boot=n_boot) return float(d), float(p) def multi_seed_jitter_dip(values, seeds=JITTER_SEEDS, n_boot=N_BOOT): """Compute dip stat + p-value across seeds; return distribution.""" arr = np.asarray(values, dtype=float) arr = arr[np.isfinite(arr)] stats = [] for seed in seeds: rng = np.random.default_rng(seed) j = arr + rng.uniform(-0.5, 0.5, len(arr)) d, p = diptest.diptest(j, boot_pval=True, n_boot=n_boot) stats.append({'seed': seed, 'dip': float(d), 'p': float(p)}) return { 'n_seeds': len(seeds), 'p_min': min(s['p'] for s in stats), 'p_max': max(s['p'] for s in stats), 'p_median': float(np.median([s['p'] for s in stats])), 'dip_min': min(s['dip'] for s in stats), 'dip_max': max(s['dip'] for s in stats), 'reject_at_05_count': int(sum(1 for s in stats if s['p'] <= 0.05)), 'per_seed': stats, } def integer_histogram_valleys(values, max_bin=20): """For integer-valued data, locate local minima in the count histogram on bins 0..max_bin. Returns valley positions and depths relative to flanking peaks.""" arr = np.asarray(values, dtype=float) arr = arr[np.isfinite(arr)] bins = np.arange(0, max_bin + 2) # 0, 1, ..., max_bin+1 counts, edges = np.histogram(arr, bins=bins) centers = (edges[:-1] + edges[1:]) / 2.0 valleys = [] for i in range(1, len(counts) - 1): if counts[i] < counts[i - 1] and counts[i] < counts[i + 1]: left_peak = counts[i - 1] right_peak = counts[i + 1] min_peak = min(left_peak, right_peak) depth_rel = (min_peak - counts[i]) / min_peak if min_peak else 0 valleys.append({ 'bin_center': float(centers[i]), 'count': int(counts[i]), 'left_peak_bin': int(centers[i - 1]), 'left_peak_count': int(left_peak), 'right_peak_bin': int(centers[i + 1]), 'right_peak_count': int(right_peak), 'depth_rel': float(depth_rel), }) return { 'histogram_bins_0_to_max': counts[:max_bin + 1].tolist(), 'valleys': valleys, 'note': ('valleys are bins where count < both neighbours; ' 'depth_rel = (min(neighbour) - bin) / min(neighbour). ' 'A genuine antimode would have a deep, stable valley ' 'with depth_rel > 0.1.'), } def firm_residualized(values, firm_labels): """Return values with firm means subtracted (centered to grand mean over firms). Used to test whether residual within-firm structure rejects unimodality.""" arr = np.asarray(values, dtype=float) firms = np.asarray(firm_labels) out = arr.copy() grand = float(np.mean(arr)) for f in np.unique(firms): m = firms == f out[m] = arr[m] - float(np.mean(arr[m])) + grand return out def pair_coincidence_rate(): """Fraction of signatures whose max-cosine partner equals the min-dHash partner within the same-CPA cross-year pool.""" conn = sqlite3.connect(f'file:{DB}?mode=ro', uri=True) cur = conn.cursor() cur.execute(''' SELECT COUNT(*) AS n_total, SUM(CASE WHEN max_cosine_pair_id IS NOT NULL AND min_dhash_pair_id IS NOT NULL AND max_cosine_pair_id = min_dhash_pair_id THEN 1 ELSE 0 END) AS n_same_pair, SUM(CASE WHEN max_cosine_pair_id IS NOT NULL AND min_dhash_pair_id IS NOT NULL AND max_cosine_pair_id != min_dhash_pair_id THEN 1 ELSE 0 END) AS n_diff_pair, SUM(CASE WHEN max_cosine_pair_id IS NULL OR min_dhash_pair_id IS NULL THEN 1 ELSE 0 END) AS n_null FROM signatures ''') row = cur.fetchone() conn.close() n_total, n_same, n_diff, n_null = row n_with_both = (n_same or 0) + (n_diff or 0) return { 'n_total': int(n_total or 0), 'n_with_both_pair_ids': int(n_with_both), 'n_same_pair': int(n_same or 0), 'n_diff_pair': int(n_diff or 0), 'n_null': int(n_null or 0), 'same_pair_rate': (float(n_same) / n_with_both if n_with_both else None), 'note': ('rate computed over signatures where both ' 'max_cosine_pair_id and min_dhash_pair_id are present'), } def _fmt_p(p): return '< 5e-4' if p == 0.0 else f'{p:.4g}' def main(): print('=' * 72) print('Script 39d: dHash Discrete-Value Robustness Diagnostics') print('=' * 72) rows = load_signatures() firms_raw = np.array([r[0] for r in rows]) cos = np.array([r[2] for r in rows], dtype=float) dh = np.array([r[3] for r in rows], dtype=float) is_big4 = np.isin(firms_raw, BIG4) n = len(rows) print(f'\nLoaded {n:,} signatures; Big-4 {is_big4.sum():,}, ' f'non-Big-4 {(~is_big4).sum():,}') results = { 'meta': { 'script': '39d', 'timestamp': datetime.now().isoformat(timespec='seconds'), 'n_total_signatures': int(n), 'n_big4': int(is_big4.sum()), 'n_non_big4': int((~is_big4).sum()), 'n_boot': N_BOOT, 'jitter_seeds': JITTER_SEEDS, 'note': ('Diagnostic for dHash integer-mass-point artifact ' 'in dip test; codex round-29 attack on Script 39b/c'), }, } # ---- A. Raw vs multi-seed jittered dip ---- print('\n[A] Raw vs jittered dip (5 seeds, n_boot=2000)') panels = {} # Big-4 pooled print(' Big-4 pooled:') raw_d, raw_p = dip(dh[is_big4]) j = multi_seed_jitter_dip(dh[is_big4]) panels['big4_pooled'] = { 'n': int(is_big4.sum()), 'raw': {'dip': raw_d, 'p': raw_p}, 'jittered': j, } print(f' raw: dip={raw_d:.5f}, p={_fmt_p(raw_p)}') print(f' jitter: p_median={j["p_median"]:.4g}, ' f'p_range=[{j["p_min"]:.4g}, {j["p_max"]:.4g}], ' f'reject@.05 in {j["reject_at_05_count"]}/5 seeds') # Each Big-4 firm for f in BIG4: mask = firms_raw == f if mask.sum() == 0: continue raw_d, raw_p = dip(dh[mask]) j = multi_seed_jitter_dip(dh[mask]) panels[ALIAS[f]] = { 'n': int(mask.sum()), 'raw': {'dip': raw_d, 'p': raw_p}, 'jittered': j, } print(f' {ALIAS[f]} (n={mask.sum():,}):') print(f' raw: dip={raw_d:.5f}, p={_fmt_p(raw_p)}') print(f' jitter: p_median={j["p_median"]:.4g}, ' f'reject@.05 in {j["reject_at_05_count"]}/5 seeds') # Non-Big-4 pooled print(' Non-Big-4 pooled:') raw_d, raw_p = dip(dh[~is_big4]) j = multi_seed_jitter_dip(dh[~is_big4]) panels['non_big4_pooled'] = { 'n': int((~is_big4).sum()), 'raw': {'dip': raw_d, 'p': raw_p}, 'jittered': j, } print(f' raw: dip={raw_d:.5f}, p={_fmt_p(raw_p)}') print(f' jitter: p_median={j["p_median"]:.4g}, ' f'reject@.05 in {j["reject_at_05_count"]}/5 seeds') results['raw_vs_jittered_dip'] = panels # ---- B. Integer-histogram valley analysis ---- print('\n[B] Integer-histogram valley analysis (bins 0..20)') valleys = {} valleys['big4_pooled'] = integer_histogram_valleys(dh[is_big4]) print(f' Big-4 pooled: {len(valleys["big4_pooled"]["valleys"])} valleys') for v in valleys['big4_pooled']['valleys']: print(f' bin {v["bin_center"]:.1f}: count={v["count"]}, ' f'depth_rel={v["depth_rel"]:.3f}') for f in BIG4: mask = firms_raw == f if mask.sum() == 0: continue valleys[ALIAS[f]] = integer_histogram_valleys(dh[mask]) print(f' {ALIAS[f]}: ' f'{len(valleys[ALIAS[f]]["valleys"])} valleys') for v in valleys[ALIAS[f]]['valleys']: print(f' bin {v["bin_center"]:.1f}: count={v["count"]}, ' f'depth_rel={v["depth_rel"]:.3f}') valleys['non_big4_pooled'] = integer_histogram_valleys(dh[~is_big4]) print(f' Non-Big-4 pooled: ' f'{len(valleys["non_big4_pooled"]["valleys"])} valleys') for v in valleys['non_big4_pooled']['valleys']: print(f' bin {v["bin_center"]:.1f}: count={v["count"]}, ' f'depth_rel={v["depth_rel"]:.3f}') results['integer_histogram_valleys'] = valleys # ---- C. Firm-residualized dip on dHash, signature level ---- print('\n[C] Firm-residualized dHash dip (signature level)') firm_labels = np.array([ ALIAS[f] if f in ALIAS else f'M:{f}' for f in firms_raw ]) # Big-4 only residualized over A/B/C/D dh_resid_big4 = firm_residualized(dh[is_big4], firm_labels[is_big4]) raw_d, raw_p = dip(dh[is_big4]) res_d, res_p = dip(dh_resid_big4) print(f' Big-4 raw: dip={raw_d:.5f}, p={_fmt_p(raw_p)}') print(f' Big-4 residualized: dip={res_d:.5f}, p={_fmt_p(res_p)}') # Also non-Big-4 residualized over their firms dh_resid_nbig4 = firm_residualized(dh[~is_big4], firm_labels[~is_big4]) raw_d_n, raw_p_n = dip(dh[~is_big4]) res_d_n, res_p_n = dip(dh_resid_nbig4) print(f' Non-Big-4 raw: dip={raw_d_n:.5f}, p={_fmt_p(raw_p_n)}') print(f' Non-Big-4 residualized: dip={res_d_n:.5f}, p={_fmt_p(res_p_n)}') results['firm_residualized_dh_dip'] = { 'big4': { 'raw': {'dip': raw_d, 'p': raw_p}, 'firm_residualized': {'dip': res_d, 'p': res_p}, }, 'non_big4': { 'raw': {'dip': raw_d_n, 'p': raw_p_n}, 'firm_residualized': {'dip': res_d_n, 'p': res_p_n}, }, 'note': ('Residualization subtracts each firm mean dh and adds ' 'back the grand mean. If residual dip rejects, there is ' 'genuine within-firm dh multimodality independent of ' 'between-firm mean shifts. If residual fails to reject, ' 'all dh "multimodality" was between-firm composition.'), } # ---- D. Pair-coincidence rate ---- print('\n[D] Pair-coincidence rate (max-cos pair vs min-dh pair)') try: pc = pair_coincidence_rate() if pc['same_pair_rate'] is not None: print(f' n_with_both: {pc["n_with_both_pair_ids"]:,}, ' f'same-pair rate: {pc["same_pair_rate"]:.4f}') else: print(' Pair IDs not stored in signatures table (skipped)') results['pair_coincidence'] = pc except sqlite3.OperationalError as e: print(f' SQL error (pair_id columns may not exist): {e}') results['pair_coincidence'] = { 'error': str(e), 'note': ('signatures table lacks max_cosine_pair_id / ' 'min_dhash_pair_id columns; analysis skipped'), } json_path = OUT / 'dhash_discrete_results.json' json_path.write_text(json.dumps(results, indent=2, ensure_ascii=False), encoding='utf-8') print(f'\n[json] {json_path}') # ---- Report markdown ---- md = ['# dHash Discrete-Value Robustness Diagnostics (Script 39d)', '', f'Generated: {results["meta"]["timestamp"]}', f'Bootstrap replicates: {N_BOOT}; jitter seeds: {JITTER_SEEDS}', '', '## A. Raw vs jittered dHash dip (signature level)', '', ('dHash is integer-valued in [0, 64]. A raw dip test on ' 'integer mass points may reject unimodality due to discrete ' 'spikes rather than a continuous bimodal density. We add ' 'uniform jitter in [-0.5, +0.5] over 5 seeds and re-test.'), '', '| Scope | n | raw dip | raw p | jitter p median | jitter reject@.05 / 5 seeds |', '|---|---|---|---|---|---|'] for key, label in [('big4_pooled', 'Big-4 pooled')] + \ [(ALIAS[f], ALIAS[f]) for f in BIG4] + \ [('non_big4_pooled', 'Non-Big-4 pooled')]: if key in panels: p = panels[key] md.append(f'| {label} | {p["n"]:,} | ' f'{p["raw"]["dip"]:.5f} | ' f'{_fmt_p(p["raw"]["p"])} | ' f'{p["jittered"]["p_median"]:.4g} | ' f'{p["jittered"]["reject_at_05_count"]}/5 |') md += ['', '**Interpretation.** If jittered dip ceases to reject in all ' 'panels, the raw-data rejection was driven by integer ties ' 'rather than a continuous bimodal density. Codex round-29 ' 'observed this pattern; this script confirms with multi-seed ' 'robustness.', '', '## B. Integer-histogram valley locations (bins 0..20)', '', ('For each scope, list bins where count is strictly less ' 'than both neighbours, with relative depth ' '(min(neighbour) - bin) / min(neighbour). A genuine ' 'antimode would show a deep, stable valley; integer-noise ' 'valleys are shallow and inconsistent across firms.'), ''] for key, label in [('big4_pooled', 'Big-4 pooled')] + \ [(ALIAS[f], ALIAS[f]) for f in BIG4] + \ [('non_big4_pooled', 'Non-Big-4 pooled')]: if key in valleys: v_list = valleys[key]['valleys'] if not v_list: md.append(f'- **{label}**: no integer-histogram valleys ' f'in 0..20') else: desc = ', '.join( f'dh={v["bin_center"]:.0f} (depth_rel={v["depth_rel"]:.3f})' for v in v_list) md.append(f'- **{label}**: {desc}') md += ['', '## C. Firm-residualized dHash dip', '', ('Subtract each firm mean dHash; add back grand mean. If ' 'residual rejects, within-firm multimodality is genuine. ' 'If residual fails to reject, all dh "multimodality" was ' 'between-firm composition.'), '', '| Scope | raw dip | raw p | residualized dip | residualized p |', '|---|---|---|---|---|'] fr = results['firm_residualized_dh_dip'] md += [f'| Big-4 | {fr["big4"]["raw"]["dip"]:.5f} | ' f'{_fmt_p(fr["big4"]["raw"]["p"])} | ' f'{fr["big4"]["firm_residualized"]["dip"]:.5f} | ' f'{_fmt_p(fr["big4"]["firm_residualized"]["p"])} |', f'| Non-Big-4 | {fr["non_big4"]["raw"]["dip"]:.5f} | ' f'{_fmt_p(fr["non_big4"]["raw"]["p"])} | ' f'{fr["non_big4"]["firm_residualized"]["dip"]:.5f} | ' f'{_fmt_p(fr["non_big4"]["firm_residualized"]["p"])} |'] md += ['', '## D. Max-cos pair vs min-dh pair coincidence', ''] pc = results.get('pair_coincidence', {}) if 'same_pair_rate' in pc and pc['same_pair_rate'] is not None: md += [f'- n_signatures with both pair IDs: ' f'{pc["n_with_both_pair_ids"]:,}', f'- same-pair rate: {pc["same_pair_rate"]:.4f} ' f'({pc["n_same_pair"]:,} of ' f'{pc["n_with_both_pair_ids"]:,})', '', ('A high rate (>0.8) supports a single-pair regime ' 'descriptor language (cos and dh attached to the same ' 'partner). A low rate indicates the two descriptors ' 'attach to different partners and should be discussed ' 'as parallel-but-different evidence.')] elif 'error' in pc: md += [f'- column not present in DB: {pc["error"]}', ('- note: schema-dependent; pair IDs not currently stored ' 'in signatures table.')] md.append('') md_path = OUT / 'dhash_discrete_report.md' md_path.write_text('\n'.join(md), encoding='utf-8') print(f'[md ] {md_path}') if __name__ == '__main__': main()