Spaces:
Running
Running
| """ | |
| AI AGENT WITH LANGGRAPH + UTILS SYSTEM | |
| Architecture: | |
| - LangChain/LangGraph workflow với AI-driven routing | |
| - Qwen3-8B làm main reasoning engine | |
| - Utils system cung cấp tools | |
| - AI tự quyết định tools và logic xử lý | |
| """ | |
| import os | |
| import json | |
| import time | |
| from typing import Dict, Any, List, Optional, Annotated | |
| from dotenv import load_dotenv | |
| # LangChain imports | |
| from langchain_core.messages import HumanMessage, AIMessage, SystemMessage | |
| from langchain_core.prompts import ChatPromptTemplate | |
| from langchain_core.output_parsers import JsonOutputParser, StrOutputParser | |
| # LangGraph imports | |
| from langgraph.graph import StateGraph, END | |
| from langgraph.graph.message import add_messages | |
| from typing_extensions import TypedDict | |
| # HuggingFace imports | |
| from huggingface_hub import InferenceClient | |
| # Groq imports for fallback | |
| from groq import Groq | |
| # Pydantic for structured output | |
| from pydantic import BaseModel, Field | |
| # Utils system imports | |
| from utils import ( | |
| process_question_with_tools, | |
| get_agent_state, | |
| reset_agent_state, | |
| ToolOrchestrator, | |
| get_system_prompt, | |
| get_response_prompt, | |
| build_context_summary, | |
| analyze_question_type | |
| ) | |
| # Load environment | |
| load_dotenv() | |
| # ============================================================================= | |
| # LANGGRAPH STATE DEFINITION | |
| # ============================================================================= | |
| class AgentState(TypedDict): | |
| """LangGraph state for AI agent""" | |
| messages: Annotated[List, add_messages] | |
| question: str | |
| task_id: str | |
| # AI Analysis | |
| ai_analysis: Dict[str, Any] | |
| should_use_tools: bool | |
| # Tool processing | |
| tool_processing_result: Dict[str, Any] | |
| # Final response | |
| final_answer: str | |
| processing_complete: bool | |
| # ============================================================================= | |
| # PYDANTIC SCHEMAS FOR STRUCTURED OUTPUT | |
| # ============================================================================= | |
| class QuestionAnalysis(BaseModel): | |
| """Schema for AI question analysis""" | |
| question_type: str = Field(description="Type: youtube|image|audio|wiki|file|text|math") | |
| needs_tools: bool = Field(description="Whether tools are needed") | |
| reasoning: str = Field(description="AI reasoning for the decision") | |
| confidence: str = Field(description="Confidence level: high|medium|low") | |
| can_answer_directly: bool = Field(description="Can answer without tools") | |
| suggested_approach: str = Field(description="Brief description of approach") | |
| class TextDecision(BaseModel): | |
| """Schema for reversed text decision""" | |
| chosen_version: str = Field(description="original|reversed") | |
| reasoning: str = Field(description="Reasoning for the choice") | |
| confidence: str = Field(description="Confidence level: high|medium|low") | |
| # ============================================================================= | |
| # AI BRAIN WITH LANGCHAIN | |
| # ============================================================================= | |
| class LangChainQwen3Brain: | |
| """AI Brain using LangChain + HuggingFace with Groq fallback""" | |
| def __init__(self): | |
| # Primary: HuggingFace | |
| self.hf_client = InferenceClient( | |
| provider="auto", | |
| api_key=os.environ.get("HF_TOKEN", "") | |
| ) | |
| self.hf_model = "Qwen/Qwen3-8B" | |
| # Fallback: Groq | |
| self.groq_client = Groq( | |
| api_key=os.environ.get("GROQ_API_KEY", "") | |
| ) | |
| self.groq_model = "llama3-8b-8192" | |
| # Setup parsers | |
| self.json_parser = JsonOutputParser() | |
| self.str_parser = StrOutputParser() | |
| print("🧠 LangChain Hybrid Brain initialized (HF + Groq fallback)") | |
| def _create_structured_model(self, schema: BaseModel): | |
| """Create model with structured output""" | |
| try: | |
| # Try HuggingFace with structured output | |
| from langchain_huggingface import ChatHuggingFace | |
| hf_model = ChatHuggingFace( | |
| llm=self.hf_client, | |
| model_id=self.hf_model | |
| ) | |
| return hf_model.with_structured_output(schema) | |
| except Exception as hf_error: | |
| print(f"⚠️ HF structured output failed: {str(hf_error)[:50]}...") | |
| try: | |
| # Fallback to Groq with structured output | |
| from langchain_groq import ChatGroq | |
| groq_model = ChatGroq( | |
| api_key=os.environ.get("GROQ_API_KEY", ""), | |
| model=self.groq_model | |
| ) | |
| return groq_model.with_structured_output(schema) | |
| except Exception as groq_error: | |
| print(f"⚠️ Both structured output failed") | |
| return None | |
| def _invoke_model(self, messages: List[Dict[str, str]]) -> str: | |
| """Invoke model with messages - try HF first, fallback to Groq""" | |
| # Try HuggingFace first | |
| try: | |
| completion = self.hf_client.chat.completions.create( | |
| model=self.hf_model, | |
| messages=messages, | |
| max_tokens=2048, | |
| temperature=0.7 | |
| ) | |
| return completion.choices[0].message.content | |
| except Exception as hf_error: | |
| print(f"⚠️ HuggingFace failed: {str(hf_error)[:100]}...") | |
| print("🔄 Falling back to Groq...") | |
| # Fallback to Groq | |
| try: | |
| completion = self.groq_client.chat.completions.create( | |
| model=self.groq_model, | |
| messages=messages, | |
| max_tokens=2048, | |
| temperature=0.7 | |
| ) | |
| return completion.choices[0].message.content | |
| except Exception as groq_error: | |
| return f"AI Error: Both HF ({str(hf_error)[:50]}) and Groq ({str(groq_error)[:50]}) failed" | |
| def analyze_question(self, question: str, task_id: str = "") -> Dict[str, Any]: | |
| """AI analyzes question and decides approach with structured output""" | |
| # Create structured model | |
| structured_model = self._create_structured_model(QuestionAnalysis) | |
| if structured_model: | |
| analysis_prompt = f""" | |
| Analyze this question and decide the approach: | |
| Question: "{question}" | |
| Task ID: "{task_id}" | |
| Important rules: | |
| - If question asks about Mercedes Sosa albums, Wikipedia, historical facts -> use "wiki" | |
| - If YouTube URL present -> use "youtube" | |
| - If mentions image, photo, chess position -> use "image" | |
| - If mentions audio, voice, mp3 -> use "audio" | |
| - If mentions file attachment, Excel, CSV -> use "file" | |
| - For math, tables, logic problems -> use "text" but needs_tools=false | |
| - Be accurate about question_type to trigger correct tools | |
| /no_thinking | |
| """ | |
| try: | |
| result = structured_model.invoke(analysis_prompt) | |
| return result.dict() | |
| except Exception as e: | |
| print(f"⚠️ Structured analysis failed: {str(e)[:50]}...") | |
| # Fallback analysis | |
| question_type = analyze_question_type(question) | |
| return { | |
| "question_type": question_type, | |
| "needs_tools": bool(task_id) or question_type in ["wiki", "youtube", "image", "audio", "file"], | |
| "reasoning": "Fallback analysis - structured output failed", | |
| "confidence": "medium", | |
| "can_answer_directly": question_type == "text" and not task_id, | |
| "suggested_approach": f"Use {question_type} processing" | |
| } | |
| def generate_final_answer(self, question: str, tool_results: Dict[str, Any], context: str = "") -> str: | |
| """Generate final answer using LangChain""" | |
| # Build context summary | |
| if tool_results and tool_results.get("tool_results"): | |
| context_summary = build_context_summary( | |
| tool_results.get("tool_results", []), | |
| tool_results.get("cached_data", {}) | |
| ) | |
| else: | |
| context_summary = context or "No additional context available" | |
| answer_prompt = get_response_prompt( | |
| "final_answer", | |
| question=question, | |
| context_summary=context_summary | |
| ) + "\n\n/no_thinking" | |
| messages = [ | |
| {"role": "system", "content": get_system_prompt("reasoning_agent")}, | |
| {"role": "user", "content": answer_prompt} | |
| ] | |
| return self._invoke_model(messages) | |
| def decide_on_reversed_text(self, original: str, reversed: str) -> Dict[str, Any]: | |
| """AI decides which version of text to use with structured output""" | |
| # Create structured model | |
| structured_model = self._create_structured_model(TextDecision) | |
| if structured_model: | |
| decision_prompt = f""" | |
| You are analyzing two versions of the same text to determine which makes more sense: | |
| Original: "{original}" | |
| Reversed: "{reversed}" | |
| Analyze both versions and decide which one is more likely to be the correct question. | |
| Consider grammar, word order, and meaning. | |
| /no_thinking | |
| """ | |
| try: | |
| result = structured_model.invoke(decision_prompt) | |
| return result.dict() | |
| except Exception as e: | |
| print(f"⚠️ Structured decision failed: {str(e)[:50]}...") | |
| # Fallback decision | |
| return { | |
| "chosen_version": "reversed" if len(reversed.split()) > 3 else "original", | |
| "reasoning": "Fallback decision based on text structure", | |
| "confidence": "low" | |
| } | |
| # ============================================================================= | |
| # LANGGRAPH NODES | |
| # ============================================================================= | |
| # Initialize AI brain | |
| ai_brain = LangChainQwen3Brain() | |
| def analyze_question_node(state: AgentState) -> AgentState: | |
| """AI analyzes the question and decides approach""" | |
| question = state["question"] | |
| task_id = state.get("task_id", "") | |
| print(f"🔍 AI analyzing question: {question[:50]}...") | |
| # Get AI analysis | |
| analysis = ai_brain.analyze_question(question, task_id) | |
| state["ai_analysis"] = analysis | |
| # Determine if tools are needed | |
| state["should_use_tools"] = analysis.get("needs_tools", True) | |
| print(f"📊 AI Analysis:") | |
| print(f" Type: {analysis.get('question_type', 'unknown')}") | |
| print(f" Needs tools: {analysis.get('needs_tools', True)}") | |
| print(f" Confidence: {analysis.get('confidence', 'medium')}") | |
| print(f" Reasoning: {analysis.get('reasoning', 'No reasoning provided')}") | |
| return state | |
| def process_with_tools_node(state: AgentState) -> AgentState: | |
| """Process question using utils tool system""" | |
| question = state["question"] | |
| task_id = state.get("task_id", "") | |
| print(f"🔧 Processing with tools...") | |
| try: | |
| # Use utils tool orchestrator | |
| result = process_question_with_tools(question, task_id) | |
| state["tool_processing_result"] = result | |
| print(f"✅ Tool processing completed:") | |
| print(f" Question type: {result.get('question_type', 'unknown')}") | |
| print(f" Successful tools: {result.get('successful_tools', [])}") | |
| print(f" Failed tools: {result.get('failed_tools', [])}") | |
| except Exception as e: | |
| print(f"❌ Tool processing failed: {str(e)}") | |
| state["tool_processing_result"] = { | |
| "error": str(e), | |
| "processed_question": question, | |
| "question_type": "error", | |
| "tools_used": [], | |
| "successful_tools": [], | |
| "failed_tools": [], | |
| "tool_results": [], | |
| "cached_data": {} | |
| } | |
| return state | |
| def answer_directly_node(state: AgentState) -> AgentState: | |
| """Answer question directly without tools""" | |
| question = state["question"] | |
| print(f"💭 AI answering directly...") | |
| # Generate direct answer | |
| direct_prompt = f""" | |
| Answer this question directly based on your knowledge: | |
| Question: {question} | |
| Provide a clear, accurate, and helpful answer. | |
| """ | |
| messages = [ | |
| {"role": "system", "content": get_system_prompt("reasoning_agent")}, | |
| {"role": "user", "content": direct_prompt} | |
| ] | |
| answer = ai_brain._invoke_model(messages) | |
| state["final_answer"] = answer | |
| state["processing_complete"] = True | |
| return state | |
| def generate_final_answer_node(state: AgentState) -> AgentState: | |
| """Generate final answer using AI + tool results""" | |
| question = state["question"] | |
| tool_results = state.get("tool_processing_result", {}) | |
| print(f"🎯 Generating final answer...") | |
| # Generate comprehensive answer | |
| answer = ai_brain.generate_final_answer(question, tool_results) | |
| state["final_answer"] = answer | |
| state["processing_complete"] = True | |
| print(f"✅ Final answer generated") | |
| return state | |
| # ============================================================================= | |
| # LANGGRAPH WORKFLOW | |
| # ============================================================================= | |
| def create_agent_workflow(): | |
| """Create LangGraph workflow""" | |
| workflow = StateGraph(AgentState) | |
| # Add nodes | |
| workflow.add_node("analyze", analyze_question_node) | |
| workflow.add_node("use_tools", process_with_tools_node) | |
| workflow.add_node("direct_answer", answer_directly_node) | |
| workflow.add_node("generate_answer", generate_final_answer_node) | |
| # Routing logic | |
| def should_use_tools(state: AgentState) -> str: | |
| """AI-driven routing decision""" | |
| should_use = state.get("should_use_tools", True) | |
| can_answer_directly = state.get("ai_analysis", {}).get("can_answer_directly", False) | |
| if can_answer_directly and not should_use: | |
| print("🚀 AI decided to answer directly") | |
| return "direct_answer" | |
| else: | |
| print("🔧 AI decided to use tools") | |
| return "use_tools" | |
| # Add conditional edges | |
| workflow.add_conditional_edges( | |
| "analyze", | |
| should_use_tools, | |
| { | |
| "use_tools": "use_tools", | |
| "direct_answer": "direct_answer" | |
| } | |
| ) | |
| # Connect tool processing to final answer | |
| workflow.add_edge("use_tools", "generate_answer") | |
| # End edges | |
| workflow.add_edge("direct_answer", END) | |
| workflow.add_edge("generate_answer", END) | |
| # Set entry point | |
| workflow.set_entry_point("analyze") | |
| return workflow.compile() | |
| # ============================================================================= | |
| # MAIN AGENT CLASS | |
| # ============================================================================= | |
| class LangGraphUtilsAgent: | |
| """Main AI Agent using LangGraph + Utils system""" | |
| def __init__(self): | |
| self.workflow = create_agent_workflow() | |
| self.ai_brain = ai_brain | |
| print("🤖 LangGraph Utils Agent initialized!") | |
| print("🧠 AI Brain: LangChain + HuggingFace with Groq fallback") | |
| print("🔧 Tools: YouTube, Image OCR, Audio Transcript, Wikipedia, File Reader, Text Processor") | |
| print("⚡ Features: AI-driven routing, Smart tool selection, Multimodal processing") | |
| def process_question(self, question: str, task_id: str = "") -> str: | |
| """Main entry point for processing questions""" | |
| try: | |
| print(f"\n🚀 Processing question: {question}") | |
| print(f"📄 Task ID: {task_id or 'None'}") | |
| # Reset agent state for new question | |
| reset_agent_state() | |
| # Initialize LangGraph state | |
| initial_state = { | |
| "messages": [HumanMessage(content=question)], | |
| "question": question, | |
| "task_id": task_id, | |
| "ai_analysis": {}, | |
| "should_use_tools": True, | |
| "tool_processing_result": {}, | |
| "final_answer": "", | |
| "processing_complete": False | |
| } | |
| # Execute workflow | |
| print("\n🔄 Starting LangGraph workflow...") | |
| start_time = time.time() | |
| final_state = self.workflow.invoke(initial_state) | |
| execution_time = time.time() - start_time | |
| print(f"\n⏱️ Total execution time: {execution_time:.2f} seconds") | |
| # Return final answer | |
| answer = final_state.get("final_answer", "No answer generated") | |
| print(f"\n✅ Question processed successfully!") | |
| return answer | |
| except Exception as e: | |
| error_msg = f"Agent processing error: {str(e)}" | |
| print(f"\n❌ {error_msg}") | |
| import traceback | |
| traceback.print_exc() | |
| return error_msg | |
| # ============================================================================= | |
| # GLOBAL AGENT INSTANCE | |
| # ============================================================================= | |
| # Create global agent | |
| agent = LangGraphUtilsAgent() | |
| def process_question(question: str, task_id: str = "") -> str: | |
| """Global function for processing questions""" | |
| return agent.process_question(question, task_id) | |
| # ============================================================================= | |
| # TESTING | |
| # ============================================================================= | |
| if __name__ == "__main__": | |
| print("🧪 Testing LangGraph Utils Agent\n") | |
| test_cases = [ | |
| { | |
| "question": "Who was Marie Curie?", | |
| "task_id": "", | |
| "description": "Wikipedia factual question" | |
| }, | |
| { | |
| "question": "What is 25 + 17 * 3?", | |
| "task_id": "", | |
| "description": "Math calculation" | |
| }, | |
| { | |
| "question": ".rewsna eht sa \"tfel\" drow eht fo etisoppo eht etirw ,ecnetnes siht dnatsrednu uoy fI", | |
| "task_id": "", | |
| "description": "Reversed text question" | |
| }, | |
| { | |
| "question": "How many continents are there?", | |
| "task_id": "", | |
| "description": "General knowledge" | |
| } | |
| ] | |
| for i, test_case in enumerate(test_cases, 1): | |
| print(f"\n{'='*60}") | |
| print(f"TEST {i}: {test_case['description']}") | |
| print(f"{'='*60}") | |
| print(f"Question: {test_case['question']}") | |
| try: | |
| answer = process_question(test_case["question"], test_case["task_id"]) | |
| print(f"\nAnswer: {answer}") | |
| except Exception as e: | |
| print(f"\nTest failed: {str(e)}") | |
| print(f"\n{'-'*60}") | |
| print("\n✅ All tests completed!") |