feat: Add curated expert occupations with local data sources

- Add curated occupations seed files (210 entries in zh/en) with specific domains
- Add DBpedia occupations data (2164 entries) for external source option
- Refactor expert_source_service to read from local JSON files
- Improve keyword generation prompts to leverage expert domain context
- Add architecture analysis documentation (ARCHITECTURE_ANALYSIS.md)
- Fix expert source selection bug (proper handling of empty custom_experts)
- Update frontend to support curated/dbpedia/wikidata expert sources

Key changes:
- backend/app/data/: Local occupation data files
- backend/app/services/expert_source_service.py: Simplified local file reading
- backend/app/prompts/expert_transformation_prompt.py: Better domain-aware prompts
- Removed expert_cache.py (no longer needed with local files)

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
2025-12-04 16:34:35 +08:00
parent 8777e27cbb
commit 5571076406
15 changed files with 9970 additions and 380 deletions

View File

@@ -1,293 +1,111 @@
"""Expert 外部資料來源服務
"""Expert 本地資料來源服務
提供從 Wikidata SPARQL 和 ConceptNet API 獲取職業/領域資料的功能。
從本地 JSON 檔案讀取職業資料,提供隨機選取功能。
"""
import json
import logging
import random
from abc import ABC, abstractmethod
from typing import List, Optional, Tuple
import httpx
from .expert_cache import expert_cache
from pathlib import Path
from typing import List, Tuple
logger = logging.getLogger(__name__)
# 資料目錄
DATA_DIR = Path(__file__).parent.parent / "data"
class ExpertSourceProvider(ABC):
"""外部來源提供者抽象類"""
@abstractmethod
async def fetch_occupations(
self, count: int, language: str = "zh"
) -> List[dict]:
class LocalDataProvider:
"""從本地 JSON 檔案讀取職業資料"""
def __init__(self, source: str):
"""
獲取職業列表
Args:
source: 資料來源名稱 (dbpedia/wikidata)
"""
self.source = source
self._cache: dict = {} # 記憶體快取
def load_occupations(self, language: str = "en") -> List[dict]:
"""
載入職業資料
Args:
count: 需要的職業數量
language: 語言代碼 (zh/en)
language: 語言代碼 (en/zh)
Returns:
職業資料列表 [{"name": "...", "domain": "..."}, ...]
職業列表 [{"name": "...", "domain": "..."}, ...]
"""
pass
cache_key = f"{self.source}:{language}"
# 檢查記憶體快取
if cache_key in self._cache:
return self._cache[cache_key]
class WikidataProvider(ExpertSourceProvider):
"""Wikidata SPARQL 查詢提供者"""
# 讀取檔案
file_path = DATA_DIR / f"{self.source}_occupations_{language}.json"
ENDPOINT = "https://query.wikidata.org/sparql"
def __init__(self):
self.client = httpx.AsyncClient(timeout=30.0)
async def fetch_occupations(
self, count: int, language: str = "zh"
) -> List[dict]:
"""從 Wikidata 獲取職業列表"""
cache_key = f"wikidata:{language}:occupations"
# 檢查快取
cached = expert_cache.get(cache_key)
if cached:
logger.info(f"Wikidata cache hit: {len(cached)} occupations")
return self._random_select(cached, count)
# SPARQL 查詢
query = self._build_sparql_query(language)
if not file_path.exists():
logger.warning(f"資料檔案不存在: {file_path}")
return []
try:
response = await self.client.get(
self.ENDPOINT,
params={"query": query, "format": "json"},
headers={"Accept": "application/sparql-results+json"}
)
response.raise_for_status()
with open(file_path, "r", encoding="utf-8") as f:
data = json.load(f)
data = response.json()
occupations = self._parse_sparql_response(data, language)
occupations = data.get("occupations", [])
logger.info(f"載入 {len(occupations)}{self.source} {language} 職業")
if occupations:
expert_cache.set(cache_key, occupations)
logger.info(f"Wikidata fetched: {len(occupations)} occupations")
return self._random_select(occupations, count)
# 存入快取
self._cache[cache_key] = occupations
return occupations
except Exception as e:
logger.error(f"Wikidata query failed: {e}")
raise
logger.error(f"讀取職業資料失敗: {e}")
return []
def _build_sparql_query(self, language: str) -> str:
"""建構 SPARQL 查詢"""
lang_filter = f'FILTER(LANG(?occupationLabel) = "{language}")'
return f"""
SELECT DISTINCT ?occupation ?occupationLabel ?fieldLabel WHERE {{
?occupation wdt:P31 wd:Q28640.
?occupation rdfs:label ?occupationLabel.
{lang_filter}
OPTIONAL {{
?occupation wdt:P425 ?field.
?field rdfs:label ?fieldLabel.
FILTER(LANG(?fieldLabel) = "{language}")
}}
}}
LIMIT 500
def random_select(self, count: int, language: str = "en") -> List[dict]:
"""
隨機選取指定數量的職業
def _parse_sparql_response(self, data: dict, language: str) -> List[dict]:
"""解析 SPARQL 回應"""
results = []
bindings = data.get("results", {}).get("bindings", [])
Args:
count: 需要的數量
language: 語言代碼
for item in bindings:
name = item.get("occupationLabel", {}).get("value", "")
field = item.get("fieldLabel", {}).get("value", "")
Returns:
隨機選取的職業列表
"""
all_occupations = self.load_occupations(language)
if name and len(name) >= 2:
results.append({
"name": name,
"domain": field if field else self._infer_domain(name)
})
if not all_occupations:
return []
return results
if len(all_occupations) <= count:
return all_occupations
def _infer_domain(self, occupation_name: str) -> str:
"""根據職業名稱推斷領域"""
# 簡單的領域推斷規則
domain_keywords = {
"": "醫療健康",
"": "專業服務",
"工程": "工程技術",
"設計": "設計創意",
"藝術": "藝術文化",
"運動": "體育運動",
"": "農業",
"": "漁業",
"": "商業貿易",
"": "法律",
"": "教育",
"研究": "學術研究",
}
for keyword, domain in domain_keywords.items():
if keyword in occupation_name:
return domain
return "專業領域"
def _random_select(self, items: List[dict], count: int) -> List[dict]:
"""隨機選取指定數量"""
if len(items) <= count:
return items
return random.sample(items, count)
async def close(self):
await self.client.aclose()
class ConceptNetProvider(ExpertSourceProvider):
"""ConceptNet API 查詢提供者"""
ENDPOINT = "https://api.conceptnet.io"
def __init__(self):
self.client = httpx.AsyncClient(timeout=30.0)
async def fetch_occupations(
self, count: int, language: str = "zh"
) -> List[dict]:
"""從 ConceptNet 獲取職業相關概念"""
cache_key = f"conceptnet:{language}:occupations"
# 檢查快取
cached = expert_cache.get(cache_key)
if cached:
logger.info(f"ConceptNet cache hit: {len(cached)} concepts")
return self._random_select(cached, count)
# 查詢職業相關概念
lang_code = "zh" if language == "zh" else "en"
start_concept = f"/c/{lang_code}/職業" if lang_code == "zh" else f"/c/{lang_code}/occupation"
try:
occupations = []
# 查詢 IsA 關係
response = await self.client.get(
f"{self.ENDPOINT}/query",
params={
"start": start_concept,
"rel": "/r/IsA",
"limit": 100
}
)
response.raise_for_status()
data = response.json()
occupations.extend(self._parse_conceptnet_response(data, lang_code))
# 也查詢 RelatedTo 關係以獲取更多結果
response2 = await self.client.get(
f"{self.ENDPOINT}/query",
params={
"node": start_concept,
"rel": "/r/RelatedTo",
"limit": 100
}
)
response2.raise_for_status()
data2 = response2.json()
occupations.extend(self._parse_conceptnet_response(data2, lang_code))
# 去重
seen = set()
unique_occupations = []
for occ in occupations:
if occ["name"] not in seen:
seen.add(occ["name"])
unique_occupations.append(occ)
if unique_occupations:
expert_cache.set(cache_key, unique_occupations)
logger.info(f"ConceptNet fetched: {len(unique_occupations)} concepts")
return self._random_select(unique_occupations, count)
except Exception as e:
logger.error(f"ConceptNet query failed: {e}")
raise
def _parse_conceptnet_response(self, data: dict, lang_code: str) -> List[dict]:
"""解析 ConceptNet 回應"""
results = []
edges = data.get("edges", [])
for edge in edges:
# 取得 start 或 end 節點(取決於查詢方向)
start = edge.get("start", {})
end = edge.get("end", {})
# 選擇非起始節點的概念
node = end if start.get("@id", "").endswith("職業") or start.get("@id", "").endswith("occupation") else start
label = node.get("label", "")
term = node.get("term", "")
# 過濾:確保是目標語言且有意義
node_id = node.get("@id", "")
if f"/c/{lang_code}/" in node_id and label and len(label) >= 2:
results.append({
"name": label,
"domain": self._infer_domain_from_edge(edge)
})
return results
def _infer_domain_from_edge(self, edge: dict) -> str:
"""從 edge 資訊推斷領域"""
# ConceptNet 的 edge 包含 surfaceText 可能有額外資訊
surface = edge.get("surfaceText", "")
rel = edge.get("rel", {}).get("label", "")
if "專業" in surface:
return "專業領域"
elif "技術" in surface:
return "技術領域"
else:
return "知識領域"
def _random_select(self, items: List[dict], count: int) -> List[dict]:
"""隨機選取指定數量"""
if len(items) <= count:
return items
return random.sample(items, count)
async def close(self):
await self.client.aclose()
return random.sample(all_occupations, count)
class ExpertSourceService:
"""統一的專家來源服務"""
def __init__(self):
self.wikidata = WikidataProvider()
self.conceptnet = ConceptNetProvider()
self.curated = LocalDataProvider("curated") # 精選職業
self.dbpedia = LocalDataProvider("dbpedia")
self.wikidata = LocalDataProvider("wikidata")
async def get_experts(
def get_experts(
self,
source: str,
count: int,
language: str = "zh",
language: str = "en",
fallback_to_llm: bool = True
) -> Tuple[List[dict], str]:
"""
從指定來源獲取專家資料
Args:
source: 來源類型 ("wikidata" | "conceptnet")
source: 來源類型 ("dbpedia" | "wikidata")
count: 需要的專家數量
language: 語言代碼
fallback_to_llm: 失敗時是否允許 fallback由呼叫者處理
@@ -296,35 +114,87 @@ class ExpertSourceService:
(專家資料列表, 實際使用的來源)
Raises:
Exception: 當獲取失敗且不 fallback
ValueError: 當獲取失敗且資料為空
"""
provider = self._get_provider(source)
try:
experts = await provider.fetch_occupations(count, language)
if not experts:
raise ValueError(f"No occupations found from {source}")
return experts, source
except Exception as e:
logger.warning(f"Failed to fetch from {source}: {e}")
raise
def _get_provider(self, source: str) -> ExpertSourceProvider:
"""根據來源類型取得對應的 provider"""
if source == "wikidata":
return self.wikidata
elif source == "conceptnet":
return self.conceptnet
# 選擇 provider
if source == "curated":
provider = self.curated
# 精選職業支援 zh 和 en預設使用 zh
if language not in ["zh", "en"]:
language = "zh"
elif source == "wikidata":
provider = self.wikidata
else:
raise ValueError(f"Unknown source: {source}")
# 預設使用 dbpedia
provider = self.dbpedia
source = "dbpedia"
async def close(self):
"""關閉所有 HTTP clients"""
await self.wikidata.close()
await self.conceptnet.close()
experts = provider.random_select(count, language)
if not experts:
raise ValueError(f"No occupations found from {source} ({language})")
logger.info(f"{source} 取得 {len(experts)} 位專家")
return experts, source
def get_available_sources(self) -> List[dict]:
"""
取得可用的資料來源資訊
Returns:
來源資訊列表
"""
sources = []
# 檢查精選職業(中文)
curated_zh = DATA_DIR / "curated_occupations_zh.json"
if curated_zh.exists():
with open(curated_zh, "r", encoding="utf-8") as f:
data = json.load(f)
sources.append({
"source": "curated",
"language": "zh",
"count": data["metadata"]["total_count"],
"created_at": data["metadata"]["created_at"]
})
# 檢查精選職業(英文)
curated_en = DATA_DIR / "curated_occupations_en.json"
if curated_en.exists():
with open(curated_en, "r", encoding="utf-8") as f:
data = json.load(f)
sources.append({
"source": "curated",
"language": "en",
"count": data["metadata"]["total_count"],
"created_at": data["metadata"]["created_at"]
})
# 檢查 DBpedia
dbpedia_en = DATA_DIR / "dbpedia_occupations_en.json"
if dbpedia_en.exists():
with open(dbpedia_en, "r", encoding="utf-8") as f:
data = json.load(f)
sources.append({
"source": "dbpedia",
"language": "en",
"count": data["metadata"]["total_count"],
"fetched_at": data["metadata"]["fetched_at"]
})
# 檢查 Wikidata
wikidata_zh = DATA_DIR / "wikidata_occupations_zh.json"
if wikidata_zh.exists():
with open(wikidata_zh, "r", encoding="utf-8") as f:
data = json.load(f)
sources.append({
"source": "wikidata",
"language": "zh",
"count": data["metadata"]["total_count"],
"fetched_at": data["metadata"]["fetched_at"]
})
return sources
# 全域服務實例