import sqlite3 from collections import defaultdict, Counter import numpy as np DB='/Volumes/NV2/PDF-Processing/signature-analysis/signature_analysis.db' FIRM_A='勤業眾信聯合'; BIG4=('勤業眾信聯合','安侯建業聯合','資誠聯合','安永聯合') ALIAS={'勤業眾信聯合':'A','安侯建業聯合':'B','資誠聯合':'C','安永聯合':'D'} SEED=42; POP=np.array([bin(i).count('1') for i in range(256)],dtype=np.uint8) def wilson(k,n,z=1.96): if n==0: return (None,None) p=k/n; d=1+z*z/n; c=(p+z*z/(2*n))/d; h=z*np.sqrt(p*(1-p)/n+z*z/(4*n*n))/d return (max(0,c-h),min(1,c+h)) def load(): c=sqlite3.connect(f'file:{DB}?mode=ro',uri=True); cur=c.cursor() cur.execute("""SELECT s.assigned_accountant,a.firm,s.source_pdf,s.feature_vector,s.dhash_vector, CAST(substr(s.year_month,1,4) AS INT) FROM signatures s JOIN accountants a ON s.assigned_accountant=a.name WHERE s.assigned_accountant IS NOT NULL AND a.firm IS NOT NULL AND s.feature_vector IS NOT NULL AND s.dhash_vector IS NOT NULL""") r=cur.fetchall(); c.close(); return r def canonical_sampler(rng,n,n_pool,same_cpa,all_idx): need=n_pool; cand=[]; att=0 while need>0 and att<10: draw=rng.choice(n,size=need*2,replace=True); ok=draw[~np.isin(draw,same_cpa)] cand.extend(ok[:need].tolist()); need-=len(ok[:need]); att+=1 if need>0: pm=np.ones(n,bool); pm[same_cpa]=False cand.extend(rng.choice(all_idx[pm],size=need,replace=False).tolist()) return np.array(cand[:n_pool],dtype=np.int64) def simulate(keep): n=len(keep); feats=np.stack([np.frombuffer(r[3],np.float32) for r in keep]).astype(np.float32) nr=np.linalg.norm(feats,axis=1,keepdims=True); nr[nr==0]=1; feats=feats/nr dh=np.stack([np.frombuffer(r[4],np.uint8) for r in keep]); cpas=np.array([r[0] for r in keep]) cpa_idx=defaultdict(list) for i,c in enumerate(cpas): cpa_idx[c].append(i) cpa_idx={c:np.array(v) for c,v in cpa_idx.items()}; ps={c:len(v)-1 for c,v in cpa_idx.items()} all_idx=np.arange(n); rng=np.random.default_rng(SEED) mc=np.zeros(n,np.float32); md=np.full(n,64,np.int32) for si in range(n): p=ps[cpas[si]] if p<=0: continue cand=canonical_sampler(rng,n,p,cpa_idx[cpas[si]],all_idx) mc[si]=(feats[cand]@feats[si]).max(); md[si]=int(POP[dh[cand]^dh[si]].sum(axis=1).min()) return mc,md def iccr(keep,label): mc,md=simulate(keep); n=len(keep) hc=(mc>0.95)&(md<=5); d2=(mc>0.95)&(md<=15) un=(mc>0.837)&(mc<=0.95); hsc=(mc>0.95)&(md>15) print(f"\n== {label} (n_sig={n:,}) ==") for nm,a in [('HC',hc),('HC+MC',d2),('UN-band',un),('HSC-band',hsc)]: k=int(a.sum()); lo,hi=wilson(k,n); print(f" ICCR per-sig {nm}: {k/n:.6f} ({k}/{n}) [{lo:.5f},{hi:.5f}]") def a_oos(rows,label): A=[r for r in rows if r[1]==FIRM_A]; BCD=[r for r in rows if r[1] in BIG4 and r[1]!=FIRM_A] bf=np.stack([np.frombuffer(r[3],np.float32) for r in BCD]).astype(np.float32) bn=np.linalg.norm(bf,axis=1,keepdims=True); bn[bn==0]=1; bf=bf/bn bdh=np.stack([np.frombuffer(r[4],np.uint8) for r in BCD]); nb=bf.shape[0] ac=defaultdict(list) for i,r in enumerate(A): ac[r[0]].append(i) ps={c:len(v)-1 for c,v in ac.items()}; rng=np.random.default_rng(SEED); hc=np.zeros(len(A),bool) for i,r in enumerate(A): p=ps[r[0]] if p<=0: continue cand=rng.choice(nb,size=p,replace=True); sf=np.frombuffer(r[3],np.float32).astype(np.float32); sf=sf/max(np.linalg.norm(sf),1e-9) mc=(bf[cand]@sf).max(); mdv=int(POP[bdh[cand]^np.frombuffer(r[4],np.uint8)].sum(axis=1).min()) hc[i]=(mc>0.95)and(mdv<=5) k=int(hc.sum()); n=len(A); lo,hi=wilson(k,n) print(f"\n== Firm A OOS vs {label} BCD pool == per-sig HC: {k/n:.6f} ({k}/{n}) [{lo:.6f},{hi:.6f}]") rows=load() bcd_all=[r for r in rows if r[1] in BIG4 and r[1]!=FIRM_A] bcd_19=[r for r in bcd_all if 2013<=r[5]<=2019] iccr(bcd_19,'BCD 2013-2019 (verify per-sig HC~0.0059)') a_oos([r for r in rows if 2013<=r[5]<=2019],'2013-2019') a_oos(rows,'full-period')