#!/usr/bin/env python3 """Script 53: BCD-only firm-effect logistic regression (Firm D reference) and BCD-only cross-firm hit matrix. Candidate pool = BCD (exclude Firm A and same-CPA). Canonical retry-loop sampler, any-pair + same-pair. Read-only. Replicates Script 44's logistic_fit and matrix logic, restricted to BCD. """ import sqlite3 from collections import defaultdict import numpy as np DB = '/Volumes/NV2/PDF-Processing/signature-analysis/signature_analysis.db' FIRM_A = '勤業眾信聯合' BIG4 = ('勤業眾信聯合', '安侯建業聯合', '資誠聯合', '安永聯合') ALIAS = {'勤業眾信聯合': 'A', '安侯建業聯合': 'B', '資誠聯合': 'C', '安永聯合': 'D'} SEED = 42 POP = np.array([bin(i).count('1') for i in range(256)], dtype=np.uint8) def logistic_fit(X, y, max_iter=200, l2=0.001): n, k = X.shape beta = np.zeros(k) for _ in range(max_iter): eta = np.clip(X @ beta, -30, 30) p = 1.0/(1.0+np.exp(-eta)) grad = X.T @ (y-p) - l2*beta W = p*(1-p) H = -(X.T*W) @ X - l2*np.eye(k) try: delta = np.linalg.solve(H, grad) except np.linalg.LinAlgError: delta = 0.3*grad nb = beta - delta if np.max(np.abs(nb-beta)) < 1e-8: beta = nb; break beta = nb eta = np.clip(X @ beta, -30, 30) p = 1.0/(1.0+np.exp(-eta)); W = p*(1-p) cov = np.linalg.inv((X.T*W) @ X + l2*np.eye(k)) return beta, np.sqrt(np.diag(cov)) conn = sqlite3.connect(f'file:{DB}?mode=ro', uri=True) cur = conn.cursor() cur.execute("""SELECT s.assigned_accountant, a.firm, s.feature_vector, s.dhash_vector FROM signatures s JOIN accountants a ON s.assigned_accountant=a.name WHERE s.assigned_accountant IS NOT NULL AND a.firm IN (?,?,?) AND s.feature_vector IS NOT NULL AND s.dhash_vector IS NOT NULL""", ('安侯建業聯合', '資誠聯合', '安永聯合')) rows = cur.fetchall() conn.close() n = len(rows) feats = np.stack([np.frombuffer(r[2], np.float32) for r in rows]).astype(np.float32) norms = np.linalg.norm(feats, axis=1, keepdims=True); norms[norms == 0] = 1.0 feats = feats / norms dh = np.stack([np.frombuffer(r[3], np.uint8) for r in rows]) cpas = np.array([r[0] for r in rows]) firms = np.array([ALIAS[r[1]] for r in rows]) cpa_idx = defaultdict(list) for i, c in enumerate(cpas): cpa_idx[c].append(i) cpa_idx = {c: np.array(v) for c, v in cpa_idx.items()} pool_size = {c: len(v)-1 for c, v in cpa_idx.items()} all_idx = np.arange(n) print(f'BCD signatures: {n:,}; CPAs: {len(cpa_idx)}') rng = np.random.default_rng(SEED) hit_any = np.zeros(n, bool) hit_same = np.zeros(n, bool) cand_firm_maxcos = np.empty(n, dtype=object) # any-pair partner firm cand_firm_same = np.empty(n, dtype=object) psize = np.zeros(n, np.int32) for si in range(n): np_ = pool_size[cpas[si]]; psize[si] = np_ if np_ <= 0: continue same = cpa_idx[cpas[si]] need = np_; cand = []; att = 0 while need > 0 and att < 10: draw = rng.choice(n, size=need*2, replace=True) ok = draw[~np.isin(draw, same)] cand.extend(ok[:need].tolist()); need -= len(ok[:need]); att += 1 if need > 0: pm = np.ones(n, bool); pm[same] = False cand.extend(rng.choice(all_idx[pm], size=need, replace=False).tolist()) cand = np.array(cand[:np_], dtype=np.int64) cosv = feats[cand] @ feats[si] dist = POP[dh[cand] ^ dh[si]].sum(axis=1) mc = int(np.argmax(cosv)); md = int(np.argmin(dist)) if cosv[mc] > 0.95 and dist[md] <= 5: hit_any[si] = True cand_firm_maxcos[si] = firms[cand[mc]] spm = (cosv > 0.95) & (dist <= 5) if spm.any(): hit_same[si] = True cand_firm_same[si] = firms[cand[int(np.argmax(spm))]] # ---- Logistic regression: hit_any ~ FirmB + FirmC + log(pool), Firm D reference ---- hp = psize > 0 y = hit_any[hp].astype(np.float64) fa = firms[hp] lp = np.log(psize[hp].astype(np.float64)); lp = lp - lp.mean() X = np.column_stack([np.ones(y.shape), (fa == 'B').astype(float), (fa == 'C').astype(float), lp]) beta, se = logistic_fit(X, y) print(f'\n[BCD logistic: hit_any ~ FirmB + FirmC + log(pool); Firm D = reference] n={len(y):,}, y_mean={y.mean():.4f}') for nm, b, s in zip(['intercept(FirmD)', 'FirmB', 'FirmC', 'log(pool,centred)'], beta, se): print(f' {nm}: beta={b:+.4f} SE={s:.4f} OR={np.exp(b):.4f} z~{abs(b)/s if s>0 else float("inf"):.2f}') # ---- Cross-firm hit matrix (any-pair max-cos partner) ---- print('\n[BCD cross-firm hit matrix: any-pair, source firm x max-cos partner firm]') print(' src -> B C D | within-firm% (n hits)') for sf in ['B', 'C', 'D']: m = (firms == sf) & hit_any parts = cand_firm_maxcos[m] tot = len(parts) cnt = {f: int((parts == f).sum()) for f in ['B', 'C', 'D']} wf = cnt[sf]/tot if tot else 0 print(f' {sf} {cnt["B"]:5d} {cnt["C"]:5d} {cnt["D"]:5d} | {wf:6.1%} ({tot})') print('\n[BCD same-pair within-firm concentration]') for sf in ['B', 'C', 'D']: m = (firms == sf) & hit_same parts = cand_firm_same[m]; tot = len(parts) wf = int((parts == sf).sum())/tot if tot else 0 print(f' Firm {sf}: {wf:.1%} within-firm ({int((parts==sf).sum())}/{tot})')