Files
novelty-seeking/backend/app/routers/expert_transformation.py
gbanyan 5571076406 feat: Add curated expert occupations with local data sources
- Add curated occupations seed files (210 entries in zh/en) with specific domains
- Add DBpedia occupations data (2164 entries) for external source option
- Refactor expert_source_service to read from local JSON files
- Improve keyword generation prompts to leverage expert domain context
- Add architecture analysis documentation (ARCHITECTURE_ANALYSIS.md)
- Fix expert source selection bug (proper handling of empty custom_experts)
- Update frontend to support curated/dbpedia/wikidata expert sources

Key changes:
- backend/app/data/: Local occupation data files
- backend/app/services/expert_source_service.py: Simplified local file reading
- backend/app/prompts/expert_transformation_prompt.py: Better domain-aware prompts
- Removed expert_cache.py (no longer needed with local files)

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-12-04 16:34:35 +08:00

277 lines
12 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Expert Transformation Agent 路由模組"""
import json
import logging
from typing import AsyncGenerator, List
from fastapi import APIRouter
from fastapi.responses import StreamingResponse
from ..models.schemas import (
ExpertTransformationRequest,
ExpertProfile,
ExpertKeyword,
ExpertTransformationCategoryResult,
ExpertTransformationDescription,
ExpertSource,
)
from ..prompts.expert_transformation_prompt import (
get_expert_generation_prompt,
get_expert_keyword_generation_prompt,
get_single_description_prompt,
)
from ..services.llm_service import ollama_provider, extract_json_from_response
from ..services.expert_source_service import expert_source_service
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/expert-transformation", tags=["expert-transformation"])
async def generate_expert_transformation_events(
request: ExpertTransformationRequest,
all_categories: List[str] # For expert generation context
) -> AsyncGenerator[str, None]:
"""Generate SSE events for expert transformation process"""
try:
temperature = request.temperature if request.temperature is not None else 0.7
model = request.model
# ========== Step 0: Generate expert team ==========
logger.info(f"[DEBUG] expert_source from request: {request.expert_source}")
logger.info(f"[DEBUG] expert_source value: {request.expert_source.value}")
logger.info(f"[DEBUG] custom_experts: {request.custom_experts}")
yield f"event: expert_start\ndata: {json.dumps({'message': '正在組建專家團隊...', 'source': request.expert_source.value}, ensure_ascii=False)}\n\n"
experts: List[ExpertProfile] = []
actual_source = request.expert_source.value
# 過濾出實際有內容的自訂專家(排除空字串)
actual_custom_experts = [
e.strip() for e in (request.custom_experts or [])
if e and e.strip()
]
logger.info(f"[DEBUG] actual_custom_experts (filtered): {actual_custom_experts}")
# 決定使用哪種來源生成專家
# 只有在明確選擇 LLM 或有實際自訂專家時才使用 LLM
use_llm = (
request.expert_source == ExpertSource.LLM or
len(actual_custom_experts) > 0 # 有實際自訂專家時,使用 LLM 補充
)
logger.info(f"[DEBUG] use_llm decision: {use_llm}")
if use_llm:
# LLM 生成專家
try:
expert_prompt = get_expert_generation_prompt(
query=request.query,
categories=all_categories,
expert_count=request.expert_count,
custom_experts=actual_custom_experts if actual_custom_experts else None
)
logger.info(f"Expert prompt: {expert_prompt[:200]}")
expert_response = await ollama_provider.generate(
expert_prompt, model=model, temperature=temperature
)
logger.info(f"Expert response: {expert_response[:500]}")
expert_data = extract_json_from_response(expert_response)
experts_raw = expert_data.get("experts", [])
for exp in experts_raw:
if isinstance(exp, dict) and all(k in exp for k in ["id", "name", "domain"]):
experts.append(ExpertProfile(**exp))
actual_source = "llm"
except Exception as e:
logger.error(f"Failed to generate experts via LLM: {e}")
yield f"event: error\ndata: {json.dumps({'error': f'專家團隊生成失敗: {str(e)}'}, ensure_ascii=False)}\n\n"
return
else:
# 外部來源生成專家 (本地檔案,同步)
try:
experts_data, actual_source = expert_source_service.get_experts(
source=request.expert_source.value,
count=request.expert_count,
language=request.expert_language
)
for i, exp_data in enumerate(experts_data):
experts.append(ExpertProfile(
id=f"expert-{i}",
name=exp_data["name"],
domain=exp_data["domain"],
perspective=f"{exp_data['domain']}角度思考"
))
logger.info(f"Generated {len(experts)} experts from {actual_source}")
except Exception as e:
# 外部來源失敗fallback 到 LLM
logger.warning(f"External source failed: {e}, falling back to LLM")
yield f"event: expert_fallback\ndata: {json.dumps({'original': request.expert_source.value, 'fallback': 'llm', 'reason': str(e)}, ensure_ascii=False)}\n\n"
try:
expert_prompt = get_expert_generation_prompt(
query=request.query,
categories=all_categories,
expert_count=request.expert_count,
custom_experts=actual_custom_experts if actual_custom_experts else None
)
expert_response = await ollama_provider.generate(
expert_prompt, model=model, temperature=temperature
)
expert_data = extract_json_from_response(expert_response)
experts_raw = expert_data.get("experts", [])
for exp in experts_raw:
if isinstance(exp, dict) and all(k in exp for k in ["id", "name", "domain"]):
experts.append(ExpertProfile(**exp))
actual_source = "llm"
except Exception as llm_error:
logger.error(f"LLM fallback also failed: {llm_error}")
yield f"event: error\ndata: {json.dumps({'error': f'專家團隊生成失敗: {str(llm_error)}'}, ensure_ascii=False)}\n\n"
return
# 回報來源資訊
yield f"event: expert_source\ndata: {json.dumps({'source': actual_source}, ensure_ascii=False)}\n\n"
yield f"event: expert_complete\ndata: {json.dumps({'experts': [e.model_dump() for e in experts]}, ensure_ascii=False)}\n\n"
if not experts:
yield f"event: error\ndata: {json.dumps({'error': '無法生成專家團隊'}, ensure_ascii=False)}\n\n"
return
# ========== Step 1: Generate keywords from expert perspectives ==========
yield f"event: keyword_start\ndata: {json.dumps({'message': f'專家團隊為「{request.category}」的屬性生成關鍵字...'}, ensure_ascii=False)}\n\n"
all_expert_keywords: List[ExpertKeyword] = []
# For each attribute, ask all experts to generate keywords
for attr_index, attribute in enumerate(request.attributes):
try:
kw_prompt = get_expert_keyword_generation_prompt(
category=request.category,
attribute=attribute,
experts=[e.model_dump() for e in experts],
keywords_per_expert=request.keywords_per_expert
)
logger.info(f"Keyword prompt for '{attribute}': {kw_prompt[:300]}")
kw_response = await ollama_provider.generate(
kw_prompt, model=model, temperature=temperature
)
logger.info(f"Keyword response for '{attribute}': {kw_response[:500]}")
kw_data = extract_json_from_response(kw_response)
keywords_raw = kw_data.get("keywords", [])
# Add source_attribute to each keyword
for kw in keywords_raw:
if isinstance(kw, dict) and all(k in kw for k in ["keyword", "expert_id", "expert_name"]):
all_expert_keywords.append(ExpertKeyword(
keyword=kw["keyword"],
expert_id=kw["expert_id"],
expert_name=kw["expert_name"],
source_attribute=attribute
))
# Emit progress
yield f"event: keyword_progress\ndata: {json.dumps({'attribute': attribute, 'count': len(keywords_raw)}, ensure_ascii=False)}\n\n"
except Exception as e:
logger.warning(f"Failed to generate keywords for '{attribute}': {e}")
yield f"event: keyword_progress\ndata: {json.dumps({'attribute': attribute, 'count': 0, 'error': str(e)}, ensure_ascii=False)}\n\n"
# Continue with next attribute instead of stopping
yield f"event: keyword_complete\ndata: {json.dumps({'total_keywords': len(all_expert_keywords)}, ensure_ascii=False)}\n\n"
if not all_expert_keywords:
yield f"event: error\ndata: {json.dumps({'error': '無法生成關鍵字'}, ensure_ascii=False)}\n\n"
return
# ========== Step 2: Generate descriptions one by one ==========
yield f"event: description_start\ndata: {json.dumps({'message': '為專家關鍵字生成創新應用描述...', 'total': len(all_expert_keywords)}, ensure_ascii=False)}\n\n"
descriptions: List[ExpertTransformationDescription] = []
# Build expert lookup for domain info
expert_lookup = {exp.id: exp for exp in experts}
for idx, kw in enumerate(all_expert_keywords):
try:
expert = expert_lookup.get(kw.expert_id)
expert_domain = expert.domain if expert else ""
desc_prompt = get_single_description_prompt(
query=request.query,
keyword=kw.keyword,
expert_id=kw.expert_id,
expert_name=kw.expert_name,
expert_domain=expert_domain
)
desc_response = await ollama_provider.generate(
desc_prompt, model=model, temperature=temperature
)
desc_data = extract_json_from_response(desc_response)
desc_text = desc_data.get("description", "")
if desc_text:
descriptions.append(ExpertTransformationDescription(
keyword=kw.keyword,
expert_id=kw.expert_id,
expert_name=kw.expert_name,
description=desc_text
))
# Send progress update
yield f"event: description_progress\ndata: {json.dumps({'current': idx + 1, 'total': len(all_expert_keywords), 'keyword': kw.keyword}, ensure_ascii=False)}\n\n"
except Exception as e:
logger.warning(f"Failed to generate description for '{kw.keyword}': {e}")
# Continue with next keyword
yield f"event: description_complete\ndata: {json.dumps({'count': len(descriptions)}, ensure_ascii=False)}\n\n"
# ========== Build final result ==========
result = ExpertTransformationCategoryResult(
category=request.category,
original_attributes=request.attributes,
expert_keywords=all_expert_keywords,
descriptions=descriptions
)
final_data = {
"result": result.model_dump(),
"experts": [e.model_dump() for e in experts]
}
yield f"event: done\ndata: {json.dumps(final_data, ensure_ascii=False)}\n\n"
except Exception as e:
logger.error(f"Expert transformation error: {e}", exc_info=True)
yield f"event: error\ndata: {json.dumps({'error': str(e)}, ensure_ascii=False)}\n\n"
@router.post("/category")
async def expert_transform_category(request: ExpertTransformationRequest):
"""處理單一類別的專家視角轉換"""
# Extract all categories from request (should be passed separately in production)
# For now, use just the single category
return StreamingResponse(
generate_expert_transformation_events(request, [request.category]),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
},
)