""" AI AGENT WITH LANGGRAPH + AI-DRIVEN TOOL CALLING Flow: 1. AI phân loại câu hỏi và quyết định tool 2. LangGraph nodes thực hiện tools 3. AI quyết định tiếp tục hoặc kết thúc 4. Qwen3-8B làm main reasoning engine Architecture: - Qwen3-8B via HuggingFace InferenceClient - LangGraph workflow với dynamic routing - AI-powered decision making (không hardcode) """ import os import json import tempfile import requests from typing import List, Dict, Any, Annotated from dotenv import load_dotenv # LangGraph imports from langgraph.graph import StateGraph, END from langgraph.graph.message import add_messages from typing_extensions import TypedDict # HuggingFace imports from huggingface_hub import InferenceClient # Other imports import wikipedia from PIL import Image import pandas as pd import yt_dlp from groq import Groq # OCR alternative - fallback to basic image processing try: import easyocr OCR_AVAILABLE = True except ImportError: OCR_AVAILABLE = False print("⚠️ EasyOCR not available, using fallback image processing") # Load environment load_dotenv() # ============================================================================= # STATE DEFINITION # ============================================================================= class AgentState(TypedDict): messages: Annotated[list, add_messages] question: str task_id: str file_name: str ai_decision: Dict[str, Any] # AI's decision about what to do tool_results: Dict[str, Any] answer: str continue_workflow: bool # ============================================================================= # QWEN3-8B AI BRAIN # ============================================================================= class Qwen3Brain: """Main AI brain using Qwen3-8B for all decisions""" def __init__(self): self.client = InferenceClient( provider="auto", api_key=os.environ.get("HF_TOKEN", "") ) self.model_name = "Qwen/Qwen3-8B" print("🧠 Qwen3-8B AI Brain initialized") def think(self, prompt: str) -> str: """Main thinking function""" try: completion = self.client.chat.completions.create( model=self.model_name, messages=[ { "role": "user", "content": prompt } ], max_tokens=2048, temperature=0.6 ) return completion.choices[0].message.content except Exception as e: return f"AI Error: {str(e)}" def decide_action(self, question: str, task_id: str = "", file_name: str = "") -> Dict[str, Any]: """AI decides what action to take""" prompt = f"""You are an intelligent AI agent. Analyze this question and decide the next action. Question: {question} Task ID: {task_id} File name: {file_name} Available actions: 1. "answer_directly" - if you can answer without tools 2. "transcribe_audio" - for audio files 3. "ocr_image" - for images with text 4. "read_file" - for Python/Excel/text files 5. "search_wikipedia" - for factual information 6. "calculate_math" - for math calculations 7. "get_youtube" - for YouTube videos 8. "download_file" - to get files from API Respond in JSON format: {{ "action": "action_name", "reasoning": "why you chose this", "params": "parameters needed (if any)", "can_answer_now": true/false }} Be decisive and clear about your choice.""" try: response = self.think(prompt) # Try to parse JSON return json.loads(response) except: # Fallback if JSON parsing fails return { "action": "answer_directly", "reasoning": "JSON parsing failed, answering directly", "params": "", "can_answer_now": True } def final_answer(self, question: str, tool_results: Dict[str, Any]) -> str: """Generate final answer based on question and tool results""" prompt = f"""Generate the final answer based on the question and any tool results. Question: {question} Tool results: {json.dumps(tool_results, indent=2)} Provide a clear, direct answer to the original question. Use the tool results if available.""" return self.think(prompt) # ============================================================================= # TOOLS AS LANGGRAPH NODES # ============================================================================= # Initialize components ai_brain = Qwen3Brain() # Initialize Groq client with error handling try: groq_client = Groq(api_key=os.environ.get("GROQ_API_KEY", "")) print("✅ Groq client initialized") except Exception as e: print(f"⚠️ Groq client initialization failed: {e}") groq_client = None # Initialize OCR with fallback if OCR_AVAILABLE: ocr_reader = easyocr.Reader(['en']) else: ocr_reader = None def ai_decision_node(state: AgentState) -> AgentState: """AI decides what to do next""" question = state["question"] task_id = state.get("task_id", "") file_name = state.get("file_name", "") decision = ai_brain.decide_action(question, task_id, file_name) state["ai_decision"] = decision print(f"🧠 AI Decision: {decision['action']} - {decision['reasoning']}") return state def answer_directly_node(state: AgentState) -> AgentState: """Answer question directly without tools""" question = state["question"] prompt = f"Answer this question directly: {question}" answer = ai_brain.think(prompt) state["answer"] = answer state["continue_workflow"] = False return state def transcribe_audio_node(state: AgentState) -> AgentState: """Transcribe audio files""" task_id = state.get("task_id", "") try: # Download file file_path = download_file(task_id) if not file_path.startswith("Error") and groq_client: # Transcribe with open(file_path, "rb") as f: transcription = groq_client.audio.transcriptions.create( file=(file_path, f.read()), model="whisper-large-v3-turbo", response_format="text", language="en" ) result = transcription.text elif not groq_client: result = "Audio transcription not available - Groq client not initialized" else: result = file_path state["tool_results"]["audio_transcript"] = result except Exception as e: state["tool_results"]["audio_transcript"] = f"Audio error: {str(e)}" state["continue_workflow"] = True return state def ocr_image_node(state: AgentState) -> AgentState: """Extract text from images""" task_id = state.get("task_id", "") try: # Download file file_path = download_file(task_id) if not file_path.startswith("Error"): if OCR_AVAILABLE and ocr_reader: # Use EasyOCR results = ocr_reader.readtext(file_path) text = " ".join([result[1] for result in results]) result = text if text.strip() else "No text found" else: # Fallback: Basic image info try: img = Image.open(file_path) result = f"Image info: {img.format} {img.size} {img.mode} - OCR not available, please describe the image content" except: result = "Image file detected but cannot process without OCR" else: result = file_path state["tool_results"]["ocr_text"] = result except Exception as e: state["tool_results"]["ocr_text"] = f"OCR error: {str(e)}" state["continue_workflow"] = True return state def read_file_node(state: AgentState) -> AgentState: """Read various file types""" task_id = state.get("task_id", "") try: # Download file file_path = download_file(task_id) if not file_path.startswith("Error"): # Read based on file type if file_path.endswith('.py'): with open(file_path, 'r', encoding='utf-8') as f: result = f"Python code:\n{f.read()}" elif file_path.endswith(('.xlsx', '.xls')): df = pd.read_excel(file_path) result = f"Excel data:\n{df.to_string()}" elif file_path.endswith('.csv'): df = pd.read_csv(file_path) result = f"CSV data:\n{df.to_string()}" else: with open(file_path, 'r', encoding='utf-8') as f: result = f"File content:\n{f.read()}" else: result = file_path state["tool_results"]["file_content"] = result except Exception as e: state["tool_results"]["file_content"] = f"File reading error: {str(e)}" state["continue_workflow"] = True return state def search_wikipedia_node(state: AgentState) -> AgentState: """Search Wikipedia""" question = state["question"] params = state["ai_decision"].get("params", "") # Use AI to determine search query if params not provided if not params: query_prompt = f"Extract the main search term for Wikipedia from: '{question}'. Return only the search term." search_query = ai_brain.think(query_prompt).strip() else: search_query = params try: wikipedia.set_lang("en") page = wikipedia.page(search_query) result = f"Title: {page.title}\nSummary: {page.summary[:2000]}" except: try: results = wikipedia.search(search_query, results=1) if results: page = wikipedia.page(results[0]) result = f"Title: {page.title}\nSummary: {page.summary[:2000]}" else: result = f"No Wikipedia results for: {search_query}" except: result = f"Wikipedia search failed for: {search_query}" state["tool_results"]["wikipedia"] = result state["continue_workflow"] = True return state def calculate_math_node(state: AgentState) -> AgentState: """Perform math calculations""" question = state["question"] # Extract math expression using AI extract_prompt = f"Extract ONLY the mathematical expression from: '{question}'. Return just the expression like '15+27'." expression = ai_brain.think(extract_prompt).strip() # Clean expression import re cleaned = re.findall(r'[\d+\-*/\(\)\.\s]+', expression) if cleaned: expression = cleaned[0].strip() try: # Safe evaluation allowed_chars = set('0123456789+-*/.() ') if all(c in allowed_chars for c in expression): result = str(eval(expression)) else: result = "Invalid mathematical expression" except Exception as e: result = f"Calculation error: {str(e)}" state["tool_results"]["calculation"] = result state["continue_workflow"] = True return state def get_youtube_node(state: AgentState) -> AgentState: """Get YouTube video info""" params = state["ai_decision"].get("params", "") try: ydl_opts = { 'writesubtitles': True, 'writeautomaticsub': True, 'subtitleslangs': ['en'], 'skip_download': True, 'quiet': True } with yt_dlp.YoutubeDL(ydl_opts) as ydl: info = ydl.extract_info(params, download=False) title = info.get('title', 'Unknown') description = info.get('description', 'No description')[:500] result = f"Video: {title}\nDescription: {description}" except Exception as e: result = f"YouTube error: {str(e)}" state["tool_results"]["youtube"] = result state["continue_workflow"] = True return state def download_file_node(state: AgentState) -> AgentState: """Download file from API""" task_id = state.get("task_id", "") try: result = download_file(task_id) state["tool_results"]["downloaded_file"] = result except Exception as e: state["tool_results"]["downloaded_file"] = f"Download error: {str(e)}" state["continue_workflow"] = True return state def final_answer_node(state: AgentState) -> AgentState: """Generate final answer using AI""" question = state["question"] tool_results = state.get("tool_results", {}) answer = ai_brain.final_answer(question, tool_results) state["answer"] = answer state["continue_workflow"] = False return state # ============================================================================= # HELPER FUNCTIONS # ============================================================================= def download_file(task_id: str) -> str: """Download file from API""" try: api_url = "https://agents-course-unit4-scoring.hf.space" file_url = f"{api_url}/files/{task_id}" response = requests.get(file_url, timeout=30) if response.status_code == 200: # Determine file extension content_type = response.headers.get('content-type', '') if 'audio' in content_type: suffix = '.mp3' elif 'image' in content_type: suffix = '.png' elif 'excel' in content_type: suffix = '.xlsx' elif 'python' in content_type: suffix = '.py' else: suffix = '.tmp' with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as tmp_file: tmp_file.write(response.content) return tmp_file.name else: return f"Error: HTTP {response.status_code}" except Exception as e: return f"Error: {str(e)}" # ============================================================================= # LANGGRAPH WORKFLOW # ============================================================================= def create_ai_agent_workflow(): """Create LangGraph workflow with AI-driven routing""" workflow = StateGraph(AgentState) # Add all nodes workflow.add_node("decision", ai_decision_node) workflow.add_node("direct_answer", answer_directly_node) workflow.add_node("audio_transcribe", transcribe_audio_node) workflow.add_node("image_ocr", ocr_image_node) workflow.add_node("file_read", read_file_node) workflow.add_node("wiki_search", search_wikipedia_node) workflow.add_node("math_calc", calculate_math_node) workflow.add_node("youtube_get", get_youtube_node) workflow.add_node("file_download", download_file_node) workflow.add_node("generate_answer", final_answer_node) # Dynamic routing based on AI decision def route_by_ai_decision(state: AgentState) -> str: action = state.get("ai_decision", {}).get("action", "answer_directly") print(f"🔀 Routing to: {action}") return action # Conditional routing from decision workflow.add_conditional_edges( "decision", route_by_ai_decision, { "answer_directly": "direct_answer", "transcribe_audio": "audio_transcribe", "ocr_image": "image_ocr", "read_file": "file_read", "search_wikipedia": "wiki_search", "calculate_math": "math_calc", "get_youtube": "youtube_get", "download_file": "file_download" } ) # Continue or end based on workflow state def should_continue(state: AgentState) -> str: if state.get("continue_workflow", False): return "generate_answer" else: return END # Add continue/end logic for tool nodes tool_nodes = [ "audio_transcribe", "image_ocr", "file_read", "wiki_search", "math_calc", "youtube_get", "file_download" ] for node in tool_nodes: workflow.add_conditional_edges( node, should_continue, { "generate_answer": "generate_answer", END: END } ) # End edges workflow.add_edge("direct_answer", END) workflow.add_edge("generate_answer", END) # Set entry point workflow.set_entry_point("decision") return workflow.compile() # ============================================================================= # MAIN AGENT CLASS # ============================================================================= class LangGraphAIAgent: """LangGraph agent with AI-driven tool calling""" def __init__(self): self.workflow = create_ai_agent_workflow() print("🤖 LangGraph AI Agent with Qwen3-8B ready!") print("🔧 Available tools: transcribe_audio, ocr_image, read_file, search_wikipedia, calculate_math, get_youtube") def process_question(self, question: str, task_id: str = "", file_name: str = "") -> str: """Process question through AI-driven workflow""" try: # Initialize state initial_state = { "messages": [], "question": question, "task_id": task_id, "file_name": file_name, "ai_decision": {}, "tool_results": {}, "answer": "", "continue_workflow": False } # Run workflow result = self.workflow.invoke(initial_state) return result.get("answer", "No answer generated") except Exception as e: return f"Agent error: {str(e)}" # ============================================================================= # GLOBAL AGENT # ============================================================================= # Create global agent instance agent = LangGraphAIAgent() def process_question(question: str, task_id: str = "", file_name: str = "") -> str: """Main entry point""" return agent.process_question(question, task_id, file_name) # ============================================================================= # TEST # ============================================================================= if __name__ == "__main__": test_questions = [ "What is 25 + 17?", "Who was Mercedes Sosa?", "What is the opposite of left?", "How many continents are there?" ] print("🧪 Testing LangGraph AI Agent:") for i, q in enumerate(test_questions): print(f"\n--- Test {i+1} ---") print(f"Q: {q}") answer = process_question(q) print(f"A: {answer}") print("-" * 50)