From c0ed9aa5dc9b0cf3039117480e73b2635d93537f Mon Sep 17 00:00:00 2001 From: gbanyan Date: Tue, 12 May 2026 11:34:17 +0800 Subject: [PATCH] Add script 27: within-auditor-year uniformity empirical check (A2 test) Empirical verification of the A2 within-year label-uniformity assumption flagged by Opus round-12. Result falsified A2 and led to its removal in Paper A v3.14; script retained as due-diligence evidence in the repo. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../27_within_year_uniformity.py | 489 ++++++++++++++++++ 1 file changed, 489 insertions(+) create mode 100644 signature_analysis/27_within_year_uniformity.py diff --git a/signature_analysis/27_within_year_uniformity.py b/signature_analysis/27_within_year_uniformity.py new file mode 100644 index 0000000..38c6fdb --- /dev/null +++ b/signature_analysis/27_within_year_uniformity.py @@ -0,0 +1,489 @@ +#!/usr/bin/env python3 +""" +Script 27: Within-Auditor-Year Uniformity Empirical Check (A2 Test) +===================================================================== +Opus 4.7 max-effort round-12 review flagged the A2 assumption +(within-year label uniformity; Methodology Section III-G) as +load-bearing for Section IV-H.1's partner-level "minority of +hand-signers" reading, yet lacking empirical verification. This +script provides the empirical check that Section III-G previously +described as 'left to future work'. + +For each (CPA, fiscal year) unit with >= 3 signatures, we compute: + - max_cos_yr: maximum pairwise cosine similarity within the year + - min_cos_yr: minimum pairwise cosine similarity within the year + +Classification via **frac_high** (the fraction of within-year pairs with +cosine >= 0.95); this is robust to stamp-output variance, template +switches, and isolated outliers in a way that raw max/min extremes are +not. Auxiliary: frac_low (fraction of pairs with cosine < 0.837). + + - strict_full_hand : frac_high == 0 + (no replicated pair anywhere; full-year hand-sign) + - mostly_hand : 0 < frac_high <= 0.1 + (isolated near-identical pair, possibly one + template reuse; dominant hand-sign) + - substantial_mixture : 0.1 < frac_high <= 0.5 + (clear A2 violation: a material minority of + signatures are replicated) + - mostly_stamp : 0.5 < frac_high <= 0.9 + (stamp-dominant but with non-trivial variance + or a minority of non-stamped signatures) + - strict_full_stamp : frac_high > 0.9 + (near-all pairs near-identical; full-year + replication with modest variance allowed) + +Thresholds: + 0.95 = whole-sample Firm A P7.5 heuristic (Section III-L) + 0.837 = all-pairs intra/inter KDE crossover (Section III-L, + likely-hand-signed boundary) + +Stratification: + - Firm bucket: Firm A (Deloitte / 勤業眾信), Firm B-D (KPMG/PwC/EY), + Non-Big-4 + - Period: 2013-2018 (pre-digitalization), + 2019-2021 (transition), + 2022-2023 (post) + - Firm x Period grid for mixed_a2_violation rate + +Output: + reports/within_year_uniformity/within_year_uniformity.md + reports/within_year_uniformity/within_year_uniformity.json + reports/within_year_uniformity/mixed_year_candidates.csv (audit trail) +""" + +import sqlite3 +import json +import csv +import numpy as np +from pathlib import Path +from datetime import datetime, timezone +from collections import defaultdict + +DB = '/Volumes/NV2/PDF-Processing/signature-analysis/signature_analysis.db' +OUT = Path('/Volumes/NV2/PDF-Processing/signature-analysis/reports/' + 'within_year_uniformity') +OUT.mkdir(parents=True, exist_ok=True) + +FIRM_A = '勤業眾信聯合' +BIG4_OTHER = {'安侯建業聯合', '資誠聯合', '安永聯合'} + +THRESH_REPLICATED = 0.95 +THRESH_HANDSIGN = 0.837 +MIN_SIGS = 3 + +FIRM_BUCKETS = ['Firm A', 'Firm B-D (Big-4 others)', 'Non-Big-4'] +PERIODS = ['2013-2018 (pre)', '2019-2021 (transition)', '2022-2023 (post)'] +CLASSES = ['strict_full_hand', 'mostly_hand', 'substantial_mixture', + 'mostly_stamp', 'strict_full_stamp'] +# A2 violation candidates = {mostly_hand, substantial_mixture, mostly_stamp} +# (i.e., not strict_full_hand and not strict_full_stamp) + + +def period_bin(year): + y = int(year) + if y <= 2018: + return '2013-2018 (pre)' + if y <= 2021: + return '2019-2021 (transition)' + return '2022-2023 (post)' + + +def firm_bucket(firm): + if firm == FIRM_A: + return 'Firm A' + if firm in BIG4_OTHER: + return 'Firm B-D (Big-4 others)' + return 'Non-Big-4' + + +def classify(frac_high): + if frac_high == 0: + return 'strict_full_hand' + if frac_high <= 0.1: + return 'mostly_hand' + if frac_high <= 0.5: + return 'substantial_mixture' + if frac_high <= 0.9: + return 'mostly_stamp' + return 'strict_full_stamp' + + +def is_a2_violation(cls): + """A2 violation candidates: not strictly full_hand and not strictly full_stamp.""" + return cls in {'mostly_hand', 'substantial_mixture', 'mostly_stamp'} + + +def pairwise_stats(feats): + """Return (max_cos, min_cos, frac_high, frac_low, n_pairs) over + within-year pairs. Filters out degenerate features (zero norm or + non-finite entries) before computing.""" + mat = np.stack(feats).astype(np.float64) + # Drop rows with non-finite entries or zero norm + finite = np.all(np.isfinite(mat), axis=1) + norms = np.linalg.norm(mat, axis=1) + keep = finite & (norms > 1e-6) + mat = mat[keep] + norms = norms[keep] + if len(mat) < 2: + return (float('nan'), float('nan'), 0.0, 0.0, 0) + mat_n = mat / norms[:, None] + sim = mat_n @ mat_n.T + iu = np.triu_indices(len(mat), k=1) + vals = sim[iu] + vals = vals[np.isfinite(vals)] + n_pairs = len(vals) + if n_pairs == 0: + return (float('nan'), float('nan'), 0.0, 0.0, 0) + n_high = int(np.sum(vals >= THRESH_REPLICATED)) + n_low = int(np.sum(vals < THRESH_HANDSIGN)) + return (float(vals.max()), float(vals.min()), + n_high / n_pairs, n_low / n_pairs, n_pairs) + + +def iterate_groups(): + """Stream rows ordered by (CPA, year); yield completed groups.""" + conn = sqlite3.connect(DB) + cur = conn.cursor() + cur.execute(''' + SELECT s.assigned_accountant, + substr(s.year_month, 1, 4) AS year, + s.feature_vector, + a.firm + FROM signatures s + LEFT JOIN accountants a ON a.name = s.assigned_accountant + WHERE s.feature_vector IS NOT NULL + AND s.assigned_accountant IS NOT NULL + AND s.year_month IS NOT NULL + ORDER BY s.assigned_accountant, year + ''') + cur_key = None + cur_feats = [] + cur_firm = None + for cpa, year, fv, firm in cur: + key = (cpa, year) + if key != cur_key: + if cur_key is not None and cur_feats: + yield cur_key, cur_feats, cur_firm + cur_key = key + cur_feats = [] + cur_firm = firm + cur_feats.append(np.frombuffer(fv, dtype=np.float32).copy()) + if cur_key is not None and cur_feats: + yield cur_key, cur_feats, cur_firm + conn.close() + + +def main(): + print('Streaming (CPA, year) groups from DB...') + results = [] + total_groups = 0 + kept_groups = 0 + for (cpa, year), feats, firm in iterate_groups(): + total_groups += 1 + if len(feats) < MIN_SIGS: + continue + kept_groups += 1 + max_c, min_c, frac_high, frac_low, n_pairs = pairwise_stats(feats) + cls = classify(frac_high) + results.append({ + 'cpa': cpa, + 'year': year, + 'n_sigs': len(feats), + 'n_pairs': n_pairs, + 'firm': firm or 'UNKNOWN', + 'firm_bucket': firm_bucket(firm), + 'period': period_bin(year), + 'max_cos': round(max_c, 4), + 'min_cos': round(min_c, 4), + 'frac_high': round(frac_high, 4), + 'frac_low': round(frac_low, 4), + 'class': cls, + 'is_a2_violation': is_a2_violation(cls), + }) + print(f' total groups: {total_groups}') + print(f' groups with n >= {MIN_SIGS}: {kept_groups}') + + total = len(results) + if total == 0: + print('No groups to analyze.') + return + + # Overall tally + overall = defaultdict(int) + for r in results: + overall[r['class']] += 1 + print('\n=== Overall classification ===') + for c in CLASSES: + n = overall[c] + print(f' {c:25s}: {n:5d} ({100*n/total:.2f}%)') + + # Stratifications + by_firm = defaultdict(lambda: defaultdict(int)) + by_period = defaultdict(lambda: defaultdict(int)) + by_fp = defaultdict(lambda: defaultdict(int)) + for r in results: + by_firm[r['firm_bucket']]['total'] += 1 + by_firm[r['firm_bucket']][r['class']] += 1 + if r['is_a2_violation']: + by_firm[r['firm_bucket']]['a2_violation'] += 1 + by_period[r['period']]['total'] += 1 + by_period[r['period']][r['class']] += 1 + if r['is_a2_violation']: + by_period[r['period']]['a2_violation'] += 1 + key = (r['firm_bucket'], r['period']) + by_fp[key]['total'] += 1 + by_fp[key][r['class']] += 1 + if r['is_a2_violation']: + by_fp[key]['a2_violation'] += 1 + + print('\n=== By firm bucket ===') + for fb in FIRM_BUCKETS: + d = by_firm[fb] + t = d['total'] + if t == 0: + continue + print(f' {fb} (N = {t}):') + for c in CLASSES: + n = d[c] + print(f' {c:25s}: {n:5d} ({100*n/t:.2f}%)') + + print('\n=== By period ===') + for p in PERIODS: + d = by_period[p] + t = d['total'] + if t == 0: + continue + print(f' {p} (N = {t}):') + for c in CLASSES: + n = d[c] + print(f' {c:25s}: {n:5d} ({100*n/t:.2f}%)') + + print('\n=== Firm x Period: A2 violation rate (any of mostly_hand, ' + 'substantial_mixture, mostly_stamp) ===') + header = ' {:25s}'.format('') + \ + ''.join(f'{p[:18]:>22}' for p in PERIODS) + print(header) + for fb in FIRM_BUCKETS: + cells = [] + for p in PERIODS: + d = by_fp[(fb, p)] + t = d['total'] + if t == 0: + cells.append('-') + else: + rate = 100 * d['a2_violation'] / t + cells.append(f'{rate:.2f}% ({d["a2_violation"]}/{t})') + row = ' {:25s}'.format(fb) + ''.join(f'{c:>22}' for c in cells) + print(row) + + # Substantial-mixture-only Firm x Period (strictest A2 violation subset) + print('\n=== Firm x Period: substantial_mixture rate (strictest) ===') + print(header) + for fb in FIRM_BUCKETS: + cells = [] + for p in PERIODS: + d = by_fp[(fb, p)] + t = d['total'] + if t == 0: + cells.append('-') + else: + rate = 100 * d['substantial_mixture'] / t + cells.append( + f'{rate:.2f}% ({d["substantial_mixture"]}/{t})') + row = ' {:25s}'.format(fb) + ''.join(f'{c:>22}' for c in cells) + print(row) + + # Outputs + json_out = { + 'generated_at': datetime.now(timezone.utc).isoformat(), + 'thresholds': { + 'replicated_cosine': THRESH_REPLICATED, + 'handsigned_cosine': THRESH_HANDSIGN, + }, + 'min_signatures_per_year': MIN_SIGS, + 'N_total_groups': total_groups, + 'N_kept_groups': kept_groups, + 'overall': {c: overall[c] for c in CLASSES}, + 'by_firm_bucket': { + fb: dict(by_firm[fb]) for fb in FIRM_BUCKETS if by_firm[fb]['total'] + }, + 'by_period': { + p: dict(by_period[p]) for p in PERIODS if by_period[p]['total'] + }, + 'by_firm_x_period': { + f'{fb}|{p}': dict(by_fp[(fb, p)]) + for fb in FIRM_BUCKETS for p in PERIODS + if by_fp[(fb, p)]['total'] + }, + } + with open(OUT / 'within_year_uniformity.json', 'w', encoding='utf-8') as f: + json.dump(json_out, f, ensure_ascii=False, indent=2) + + # CSV audit trail: all rows with all metrics + csv_fields = [ + 'cpa', 'firm', 'firm_bucket', 'year', 'period', + 'n_sigs', 'n_pairs', 'max_cos', 'min_cos', + 'frac_high', 'frac_low', 'class', 'is_a2_violation', + ] + csv_path = OUT / 'all_cpa_year_rows.csv' + with open(csv_path, 'w', newline='', encoding='utf-8') as f: + w = csv.DictWriter(f, fieldnames=csv_fields) + w.writeheader() + for r in sorted(results, + key=lambda x: (x['firm_bucket'], x['year'], x['cpa'])): + w.writerow({k: r[k] for k in csv_fields}) + + # CSV: substantial_mixture rows only (strictest A2 violation subset) + mixed_path = OUT / 'substantial_mixture_candidates.csv' + with open(mixed_path, 'w', newline='', encoding='utf-8') as f: + w = csv.DictWriter(f, fieldnames=csv_fields) + w.writeheader() + for r in sorted(results, + key=lambda x: (x['firm_bucket'], x['year'], x['cpa'])): + if r['class'] == 'substantial_mixture': + w.writerow({k: r[k] for k in csv_fields}) + + # Markdown + md = build_markdown(overall, by_firm, by_period, by_fp, total, + total_groups, kept_groups) + with open(OUT / 'within_year_uniformity.md', 'w', encoding='utf-8') as f: + f.write(md) + + print(f'\n=> Outputs in {OUT}') + + +def build_markdown(overall, by_firm, by_period, by_fp, total, + total_groups, kept_groups): + ts = datetime.now(timezone.utc).isoformat() + L = [] + L.append('# Within-Auditor-Year Uniformity Check (A2 Empirical Test)') + L.append('') + L.append(f'Generated: {ts}') + L.append('') + L.append('## Method') + L.append('') + L.append(f'For each (CPA, fiscal year) with >= {MIN_SIGS} signatures, ' + 'compute all within-year pairwise cosine similarities and ' + f'derive frac_high = fraction of pairs with cos >= {THRESH_REPLICATED}. ' + 'Classification is based on frac_high; this is robust to stamp-' + 'output variance, template switches, and isolated outliers.') + L.append('') + L.append(f'- `strict_full_hand`: frac_high = 0 ' + '(no near-identical pair; full-year hand-signing)') + L.append(f'- `mostly_hand`: 0 < frac_high <= 0.1 ' + '(isolated near-identical pair; dominant hand-sign with possibly ' + 'one template reuse)') + L.append(f'- `substantial_mixture`: 0.1 < frac_high <= 0.5 ' + '(material minority of signatures replicated; clearest A2 ' + 'violation signature)') + L.append(f'- `mostly_stamp`: 0.5 < frac_high <= 0.9 ' + '(stamp-dominant with non-trivial variance or minority of ' + 'non-stamped signatures)') + L.append(f'- `strict_full_stamp`: frac_high > 0.9 ' + '(near-all pairs near-identical; full-year replication with ' + 'modest variance allowed)') + L.append('') + L.append('**A2 violation candidates** = `mostly_hand` ∪ ' + '`substantial_mixture` ∪ `mostly_stamp` (anything that is not ' + '`strict_full_hand` and not `strict_full_stamp`).') + L.append('') + L.append(f'Total (CPA, year) groups in DB: {total_groups}; ' + f'groups with n >= {MIN_SIGS}: {kept_groups}.') + L.append('') + L.append('## Overall') + L.append('') + L.append('| Class | N | Share |') + L.append('|---|---|---|') + for c in CLASSES: + n = overall[c] + L.append(f'| `{c}` | {n} | {100*n/total:.2f}% |') + L.append('') + def row(label, d, t): + cells = [label, str(t)] + for c in CLASSES: + n = d[c] + cells.append(f'{n} ({100*n/t:.2f}%)') + av = d['a2_violation'] + cells.append(f'{av} ({100*av/t:.2f}%)') + return '| ' + ' | '.join(cells) + ' |' + + header = ('| Bucket | N | ' + ' | '.join(f'`{c}`' for c in CLASSES) + + ' | A2 violation (union) |') + sep = '|' + '|'.join(['---'] * (len(CLASSES) + 3)) + '|' + + L.append('## By firm bucket') + L.append('') + L.append(header) + L.append(sep) + for fb in FIRM_BUCKETS: + d = by_firm[fb] + t = d['total'] + if t == 0: + continue + L.append(row(fb, d, t)) + L.append('') + L.append('## By period') + L.append('') + L.append(header.replace('Bucket', 'Period')) + L.append(sep) + for p in PERIODS: + d = by_period[p] + t = d['total'] + if t == 0: + continue + L.append(row(p, d, t)) + L.append('') + L.append('## Firm x Period: A2 violation rate (union of ' + '`mostly_hand`, `substantial_mixture`, `mostly_stamp`)') + L.append('') + L.append('| Firm | 2013-2018 (pre) | 2019-2021 (transition) | ' + '2022-2023 (post) |') + L.append('|---|---|---|---|') + for fb in FIRM_BUCKETS: + cells = [] + for p in PERIODS: + d = by_fp[(fb, p)] + t = d['total'] + if t == 0: + cells.append('-') + else: + rate = 100 * d['a2_violation'] / t + cells.append(f'{rate:.2f}% ({d["a2_violation"]}/{t})') + L.append(f'| {fb} | ' + ' | '.join(cells) + ' |') + L.append('') + L.append('## Firm x Period: `substantial_mixture` rate (strictest subset)') + L.append('') + L.append('| Firm | 2013-2018 (pre) | 2019-2021 (transition) | ' + '2022-2023 (post) |') + L.append('|---|---|---|---|') + for fb in FIRM_BUCKETS: + cells = [] + for p in PERIODS: + d = by_fp[(fb, p)] + t = d['total'] + if t == 0: + cells.append('-') + else: + rate = 100 * d['substantial_mixture'] / t + cells.append( + f'{rate:.2f}% ({d["substantial_mixture"]}/{t})') + L.append(f'| {fb} | ' + ' | '.join(cells) + ' |') + L.append('') + L.append('## Interpretation guide') + L.append('') + L.append('- Low A2-violation union rate overall (e.g. < 10%): A2 is ' + 'empirically well-supported; report as Methodology III-G ' + 'robustness check.') + L.append('- High `substantial_mixture` rate specifically (e.g. > 5% ' + 'at Big-4 B-D in 2019-2021): A2 weakens in the digitalization ' + 'transition; IV-H.1 partner-level reading may need restriction ' + 'to Firm A or pre-2019 period.') + L.append('- High `substantial_mixture` rate at Firm A itself: unexpected; ' + 'Firm A industry-practice defense of A2 would need revisiting.') + L.append('') + return '\n'.join(L) + + +if __name__ == '__main__': + main()