#!/usr/bin/env python3 """ Script 31: Within-Year Same-CPA Ranking Robustness Check ========================================================== Recomputes the per-auditor-year mean cosine ranking of Table XIV using within-year same-CPA matching only (instead of cross-year same-CPA pool which Table XIV uses by construction). Reports pooled top-10/20/30% Firm A share under the within-year restriction so the partner-level ranking finding can be checked against the cross-year aggregation choice flagged in Section IV-G.2. Definition (within-year statistic): For each signature s, with CPA = c, year = y: cos_within(s) = max cosine(s, s') over s' != s, CPA(s')=c, year(s')=y If a (CPA, year) block has only one signature, cos_within is undefined and that signature is dropped from the auditor-year aggregation (matching the same-CPA pair-existence requirement of Section III-G). Outputs: reports/within_year_ranking/within_year_ranking.json reports/within_year_ranking/within_year_ranking.md """ import json import sqlite3 from collections import defaultdict from datetime import datetime from pathlib import Path import numpy as np DB = '/Volumes/NV2/PDF-Processing/signature-analysis/signature_analysis.db' OUT = Path('/Volumes/NV2/PDF-Processing/signature-analysis/reports/' 'within_year_ranking') OUT.mkdir(parents=True, exist_ok=True) FIRM_A = '勤業眾信聯合' MIN_SIGS_PER_AUDITOR_YEAR = 5 def firm_bucket(firm): if firm == '勤業眾信聯合': return 'Firm A' if firm == '安侯建業聯合': return 'Firm B' if firm == '資誠聯合': return 'Firm C' if firm == '安永聯合': return 'Firm D' return 'Non-Big-4' def load_signatures(): conn = sqlite3.connect(DB) cur = conn.cursor() cur.execute(""" SELECT s.signature_id, s.assigned_accountant, a.firm, CAST(substr(s.year_month, 1, 4) AS INTEGER) AS year, s.feature_vector FROM signatures s LEFT JOIN accountants a ON s.assigned_accountant = a.name WHERE s.feature_vector IS NOT NULL AND s.assigned_accountant IS NOT NULL AND s.year_month IS NOT NULL """) rows = cur.fetchall() conn.close() return rows def compute_within_year_max(rows): """Group by (CPA, year), compute max cosine to other same-block sigs.""" blocks = defaultdict(list) # (cpa, year) -> [(sig_id, feat)] for sig_id, cpa, firm, year, blob in rows: if year is None: continue feat = np.frombuffer(blob, dtype=np.float32) blocks[(cpa, int(year))].append((sig_id, feat, firm)) sig_max_within = {} # sig_id -> max within-year same-CPA cosine sig_meta = {} # sig_id -> (cpa, year, firm) for (cpa, year), entries in blocks.items(): if len(entries) < 2: continue # singleton: max-within is undefined feats = np.stack([e[1] for e in entries]) # (n, 2048) sims = feats @ feats.T # (n, n) np.fill_diagonal(sims, -np.inf) maxs = sims.max(axis=1) for i, (sig_id, _, firm) in enumerate(entries): sig_max_within[sig_id] = float(maxs[i]) sig_meta[sig_id] = (cpa, year, firm) return sig_max_within, sig_meta def auditor_year_aggregation(sig_max_within, sig_meta): by_ay = defaultdict(list) # (cpa, year) -> list of cos for sig_id, cos in sig_max_within.items(): cpa, year, firm = sig_meta[sig_id] by_ay[(cpa, year)].append(cos) rows = [] for (cpa, year), vals in by_ay.items(): if len(vals) < MIN_SIGS_PER_AUDITOR_YEAR: continue firm = sig_meta[next(s for s in sig_max_within if sig_meta[s][0] == cpa and sig_meta[s][1] == year)][2] rows.append({ 'acct': cpa, 'year': year, 'firm': firm, 'cos_mean_within_year': float(np.mean(vals)), 'n': len(vals), }) return rows def top_k_breakdown(rows, k_pcts=(10, 20, 25, 30, 50)): sorted_rows = sorted(rows, key=lambda r: -r['cos_mean_within_year']) N = len(sorted_rows) out = {} for k_pct in k_pcts: k = max(1, int(N * k_pct / 100)) top = sorted_rows[:k] counts = defaultdict(int) for r in top: counts[firm_bucket(r['firm'])] += 1 out[f'top_{k_pct}pct'] = { 'k': k, 'firm_counts': dict(counts), 'firm_a_share': counts['Firm A'] / k, } return out def per_year_top_k(rows, k_pcts=(10, 20, 30)): years = sorted(set(r['year'] for r in rows)) out = {} for y in years: yr = [r for r in rows if r['year'] == y] if not yr: continue sr = sorted(yr, key=lambda r: -r['cos_mean_within_year']) n_y = len(sr) n_a = sum(1 for r in sr if r['firm'] == FIRM_A) per = {'n_auditor_years': n_y, 'firm_a_baseline_share': n_a / n_y, 'top_k': {}} for kp in k_pcts: k = max(1, int(n_y * kp / 100)) n_a_top = sum(1 for r in sr[:k] if r['firm'] == FIRM_A) per['top_k'][f'top_{kp}pct'] = { 'k': k, 'firm_a_in_top': n_a_top, 'firm_a_share': n_a_top / k, } out[y] = per return out def main(): print('Loading signatures + features...') rows = load_signatures() print(f' loaded {len(rows):,}') print('Computing within-year same-CPA max cosine...') sig_max_within, sig_meta = compute_within_year_max(rows) print(f' signatures with within-year pair: {len(sig_max_within):,}') n_dropped = len(rows) - len(sig_max_within) print(f' dropped (singleton within year): {n_dropped:,}') ay_rows = auditor_year_aggregation(sig_max_within, sig_meta) print(f' auditor-years (>={MIN_SIGS_PER_AUDITOR_YEAR} sigs ' f'with within-year pair): {len(ay_rows):,}') pooled = top_k_breakdown(ay_rows) yearly = per_year_top_k(ay_rows) payload = { 'generated_at': datetime.now().isoformat(timespec='seconds'), 'n_signatures_loaded': len(rows), 'n_signatures_with_within_year_pair': len(sig_max_within), 'n_singleton_dropped': n_dropped, 'min_sigs_per_auditor_year': MIN_SIGS_PER_AUDITOR_YEAR, 'n_auditor_years': len(ay_rows), 'n_firm_a_auditor_years': sum(1 for r in ay_rows if r['firm'] == FIRM_A), 'pooled_top_k': pooled, 'yearly_top_k': yearly, } json_path = OUT / 'within_year_ranking.json' json_path.write_text(json.dumps(payload, indent=2, ensure_ascii=False), encoding='utf-8') print(f'\nWrote {json_path}') # Markdown md = ['# Within-Year Same-CPA Ranking Robustness', '', f"Generated: {payload['generated_at']}", '', ('Per-signature best-match cosine recomputed using within-year ' 'same-CPA matching only. See Script 31 docstring for the ' 'precise definition.'), '', f"- Signatures loaded: {len(rows):,}", f"- Signatures with at least one within-year same-CPA pair: " f"{len(sig_max_within):,}", f"- Singletons dropped (no within-year pair): {n_dropped:,}", f"- Auditor-years with >= {MIN_SIGS_PER_AUDITOR_YEAR} sigs: " f"{len(ay_rows):,}", f"- Firm A auditor-years: {payload['n_firm_a_auditor_years']:,} " f"({100*payload['n_firm_a_auditor_years']/len(ay_rows):.1f}% baseline)", '', '## Pooled (2013-2023) top-K Firm A share', '', '| Top-K | k | Firm A share | A | B | C | D | NB4 |', '|-------|---|--------------|---|---|---|---|-----|'] for kp in [10, 20, 25, 30, 50]: d = pooled[f'top_{kp}pct'] c = d['firm_counts'] md.append(f"| {kp}% | {d['k']:,} | " f"{100*d['firm_a_share']:.1f}% | " f"{c.get('Firm A', 0)} | {c.get('Firm B', 0)} | " f"{c.get('Firm C', 0)} | {c.get('Firm D', 0)} | " f"{c.get('Non-Big-4', 0)} |") md.extend(['', '## Year-by-year top-K Firm A share', '', '| Year | n AY | Top-10% share | Top-20% share | ' 'Top-30% share | A baseline |', '|------|------|---------------|---------------|' '---------------|------------|']) for y in sorted(yearly): per = yearly[y] line = (f"| {y} | {per['n_auditor_years']:,} ") for kp in [10, 20, 30]: d = per['top_k'][f'top_{kp}pct'] line += (f"| {100*d['firm_a_share']:.1f}% " f"({d['firm_a_in_top']}/{d['k']}) ") line += f"| {100*per['firm_a_baseline_share']:.1f}% |" md.append(line) md_path = OUT / 'within_year_ranking.md' md_path.write_text('\n'.join(md) + '\n', encoding='utf-8') print(f'Wrote {md_path}') if __name__ == '__main__': main()