From 43785db5957793349279ea11eb445a9beb4326f2 Mon Sep 17 00:00:00 2001 From: gbanyan Date: Thu, 4 Dec 2025 11:42:48 +0800 Subject: [PATCH] feat: Add external expert sources (Wikidata SPARQL + ConceptNet API) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add expert_cache.py: TTL-based in-memory cache (1 hour default) - Add expert_source_service.py: WikidataProvider and ConceptNetProvider - Wikidata SPARQL queries for occupations with Chinese labels - ConceptNet API queries for occupation-related concepts - Random selection from cached pool - Update schemas.py: Add ExpertSource enum (llm/wikidata/conceptnet) - Update ExpertTransformationRequest with expert_source and expert_language - Update router: Conditionally use external sources with LLM fallback - New SSE events: expert_source, expert_fallback - Update frontend types with ExpertSource 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- backend/app/models/schemas.py | 11 + backend/app/routers/expert_transformation.py | 108 ++++-- backend/app/services/expert_cache.py | 92 +++++ backend/app/services/expert_source_service.py | 331 ++++++++++++++++++ frontend/src/types/index.ts | 4 + 5 files changed, 524 insertions(+), 22 deletions(-) create mode 100644 backend/app/services/expert_cache.py create mode 100644 backend/app/services/expert_source_service.py diff --git a/backend/app/models/schemas.py b/backend/app/models/schemas.py index 2b7451c..25e3648 100644 --- a/backend/app/models/schemas.py +++ b/backend/app/models/schemas.py @@ -206,6 +206,13 @@ class ExpertTransformationDAGResult(BaseModel): results: List[ExpertTransformationCategoryResult] +class ExpertSource(str, Enum): + """專家來源類型""" + LLM = "llm" + WIKIDATA = "wikidata" + CONCEPTNET = "conceptnet" + + class ExpertTransformationRequest(BaseModel): """Expert Transformation Agent 請求""" query: str @@ -217,6 +224,10 @@ class ExpertTransformationRequest(BaseModel): keywords_per_expert: int = 1 # 每個專家為每個屬性生成幾個關鍵字 (1-3) custom_experts: Optional[List[str]] = None # 用戶指定專家 ["藥師", "工程師"] + # Expert source parameters + expert_source: ExpertSource = ExpertSource.LLM # 專家來源 + expert_language: str = "zh" # 外部來源的語言 + # LLM parameters model: Optional[str] = None temperature: Optional[float] = 0.7 diff --git a/backend/app/routers/expert_transformation.py b/backend/app/routers/expert_transformation.py index a88d3ad..0019e68 100644 --- a/backend/app/routers/expert_transformation.py +++ b/backend/app/routers/expert_transformation.py @@ -13,6 +13,7 @@ from ..models.schemas import ( ExpertKeyword, ExpertTransformationCategoryResult, ExpertTransformationDescription, + ExpertSource, ) from ..prompts.expert_transformation_prompt import ( get_expert_generation_prompt, @@ -20,6 +21,7 @@ from ..prompts.expert_transformation_prompt import ( get_single_description_prompt, ) from ..services.llm_service import ollama_provider, extract_json_from_response +from ..services.expert_source_service import expert_source_service logger = logging.getLogger(__name__) router = APIRouter(prefix="/api/expert-transformation", tags=["expert-transformation"]) @@ -35,36 +37,98 @@ async def generate_expert_transformation_events( model = request.model # ========== Step 0: Generate expert team ========== - yield f"event: expert_start\ndata: {json.dumps({'message': '正在組建專家團隊...'}, ensure_ascii=False)}\n\n" + yield f"event: expert_start\ndata: {json.dumps({'message': '正在組建專家團隊...', 'source': request.expert_source.value}, ensure_ascii=False)}\n\n" experts: List[ExpertProfile] = [] + actual_source = request.expert_source.value - try: - expert_prompt = get_expert_generation_prompt( - query=request.query, - categories=all_categories, - expert_count=request.expert_count, - custom_experts=request.custom_experts - ) - logger.info(f"Expert prompt: {expert_prompt[:200]}") + # 決定使用哪種來源生成專家 + use_llm = ( + request.expert_source == ExpertSource.LLM or + request.custom_experts # 有自訂專家時,使用 LLM 補充 + ) - expert_response = await ollama_provider.generate( - expert_prompt, model=model, temperature=temperature - ) - logger.info(f"Expert response: {expert_response[:500]}") + if use_llm: + # LLM 生成專家 + try: + expert_prompt = get_expert_generation_prompt( + query=request.query, + categories=all_categories, + expert_count=request.expert_count, + custom_experts=request.custom_experts + ) + logger.info(f"Expert prompt: {expert_prompt[:200]}") - expert_data = extract_json_from_response(expert_response) - experts_raw = expert_data.get("experts", []) + expert_response = await ollama_provider.generate( + expert_prompt, model=model, temperature=temperature + ) + logger.info(f"Expert response: {expert_response[:500]}") - for exp in experts_raw: - if isinstance(exp, dict) and all(k in exp for k in ["id", "name", "domain"]): - experts.append(ExpertProfile(**exp)) + expert_data = extract_json_from_response(expert_response) + experts_raw = expert_data.get("experts", []) - except Exception as e: - logger.error(f"Failed to generate experts: {e}") - yield f"event: error\ndata: {json.dumps({'error': f'專家團隊生成失敗: {str(e)}'}, ensure_ascii=False)}\n\n" - return + for exp in experts_raw: + if isinstance(exp, dict) and all(k in exp for k in ["id", "name", "domain"]): + experts.append(ExpertProfile(**exp)) + actual_source = "llm" + + except Exception as e: + logger.error(f"Failed to generate experts via LLM: {e}") + yield f"event: error\ndata: {json.dumps({'error': f'專家團隊生成失敗: {str(e)}'}, ensure_ascii=False)}\n\n" + return + else: + # 外部來源生成專家 + try: + experts_data, actual_source = await expert_source_service.get_experts( + source=request.expert_source.value, + count=request.expert_count, + language=request.expert_language + ) + + for i, exp_data in enumerate(experts_data): + experts.append(ExpertProfile( + id=f"expert-{i}", + name=exp_data["name"], + domain=exp_data["domain"], + perspective=f"從{exp_data['domain']}角度思考" + )) + + logger.info(f"Generated {len(experts)} experts from {actual_source}") + + except Exception as e: + # 外部來源失敗,fallback 到 LLM + logger.warning(f"External source failed: {e}, falling back to LLM") + yield f"event: expert_fallback\ndata: {json.dumps({'original': request.expert_source.value, 'fallback': 'llm', 'reason': str(e)}, ensure_ascii=False)}\n\n" + + try: + expert_prompt = get_expert_generation_prompt( + query=request.query, + categories=all_categories, + expert_count=request.expert_count, + custom_experts=request.custom_experts + ) + + expert_response = await ollama_provider.generate( + expert_prompt, model=model, temperature=temperature + ) + + expert_data = extract_json_from_response(expert_response) + experts_raw = expert_data.get("experts", []) + + for exp in experts_raw: + if isinstance(exp, dict) and all(k in exp for k in ["id", "name", "domain"]): + experts.append(ExpertProfile(**exp)) + + actual_source = "llm" + + except Exception as llm_error: + logger.error(f"LLM fallback also failed: {llm_error}") + yield f"event: error\ndata: {json.dumps({'error': f'專家團隊生成失敗: {str(llm_error)}'}, ensure_ascii=False)}\n\n" + return + + # 回報來源資訊 + yield f"event: expert_source\ndata: {json.dumps({'source': actual_source}, ensure_ascii=False)}\n\n" yield f"event: expert_complete\ndata: {json.dumps({'experts': [e.model_dump() for e in experts]}, ensure_ascii=False)}\n\n" if not experts: diff --git a/backend/app/services/expert_cache.py b/backend/app/services/expert_cache.py new file mode 100644 index 0000000..2df8a4d --- /dev/null +++ b/backend/app/services/expert_cache.py @@ -0,0 +1,92 @@ +"""Expert 資料快取模組 + +提供 TTL-based 快取機制,減少外部 API 呼叫。 +""" + +import time +from dataclasses import dataclass +from typing import Dict, List, Optional + + +@dataclass +class CacheEntry: + """快取項目""" + data: List[dict] + timestamp: float + + +class ExpertCache: + """TTL 快取,用於儲存外部來源的職業資料""" + + def __init__(self, ttl_seconds: int = 3600): + """ + 初始化快取 + + Args: + ttl_seconds: 快取存活時間(預設 1 小時) + """ + self._cache: Dict[str, CacheEntry] = {} + self._ttl = ttl_seconds + + def get(self, key: str) -> Optional[List[dict]]: + """ + 取得快取資料 + + Args: + key: 快取鍵(如 "wikidata:zh:occupations") + + Returns: + 快取的資料列表,若不存在或已過期則回傳 None + """ + entry = self._cache.get(key) + if entry is None: + return None + + # 檢查是否過期 + if time.time() - entry.timestamp > self._ttl: + del self._cache[key] + return None + + return entry.data + + def set(self, key: str, data: List[dict]) -> None: + """ + 設定快取資料 + + Args: + key: 快取鍵 + data: 要快取的資料列表 + """ + self._cache[key] = CacheEntry( + data=data, + timestamp=time.time() + ) + + def invalidate(self, key: Optional[str] = None) -> None: + """ + 清除快取 + + Args: + key: 要清除的鍵,若為 None 則清除全部 + """ + if key is None: + self._cache.clear() + elif key in self._cache: + del self._cache[key] + + def get_stats(self) -> dict: + """取得快取統計資訊""" + now = time.time() + valid_count = sum( + 1 for entry in self._cache.values() + if now - entry.timestamp <= self._ttl + ) + return { + "total_entries": len(self._cache), + "valid_entries": valid_count, + "ttl_seconds": self._ttl + } + + +# 全域快取實例 +expert_cache = ExpertCache() diff --git a/backend/app/services/expert_source_service.py b/backend/app/services/expert_source_service.py new file mode 100644 index 0000000..29a7037 --- /dev/null +++ b/backend/app/services/expert_source_service.py @@ -0,0 +1,331 @@ +"""Expert 外部資料來源服務 + +提供從 Wikidata SPARQL 和 ConceptNet API 獲取職業/領域資料的功能。 +""" + +import logging +import random +from abc import ABC, abstractmethod +from typing import List, Optional, Tuple + +import httpx + +from .expert_cache import expert_cache + +logger = logging.getLogger(__name__) + + +class ExpertSourceProvider(ABC): + """外部來源提供者抽象類""" + + @abstractmethod + async def fetch_occupations( + self, count: int, language: str = "zh" + ) -> List[dict]: + """ + 獲取職業列表 + + Args: + count: 需要的職業數量 + language: 語言代碼 (zh/en) + + Returns: + 職業資料列表 [{"name": "...", "domain": "..."}, ...] + """ + pass + + +class WikidataProvider(ExpertSourceProvider): + """Wikidata SPARQL 查詢提供者""" + + ENDPOINT = "https://query.wikidata.org/sparql" + + def __init__(self): + self.client = httpx.AsyncClient(timeout=30.0) + + async def fetch_occupations( + self, count: int, language: str = "zh" + ) -> List[dict]: + """從 Wikidata 獲取職業列表""" + cache_key = f"wikidata:{language}:occupations" + + # 檢查快取 + cached = expert_cache.get(cache_key) + if cached: + logger.info(f"Wikidata cache hit: {len(cached)} occupations") + return self._random_select(cached, count) + + # SPARQL 查詢 + query = self._build_sparql_query(language) + + try: + response = await self.client.get( + self.ENDPOINT, + params={"query": query, "format": "json"}, + headers={"Accept": "application/sparql-results+json"} + ) + response.raise_for_status() + + data = response.json() + occupations = self._parse_sparql_response(data, language) + + if occupations: + expert_cache.set(cache_key, occupations) + logger.info(f"Wikidata fetched: {len(occupations)} occupations") + + return self._random_select(occupations, count) + + except Exception as e: + logger.error(f"Wikidata query failed: {e}") + raise + + def _build_sparql_query(self, language: str) -> str: + """建構 SPARQL 查詢""" + lang_filter = f'FILTER(LANG(?occupationLabel) = "{language}")' + + return f""" + SELECT DISTINCT ?occupation ?occupationLabel ?fieldLabel WHERE {{ + ?occupation wdt:P31 wd:Q28640. + ?occupation rdfs:label ?occupationLabel. + {lang_filter} + + OPTIONAL {{ + ?occupation wdt:P425 ?field. + ?field rdfs:label ?fieldLabel. + FILTER(LANG(?fieldLabel) = "{language}") + }} + }} + LIMIT 500 + """ + + def _parse_sparql_response(self, data: dict, language: str) -> List[dict]: + """解析 SPARQL 回應""" + results = [] + bindings = data.get("results", {}).get("bindings", []) + + for item in bindings: + name = item.get("occupationLabel", {}).get("value", "") + field = item.get("fieldLabel", {}).get("value", "") + + if name and len(name) >= 2: + results.append({ + "name": name, + "domain": field if field else self._infer_domain(name) + }) + + return results + + def _infer_domain(self, occupation_name: str) -> str: + """根據職業名稱推斷領域""" + # 簡單的領域推斷規則 + domain_keywords = { + "醫": "醫療健康", + "師": "專業服務", + "工程": "工程技術", + "設計": "設計創意", + "藝術": "藝術文化", + "運動": "體育運動", + "農": "農業", + "漁": "漁業", + "商": "商業貿易", + "法": "法律", + "教": "教育", + "研究": "學術研究", + } + + for keyword, domain in domain_keywords.items(): + if keyword in occupation_name: + return domain + + return "專業領域" + + def _random_select(self, items: List[dict], count: int) -> List[dict]: + """隨機選取指定數量""" + if len(items) <= count: + return items + return random.sample(items, count) + + async def close(self): + await self.client.aclose() + + +class ConceptNetProvider(ExpertSourceProvider): + """ConceptNet API 查詢提供者""" + + ENDPOINT = "https://api.conceptnet.io" + + def __init__(self): + self.client = httpx.AsyncClient(timeout=30.0) + + async def fetch_occupations( + self, count: int, language: str = "zh" + ) -> List[dict]: + """從 ConceptNet 獲取職業相關概念""" + cache_key = f"conceptnet:{language}:occupations" + + # 檢查快取 + cached = expert_cache.get(cache_key) + if cached: + logger.info(f"ConceptNet cache hit: {len(cached)} concepts") + return self._random_select(cached, count) + + # 查詢職業相關概念 + lang_code = "zh" if language == "zh" else "en" + start_concept = f"/c/{lang_code}/職業" if lang_code == "zh" else f"/c/{lang_code}/occupation" + + try: + occupations = [] + + # 查詢 IsA 關係 + response = await self.client.get( + f"{self.ENDPOINT}/query", + params={ + "start": start_concept, + "rel": "/r/IsA", + "limit": 100 + } + ) + response.raise_for_status() + data = response.json() + occupations.extend(self._parse_conceptnet_response(data, lang_code)) + + # 也查詢 RelatedTo 關係以獲取更多結果 + response2 = await self.client.get( + f"{self.ENDPOINT}/query", + params={ + "node": start_concept, + "rel": "/r/RelatedTo", + "limit": 100 + } + ) + response2.raise_for_status() + data2 = response2.json() + occupations.extend(self._parse_conceptnet_response(data2, lang_code)) + + # 去重 + seen = set() + unique_occupations = [] + for occ in occupations: + if occ["name"] not in seen: + seen.add(occ["name"]) + unique_occupations.append(occ) + + if unique_occupations: + expert_cache.set(cache_key, unique_occupations) + logger.info(f"ConceptNet fetched: {len(unique_occupations)} concepts") + + return self._random_select(unique_occupations, count) + + except Exception as e: + logger.error(f"ConceptNet query failed: {e}") + raise + + def _parse_conceptnet_response(self, data: dict, lang_code: str) -> List[dict]: + """解析 ConceptNet 回應""" + results = [] + edges = data.get("edges", []) + + for edge in edges: + # 取得 start 或 end 節點(取決於查詢方向) + start = edge.get("start", {}) + end = edge.get("end", {}) + + # 選擇非起始節點的概念 + node = end if start.get("@id", "").endswith("職業") or start.get("@id", "").endswith("occupation") else start + + label = node.get("label", "") + term = node.get("term", "") + + # 過濾:確保是目標語言且有意義 + node_id = node.get("@id", "") + if f"/c/{lang_code}/" in node_id and label and len(label) >= 2: + results.append({ + "name": label, + "domain": self._infer_domain_from_edge(edge) + }) + + return results + + def _infer_domain_from_edge(self, edge: dict) -> str: + """從 edge 資訊推斷領域""" + # ConceptNet 的 edge 包含 surfaceText 可能有額外資訊 + surface = edge.get("surfaceText", "") + rel = edge.get("rel", {}).get("label", "") + + if "專業" in surface: + return "專業領域" + elif "技術" in surface: + return "技術領域" + else: + return "知識領域" + + def _random_select(self, items: List[dict], count: int) -> List[dict]: + """隨機選取指定數量""" + if len(items) <= count: + return items + return random.sample(items, count) + + async def close(self): + await self.client.aclose() + + +class ExpertSourceService: + """統一的專家來源服務""" + + def __init__(self): + self.wikidata = WikidataProvider() + self.conceptnet = ConceptNetProvider() + + async def get_experts( + self, + source: str, + count: int, + language: str = "zh", + fallback_to_llm: bool = True + ) -> Tuple[List[dict], str]: + """ + 從指定來源獲取專家資料 + + Args: + source: 來源類型 ("wikidata" | "conceptnet") + count: 需要的專家數量 + language: 語言代碼 + fallback_to_llm: 失敗時是否允許 fallback(由呼叫者處理) + + Returns: + (專家資料列表, 實際使用的來源) + + Raises: + Exception: 當獲取失敗且不 fallback 時 + """ + provider = self._get_provider(source) + + try: + experts = await provider.fetch_occupations(count, language) + + if not experts: + raise ValueError(f"No occupations found from {source}") + + return experts, source + + except Exception as e: + logger.warning(f"Failed to fetch from {source}: {e}") + raise + + def _get_provider(self, source: str) -> ExpertSourceProvider: + """根據來源類型取得對應的 provider""" + if source == "wikidata": + return self.wikidata + elif source == "conceptnet": + return self.conceptnet + else: + raise ValueError(f"Unknown source: {source}") + + async def close(self): + """關閉所有 HTTP clients""" + await self.wikidata.close() + await self.conceptnet.close() + + +# 全域服務實例 +expert_source_service = ExpertSourceService() diff --git a/frontend/src/types/index.ts b/frontend/src/types/index.ts index 6564381..b1a0a88 100644 --- a/frontend/src/types/index.ts +++ b/frontend/src/types/index.ts @@ -230,6 +230,8 @@ export interface ExpertTransformationDAGResult { results: ExpertTransformationCategoryResult[]; } +export type ExpertSource = 'llm' | 'wikidata' | 'conceptnet'; + export interface ExpertTransformationRequest { query: string; category: string; @@ -237,6 +239,8 @@ export interface ExpertTransformationRequest { expert_count: number; // 2-8 keywords_per_expert: number; // 1-3 custom_experts?: string[]; // ["藥師", "工程師"] + expert_source?: ExpertSource; // 專家來源 (default: 'llm') + expert_language?: string; // 外部來源語言 (default: 'zh') model?: string; temperature?: number; }