#!/usr/bin/env python3 """ Script 46: Alert-Rate Sensitivity / Threshold-Plateau Analysis ============================================================== Anchor-based screening framework supplementary validation. With no ground-truth labels, "threshold validation" can only be done via proxies. One proxy: alert-rate sensitivity to threshold perturbation. If the v3-inherited threshold (cos>0.95 AND dh<=5) sits at a low-gradient region of the (cos, dh) -> alert-rate surface, that is weak evidence the threshold is a stable operating point. If the surface is everywhere smooth with no plateau, the threshold is an arbitrary point in a continuous specificity-recall tradeoff -- which is consistent with the "no natural threshold" finding from Scripts 39b-39e (composition decomposition) and supports the multi-level screening framework framing. This script computes alert rates (using actual observed Big-4 descriptors, NOT inter-CPA simulated pools) across: - 1D cos threshold sweep at fixed dh<=5 - 1D dh threshold sweep at fixed cos>0.95 - 2D (cos, dh) grid Per firm and pooled. Gradient-based plateau detection. Note: this uses observed (max_cos, min_dh) from each Big-4 signature's real same-CPA pool, i.e., the deployment-side behavior of the rule on the actual corpus (not the inter-CPA negative anchor). Outputs: reports/v4_big4/alert_rate_sensitivity/ alert_rate_results.json alert_rate_report.md """ import json import sqlite3 import numpy as np from pathlib import Path from datetime import datetime from collections import defaultdict DB = '/Volumes/NV2/PDF-Processing/signature-analysis/signature_analysis.db' OUT = Path('/Volumes/NV2/PDF-Processing/signature-analysis/reports/' 'v4_big4/alert_rate_sensitivity') OUT.mkdir(parents=True, exist_ok=True) BIG4 = ('勤業眾信聯合', '安侯建業聯合', '資誠聯合', '安永聯合') ALIAS = {'勤業眾信聯合': 'Firm A', '安侯建業聯合': 'Firm B', '資誠聯合': 'Firm C', '安永聯合': 'Firm D'} # Threshold grids COS_GRID = np.arange(0.80, 1.00, 0.005) # 41 points DH_GRID = np.arange(0, 21, 1) # 21 integer points COS_FOR_2D = np.arange(0.85, 1.00, 0.01) # 16 cos points for 2D DH_FOR_2D = np.arange(0, 21, 1) # 21 dh points for 2D def load_big4(): conn = sqlite3.connect(f'file:{DB}?mode=ro', uri=True) cur = conn.cursor() cur.execute(''' SELECT s.assigned_accountant, a.firm, s.source_pdf, s.max_similarity_to_same_accountant, CAST(s.min_dhash_independent AS REAL) FROM signatures s JOIN accountants a ON s.assigned_accountant = a.name WHERE s.assigned_accountant IS NOT NULL AND s.max_similarity_to_same_accountant IS NOT NULL AND s.min_dhash_independent IS NOT NULL AND a.firm IN (?, ?, ?, ?) ''', BIG4) rows = cur.fetchall() conn.close() return rows def alert_rate(cos_arr, dh_arr, cos_k, dh_k): """Fraction of (cos, dh) pairs satisfying cos>cos_k AND dh<=dh_k.""" n = len(cos_arr) if n == 0: return 0.0 return float(((cos_arr > cos_k) & (dh_arr <= dh_k)).mean()) def plateau_gradient(cos_grid, rates): """Return absolute gradient |d(rate)/d(threshold)| for each interior point, plus min and median gradient.""" rates = np.asarray(rates) grads = np.abs(np.diff(rates) / np.diff(cos_grid)) return { 'gradients': grads.tolist(), 'min': float(grads.min()) if len(grads) else None, 'median': float(np.median(grads)) if len(grads) else None, 'max': float(grads.max()) if len(grads) else None, 'argmin_threshold': float(cos_grid[int(np.argmin(grads))]) if len(grads) else None, } def main(): print('=' * 72) print('Script 46: Alert-Rate Sensitivity / Threshold-Plateau Analysis') print('=' * 72) rows = load_big4() n_sigs = len(rows) print(f'\nLoaded {n_sigs:,} Big-4 signatures') firms = np.array([ALIAS[r[1]] for r in rows]) source_pdfs = np.array([r[2] for r in rows]) cos = np.array([r[3] for r in rows], dtype=np.float32) dh = np.array([r[4] for r in rows], dtype=np.int32) # Document grouping doc_idx = defaultdict(list) for i, pdf in enumerate(source_pdfs): doc_idx[pdf].append(i) n_docs = len(doc_idx) print(f' Documents: {n_docs:,}') # Per-document worst-case (max cos, min dh) def doc_alert_rate(cos_k, dh_k): """Fraction of docs with any signature satisfying rule.""" hit_docs = 0 for pdf, idxs in doc_idx.items(): idxs_a = np.array(idxs, dtype=np.int64) if ((cos[idxs_a] > cos_k) & (dh[idxs_a] <= dh_k)).any(): hit_docs += 1 return hit_docs / n_docs results = { 'meta': { 'script': '46', 'timestamp': datetime.now().isoformat(timespec='seconds'), 'n_signatures': n_sigs, 'n_documents': n_docs, 'note': ('Alert-rate sensitivity using observed descriptors ' '(not inter-CPA simulation). Per-signature and ' 'per-document; pooled and per-firm.'), }, } # ── 1D cos sweep at fixed dh<=5 ── print('\n[1D cos sweep at dh<=5]') sig_rates_cos = {} sig_rates_cos['pooled'] = [alert_rate(cos, dh, k, 5) for k in COS_GRID] for f in sorted(set(firms)): mask = firms == f sig_rates_cos[f] = [alert_rate(cos[mask], dh[mask], k, 5) for k in COS_GRID] print(' cos | pooled | Firm A | Firm B | Firm C | Firm D') for i, k in enumerate(COS_GRID): if i % 4 == 0 or abs(k - 0.95) < 1e-6: line = f' {k:.3f} | {sig_rates_cos["pooled"][i]:.4f}' for f in ['Firm A', 'Firm B', 'Firm C', 'Firm D']: line += f' | {sig_rates_cos[f][i]:.4f}' print(line) cos_pooled_grad = plateau_gradient(COS_GRID, sig_rates_cos['pooled']) print(f'\n pooled gradient summary: min={cos_pooled_grad["min"]:.5f}, ' f'median={cos_pooled_grad["median"]:.5f}, ' f'max={cos_pooled_grad["max"]:.5f}') print(f' argmin of |grad| at cos={cos_pooled_grad["argmin_threshold"]:.3f}') # ── 1D dh sweep at fixed cos>0.95 ── print('\n[1D dh sweep at cos>0.95]') sig_rates_dh = {} sig_rates_dh['pooled'] = [alert_rate(cos, dh, 0.95, k) for k in DH_GRID] for f in sorted(set(firms)): mask = firms == f sig_rates_dh[f] = [alert_rate(cos[mask], dh[mask], 0.95, k) for k in DH_GRID] print(' dh | pooled | Firm A | Firm B | Firm C | Firm D') for i, k in enumerate(DH_GRID): line = f' {k:2d} | {sig_rates_dh["pooled"][i]:.4f}' for f in ['Firm A', 'Firm B', 'Firm C', 'Firm D']: line += f' | {sig_rates_dh[f][i]:.4f}' print(line) dh_pooled_grad = plateau_gradient(DH_GRID, sig_rates_dh['pooled']) print(f'\n pooled gradient summary: min={dh_pooled_grad["min"]:.5f}, ' f'median={dh_pooled_grad["median"]:.5f}, ' f'max={dh_pooled_grad["max"]:.5f}') print(f' argmin of |grad| at dh={dh_pooled_grad["argmin_threshold"]:.0f}') # ── 2D (cos, dh) surface ── print('\n[2D (cos, dh) alert-rate surface]') surface = np.zeros((len(COS_FOR_2D), len(DH_FOR_2D)), dtype=np.float32) for i, ck in enumerate(COS_FOR_2D): for j, dk in enumerate(DH_FOR_2D): surface[i, j] = alert_rate(cos, dh, ck, dk) print(' Surface dimensions:', surface.shape) # Print a few key rows for i, ck in enumerate(COS_FOR_2D): if abs(ck - 0.85) < 1e-6 or abs(ck - 0.90) < 1e-6 \ or abs(ck - 0.95) < 1e-6 or abs(ck - 0.98) < 1e-6: line = f' cos>{ck:.2f}:' for j, dk in enumerate(DH_FOR_2D): if dk in [0, 3, 5, 8, 10, 15, 20]: line += f' dh<={dk}: {surface[i, j]:.4f},' print(line) # Compute 2D gradient magnitude at key threshold (cos=0.95, dh=5) # Find indices i95 = int(np.argmin(np.abs(COS_FOR_2D - 0.95))) j5 = int(np.argmin(np.abs(DH_FOR_2D - 5))) if 0 < i95 < len(COS_FOR_2D) - 1 and 0 < j5 < len(DH_FOR_2D) - 1: dcos = (surface[i95 + 1, j5] - surface[i95 - 1, j5]) / \ (COS_FOR_2D[i95 + 1] - COS_FOR_2D[i95 - 1]) ddh = (surface[i95, j5 + 1] - surface[i95, j5 - 1]) / \ (DH_FOR_2D[j5 + 1] - DH_FOR_2D[j5 - 1]) grad_mag = float(np.sqrt(dcos ** 2 + ddh ** 2)) else: dcos = ddh = grad_mag = None print(f'\n At (cos=0.95, dh=5): rate={surface[i95, j5]:.4f}') print(f' d(rate)/d(cos) ~ {dcos:.4f} (per unit cos)') print(f' d(rate)/d(dh) ~ {ddh:.4f} (per unit dh)') print(f' gradient magnitude ~ {grad_mag:.4f}') # ── Document-level 1D cos sweep ── print('\n[Document-level 1D cos sweep at dh<=5]') doc_rates_cos = [doc_alert_rate(k, 5) for k in COS_GRID] for i, k in enumerate(COS_GRID): if i % 4 == 0 or abs(k - 0.95) < 1e-6: print(f' cos > {k:.3f}: doc-FAR (HC) = {doc_rates_cos[i]:.4f}') doc_cos_grad = plateau_gradient(COS_GRID, doc_rates_cos) print(f'\n doc gradient summary: min={doc_cos_grad["min"]:.5f}, ' f'median={doc_cos_grad["median"]:.5f}, ' f'max={doc_cos_grad["max"]:.5f}') # ── Plateau detection summary ── print('\n[Plateau detection summary]') cos095_idx = int(np.argmin(np.abs(COS_GRID - 0.95))) dh5_idx = int(np.argmin(np.abs(DH_GRID - 5))) if 0 < cos095_idx < len(sig_rates_cos['pooled']) - 1: local_grad_cos = abs( sig_rates_cos['pooled'][cos095_idx + 1] - sig_rates_cos['pooled'][cos095_idx - 1]) / \ (COS_GRID[cos095_idx + 1] - COS_GRID[cos095_idx - 1]) else: local_grad_cos = None if 0 < dh5_idx < len(sig_rates_dh['pooled']) - 1: local_grad_dh = abs( sig_rates_dh['pooled'][dh5_idx + 1] - sig_rates_dh['pooled'][dh5_idx - 1]) / \ (DH_GRID[dh5_idx + 1] - DH_GRID[dh5_idx - 1]) else: local_grad_dh = None median_grad_cos = cos_pooled_grad['median'] median_grad_dh = dh_pooled_grad['median'] ratio_cos = (local_grad_cos / median_grad_cos if median_grad_cos and median_grad_cos > 0 else None) ratio_dh = (local_grad_dh / median_grad_dh if median_grad_dh and median_grad_dh > 0 else None) print(f' v3 inherited cos=0.95 local |grad|={local_grad_cos:.5f}, ' f'median |grad|={median_grad_cos:.5f}, ' f'ratio={ratio_cos:.2f}') print(f' v3 inherited dh=5 local |grad|={local_grad_dh:.5f}, ' f'median |grad|={median_grad_dh:.5f}, ' f'ratio={ratio_dh:.2f}') if ratio_cos is not None and ratio_cos < 0.5: print(' -> cos=0.95 IS at a low-gradient region (plateau-like).') elif ratio_cos is not None and ratio_cos > 1.5: print(' -> cos=0.95 IS at a high-gradient region (steep slope).') else: print(' -> cos=0.95 is at a moderate-gradient region ' '(no clear plateau or cliff).') if ratio_dh is not None and ratio_dh < 0.5: print(' -> dh=5 IS at a low-gradient region (plateau-like).') elif ratio_dh is not None and ratio_dh > 1.5: print(' -> dh=5 IS at a high-gradient region.') else: print(' -> dh=5 is at a moderate-gradient region.') results['cos_sweep_at_dh_5'] = { 'cos_grid': COS_GRID.tolist(), 'sig_rates': {k: v for k, v in sig_rates_cos.items()}, 'pooled_gradient_summary': cos_pooled_grad, } results['dh_sweep_at_cos_0_95'] = { 'dh_grid': DH_GRID.tolist(), 'sig_rates': {k: v for k, v in sig_rates_dh.items()}, 'pooled_gradient_summary': dh_pooled_grad, } results['surface_2d'] = { 'cos_axis': COS_FOR_2D.tolist(), 'dh_axis': DH_FOR_2D.tolist(), 'rates': surface.tolist(), 'at_v3_threshold': { 'cos_0.95_dh_5_rate': float(surface[i95, j5]), 'd_rate_d_cos': dcos, 'd_rate_d_dh': ddh, 'gradient_magnitude': grad_mag, }, } results['doc_level_cos_sweep_at_dh_5'] = { 'cos_grid': COS_GRID.tolist(), 'doc_rates': doc_rates_cos, 'doc_gradient_summary': doc_cos_grad, } results['plateau_detection'] = { 'v3_cos_0_95': { 'local_gradient': local_grad_cos, 'median_gradient': median_grad_cos, 'ratio_local_to_median': ratio_cos, }, 'v3_dh_5': { 'local_gradient': local_grad_dh, 'median_gradient': median_grad_dh, 'ratio_local_to_median': ratio_dh, }, } json_path = OUT / 'alert_rate_results.json' json_path.write_text(json.dumps(results, indent=2, ensure_ascii=False), encoding='utf-8') print(f'\n[json] {json_path}') md = [ '# Alert-Rate Sensitivity / Threshold-Plateau Analysis ' '(Script 46)', '', f'Generated: {results["meta"]["timestamp"]}', f'Big-4 signatures: {n_sigs:,}; documents: {n_docs:,}', '', ('Alert-rate sensitivity to threshold perturbation. If the ' 'v3-inherited threshold cos>0.95 AND dh<=5 sits at a ' 'low-gradient region, that is weak evidence the threshold is ' 'a stable operating point. If the alert-rate surface is ' 'everywhere smooth without a plateau, the threshold is one ' 'point on a continuous specificity-recall tradeoff -- ' 'consistent with the no-natural-threshold finding from ' 'Scripts 39b-39e.'), '', '## Plateau detection at v3 inherited thresholds', '', '| Threshold | local |grad| | median |grad| | ratio | interpretation |', '|---|---|---|---|---|', f'| cos=0.95 | {local_grad_cos:.5f} | ' f'{median_grad_cos:.5f} | {ratio_cos:.2f} | ' f'{"plateau" if ratio_cos < 0.5 else ("cliff" if ratio_cos > 1.5 else "moderate")} |', f'| dh=5 | {local_grad_dh:.5f} | {median_grad_dh:.5f} | ' f'{ratio_dh:.2f} | ' f'{"plateau" if ratio_dh < 0.5 else ("cliff" if ratio_dh > 1.5 else "moderate")} |', '', '## 1D cos sweep at dh<=5 (per-signature alert rate)', '', '| cos > k | pooled | Firm A | Firm B | Firm C | Firm D |', '|---|---|---|---|---|---|', ] for i, k in enumerate(COS_GRID): if i % 2 == 0: md.append(f'| {k:.3f} | {sig_rates_cos["pooled"][i]:.4f} | ' f'{sig_rates_cos["Firm A"][i]:.4f} | ' f'{sig_rates_cos["Firm B"][i]:.4f} | ' f'{sig_rates_cos["Firm C"][i]:.4f} | ' f'{sig_rates_cos["Firm D"][i]:.4f} |') md += ['', '## 1D dh sweep at cos>0.95 (per-signature alert rate)', '', '| dh <= k | pooled | Firm A | Firm B | Firm C | Firm D |', '|---|---|---|---|---|---|'] for i, k in enumerate(DH_GRID): md.append(f'| {int(k):2d} | {sig_rates_dh["pooled"][i]:.4f} | ' f'{sig_rates_dh["Firm A"][i]:.4f} | ' f'{sig_rates_dh["Firm B"][i]:.4f} | ' f'{sig_rates_dh["Firm C"][i]:.4f} | ' f'{sig_rates_dh["Firm D"][i]:.4f} |') md += ['', '## Document-level cos sweep at dh<=5', '', '| cos > k | doc alert rate (HC) |', '|---|---|'] for i, k in enumerate(COS_GRID): if i % 2 == 0: md.append(f'| {k:.3f} | {doc_rates_cos[i]:.4f} |') md.append('') md_path = OUT / 'alert_rate_report.md' md_path.write_text('\n'.join(md), encoding='utf-8') print(f'[md ] {md_path}') if __name__ == '__main__': main()