""" AI AGENT WITH LANGGRAPH + UTILS SYSTEM Architecture: - LangChain/LangGraph workflow với AI-driven routing - Qwen3-8B làm main reasoning engine - Utils system cung cấp tools - AI tự quyết định tools và logic xử lý """ import os import json import time from typing import Dict, Any, List, Optional, Annotated from dotenv import load_dotenv # LangChain imports from langchain_core.messages import HumanMessage, AIMessage, SystemMessage from langchain_core.prompts import ChatPromptTemplate from langchain_core.output_parsers import JsonOutputParser, StrOutputParser # LangGraph imports from langgraph.graph import StateGraph, END from langgraph.graph.message import add_messages from typing_extensions import TypedDict # HuggingFace imports from huggingface_hub import InferenceClient # Groq imports for fallback from groq import Groq # Pydantic for structured output from pydantic import BaseModel, Field # Utils system imports from utils import ( process_question_with_tools, get_agent_state, reset_agent_state, ToolOrchestrator, get_system_prompt, get_response_prompt, build_context_summary, analyze_question_type ) # Load environment load_dotenv() # ============================================================================= # LANGGRAPH STATE DEFINITION # ============================================================================= class AgentState(TypedDict): """LangGraph state for AI agent""" messages: Annotated[List, add_messages] question: str task_id: str # AI Analysis ai_analysis: Dict[str, Any] should_use_tools: bool # Tool processing tool_processing_result: Dict[str, Any] # Final response final_answer: str processing_complete: bool # ============================================================================= # PYDANTIC SCHEMAS FOR STRUCTURED OUTPUT # ============================================================================= class QuestionAnalysis(BaseModel): """Schema for AI question analysis""" question_type: str = Field(description="Type: youtube|image|audio|wiki|file|text|math") needs_tools: bool = Field(description="Whether tools are needed") reasoning: str = Field(description="AI reasoning for the decision") confidence: str = Field(description="Confidence level: high|medium|low") can_answer_directly: bool = Field(description="Can answer without tools") suggested_approach: str = Field(description="Brief description of approach") class TextDecision(BaseModel): """Schema for reversed text decision""" chosen_version: str = Field(description="original|reversed") reasoning: str = Field(description="Reasoning for the choice") confidence: str = Field(description="Confidence level: high|medium|low") # ============================================================================= # AI BRAIN WITH LANGCHAIN # ============================================================================= class LangChainQwen3Brain: """AI Brain using LangChain + HuggingFace with Groq fallback""" def __init__(self): # Primary: HuggingFace self.hf_client = InferenceClient( provider="auto", api_key=os.environ.get("HF_TOKEN", "") ) self.hf_model = "Qwen/Qwen3-8B" # Fallback: Groq self.groq_client = Groq( api_key=os.environ.get("GROQ_API_KEY", "") ) self.groq_model = "llama3-8b-8192" # Setup parsers self.json_parser = JsonOutputParser() self.str_parser = StrOutputParser() print("🧠 LangChain Hybrid Brain initialized (HF + Groq fallback)") def _create_structured_model(self, schema: BaseModel): """Create model with structured output""" try: # Try HuggingFace with structured output from langchain_huggingface import ChatHuggingFace hf_model = ChatHuggingFace( llm=self.hf_client, model_id=self.hf_model ) return hf_model.with_structured_output(schema) except Exception as hf_error: print(f"⚠️ HF structured output failed: {str(hf_error)[:50]}...") try: # Fallback to Groq with structured output from langchain_groq import ChatGroq groq_model = ChatGroq( api_key=os.environ.get("GROQ_API_KEY", ""), model=self.groq_model ) return groq_model.with_structured_output(schema) except Exception as groq_error: print(f"⚠️ Both structured output failed") return None def _invoke_model(self, messages: List[Dict[str, str]]) -> str: """Invoke model with messages - try HF first, fallback to Groq""" # Try HuggingFace first try: completion = self.hf_client.chat.completions.create( model=self.hf_model, messages=messages, max_tokens=2048, temperature=0.7 ) return completion.choices[0].message.content except Exception as hf_error: print(f"⚠️ HuggingFace failed: {str(hf_error)[:100]}...") print("🔄 Falling back to Groq...") # Fallback to Groq try: completion = self.groq_client.chat.completions.create( model=self.groq_model, messages=messages, max_tokens=2048, temperature=0.7 ) return completion.choices[0].message.content except Exception as groq_error: return f"AI Error: Both HF ({str(hf_error)[:50]}) and Groq ({str(groq_error)[:50]}) failed" def analyze_question(self, question: str, task_id: str = "") -> Dict[str, Any]: """AI analyzes question and decides approach with structured output""" # Create structured model structured_model = self._create_structured_model(QuestionAnalysis) if structured_model: analysis_prompt = f""" Analyze this question and decide the approach: Question: "{question}" Task ID: "{task_id}" Important rules: - If question asks about Mercedes Sosa albums, Wikipedia, historical facts -> use "wiki" - If YouTube URL present -> use "youtube" - If mentions image, photo, chess position -> use "image" - If mentions audio, voice, mp3 -> use "audio" - If mentions file attachment, Excel, CSV -> use "file" - For math, tables, logic problems -> use "text" but needs_tools=false - Be accurate about question_type to trigger correct tools /no_thinking """ try: result = structured_model.invoke(analysis_prompt) return result.dict() except Exception as e: print(f"⚠️ Structured analysis failed: {str(e)[:50]}...") # Fallback analysis question_type = analyze_question_type(question) return { "question_type": question_type, "needs_tools": bool(task_id) or question_type in ["wiki", "youtube", "image", "audio", "file"], "reasoning": "Fallback analysis - structured output failed", "confidence": "medium", "can_answer_directly": question_type == "text" and not task_id, "suggested_approach": f"Use {question_type} processing" } def generate_final_answer(self, question: str, tool_results: Dict[str, Any], context: str = "") -> str: """Generate final answer using LangChain""" # Build context summary if tool_results and tool_results.get("tool_results"): context_summary = build_context_summary( tool_results.get("tool_results", []), tool_results.get("cached_data", {}) ) else: context_summary = context or "No additional context available" answer_prompt = get_response_prompt( "final_answer", question=question, context_summary=context_summary ) + "\n\n/no_thinking" messages = [ {"role": "system", "content": get_system_prompt("reasoning_agent")}, {"role": "user", "content": answer_prompt} ] return self._invoke_model(messages) def decide_on_reversed_text(self, original: str, reversed: str) -> Dict[str, Any]: """AI decides which version of text to use with structured output""" # Create structured model structured_model = self._create_structured_model(TextDecision) if structured_model: decision_prompt = f""" You are analyzing two versions of the same text to determine which makes more sense: Original: "{original}" Reversed: "{reversed}" Analyze both versions and decide which one is more likely to be the correct question. Consider grammar, word order, and meaning. /no_thinking """ try: result = structured_model.invoke(decision_prompt) return result.dict() except Exception as e: print(f"⚠️ Structured decision failed: {str(e)[:50]}...") # Fallback decision return { "chosen_version": "reversed" if len(reversed.split()) > 3 else "original", "reasoning": "Fallback decision based on text structure", "confidence": "low" } # ============================================================================= # LANGGRAPH NODES # ============================================================================= # Initialize AI brain ai_brain = LangChainQwen3Brain() def analyze_question_node(state: AgentState) -> AgentState: """AI analyzes the question and decides approach""" question = state["question"] task_id = state.get("task_id", "") print(f"🔍 AI analyzing question: {question[:50]}...") # Get AI analysis analysis = ai_brain.analyze_question(question, task_id) state["ai_analysis"] = analysis # Determine if tools are needed state["should_use_tools"] = analysis.get("needs_tools", True) print(f"📊 AI Analysis:") print(f" Type: {analysis.get('question_type', 'unknown')}") print(f" Needs tools: {analysis.get('needs_tools', True)}") print(f" Confidence: {analysis.get('confidence', 'medium')}") print(f" Reasoning: {analysis.get('reasoning', 'No reasoning provided')}") return state def process_with_tools_node(state: AgentState) -> AgentState: """Process question using utils tool system""" question = state["question"] task_id = state.get("task_id", "") print(f"🔧 Processing with tools...") try: # Use utils tool orchestrator result = process_question_with_tools(question, task_id) state["tool_processing_result"] = result print(f"✅ Tool processing completed:") print(f" Question type: {result.get('question_type', 'unknown')}") print(f" Successful tools: {result.get('successful_tools', [])}") print(f" Failed tools: {result.get('failed_tools', [])}") except Exception as e: print(f"❌ Tool processing failed: {str(e)}") state["tool_processing_result"] = { "error": str(e), "processed_question": question, "question_type": "error", "tools_used": [], "successful_tools": [], "failed_tools": [], "tool_results": [], "cached_data": {} } return state def answer_directly_node(state: AgentState) -> AgentState: """Answer question directly without tools""" question = state["question"] print(f"💭 AI answering directly...") # Generate direct answer direct_prompt = f""" Answer this question directly based on your knowledge: Question: {question} Provide a clear, accurate, and helpful answer. """ messages = [ {"role": "system", "content": get_system_prompt("reasoning_agent")}, {"role": "user", "content": direct_prompt} ] answer = ai_brain._invoke_model(messages) state["final_answer"] = answer state["processing_complete"] = True return state def generate_final_answer_node(state: AgentState) -> AgentState: """Generate final answer using AI + tool results""" question = state["question"] tool_results = state.get("tool_processing_result", {}) print(f"🎯 Generating final answer...") # Generate comprehensive answer answer = ai_brain.generate_final_answer(question, tool_results) state["final_answer"] = answer state["processing_complete"] = True print(f"✅ Final answer generated") return state # ============================================================================= # LANGGRAPH WORKFLOW # ============================================================================= def create_agent_workflow(): """Create LangGraph workflow""" workflow = StateGraph(AgentState) # Add nodes workflow.add_node("analyze", analyze_question_node) workflow.add_node("use_tools", process_with_tools_node) workflow.add_node("direct_answer", answer_directly_node) workflow.add_node("generate_answer", generate_final_answer_node) # Routing logic def should_use_tools(state: AgentState) -> str: """AI-driven routing decision""" should_use = state.get("should_use_tools", True) can_answer_directly = state.get("ai_analysis", {}).get("can_answer_directly", False) if can_answer_directly and not should_use: print("🚀 AI decided to answer directly") return "direct_answer" else: print("🔧 AI decided to use tools") return "use_tools" # Add conditional edges workflow.add_conditional_edges( "analyze", should_use_tools, { "use_tools": "use_tools", "direct_answer": "direct_answer" } ) # Connect tool processing to final answer workflow.add_edge("use_tools", "generate_answer") # End edges workflow.add_edge("direct_answer", END) workflow.add_edge("generate_answer", END) # Set entry point workflow.set_entry_point("analyze") return workflow.compile() # ============================================================================= # MAIN AGENT CLASS # ============================================================================= class LangGraphUtilsAgent: """Main AI Agent using LangGraph + Utils system""" def __init__(self): self.workflow = create_agent_workflow() self.ai_brain = ai_brain print("🤖 LangGraph Utils Agent initialized!") print("🧠 AI Brain: LangChain + HuggingFace with Groq fallback") print("🔧 Tools: YouTube, Image OCR, Audio Transcript, Wikipedia, File Reader, Text Processor") print("⚡ Features: AI-driven routing, Smart tool selection, Multimodal processing") def process_question(self, question: str, task_id: str = "") -> str: """Main entry point for processing questions""" try: print(f"\n🚀 Processing question: {question}") print(f"📄 Task ID: {task_id or 'None'}") # Reset agent state for new question reset_agent_state() # Initialize LangGraph state initial_state = { "messages": [HumanMessage(content=question)], "question": question, "task_id": task_id, "ai_analysis": {}, "should_use_tools": True, "tool_processing_result": {}, "final_answer": "", "processing_complete": False } # Execute workflow print("\n🔄 Starting LangGraph workflow...") start_time = time.time() final_state = self.workflow.invoke(initial_state) execution_time = time.time() - start_time print(f"\n⏱️ Total execution time: {execution_time:.2f} seconds") # Return final answer answer = final_state.get("final_answer", "No answer generated") print(f"\n✅ Question processed successfully!") return answer except Exception as e: error_msg = f"Agent processing error: {str(e)}" print(f"\n❌ {error_msg}") import traceback traceback.print_exc() return error_msg # ============================================================================= # GLOBAL AGENT INSTANCE # ============================================================================= # Create global agent agent = LangGraphUtilsAgent() def process_question(question: str, task_id: str = "") -> str: """Global function for processing questions""" return agent.process_question(question, task_id) # ============================================================================= # TESTING # ============================================================================= if __name__ == "__main__": print("🧪 Testing LangGraph Utils Agent\n") test_cases = [ { "question": "Who was Marie Curie?", "task_id": "", "description": "Wikipedia factual question" }, { "question": "What is 25 + 17 * 3?", "task_id": "", "description": "Math calculation" }, { "question": ".rewsna eht sa \"tfel\" drow eht fo etisoppo eht etirw ,ecnetnes siht dnatsrednu uoy fI", "task_id": "", "description": "Reversed text question" }, { "question": "How many continents are there?", "task_id": "", "description": "General knowledge" } ] for i, test_case in enumerate(test_cases, 1): print(f"\n{'='*60}") print(f"TEST {i}: {test_case['description']}") print(f"{'='*60}") print(f"Question: {test_case['question']}") try: answer = process_question(test_case["question"], test_case["task_id"]) print(f"\nAnswer: {answer}") except Exception as e: print(f"\nTest failed: {str(e)}") print(f"\n{'-'*60}") print("\n✅ All tests completed!")