#!/usr/bin/env python3 """ Script 27: Within-Auditor-Year Uniformity Empirical Check (A2 Test) ===================================================================== Opus 4.7 max-effort round-12 review flagged the A2 assumption (within-year label uniformity; Methodology Section III-G) as load-bearing for Section IV-H.1's partner-level "minority of hand-signers" reading, yet lacking empirical verification. This script provides the empirical check that Section III-G previously described as 'left to future work'. For each (CPA, fiscal year) unit with >= 3 signatures, we compute: - max_cos_yr: maximum pairwise cosine similarity within the year - min_cos_yr: minimum pairwise cosine similarity within the year Classification via **frac_high** (the fraction of within-year pairs with cosine >= 0.95); this is robust to stamp-output variance, template switches, and isolated outliers in a way that raw max/min extremes are not. Auxiliary: frac_low (fraction of pairs with cosine < 0.837). - strict_full_hand : frac_high == 0 (no replicated pair anywhere; full-year hand-sign) - mostly_hand : 0 < frac_high <= 0.1 (isolated near-identical pair, possibly one template reuse; dominant hand-sign) - substantial_mixture : 0.1 < frac_high <= 0.5 (clear A2 violation: a material minority of signatures are replicated) - mostly_stamp : 0.5 < frac_high <= 0.9 (stamp-dominant but with non-trivial variance or a minority of non-stamped signatures) - strict_full_stamp : frac_high > 0.9 (near-all pairs near-identical; full-year replication with modest variance allowed) Thresholds: 0.95 = whole-sample Firm A P7.5 heuristic (Section III-L) 0.837 = all-pairs intra/inter KDE crossover (Section III-L, likely-hand-signed boundary) Stratification: - Firm bucket: Firm A (Deloitte / 勤業眾信), Firm B-D (KPMG/PwC/EY), Non-Big-4 - Period: 2013-2018 (pre-digitalization), 2019-2021 (transition), 2022-2023 (post) - Firm x Period grid for mixed_a2_violation rate Output: reports/within_year_uniformity/within_year_uniformity.md reports/within_year_uniformity/within_year_uniformity.json reports/within_year_uniformity/mixed_year_candidates.csv (audit trail) """ import sqlite3 import json import csv import numpy as np from pathlib import Path from datetime import datetime, timezone from collections import defaultdict DB = '/Volumes/NV2/PDF-Processing/signature-analysis/signature_analysis.db' OUT = Path('/Volumes/NV2/PDF-Processing/signature-analysis/reports/' 'within_year_uniformity') OUT.mkdir(parents=True, exist_ok=True) FIRM_A = '勤業眾信聯合' BIG4_OTHER = {'安侯建業聯合', '資誠聯合', '安永聯合'} THRESH_REPLICATED = 0.95 THRESH_HANDSIGN = 0.837 MIN_SIGS = 3 FIRM_BUCKETS = ['Firm A', 'Firm B-D (Big-4 others)', 'Non-Big-4'] PERIODS = ['2013-2018 (pre)', '2019-2021 (transition)', '2022-2023 (post)'] CLASSES = ['strict_full_hand', 'mostly_hand', 'substantial_mixture', 'mostly_stamp', 'strict_full_stamp'] # A2 violation candidates = {mostly_hand, substantial_mixture, mostly_stamp} # (i.e., not strict_full_hand and not strict_full_stamp) def period_bin(year): y = int(year) if y <= 2018: return '2013-2018 (pre)' if y <= 2021: return '2019-2021 (transition)' return '2022-2023 (post)' def firm_bucket(firm): if firm == FIRM_A: return 'Firm A' if firm in BIG4_OTHER: return 'Firm B-D (Big-4 others)' return 'Non-Big-4' def classify(frac_high): if frac_high == 0: return 'strict_full_hand' if frac_high <= 0.1: return 'mostly_hand' if frac_high <= 0.5: return 'substantial_mixture' if frac_high <= 0.9: return 'mostly_stamp' return 'strict_full_stamp' def is_a2_violation(cls): """A2 violation candidates: not strictly full_hand and not strictly full_stamp.""" return cls in {'mostly_hand', 'substantial_mixture', 'mostly_stamp'} def pairwise_stats(feats): """Return (max_cos, min_cos, frac_high, frac_low, n_pairs) over within-year pairs. Filters out degenerate features (zero norm or non-finite entries) before computing.""" mat = np.stack(feats).astype(np.float64) # Drop rows with non-finite entries or zero norm finite = np.all(np.isfinite(mat), axis=1) norms = np.linalg.norm(mat, axis=1) keep = finite & (norms > 1e-6) mat = mat[keep] norms = norms[keep] if len(mat) < 2: return (float('nan'), float('nan'), 0.0, 0.0, 0) mat_n = mat / norms[:, None] sim = mat_n @ mat_n.T iu = np.triu_indices(len(mat), k=1) vals = sim[iu] vals = vals[np.isfinite(vals)] n_pairs = len(vals) if n_pairs == 0: return (float('nan'), float('nan'), 0.0, 0.0, 0) n_high = int(np.sum(vals >= THRESH_REPLICATED)) n_low = int(np.sum(vals < THRESH_HANDSIGN)) return (float(vals.max()), float(vals.min()), n_high / n_pairs, n_low / n_pairs, n_pairs) def iterate_groups(): """Stream rows ordered by (CPA, year); yield completed groups.""" conn = sqlite3.connect(DB) cur = conn.cursor() cur.execute(''' SELECT s.assigned_accountant, substr(s.year_month, 1, 4) AS year, s.feature_vector, a.firm FROM signatures s LEFT JOIN accountants a ON a.name = s.assigned_accountant WHERE s.feature_vector IS NOT NULL AND s.assigned_accountant IS NOT NULL AND s.year_month IS NOT NULL ORDER BY s.assigned_accountant, year ''') cur_key = None cur_feats = [] cur_firm = None for cpa, year, fv, firm in cur: key = (cpa, year) if key != cur_key: if cur_key is not None and cur_feats: yield cur_key, cur_feats, cur_firm cur_key = key cur_feats = [] cur_firm = firm cur_feats.append(np.frombuffer(fv, dtype=np.float32).copy()) if cur_key is not None and cur_feats: yield cur_key, cur_feats, cur_firm conn.close() def main(): print('Streaming (CPA, year) groups from DB...') results = [] total_groups = 0 kept_groups = 0 for (cpa, year), feats, firm in iterate_groups(): total_groups += 1 if len(feats) < MIN_SIGS: continue kept_groups += 1 max_c, min_c, frac_high, frac_low, n_pairs = pairwise_stats(feats) cls = classify(frac_high) results.append({ 'cpa': cpa, 'year': year, 'n_sigs': len(feats), 'n_pairs': n_pairs, 'firm': firm or 'UNKNOWN', 'firm_bucket': firm_bucket(firm), 'period': period_bin(year), 'max_cos': round(max_c, 4), 'min_cos': round(min_c, 4), 'frac_high': round(frac_high, 4), 'frac_low': round(frac_low, 4), 'class': cls, 'is_a2_violation': is_a2_violation(cls), }) print(f' total groups: {total_groups}') print(f' groups with n >= {MIN_SIGS}: {kept_groups}') total = len(results) if total == 0: print('No groups to analyze.') return # Overall tally overall = defaultdict(int) for r in results: overall[r['class']] += 1 print('\n=== Overall classification ===') for c in CLASSES: n = overall[c] print(f' {c:25s}: {n:5d} ({100*n/total:.2f}%)') # Stratifications by_firm = defaultdict(lambda: defaultdict(int)) by_period = defaultdict(lambda: defaultdict(int)) by_fp = defaultdict(lambda: defaultdict(int)) for r in results: by_firm[r['firm_bucket']]['total'] += 1 by_firm[r['firm_bucket']][r['class']] += 1 if r['is_a2_violation']: by_firm[r['firm_bucket']]['a2_violation'] += 1 by_period[r['period']]['total'] += 1 by_period[r['period']][r['class']] += 1 if r['is_a2_violation']: by_period[r['period']]['a2_violation'] += 1 key = (r['firm_bucket'], r['period']) by_fp[key]['total'] += 1 by_fp[key][r['class']] += 1 if r['is_a2_violation']: by_fp[key]['a2_violation'] += 1 print('\n=== By firm bucket ===') for fb in FIRM_BUCKETS: d = by_firm[fb] t = d['total'] if t == 0: continue print(f' {fb} (N = {t}):') for c in CLASSES: n = d[c] print(f' {c:25s}: {n:5d} ({100*n/t:.2f}%)') print('\n=== By period ===') for p in PERIODS: d = by_period[p] t = d['total'] if t == 0: continue print(f' {p} (N = {t}):') for c in CLASSES: n = d[c] print(f' {c:25s}: {n:5d} ({100*n/t:.2f}%)') print('\n=== Firm x Period: A2 violation rate (any of mostly_hand, ' 'substantial_mixture, mostly_stamp) ===') header = ' {:25s}'.format('') + \ ''.join(f'{p[:18]:>22}' for p in PERIODS) print(header) for fb in FIRM_BUCKETS: cells = [] for p in PERIODS: d = by_fp[(fb, p)] t = d['total'] if t == 0: cells.append('-') else: rate = 100 * d['a2_violation'] / t cells.append(f'{rate:.2f}% ({d["a2_violation"]}/{t})') row = ' {:25s}'.format(fb) + ''.join(f'{c:>22}' for c in cells) print(row) # Substantial-mixture-only Firm x Period (strictest A2 violation subset) print('\n=== Firm x Period: substantial_mixture rate (strictest) ===') print(header) for fb in FIRM_BUCKETS: cells = [] for p in PERIODS: d = by_fp[(fb, p)] t = d['total'] if t == 0: cells.append('-') else: rate = 100 * d['substantial_mixture'] / t cells.append( f'{rate:.2f}% ({d["substantial_mixture"]}/{t})') row = ' {:25s}'.format(fb) + ''.join(f'{c:>22}' for c in cells) print(row) # Outputs json_out = { 'generated_at': datetime.now(timezone.utc).isoformat(), 'thresholds': { 'replicated_cosine': THRESH_REPLICATED, 'handsigned_cosine': THRESH_HANDSIGN, }, 'min_signatures_per_year': MIN_SIGS, 'N_total_groups': total_groups, 'N_kept_groups': kept_groups, 'overall': {c: overall[c] for c in CLASSES}, 'by_firm_bucket': { fb: dict(by_firm[fb]) for fb in FIRM_BUCKETS if by_firm[fb]['total'] }, 'by_period': { p: dict(by_period[p]) for p in PERIODS if by_period[p]['total'] }, 'by_firm_x_period': { f'{fb}|{p}': dict(by_fp[(fb, p)]) for fb in FIRM_BUCKETS for p in PERIODS if by_fp[(fb, p)]['total'] }, } with open(OUT / 'within_year_uniformity.json', 'w', encoding='utf-8') as f: json.dump(json_out, f, ensure_ascii=False, indent=2) # CSV audit trail: all rows with all metrics csv_fields = [ 'cpa', 'firm', 'firm_bucket', 'year', 'period', 'n_sigs', 'n_pairs', 'max_cos', 'min_cos', 'frac_high', 'frac_low', 'class', 'is_a2_violation', ] csv_path = OUT / 'all_cpa_year_rows.csv' with open(csv_path, 'w', newline='', encoding='utf-8') as f: w = csv.DictWriter(f, fieldnames=csv_fields) w.writeheader() for r in sorted(results, key=lambda x: (x['firm_bucket'], x['year'], x['cpa'])): w.writerow({k: r[k] for k in csv_fields}) # CSV: substantial_mixture rows only (strictest A2 violation subset) mixed_path = OUT / 'substantial_mixture_candidates.csv' with open(mixed_path, 'w', newline='', encoding='utf-8') as f: w = csv.DictWriter(f, fieldnames=csv_fields) w.writeheader() for r in sorted(results, key=lambda x: (x['firm_bucket'], x['year'], x['cpa'])): if r['class'] == 'substantial_mixture': w.writerow({k: r[k] for k in csv_fields}) # Markdown md = build_markdown(overall, by_firm, by_period, by_fp, total, total_groups, kept_groups) with open(OUT / 'within_year_uniformity.md', 'w', encoding='utf-8') as f: f.write(md) print(f'\n=> Outputs in {OUT}') def build_markdown(overall, by_firm, by_period, by_fp, total, total_groups, kept_groups): ts = datetime.now(timezone.utc).isoformat() L = [] L.append('# Within-Auditor-Year Uniformity Check (A2 Empirical Test)') L.append('') L.append(f'Generated: {ts}') L.append('') L.append('## Method') L.append('') L.append(f'For each (CPA, fiscal year) with >= {MIN_SIGS} signatures, ' 'compute all within-year pairwise cosine similarities and ' f'derive frac_high = fraction of pairs with cos >= {THRESH_REPLICATED}. ' 'Classification is based on frac_high; this is robust to stamp-' 'output variance, template switches, and isolated outliers.') L.append('') L.append(f'- `strict_full_hand`: frac_high = 0 ' '(no near-identical pair; full-year hand-signing)') L.append(f'- `mostly_hand`: 0 < frac_high <= 0.1 ' '(isolated near-identical pair; dominant hand-sign with possibly ' 'one template reuse)') L.append(f'- `substantial_mixture`: 0.1 < frac_high <= 0.5 ' '(material minority of signatures replicated; clearest A2 ' 'violation signature)') L.append(f'- `mostly_stamp`: 0.5 < frac_high <= 0.9 ' '(stamp-dominant with non-trivial variance or minority of ' 'non-stamped signatures)') L.append(f'- `strict_full_stamp`: frac_high > 0.9 ' '(near-all pairs near-identical; full-year replication with ' 'modest variance allowed)') L.append('') L.append('**A2 violation candidates** = `mostly_hand` ∪ ' '`substantial_mixture` ∪ `mostly_stamp` (anything that is not ' '`strict_full_hand` and not `strict_full_stamp`).') L.append('') L.append(f'Total (CPA, year) groups in DB: {total_groups}; ' f'groups with n >= {MIN_SIGS}: {kept_groups}.') L.append('') L.append('## Overall') L.append('') L.append('| Class | N | Share |') L.append('|---|---|---|') for c in CLASSES: n = overall[c] L.append(f'| `{c}` | {n} | {100*n/total:.2f}% |') L.append('') def row(label, d, t): cells = [label, str(t)] for c in CLASSES: n = d[c] cells.append(f'{n} ({100*n/t:.2f}%)') av = d['a2_violation'] cells.append(f'{av} ({100*av/t:.2f}%)') return '| ' + ' | '.join(cells) + ' |' header = ('| Bucket | N | ' + ' | '.join(f'`{c}`' for c in CLASSES) + ' | A2 violation (union) |') sep = '|' + '|'.join(['---'] * (len(CLASSES) + 3)) + '|' L.append('## By firm bucket') L.append('') L.append(header) L.append(sep) for fb in FIRM_BUCKETS: d = by_firm[fb] t = d['total'] if t == 0: continue L.append(row(fb, d, t)) L.append('') L.append('## By period') L.append('') L.append(header.replace('Bucket', 'Period')) L.append(sep) for p in PERIODS: d = by_period[p] t = d['total'] if t == 0: continue L.append(row(p, d, t)) L.append('') L.append('## Firm x Period: A2 violation rate (union of ' '`mostly_hand`, `substantial_mixture`, `mostly_stamp`)') L.append('') L.append('| Firm | 2013-2018 (pre) | 2019-2021 (transition) | ' '2022-2023 (post) |') L.append('|---|---|---|---|') for fb in FIRM_BUCKETS: cells = [] for p in PERIODS: d = by_fp[(fb, p)] t = d['total'] if t == 0: cells.append('-') else: rate = 100 * d['a2_violation'] / t cells.append(f'{rate:.2f}% ({d["a2_violation"]}/{t})') L.append(f'| {fb} | ' + ' | '.join(cells) + ' |') L.append('') L.append('## Firm x Period: `substantial_mixture` rate (strictest subset)') L.append('') L.append('| Firm | 2013-2018 (pre) | 2019-2021 (transition) | ' '2022-2023 (post) |') L.append('|---|---|---|---|') for fb in FIRM_BUCKETS: cells = [] for p in PERIODS: d = by_fp[(fb, p)] t = d['total'] if t == 0: cells.append('-') else: rate = 100 * d['substantial_mixture'] / t cells.append( f'{rate:.2f}% ({d["substantial_mixture"]}/{t})') L.append(f'| {fb} | ' + ' | '.join(cells) + ' |') L.append('') L.append('## Interpretation guide') L.append('') L.append('- Low A2-violation union rate overall (e.g. < 10%): A2 is ' 'empirically well-supported; report as Methodology III-G ' 'robustness check.') L.append('- High `substantial_mixture` rate specifically (e.g. > 5% ' 'at Big-4 B-D in 2019-2021): A2 weakens in the digitalization ' 'transition; IV-H.1 partner-level reading may need restriction ' 'to Firm A or pre-2019 period.') L.append('- High `substantial_mixture` rate at Firm A itself: unexpected; ' 'Firm A industry-practice defense of A2 would need revisiting.') L.append('') return '\n'.join(L) if __name__ == '__main__': main()