"""Expert 外部資料來源服務 提供從 Wikidata SPARQL 和 ConceptNet API 獲取職業/領域資料的功能。 """ import logging import random from abc import ABC, abstractmethod from typing import List, Optional, Tuple import httpx from .expert_cache import expert_cache logger = logging.getLogger(__name__) class ExpertSourceProvider(ABC): """外部來源提供者抽象類""" @abstractmethod async def fetch_occupations( self, count: int, language: str = "zh" ) -> List[dict]: """ 獲取職業列表 Args: count: 需要的職業數量 language: 語言代碼 (zh/en) Returns: 職業資料列表 [{"name": "...", "domain": "..."}, ...] """ pass class WikidataProvider(ExpertSourceProvider): """Wikidata SPARQL 查詢提供者""" ENDPOINT = "https://query.wikidata.org/sparql" def __init__(self): self.client = httpx.AsyncClient(timeout=30.0) async def fetch_occupations( self, count: int, language: str = "zh" ) -> List[dict]: """從 Wikidata 獲取職業列表""" cache_key = f"wikidata:{language}:occupations" # 檢查快取 cached = expert_cache.get(cache_key) if cached: logger.info(f"Wikidata cache hit: {len(cached)} occupations") return self._random_select(cached, count) # SPARQL 查詢 query = self._build_sparql_query(language) try: response = await self.client.get( self.ENDPOINT, params={"query": query, "format": "json"}, headers={"Accept": "application/sparql-results+json"} ) response.raise_for_status() data = response.json() occupations = self._parse_sparql_response(data, language) if occupations: expert_cache.set(cache_key, occupations) logger.info(f"Wikidata fetched: {len(occupations)} occupations") return self._random_select(occupations, count) except Exception as e: logger.error(f"Wikidata query failed: {e}") raise def _build_sparql_query(self, language: str) -> str: """建構 SPARQL 查詢""" lang_filter = f'FILTER(LANG(?occupationLabel) = "{language}")' return f""" SELECT DISTINCT ?occupation ?occupationLabel ?fieldLabel WHERE {{ ?occupation wdt:P31 wd:Q28640. ?occupation rdfs:label ?occupationLabel. {lang_filter} OPTIONAL {{ ?occupation wdt:P425 ?field. ?field rdfs:label ?fieldLabel. FILTER(LANG(?fieldLabel) = "{language}") }} }} LIMIT 500 """ def _parse_sparql_response(self, data: dict, language: str) -> List[dict]: """解析 SPARQL 回應""" results = [] bindings = data.get("results", {}).get("bindings", []) for item in bindings: name = item.get("occupationLabel", {}).get("value", "") field = item.get("fieldLabel", {}).get("value", "") if name and len(name) >= 2: results.append({ "name": name, "domain": field if field else self._infer_domain(name) }) return results def _infer_domain(self, occupation_name: str) -> str: """根據職業名稱推斷領域""" # 簡單的領域推斷規則 domain_keywords = { "醫": "醫療健康", "師": "專業服務", "工程": "工程技術", "設計": "設計創意", "藝術": "藝術文化", "運動": "體育運動", "農": "農業", "漁": "漁業", "商": "商業貿易", "法": "法律", "教": "教育", "研究": "學術研究", } for keyword, domain in domain_keywords.items(): if keyword in occupation_name: return domain return "專業領域" def _random_select(self, items: List[dict], count: int) -> List[dict]: """隨機選取指定數量""" if len(items) <= count: return items return random.sample(items, count) async def close(self): await self.client.aclose() class ConceptNetProvider(ExpertSourceProvider): """ConceptNet API 查詢提供者""" ENDPOINT = "https://api.conceptnet.io" def __init__(self): self.client = httpx.AsyncClient(timeout=30.0) async def fetch_occupations( self, count: int, language: str = "zh" ) -> List[dict]: """從 ConceptNet 獲取職業相關概念""" cache_key = f"conceptnet:{language}:occupations" # 檢查快取 cached = expert_cache.get(cache_key) if cached: logger.info(f"ConceptNet cache hit: {len(cached)} concepts") return self._random_select(cached, count) # 查詢職業相關概念 lang_code = "zh" if language == "zh" else "en" start_concept = f"/c/{lang_code}/職業" if lang_code == "zh" else f"/c/{lang_code}/occupation" try: occupations = [] # 查詢 IsA 關係 response = await self.client.get( f"{self.ENDPOINT}/query", params={ "start": start_concept, "rel": "/r/IsA", "limit": 100 } ) response.raise_for_status() data = response.json() occupations.extend(self._parse_conceptnet_response(data, lang_code)) # 也查詢 RelatedTo 關係以獲取更多結果 response2 = await self.client.get( f"{self.ENDPOINT}/query", params={ "node": start_concept, "rel": "/r/RelatedTo", "limit": 100 } ) response2.raise_for_status() data2 = response2.json() occupations.extend(self._parse_conceptnet_response(data2, lang_code)) # 去重 seen = set() unique_occupations = [] for occ in occupations: if occ["name"] not in seen: seen.add(occ["name"]) unique_occupations.append(occ) if unique_occupations: expert_cache.set(cache_key, unique_occupations) logger.info(f"ConceptNet fetched: {len(unique_occupations)} concepts") return self._random_select(unique_occupations, count) except Exception as e: logger.error(f"ConceptNet query failed: {e}") raise def _parse_conceptnet_response(self, data: dict, lang_code: str) -> List[dict]: """解析 ConceptNet 回應""" results = [] edges = data.get("edges", []) for edge in edges: # 取得 start 或 end 節點(取決於查詢方向) start = edge.get("start", {}) end = edge.get("end", {}) # 選擇非起始節點的概念 node = end if start.get("@id", "").endswith("職業") or start.get("@id", "").endswith("occupation") else start label = node.get("label", "") term = node.get("term", "") # 過濾:確保是目標語言且有意義 node_id = node.get("@id", "") if f"/c/{lang_code}/" in node_id and label and len(label) >= 2: results.append({ "name": label, "domain": self._infer_domain_from_edge(edge) }) return results def _infer_domain_from_edge(self, edge: dict) -> str: """從 edge 資訊推斷領域""" # ConceptNet 的 edge 包含 surfaceText 可能有額外資訊 surface = edge.get("surfaceText", "") rel = edge.get("rel", {}).get("label", "") if "專業" in surface: return "專業領域" elif "技術" in surface: return "技術領域" else: return "知識領域" def _random_select(self, items: List[dict], count: int) -> List[dict]: """隨機選取指定數量""" if len(items) <= count: return items return random.sample(items, count) async def close(self): await self.client.aclose() class ExpertSourceService: """統一的專家來源服務""" def __init__(self): self.wikidata = WikidataProvider() self.conceptnet = ConceptNetProvider() async def get_experts( self, source: str, count: int, language: str = "zh", fallback_to_llm: bool = True ) -> Tuple[List[dict], str]: """ 從指定來源獲取專家資料 Args: source: 來源類型 ("wikidata" | "conceptnet") count: 需要的專家數量 language: 語言代碼 fallback_to_llm: 失敗時是否允許 fallback(由呼叫者處理) Returns: (專家資料列表, 實際使用的來源) Raises: Exception: 當獲取失敗且不 fallback 時 """ provider = self._get_provider(source) try: experts = await provider.fetch_occupations(count, language) if not experts: raise ValueError(f"No occupations found from {source}") return experts, source except Exception as e: logger.warning(f"Failed to fetch from {source}: {e}") raise def _get_provider(self, source: str) -> ExpertSourceProvider: """根據來源類型取得對應的 provider""" if source == "wikidata": return self.wikidata elif source == "conceptnet": return self.conceptnet else: raise ValueError(f"Unknown source: {source}") async def close(self): """關閉所有 HTTP clients""" await self.wikidata.close() await self.conceptnet.close() # 全域服務實例 expert_source_service = ExpertSourceService()