"""Expert Transformation Agent 路由模組""" import json import logging from typing import AsyncGenerator, List from fastapi import APIRouter from fastapi.responses import StreamingResponse from ..models.schemas import ( ExpertTransformationRequest, ExpertProfile, ExpertKeyword, ExpertTransformationCategoryResult, ExpertTransformationDescription, ) from ..prompts.expert_transformation_prompt import ( get_expert_generation_prompt, get_expert_keyword_generation_prompt, get_expert_batch_description_prompt, ) from ..services.llm_service import ollama_provider, extract_json_from_response logger = logging.getLogger(__name__) router = APIRouter(prefix="/api/expert-transformation", tags=["expert-transformation"]) async def generate_expert_transformation_events( request: ExpertTransformationRequest, all_categories: List[str] # For expert generation context ) -> AsyncGenerator[str, None]: """Generate SSE events for expert transformation process""" try: temperature = request.temperature if request.temperature is not None else 0.7 model = request.model # ========== Step 0: Generate expert team ========== yield f"event: expert_start\ndata: {json.dumps({'message': '正在組建專家團隊...'}, ensure_ascii=False)}\n\n" experts: List[ExpertProfile] = [] try: expert_prompt = get_expert_generation_prompt( query=request.query, categories=all_categories, expert_count=request.expert_count, custom_experts=request.custom_experts ) logger.info(f"Expert prompt: {expert_prompt[:200]}") expert_response = await ollama_provider.generate( expert_prompt, model=model, temperature=temperature ) logger.info(f"Expert response: {expert_response[:500]}") expert_data = extract_json_from_response(expert_response) experts_raw = expert_data.get("experts", []) for exp in experts_raw: if isinstance(exp, dict) and all(k in exp for k in ["id", "name", "domain"]): experts.append(ExpertProfile(**exp)) except Exception as e: logger.error(f"Failed to generate experts: {e}") yield f"event: error\ndata: {json.dumps({'error': f'專家團隊生成失敗: {str(e)}'}, ensure_ascii=False)}\n\n" return yield f"event: expert_complete\ndata: {json.dumps({'experts': [e.model_dump() for e in experts]}, ensure_ascii=False)}\n\n" if not experts: yield f"event: error\ndata: {json.dumps({'error': '無法生成專家團隊'}, ensure_ascii=False)}\n\n" return # ========== Step 1: Generate keywords from expert perspectives ========== yield f"event: keyword_start\ndata: {json.dumps({'message': f'專家團隊為「{request.category}」的屬性生成關鍵字...'}, ensure_ascii=False)}\n\n" all_expert_keywords: List[ExpertKeyword] = [] # For each attribute, ask all experts to generate keywords for attr_index, attribute in enumerate(request.attributes): try: kw_prompt = get_expert_keyword_generation_prompt( category=request.category, attribute=attribute, experts=[e.model_dump() for e in experts], keywords_per_expert=request.keywords_per_expert ) logger.info(f"Keyword prompt for '{attribute}': {kw_prompt[:300]}") kw_response = await ollama_provider.generate( kw_prompt, model=model, temperature=temperature ) logger.info(f"Keyword response for '{attribute}': {kw_response[:500]}") kw_data = extract_json_from_response(kw_response) keywords_raw = kw_data.get("keywords", []) # Add source_attribute to each keyword for kw in keywords_raw: if isinstance(kw, dict) and all(k in kw for k in ["keyword", "expert_id", "expert_name"]): all_expert_keywords.append(ExpertKeyword( keyword=kw["keyword"], expert_id=kw["expert_id"], expert_name=kw["expert_name"], source_attribute=attribute )) # Emit progress yield f"event: keyword_progress\ndata: {json.dumps({'attribute': attribute, 'count': len(keywords_raw)}, ensure_ascii=False)}\n\n" except Exception as e: logger.warning(f"Failed to generate keywords for '{attribute}': {e}") yield f"event: keyword_progress\ndata: {json.dumps({'attribute': attribute, 'count': 0, 'error': str(e)}, ensure_ascii=False)}\n\n" # Continue with next attribute instead of stopping yield f"event: keyword_complete\ndata: {json.dumps({'total_keywords': len(all_expert_keywords)}, ensure_ascii=False)}\n\n" if not all_expert_keywords: yield f"event: error\ndata: {json.dumps({'error': '無法生成關鍵字'}, ensure_ascii=False)}\n\n" return # ========== Step 2: Generate descriptions for each expert keyword ========== yield f"event: description_start\ndata: {json.dumps({'message': '為專家關鍵字生成創新應用描述...'}, ensure_ascii=False)}\n\n" descriptions: List[ExpertTransformationDescription] = [] try: desc_prompt = get_expert_batch_description_prompt( query=request.query, category=request.category, expert_keywords=[kw.model_dump() for kw in all_expert_keywords] ) logger.info(f"Description prompt: {desc_prompt[:300]}") desc_response = await ollama_provider.generate( desc_prompt, model=model, temperature=temperature ) logger.info(f"Description response: {desc_response[:500]}") desc_data = extract_json_from_response(desc_response) descriptions_raw = desc_data.get("descriptions", []) for desc in descriptions_raw: if isinstance(desc, dict) and all(k in desc for k in ["keyword", "expert_id", "expert_name", "description"]): descriptions.append(ExpertTransformationDescription(**desc)) except Exception as e: logger.warning(f"Failed to generate descriptions: {e}") # Continue without descriptions - at least we have keywords yield f"event: description_complete\ndata: {json.dumps({'count': len(descriptions)}, ensure_ascii=False)}\n\n" # ========== Build final result ========== result = ExpertTransformationCategoryResult( category=request.category, original_attributes=request.attributes, expert_keywords=all_expert_keywords, descriptions=descriptions ) final_data = { "result": result.model_dump(), "experts": [e.model_dump() for e in experts] } yield f"event: done\ndata: {json.dumps(final_data, ensure_ascii=False)}\n\n" except Exception as e: logger.error(f"Expert transformation error: {e}", exc_info=True) yield f"event: error\ndata: {json.dumps({'error': str(e)}, ensure_ascii=False)}\n\n" @router.post("/category") async def expert_transform_category(request: ExpertTransformationRequest): """處理單一類別的專家視角轉換""" # Extract all categories from request (should be passed separately in production) # For now, use just the single category return StreamingResponse( generate_expert_transformation_events(request, [request.category]), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no", }, )