Files
novelty-seeking/backend/app/routers/expert_transformation.py
gbanyan 43785db595 feat: Add external expert sources (Wikidata SPARQL + ConceptNet API)
- Add expert_cache.py: TTL-based in-memory cache (1 hour default)
- Add expert_source_service.py: WikidataProvider and ConceptNetProvider
  - Wikidata SPARQL queries for occupations with Chinese labels
  - ConceptNet API queries for occupation-related concepts
  - Random selection from cached pool
- Update schemas.py: Add ExpertSource enum (llm/wikidata/conceptnet)
- Update ExpertTransformationRequest with expert_source and expert_language
- Update router: Conditionally use external sources with LLM fallback
  - New SSE events: expert_source, expert_fallback
- Update frontend types with ExpertSource

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-12-04 11:42:48 +08:00

264 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Expert Transformation Agent 路由模組"""
import json
import logging
from typing import AsyncGenerator, List
from fastapi import APIRouter
from fastapi.responses import StreamingResponse
from ..models.schemas import (
ExpertTransformationRequest,
ExpertProfile,
ExpertKeyword,
ExpertTransformationCategoryResult,
ExpertTransformationDescription,
ExpertSource,
)
from ..prompts.expert_transformation_prompt import (
get_expert_generation_prompt,
get_expert_keyword_generation_prompt,
get_single_description_prompt,
)
from ..services.llm_service import ollama_provider, extract_json_from_response
from ..services.expert_source_service import expert_source_service
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/expert-transformation", tags=["expert-transformation"])
async def generate_expert_transformation_events(
request: ExpertTransformationRequest,
all_categories: List[str] # For expert generation context
) -> AsyncGenerator[str, None]:
"""Generate SSE events for expert transformation process"""
try:
temperature = request.temperature if request.temperature is not None else 0.7
model = request.model
# ========== Step 0: Generate expert team ==========
yield f"event: expert_start\ndata: {json.dumps({'message': '正在組建專家團隊...', 'source': request.expert_source.value}, ensure_ascii=False)}\n\n"
experts: List[ExpertProfile] = []
actual_source = request.expert_source.value
# 決定使用哪種來源生成專家
use_llm = (
request.expert_source == ExpertSource.LLM or
request.custom_experts # 有自訂專家時,使用 LLM 補充
)
if use_llm:
# LLM 生成專家
try:
expert_prompt = get_expert_generation_prompt(
query=request.query,
categories=all_categories,
expert_count=request.expert_count,
custom_experts=request.custom_experts
)
logger.info(f"Expert prompt: {expert_prompt[:200]}")
expert_response = await ollama_provider.generate(
expert_prompt, model=model, temperature=temperature
)
logger.info(f"Expert response: {expert_response[:500]}")
expert_data = extract_json_from_response(expert_response)
experts_raw = expert_data.get("experts", [])
for exp in experts_raw:
if isinstance(exp, dict) and all(k in exp for k in ["id", "name", "domain"]):
experts.append(ExpertProfile(**exp))
actual_source = "llm"
except Exception as e:
logger.error(f"Failed to generate experts via LLM: {e}")
yield f"event: error\ndata: {json.dumps({'error': f'專家團隊生成失敗: {str(e)}'}, ensure_ascii=False)}\n\n"
return
else:
# 外部來源生成專家
try:
experts_data, actual_source = await expert_source_service.get_experts(
source=request.expert_source.value,
count=request.expert_count,
language=request.expert_language
)
for i, exp_data in enumerate(experts_data):
experts.append(ExpertProfile(
id=f"expert-{i}",
name=exp_data["name"],
domain=exp_data["domain"],
perspective=f"{exp_data['domain']}角度思考"
))
logger.info(f"Generated {len(experts)} experts from {actual_source}")
except Exception as e:
# 外部來源失敗fallback 到 LLM
logger.warning(f"External source failed: {e}, falling back to LLM")
yield f"event: expert_fallback\ndata: {json.dumps({'original': request.expert_source.value, 'fallback': 'llm', 'reason': str(e)}, ensure_ascii=False)}\n\n"
try:
expert_prompt = get_expert_generation_prompt(
query=request.query,
categories=all_categories,
expert_count=request.expert_count,
custom_experts=request.custom_experts
)
expert_response = await ollama_provider.generate(
expert_prompt, model=model, temperature=temperature
)
expert_data = extract_json_from_response(expert_response)
experts_raw = expert_data.get("experts", [])
for exp in experts_raw:
if isinstance(exp, dict) and all(k in exp for k in ["id", "name", "domain"]):
experts.append(ExpertProfile(**exp))
actual_source = "llm"
except Exception as llm_error:
logger.error(f"LLM fallback also failed: {llm_error}")
yield f"event: error\ndata: {json.dumps({'error': f'專家團隊生成失敗: {str(llm_error)}'}, ensure_ascii=False)}\n\n"
return
# 回報來源資訊
yield f"event: expert_source\ndata: {json.dumps({'source': actual_source}, ensure_ascii=False)}\n\n"
yield f"event: expert_complete\ndata: {json.dumps({'experts': [e.model_dump() for e in experts]}, ensure_ascii=False)}\n\n"
if not experts:
yield f"event: error\ndata: {json.dumps({'error': '無法生成專家團隊'}, ensure_ascii=False)}\n\n"
return
# ========== Step 1: Generate keywords from expert perspectives ==========
yield f"event: keyword_start\ndata: {json.dumps({'message': f'專家團隊為「{request.category}」的屬性生成關鍵字...'}, ensure_ascii=False)}\n\n"
all_expert_keywords: List[ExpertKeyword] = []
# For each attribute, ask all experts to generate keywords
for attr_index, attribute in enumerate(request.attributes):
try:
kw_prompt = get_expert_keyword_generation_prompt(
category=request.category,
attribute=attribute,
experts=[e.model_dump() for e in experts],
keywords_per_expert=request.keywords_per_expert
)
logger.info(f"Keyword prompt for '{attribute}': {kw_prompt[:300]}")
kw_response = await ollama_provider.generate(
kw_prompt, model=model, temperature=temperature
)
logger.info(f"Keyword response for '{attribute}': {kw_response[:500]}")
kw_data = extract_json_from_response(kw_response)
keywords_raw = kw_data.get("keywords", [])
# Add source_attribute to each keyword
for kw in keywords_raw:
if isinstance(kw, dict) and all(k in kw for k in ["keyword", "expert_id", "expert_name"]):
all_expert_keywords.append(ExpertKeyword(
keyword=kw["keyword"],
expert_id=kw["expert_id"],
expert_name=kw["expert_name"],
source_attribute=attribute
))
# Emit progress
yield f"event: keyword_progress\ndata: {json.dumps({'attribute': attribute, 'count': len(keywords_raw)}, ensure_ascii=False)}\n\n"
except Exception as e:
logger.warning(f"Failed to generate keywords for '{attribute}': {e}")
yield f"event: keyword_progress\ndata: {json.dumps({'attribute': attribute, 'count': 0, 'error': str(e)}, ensure_ascii=False)}\n\n"
# Continue with next attribute instead of stopping
yield f"event: keyword_complete\ndata: {json.dumps({'total_keywords': len(all_expert_keywords)}, ensure_ascii=False)}\n\n"
if not all_expert_keywords:
yield f"event: error\ndata: {json.dumps({'error': '無法生成關鍵字'}, ensure_ascii=False)}\n\n"
return
# ========== Step 2: Generate descriptions one by one ==========
yield f"event: description_start\ndata: {json.dumps({'message': '為專家關鍵字生成創新應用描述...', 'total': len(all_expert_keywords)}, ensure_ascii=False)}\n\n"
descriptions: List[ExpertTransformationDescription] = []
# Build expert lookup for domain info
expert_lookup = {exp.id: exp for exp in experts}
for idx, kw in enumerate(all_expert_keywords):
try:
expert = expert_lookup.get(kw.expert_id)
expert_domain = expert.domain if expert else ""
desc_prompt = get_single_description_prompt(
query=request.query,
keyword=kw.keyword,
expert_id=kw.expert_id,
expert_name=kw.expert_name,
expert_domain=expert_domain
)
desc_response = await ollama_provider.generate(
desc_prompt, model=model, temperature=temperature
)
desc_data = extract_json_from_response(desc_response)
desc_text = desc_data.get("description", "")
if desc_text:
descriptions.append(ExpertTransformationDescription(
keyword=kw.keyword,
expert_id=kw.expert_id,
expert_name=kw.expert_name,
description=desc_text
))
# Send progress update
yield f"event: description_progress\ndata: {json.dumps({'current': idx + 1, 'total': len(all_expert_keywords), 'keyword': kw.keyword}, ensure_ascii=False)}\n\n"
except Exception as e:
logger.warning(f"Failed to generate description for '{kw.keyword}': {e}")
# Continue with next keyword
yield f"event: description_complete\ndata: {json.dumps({'count': len(descriptions)}, ensure_ascii=False)}\n\n"
# ========== Build final result ==========
result = ExpertTransformationCategoryResult(
category=request.category,
original_attributes=request.attributes,
expert_keywords=all_expert_keywords,
descriptions=descriptions
)
final_data = {
"result": result.model_dump(),
"experts": [e.model_dump() for e in experts]
}
yield f"event: done\ndata: {json.dumps(final_data, ensure_ascii=False)}\n\n"
except Exception as e:
logger.error(f"Expert transformation error: {e}", exc_info=True)
yield f"event: error\ndata: {json.dumps({'error': str(e)}, ensure_ascii=False)}\n\n"
@router.post("/category")
async def expert_transform_category(request: ExpertTransformationRequest):
"""處理單一類別的專家視角轉換"""
# Extract all categories from request (should be passed separately in production)
# For now, use just the single category
return StreamingResponse(
generate_expert_transformation_events(request, [request.category]),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
},
)