Files
pdf_signature_extraction/signature_analysis/27_within_year_uniformity.py
T
gbanyan c0ed9aa5dc Add script 27: within-auditor-year uniformity empirical check (A2 test)
Empirical verification of the A2 within-year label-uniformity
assumption flagged by Opus round-12. Result falsified A2 and led to
its removal in Paper A v3.14; script retained as due-diligence
evidence in the repo.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-12 11:34:17 +08:00

490 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
Script 27: Within-Auditor-Year Uniformity Empirical Check (A2 Test)
=====================================================================
Opus 4.7 max-effort round-12 review flagged the A2 assumption
(within-year label uniformity; Methodology Section III-G) as
load-bearing for Section IV-H.1's partner-level "minority of
hand-signers" reading, yet lacking empirical verification. This
script provides the empirical check that Section III-G previously
described as 'left to future work'.
For each (CPA, fiscal year) unit with >= 3 signatures, we compute:
- max_cos_yr: maximum pairwise cosine similarity within the year
- min_cos_yr: minimum pairwise cosine similarity within the year
Classification via **frac_high** (the fraction of within-year pairs with
cosine >= 0.95); this is robust to stamp-output variance, template
switches, and isolated outliers in a way that raw max/min extremes are
not. Auxiliary: frac_low (fraction of pairs with cosine < 0.837).
- strict_full_hand : frac_high == 0
(no replicated pair anywhere; full-year hand-sign)
- mostly_hand : 0 < frac_high <= 0.1
(isolated near-identical pair, possibly one
template reuse; dominant hand-sign)
- substantial_mixture : 0.1 < frac_high <= 0.5
(clear A2 violation: a material minority of
signatures are replicated)
- mostly_stamp : 0.5 < frac_high <= 0.9
(stamp-dominant but with non-trivial variance
or a minority of non-stamped signatures)
- strict_full_stamp : frac_high > 0.9
(near-all pairs near-identical; full-year
replication with modest variance allowed)
Thresholds:
0.95 = whole-sample Firm A P7.5 heuristic (Section III-L)
0.837 = all-pairs intra/inter KDE crossover (Section III-L,
likely-hand-signed boundary)
Stratification:
- Firm bucket: Firm A (Deloitte / 勤業眾信), Firm B-D (KPMG/PwC/EY),
Non-Big-4
- Period: 2013-2018 (pre-digitalization),
2019-2021 (transition),
2022-2023 (post)
- Firm x Period grid for mixed_a2_violation rate
Output:
reports/within_year_uniformity/within_year_uniformity.md
reports/within_year_uniformity/within_year_uniformity.json
reports/within_year_uniformity/mixed_year_candidates.csv (audit trail)
"""
import sqlite3
import json
import csv
import numpy as np
from pathlib import Path
from datetime import datetime, timezone
from collections import defaultdict
DB = '/Volumes/NV2/PDF-Processing/signature-analysis/signature_analysis.db'
OUT = Path('/Volumes/NV2/PDF-Processing/signature-analysis/reports/'
'within_year_uniformity')
OUT.mkdir(parents=True, exist_ok=True)
FIRM_A = '勤業眾信聯合'
BIG4_OTHER = {'安侯建業聯合', '資誠聯合', '安永聯合'}
THRESH_REPLICATED = 0.95
THRESH_HANDSIGN = 0.837
MIN_SIGS = 3
FIRM_BUCKETS = ['Firm A', 'Firm B-D (Big-4 others)', 'Non-Big-4']
PERIODS = ['2013-2018 (pre)', '2019-2021 (transition)', '2022-2023 (post)']
CLASSES = ['strict_full_hand', 'mostly_hand', 'substantial_mixture',
'mostly_stamp', 'strict_full_stamp']
# A2 violation candidates = {mostly_hand, substantial_mixture, mostly_stamp}
# (i.e., not strict_full_hand and not strict_full_stamp)
def period_bin(year):
y = int(year)
if y <= 2018:
return '2013-2018 (pre)'
if y <= 2021:
return '2019-2021 (transition)'
return '2022-2023 (post)'
def firm_bucket(firm):
if firm == FIRM_A:
return 'Firm A'
if firm in BIG4_OTHER:
return 'Firm B-D (Big-4 others)'
return 'Non-Big-4'
def classify(frac_high):
if frac_high == 0:
return 'strict_full_hand'
if frac_high <= 0.1:
return 'mostly_hand'
if frac_high <= 0.5:
return 'substantial_mixture'
if frac_high <= 0.9:
return 'mostly_stamp'
return 'strict_full_stamp'
def is_a2_violation(cls):
"""A2 violation candidates: not strictly full_hand and not strictly full_stamp."""
return cls in {'mostly_hand', 'substantial_mixture', 'mostly_stamp'}
def pairwise_stats(feats):
"""Return (max_cos, min_cos, frac_high, frac_low, n_pairs) over
within-year pairs. Filters out degenerate features (zero norm or
non-finite entries) before computing."""
mat = np.stack(feats).astype(np.float64)
# Drop rows with non-finite entries or zero norm
finite = np.all(np.isfinite(mat), axis=1)
norms = np.linalg.norm(mat, axis=1)
keep = finite & (norms > 1e-6)
mat = mat[keep]
norms = norms[keep]
if len(mat) < 2:
return (float('nan'), float('nan'), 0.0, 0.0, 0)
mat_n = mat / norms[:, None]
sim = mat_n @ mat_n.T
iu = np.triu_indices(len(mat), k=1)
vals = sim[iu]
vals = vals[np.isfinite(vals)]
n_pairs = len(vals)
if n_pairs == 0:
return (float('nan'), float('nan'), 0.0, 0.0, 0)
n_high = int(np.sum(vals >= THRESH_REPLICATED))
n_low = int(np.sum(vals < THRESH_HANDSIGN))
return (float(vals.max()), float(vals.min()),
n_high / n_pairs, n_low / n_pairs, n_pairs)
def iterate_groups():
"""Stream rows ordered by (CPA, year); yield completed groups."""
conn = sqlite3.connect(DB)
cur = conn.cursor()
cur.execute('''
SELECT s.assigned_accountant,
substr(s.year_month, 1, 4) AS year,
s.feature_vector,
a.firm
FROM signatures s
LEFT JOIN accountants a ON a.name = s.assigned_accountant
WHERE s.feature_vector IS NOT NULL
AND s.assigned_accountant IS NOT NULL
AND s.year_month IS NOT NULL
ORDER BY s.assigned_accountant, year
''')
cur_key = None
cur_feats = []
cur_firm = None
for cpa, year, fv, firm in cur:
key = (cpa, year)
if key != cur_key:
if cur_key is not None and cur_feats:
yield cur_key, cur_feats, cur_firm
cur_key = key
cur_feats = []
cur_firm = firm
cur_feats.append(np.frombuffer(fv, dtype=np.float32).copy())
if cur_key is not None and cur_feats:
yield cur_key, cur_feats, cur_firm
conn.close()
def main():
print('Streaming (CPA, year) groups from DB...')
results = []
total_groups = 0
kept_groups = 0
for (cpa, year), feats, firm in iterate_groups():
total_groups += 1
if len(feats) < MIN_SIGS:
continue
kept_groups += 1
max_c, min_c, frac_high, frac_low, n_pairs = pairwise_stats(feats)
cls = classify(frac_high)
results.append({
'cpa': cpa,
'year': year,
'n_sigs': len(feats),
'n_pairs': n_pairs,
'firm': firm or 'UNKNOWN',
'firm_bucket': firm_bucket(firm),
'period': period_bin(year),
'max_cos': round(max_c, 4),
'min_cos': round(min_c, 4),
'frac_high': round(frac_high, 4),
'frac_low': round(frac_low, 4),
'class': cls,
'is_a2_violation': is_a2_violation(cls),
})
print(f' total groups: {total_groups}')
print(f' groups with n >= {MIN_SIGS}: {kept_groups}')
total = len(results)
if total == 0:
print('No groups to analyze.')
return
# Overall tally
overall = defaultdict(int)
for r in results:
overall[r['class']] += 1
print('\n=== Overall classification ===')
for c in CLASSES:
n = overall[c]
print(f' {c:25s}: {n:5d} ({100*n/total:.2f}%)')
# Stratifications
by_firm = defaultdict(lambda: defaultdict(int))
by_period = defaultdict(lambda: defaultdict(int))
by_fp = defaultdict(lambda: defaultdict(int))
for r in results:
by_firm[r['firm_bucket']]['total'] += 1
by_firm[r['firm_bucket']][r['class']] += 1
if r['is_a2_violation']:
by_firm[r['firm_bucket']]['a2_violation'] += 1
by_period[r['period']]['total'] += 1
by_period[r['period']][r['class']] += 1
if r['is_a2_violation']:
by_period[r['period']]['a2_violation'] += 1
key = (r['firm_bucket'], r['period'])
by_fp[key]['total'] += 1
by_fp[key][r['class']] += 1
if r['is_a2_violation']:
by_fp[key]['a2_violation'] += 1
print('\n=== By firm bucket ===')
for fb in FIRM_BUCKETS:
d = by_firm[fb]
t = d['total']
if t == 0:
continue
print(f' {fb} (N = {t}):')
for c in CLASSES:
n = d[c]
print(f' {c:25s}: {n:5d} ({100*n/t:.2f}%)')
print('\n=== By period ===')
for p in PERIODS:
d = by_period[p]
t = d['total']
if t == 0:
continue
print(f' {p} (N = {t}):')
for c in CLASSES:
n = d[c]
print(f' {c:25s}: {n:5d} ({100*n/t:.2f}%)')
print('\n=== Firm x Period: A2 violation rate (any of mostly_hand, '
'substantial_mixture, mostly_stamp) ===')
header = ' {:25s}'.format('') + \
''.join(f'{p[:18]:>22}' for p in PERIODS)
print(header)
for fb in FIRM_BUCKETS:
cells = []
for p in PERIODS:
d = by_fp[(fb, p)]
t = d['total']
if t == 0:
cells.append('-')
else:
rate = 100 * d['a2_violation'] / t
cells.append(f'{rate:.2f}% ({d["a2_violation"]}/{t})')
row = ' {:25s}'.format(fb) + ''.join(f'{c:>22}' for c in cells)
print(row)
# Substantial-mixture-only Firm x Period (strictest A2 violation subset)
print('\n=== Firm x Period: substantial_mixture rate (strictest) ===')
print(header)
for fb in FIRM_BUCKETS:
cells = []
for p in PERIODS:
d = by_fp[(fb, p)]
t = d['total']
if t == 0:
cells.append('-')
else:
rate = 100 * d['substantial_mixture'] / t
cells.append(
f'{rate:.2f}% ({d["substantial_mixture"]}/{t})')
row = ' {:25s}'.format(fb) + ''.join(f'{c:>22}' for c in cells)
print(row)
# Outputs
json_out = {
'generated_at': datetime.now(timezone.utc).isoformat(),
'thresholds': {
'replicated_cosine': THRESH_REPLICATED,
'handsigned_cosine': THRESH_HANDSIGN,
},
'min_signatures_per_year': MIN_SIGS,
'N_total_groups': total_groups,
'N_kept_groups': kept_groups,
'overall': {c: overall[c] for c in CLASSES},
'by_firm_bucket': {
fb: dict(by_firm[fb]) for fb in FIRM_BUCKETS if by_firm[fb]['total']
},
'by_period': {
p: dict(by_period[p]) for p in PERIODS if by_period[p]['total']
},
'by_firm_x_period': {
f'{fb}|{p}': dict(by_fp[(fb, p)])
for fb in FIRM_BUCKETS for p in PERIODS
if by_fp[(fb, p)]['total']
},
}
with open(OUT / 'within_year_uniformity.json', 'w', encoding='utf-8') as f:
json.dump(json_out, f, ensure_ascii=False, indent=2)
# CSV audit trail: all rows with all metrics
csv_fields = [
'cpa', 'firm', 'firm_bucket', 'year', 'period',
'n_sigs', 'n_pairs', 'max_cos', 'min_cos',
'frac_high', 'frac_low', 'class', 'is_a2_violation',
]
csv_path = OUT / 'all_cpa_year_rows.csv'
with open(csv_path, 'w', newline='', encoding='utf-8') as f:
w = csv.DictWriter(f, fieldnames=csv_fields)
w.writeheader()
for r in sorted(results,
key=lambda x: (x['firm_bucket'], x['year'], x['cpa'])):
w.writerow({k: r[k] for k in csv_fields})
# CSV: substantial_mixture rows only (strictest A2 violation subset)
mixed_path = OUT / 'substantial_mixture_candidates.csv'
with open(mixed_path, 'w', newline='', encoding='utf-8') as f:
w = csv.DictWriter(f, fieldnames=csv_fields)
w.writeheader()
for r in sorted(results,
key=lambda x: (x['firm_bucket'], x['year'], x['cpa'])):
if r['class'] == 'substantial_mixture':
w.writerow({k: r[k] for k in csv_fields})
# Markdown
md = build_markdown(overall, by_firm, by_period, by_fp, total,
total_groups, kept_groups)
with open(OUT / 'within_year_uniformity.md', 'w', encoding='utf-8') as f:
f.write(md)
print(f'\n=> Outputs in {OUT}')
def build_markdown(overall, by_firm, by_period, by_fp, total,
total_groups, kept_groups):
ts = datetime.now(timezone.utc).isoformat()
L = []
L.append('# Within-Auditor-Year Uniformity Check (A2 Empirical Test)')
L.append('')
L.append(f'Generated: {ts}')
L.append('')
L.append('## Method')
L.append('')
L.append(f'For each (CPA, fiscal year) with >= {MIN_SIGS} signatures, '
'compute all within-year pairwise cosine similarities and '
f'derive frac_high = fraction of pairs with cos >= {THRESH_REPLICATED}. '
'Classification is based on frac_high; this is robust to stamp-'
'output variance, template switches, and isolated outliers.')
L.append('')
L.append(f'- `strict_full_hand`: frac_high = 0 '
'(no near-identical pair; full-year hand-signing)')
L.append(f'- `mostly_hand`: 0 < frac_high <= 0.1 '
'(isolated near-identical pair; dominant hand-sign with possibly '
'one template reuse)')
L.append(f'- `substantial_mixture`: 0.1 < frac_high <= 0.5 '
'(material minority of signatures replicated; clearest A2 '
'violation signature)')
L.append(f'- `mostly_stamp`: 0.5 < frac_high <= 0.9 '
'(stamp-dominant with non-trivial variance or minority of '
'non-stamped signatures)')
L.append(f'- `strict_full_stamp`: frac_high > 0.9 '
'(near-all pairs near-identical; full-year replication with '
'modest variance allowed)')
L.append('')
L.append('**A2 violation candidates** = `mostly_hand` '
'`substantial_mixture` `mostly_stamp` (anything that is not '
'`strict_full_hand` and not `strict_full_stamp`).')
L.append('')
L.append(f'Total (CPA, year) groups in DB: {total_groups}; '
f'groups with n >= {MIN_SIGS}: {kept_groups}.')
L.append('')
L.append('## Overall')
L.append('')
L.append('| Class | N | Share |')
L.append('|---|---|---|')
for c in CLASSES:
n = overall[c]
L.append(f'| `{c}` | {n} | {100*n/total:.2f}% |')
L.append('')
def row(label, d, t):
cells = [label, str(t)]
for c in CLASSES:
n = d[c]
cells.append(f'{n} ({100*n/t:.2f}%)')
av = d['a2_violation']
cells.append(f'{av} ({100*av/t:.2f}%)')
return '| ' + ' | '.join(cells) + ' |'
header = ('| Bucket | N | ' + ' | '.join(f'`{c}`' for c in CLASSES)
+ ' | A2 violation (union) |')
sep = '|' + '|'.join(['---'] * (len(CLASSES) + 3)) + '|'
L.append('## By firm bucket')
L.append('')
L.append(header)
L.append(sep)
for fb in FIRM_BUCKETS:
d = by_firm[fb]
t = d['total']
if t == 0:
continue
L.append(row(fb, d, t))
L.append('')
L.append('## By period')
L.append('')
L.append(header.replace('Bucket', 'Period'))
L.append(sep)
for p in PERIODS:
d = by_period[p]
t = d['total']
if t == 0:
continue
L.append(row(p, d, t))
L.append('')
L.append('## Firm x Period: A2 violation rate (union of '
'`mostly_hand`, `substantial_mixture`, `mostly_stamp`)')
L.append('')
L.append('| Firm | 2013-2018 (pre) | 2019-2021 (transition) | '
'2022-2023 (post) |')
L.append('|---|---|---|---|')
for fb in FIRM_BUCKETS:
cells = []
for p in PERIODS:
d = by_fp[(fb, p)]
t = d['total']
if t == 0:
cells.append('-')
else:
rate = 100 * d['a2_violation'] / t
cells.append(f'{rate:.2f}% ({d["a2_violation"]}/{t})')
L.append(f'| {fb} | ' + ' | '.join(cells) + ' |')
L.append('')
L.append('## Firm x Period: `substantial_mixture` rate (strictest subset)')
L.append('')
L.append('| Firm | 2013-2018 (pre) | 2019-2021 (transition) | '
'2022-2023 (post) |')
L.append('|---|---|---|---|')
for fb in FIRM_BUCKETS:
cells = []
for p in PERIODS:
d = by_fp[(fb, p)]
t = d['total']
if t == 0:
cells.append('-')
else:
rate = 100 * d['substantial_mixture'] / t
cells.append(
f'{rate:.2f}% ({d["substantial_mixture"]}/{t})')
L.append(f'| {fb} | ' + ' | '.join(cells) + ' |')
L.append('')
L.append('## Interpretation guide')
L.append('')
L.append('- Low A2-violation union rate overall (e.g. < 10%): A2 is '
'empirically well-supported; report as Methodology III-G '
'robustness check.')
L.append('- High `substantial_mixture` rate specifically (e.g. > 5% '
'at Big-4 B-D in 2019-2021): A2 weakens in the digitalization '
'transition; IV-H.1 partner-level reading may need restriction '
'to Firm A or pre-2019 period.')
L.append('- High `substantial_mixture` rate at Firm A itself: unexpected; '
'Firm A industry-practice defense of A2 would need revisiting.')
L.append('')
return '\n'.join(L)
if __name__ == '__main__':
main()