""" AI AGENT WITH LANGGRAPH + HUGGINGFACE INTEGRATION Clean architecture with LangChain HuggingFace Pipeline """ import os import json import time from typing import Dict, Any, List, Optional, Annotated from dotenv import load_dotenv from langchain_core.messages import HumanMessage, AIMessage, SystemMessage from langchain_core.prompts import ChatPromptTemplate from langchain_core.output_parsers import JsonOutputParser, StrOutputParser from langgraph.graph import StateGraph, END from langgraph.graph.message import add_messages from typing_extensions import TypedDict from pydantic import BaseModel, Field # LangChain HuggingFace Integration from transformers import AutoModelForCausalLM, AutoTokenizer from utils import ( process_question_with_tools, get_agent_state, reset_agent_state, ToolOrchestrator, get_system_prompt, get_response_prompt, build_context_summary, analyze_question_type ) load_dotenv() class AgentState(TypedDict): messages: Annotated[List, add_messages] question: str task_id: str ai_analysis: Dict[str, Any] should_use_tools: bool tool_processing_result: Dict[str, Any] final_answer: str processing_complete: bool class QuestionAnalysis(BaseModel): question_type: str = Field(description="Type: youtube|image|audio|wiki|file|text|math") needs_tools: bool = Field(description="Whether tools are needed") reasoning: str = Field(description="AI reasoning for the decision") confidence: str = Field(description="Confidence level: high|medium|low") class AIBrain: def __init__(self): self.model_name = "Qwen/Qwen3-8B" print("🧠 Initializing Qwen3-8B với transformers gốc...") self.tokenizer = AutoTokenizer.from_pretrained(self.model_name) self.model = AutoModelForCausalLM.from_pretrained( self.model_name, torch_dtype="auto", device_map="auto" ) print("✅ Qwen3 AI Brain với transformers đã sẵn sàng") def _generate_with_qwen3(self, prompt: str, max_tokens: int = 2048) -> str: """Sinh text với Qwen3 bằng transformers gốc, thinking mode tắt""" try: messages = [{"role": "user", "content": prompt}] text = self.tokenizer.apply_chat_template( messages, tokenize=False, add_generation_prompt=True, enable_thinking=False ) model_inputs = self.tokenizer([text], return_tensors="pt").to(self.model.device) generated_ids = self.model.generate( **model_inputs, max_new_tokens=max_tokens ) output_ids = generated_ids[0][len(model_inputs.input_ids[0]):].tolist() response = self.tokenizer.decode(output_ids, skip_special_tokens=True).strip("\n") return response except Exception as e: print(f"⚠️ Qwen3 generation error: {str(e)}") return f"AI generation failed: {str(e)}" def analyze_question(self, question: str, task_id: str = "") -> Dict[str, Any]: """Analyze question type using Qwen3 with strict JSON output""" prompt = f""" Analyze this question and determine the correct tool approach. Return ONLY valid JSON. {question} {task_id} - YouTube URLs (youtube.com, youtu.be): "youtube" - Images, photos, chess positions, visual content: "image" - Audio files, voice, sound, mp3: "audio" - Excel, CSV, documents, file uploads: "file" - Wikipedia searches, historical facts, people info: "wiki" - Math calculations, logic, text analysis: "text" Return this exact JSON format: {{ "question_type": "youtube|image|audio|wiki|file|text", "needs_tools": true, "reasoning": "Brief explanation of classification", "confidence": "high" }}""" try: response = self._generate_with_qwen3(prompt, 512) # Extract JSON from response import re json_pattern = r'\{[^{}]*\}' json_match = re.search(json_pattern, response) if json_match: result = json.loads(json_match.group()) # Validate required fields required_fields = ["question_type", "needs_tools", "reasoning", "confidence"] if all(field in result for field in required_fields): return result raise ValueError("Invalid JSON structure in response") except Exception as e: print(f"⚠️ Qwen3 analysis failed: {str(e)[:100]}...") # Fallback analysis question_type = analyze_question_type(question) return { "question_type": question_type, "needs_tools": question_type in ["wiki", "youtube", "image", "audio", "file"], "reasoning": f"Fallback classification: detected {question_type}", "confidence": "medium" } def generate_answer(self, question: str, tool_results: Dict[str, Any]) -> str: """Generate final answer using Qwen3 with context""" if tool_results and tool_results.get("tool_results"): context = build_context_summary( tool_results.get("tool_results", []), tool_results.get("cached_data", {}) ) else: context = "No additional context available" prompt = f""" Generate a comprehensive answer to the user's question using the provided context. {question} {context} - Provide direct, accurate answers - Use context information when relevant - Be concise but complete - No thinking process in output - Professional tone Answer:""" response = self._generate_with_qwen3(prompt, 2048) # Clean up response if "Answer:" in response: response = response.split("Answer:")[-1].strip() return response # Initialize AI Brain globally ai_brain = AIBrain() def analyze_question_node(state: AgentState) -> AgentState: """Analyze question using Qwen3 AI Brain""" question = state["question"] task_id = state.get("task_id", "") print("🔍 Analyzing question with Qwen3...") analysis = ai_brain.analyze_question(question, task_id) state["ai_analysis"] = analysis state["should_use_tools"] = analysis.get("needs_tools", True) print(f"📊 Type: {analysis.get('question_type')} | Tools: {analysis.get('needs_tools')} | Confidence: {analysis.get('confidence')}") return state def process_with_tools_node(state: AgentState) -> AgentState: """Process question with appropriate tools""" question = state["question"] task_id = state.get("task_id", "") print("🔧 Processing with specialized tools...") tool_results = process_question_with_tools(question, task_id) state["tool_processing_result"] = tool_results successful_tools = [result.tool_name for result in tool_results.get("tool_results", []) if result.success] if successful_tools: print(f"✅ Successful tools: {successful_tools}") else: print("⚠️ No tools succeeded") return state def answer_directly_node(state: AgentState) -> AgentState: """Answer directly without tools using Qwen3""" question = state["question"] print("💭 Generating direct answer with Qwen3...") answer = ai_brain.generate_answer(question, {}) state["final_answer"] = answer state["processing_complete"] = True return state def generate_final_answer_node(state: AgentState) -> AgentState: """Generate final answer combining tool results and AI analysis""" question = state["question"] tool_results = state.get("tool_processing_result", {}) print("🎯 Generating final answer with context...") answer = ai_brain.generate_answer(question, tool_results) state["final_answer"] = answer state["processing_complete"] = True return state def create_agent_workflow(): """Create LangGraph workflow for question processing""" workflow = StateGraph(AgentState) # Add nodes workflow.add_node("analyze_question", analyze_question_node) workflow.add_node("process_with_tools", process_with_tools_node) workflow.add_node("answer_directly", answer_directly_node) workflow.add_node("generate_final_answer", generate_final_answer_node) # Define routing logic def should_use_tools(state: AgentState) -> str: return "process_with_tools" if state.get("should_use_tools", True) else "answer_directly" # Set up the flow workflow.set_entry_point("analyze_question") workflow.add_conditional_edges("analyze_question", should_use_tools) workflow.add_edge("process_with_tools", "generate_final_answer") workflow.add_edge("answer_directly", END) workflow.add_edge("generate_final_answer", END) return workflow.compile() class LangGraphUtilsAgent: def __init__(self): self.app = create_agent_workflow() print("🚀 LangGraph Agent with Qwen3 + Utils System ready") def process_question(self, question: str, task_id: str = "") -> str: """Process question through the workflow""" try: print(f"\n🎯 Processing: {question[:100]}...") # Initialize state initial_state = { "messages": [HumanMessage(content=question)], "question": question, "task_id": task_id, "ai_analysis": {}, "should_use_tools": True, "tool_processing_result": {}, "final_answer": "", "processing_complete": False } # Run workflow start_time = time.time() result = self.app.invoke(initial_state) elapsed_time = time.time() - start_time final_answer = result.get("final_answer", "No answer generated") print(f"✅ Completed in {elapsed_time:.2f}s") return final_answer except Exception as e: print(f"❌ Agent error: {str(e)}") return f"I apologize, but I encountered an error processing your question: {str(e)}" # Global agent instance agent = LangGraphUtilsAgent() def process_question(question: str, task_id: str = "") -> str: """Main entry point for question processing""" if not question or not question.strip(): return "Please provide a valid question." return agent.process_question(question.strip(), task_id) # ============================================================================= # TESTING # ============================================================================= if __name__ == "__main__": print("🧪 Testing LangGraph Utils Agent\n") test_cases = [ { "question": "Who was Marie Curie?", "task_id": "", "description": "Wikipedia factual question" }, { "question": "What is 25 + 17 * 3?", "task_id": "", "description": "Math calculation" }, { "question": ".rewsna eht sa \"tfel\" drow eht fo etisoppo eht etirw ,ecnetnes siht dnatsrednu uoy fI", "task_id": "", "description": "Reversed text question" }, { "question": "How many continents are there?", "task_id": "", "description": "General knowledge" } ] for i, test_case in enumerate(test_cases, 1): print(f"\n{'='*60}") print(f"TEST {i}: {test_case['description']}") print(f"{'='*60}") print(f"Question: {test_case['question']}") try: answer = process_question(test_case["question"], test_case["task_id"]) print(f"\nAnswer: {answer}") except Exception as e: print(f"\nTest failed: {str(e)}") print(f"\n{'-'*60}") print("\n✅ All tests completed!")