""" AI AGENT WITH LANGGRAPH + UTILS SYSTEM Architecture: - LangChain/LangGraph workflow với AI-driven routing - Qwen3-8B làm main reasoning engine - Utils system cung cấp tools - AI tự quyết định tools và logic xử lý """ import os import json import time from typing import Dict, Any, List, Optional, Annotated from dotenv import load_dotenv # LangChain imports from langchain_core.messages import HumanMessage, AIMessage, SystemMessage from langchain_core.prompts import ChatPromptTemplate from langchain_core.output_parsers import JsonOutputParser, StrOutputParser # LangGraph imports from langgraph.graph import StateGraph, END from langgraph.graph.message import add_messages from typing_extensions import TypedDict # HuggingFace imports from huggingface_hub import InferenceClient # Utils system imports from utils import ( process_question_with_tools, get_agent_state, reset_agent_state, ToolOrchestrator, get_system_prompt, get_response_prompt, build_context_summary, analyze_question_type ) # Load environment load_dotenv() # ============================================================================= # LANGGRAPH STATE DEFINITION # ============================================================================= class AgentState(TypedDict): """LangGraph state for AI agent""" messages: Annotated[List, add_messages] question: str task_id: str # AI Analysis ai_analysis: Dict[str, Any] should_use_tools: bool # Tool processing tool_processing_result: Dict[str, Any] # Final response final_answer: str processing_complete: bool # ============================================================================= # AI BRAIN WITH LANGCHAIN # ============================================================================= class LangChainQwen3Brain: """AI Brain using LangChain + Qwen3-8B""" def __init__(self): self.client = InferenceClient( provider="auto", api_key=os.environ.get("HF_TOKEN", "") ) self.model_name = "Qwen/Qwen2.5-7B-Instruct" # Setup parsers self.json_parser = JsonOutputParser() self.str_parser = StrOutputParser() print("🧠 LangChain Qwen3 Brain initialized") def _invoke_model(self, messages: List[Dict[str, str]]) -> str: """Invoke model with messages""" try: completion = self.client.chat.completions.create( model=self.model_name, messages=messages, max_tokens=2048, temperature=0.7 ) return completion.choices[0].message.content except Exception as e: return f"AI Error: {str(e)}" def analyze_question(self, question: str, task_id: str = "") -> Dict[str, Any]: """AI analyzes question and decides approach""" system_prompt = get_system_prompt("main_agent") analysis_prompt = f""" Analyze this question and decide the approach: Question: "{question}" Task ID: "{task_id}" Provide your analysis in JSON format: {{ "question_type": "youtube|image|audio|wiki|file|text|math", "needs_tools": true/false, "reasoning": "your reasoning", "confidence": "high|medium|low", "can_answer_directly": true/false, "suggested_approach": "brief description" }} Important: - If task_id is provided, likely has file attachment - Look for URLs, especially YouTube - Consider if question seems reversed/malformed - Be intelligent about what tools are actually needed """ messages = [ {"role": "system", "content": system_prompt}, {"role": "user", "content": analysis_prompt} ] response = self._invoke_model(messages) # Try to parse JSON try: # Extract JSON from response import re json_match = re.search(r'\{.*\}', response, re.DOTALL) if json_match: analysis = json.loads(json_match.group()) return analysis else: raise ValueError("No JSON found") except: # Fallback analysis question_type = analyze_question_type(question) return { "question_type": question_type, "needs_tools": bool(task_id) or question_type != "text", "reasoning": "JSON parsing failed, using fallback analysis", "confidence": "medium", "can_answer_directly": question_type == "text" and not task_id, "suggested_approach": f"Use {question_type} processing" } def generate_final_answer(self, question: str, tool_results: Dict[str, Any], context: str = "") -> str: """Generate final answer using LangChain""" # Build context summary if tool_results and tool_results.get("tool_results"): context_summary = build_context_summary( tool_results.get("tool_results", []), tool_results.get("cached_data", {}) ) else: context_summary = context or "No additional context available" answer_prompt = get_response_prompt( "final_answer", question=question, context_summary=context_summary ) messages = [ {"role": "system", "content": get_system_prompt("reasoning_agent")}, {"role": "user", "content": answer_prompt} ] return self._invoke_model(messages) def decide_on_reversed_text(self, original: str, reversed: str) -> Dict[str, Any]: """AI decides which version of text to use""" decision_prompt = f""" You are analyzing two versions of the same text to determine which makes more sense: Original: "{original}" Reversed: "{reversed}" Analyze both versions and decide which one is more likely to be the correct question. Consider grammar, word order, and meaning. Respond in JSON format: {{ "chosen_version": "original|reversed", "reasoning": "your reasoning", "confidence": "high|medium|low" }} """ messages = [ {"role": "system", "content": "You are a text analysis expert."}, {"role": "user", "content": decision_prompt} ] response = self._invoke_model(messages) try: import re json_match = re.search(r'\{.*\}', response, re.DOTALL) if json_match: return json.loads(json_match.group()) except: pass # Fallback decision return { "chosen_version": "reversed" if len(reversed.split()) > 3 else "original", "reasoning": "Fallback decision based on text structure", "confidence": "low" } # ============================================================================= # LANGGRAPH NODES # ============================================================================= # Initialize AI brain ai_brain = LangChainQwen3Brain() def analyze_question_node(state: AgentState) -> AgentState: """AI analyzes the question and decides approach""" question = state["question"] task_id = state.get("task_id", "") print(f"🔍 AI analyzing question: {question[:50]}...") # Get AI analysis analysis = ai_brain.analyze_question(question, task_id) state["ai_analysis"] = analysis # Determine if tools are needed state["should_use_tools"] = analysis.get("needs_tools", True) print(f"📊 AI Analysis:") print(f" Type: {analysis.get('question_type', 'unknown')}") print(f" Needs tools: {analysis.get('needs_tools', True)}") print(f" Confidence: {analysis.get('confidence', 'medium')}") print(f" Reasoning: {analysis.get('reasoning', 'No reasoning provided')}") return state def process_with_tools_node(state: AgentState) -> AgentState: """Process question using utils tool system""" question = state["question"] task_id = state.get("task_id", "") print(f"🔧 Processing with tools...") try: # Use utils tool orchestrator result = process_question_with_tools(question, task_id) state["tool_processing_result"] = result print(f"✅ Tool processing completed:") print(f" Question type: {result.get('question_type', 'unknown')}") print(f" Successful tools: {result.get('successful_tools', [])}") print(f" Failed tools: {result.get('failed_tools', [])}") except Exception as e: print(f"❌ Tool processing failed: {str(e)}") state["tool_processing_result"] = { "error": str(e), "processed_question": question, "question_type": "error", "tools_used": [], "successful_tools": [], "failed_tools": [], "tool_results": [], "cached_data": {} } return state def answer_directly_node(state: AgentState) -> AgentState: """Answer question directly without tools""" question = state["question"] print(f"💭 AI answering directly...") # Generate direct answer direct_prompt = f""" Answer this question directly based on your knowledge: Question: {question} Provide a clear, accurate, and helpful answer. """ messages = [ {"role": "system", "content": get_system_prompt("reasoning_agent")}, {"role": "user", "content": direct_prompt} ] answer = ai_brain._invoke_model(messages) state["final_answer"] = answer state["processing_complete"] = True return state def generate_final_answer_node(state: AgentState) -> AgentState: """Generate final answer using AI + tool results""" question = state["question"] tool_results = state.get("tool_processing_result", {}) print(f"🎯 Generating final answer...") # Generate comprehensive answer answer = ai_brain.generate_final_answer(question, tool_results) state["final_answer"] = answer state["processing_complete"] = True print(f"✅ Final answer generated") return state # ============================================================================= # LANGGRAPH WORKFLOW # ============================================================================= def create_agent_workflow(): """Create LangGraph workflow""" workflow = StateGraph(AgentState) # Add nodes workflow.add_node("analyze", analyze_question_node) workflow.add_node("use_tools", process_with_tools_node) workflow.add_node("direct_answer", answer_directly_node) workflow.add_node("generate_answer", generate_final_answer_node) # Routing logic def should_use_tools(state: AgentState) -> str: """AI-driven routing decision""" should_use = state.get("should_use_tools", True) can_answer_directly = state.get("ai_analysis", {}).get("can_answer_directly", False) if can_answer_directly and not should_use: print("🚀 AI decided to answer directly") return "direct_answer" else: print("🔧 AI decided to use tools") return "use_tools" # Add conditional edges workflow.add_conditional_edges( "analyze", should_use_tools, { "use_tools": "use_tools", "direct_answer": "direct_answer" } ) # Connect tool processing to final answer workflow.add_edge("use_tools", "generate_answer") # End edges workflow.add_edge("direct_answer", END) workflow.add_edge("generate_answer", END) # Set entry point workflow.set_entry_point("analyze") return workflow.compile() # ============================================================================= # MAIN AGENT CLASS # ============================================================================= class LangGraphUtilsAgent: """Main AI Agent using LangGraph + Utils system""" def __init__(self): self.workflow = create_agent_workflow() self.ai_brain = ai_brain print("🤖 LangGraph Utils Agent initialized!") print("🧠 AI Brain: Qwen3-8B with LangChain") print("🔧 Tools: YouTube, Image OCR, Audio Transcript, Wikipedia, File Reader, Text Processor") print("⚡ Features: AI-driven routing, Smart tool selection, Multimodal processing") def process_question(self, question: str, task_id: str = "") -> str: """Main entry point for processing questions""" try: print(f"\n🚀 Processing question: {question}") print(f"📄 Task ID: {task_id or 'None'}") # Reset agent state for new question reset_agent_state() # Initialize LangGraph state initial_state = { "messages": [HumanMessage(content=question)], "question": question, "task_id": task_id, "ai_analysis": {}, "should_use_tools": True, "tool_processing_result": {}, "final_answer": "", "processing_complete": False } # Execute workflow print("\n🔄 Starting LangGraph workflow...") start_time = time.time() final_state = self.workflow.invoke(initial_state) execution_time = time.time() - start_time print(f"\n⏱️ Total execution time: {execution_time:.2f} seconds") # Return final answer answer = final_state.get("final_answer", "No answer generated") print(f"\n✅ Question processed successfully!") return answer except Exception as e: error_msg = f"Agent processing error: {str(e)}" print(f"\n❌ {error_msg}") import traceback traceback.print_exc() return error_msg # ============================================================================= # GLOBAL AGENT INSTANCE # ============================================================================= # Create global agent agent = LangGraphUtilsAgent() def process_question(question: str, task_id: str = "") -> str: """Global function for processing questions""" return agent.process_question(question, task_id) # ============================================================================= # TESTING # ============================================================================= if __name__ == "__main__": print("🧪 Testing LangGraph Utils Agent\n") test_cases = [ { "question": "Who was Marie Curie?", "task_id": "", "description": "Wikipedia factual question" }, { "question": "What is 25 + 17 * 3?", "task_id": "", "description": "Math calculation" }, { "question": ".rewsna eht sa \"tfel\" drow eht fo etisoppo eht etirw ,ecnetnes siht dnatsrednu uoy fI", "task_id": "", "description": "Reversed text question" }, { "question": "How many continents are there?", "task_id": "", "description": "General knowledge" } ] for i, test_case in enumerate(test_cases, 1): print(f"\n{'='*60}") print(f"TEST {i}: {test_case['description']}") print(f"{'='*60}") print(f"Question: {test_case['question']}") try: answer = process_question(test_case["question"], test_case["task_id"]) print(f"\nAnswer: {answer}") except Exception as e: print(f"\nTest failed: {str(e)}") print(f"\n{'-'*60}") print("\n✅ All tests completed!")