final_agent_course / agent.py
tuan3335's picture
Add structured output with Pydantic, fix tool selection logic, add YouTube cookies support, disable thinking mode
a9b5cb5
raw
history blame
18.9 kB
"""
AI AGENT WITH LANGGRAPH + UTILS SYSTEM
Architecture:
- LangChain/LangGraph workflow với AI-driven routing
- Qwen3-8B làm main reasoning engine
- Utils system cung cấp tools
- AI tự quyết định tools và logic xử lý
"""
import os
import json
import time
from typing import Dict, Any, List, Optional, Annotated
from dotenv import load_dotenv
# LangChain imports
from langchain_core.messages import HumanMessage, AIMessage, SystemMessage
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import JsonOutputParser, StrOutputParser
# LangGraph imports
from langgraph.graph import StateGraph, END
from langgraph.graph.message import add_messages
from typing_extensions import TypedDict
# HuggingFace imports
from huggingface_hub import InferenceClient
# Groq imports for fallback
from groq import Groq
# Pydantic for structured output
from pydantic import BaseModel, Field
# Utils system imports
from utils import (
process_question_with_tools,
get_agent_state,
reset_agent_state,
ToolOrchestrator,
get_system_prompt,
get_response_prompt,
build_context_summary,
analyze_question_type
)
# Load environment
load_dotenv()
# =============================================================================
# LANGGRAPH STATE DEFINITION
# =============================================================================
class AgentState(TypedDict):
"""LangGraph state for AI agent"""
messages: Annotated[List, add_messages]
question: str
task_id: str
# AI Analysis
ai_analysis: Dict[str, Any]
should_use_tools: bool
# Tool processing
tool_processing_result: Dict[str, Any]
# Final response
final_answer: str
processing_complete: bool
# =============================================================================
# PYDANTIC SCHEMAS FOR STRUCTURED OUTPUT
# =============================================================================
class QuestionAnalysis(BaseModel):
"""Schema for AI question analysis"""
question_type: str = Field(description="Type: youtube|image|audio|wiki|file|text|math")
needs_tools: bool = Field(description="Whether tools are needed")
reasoning: str = Field(description="AI reasoning for the decision")
confidence: str = Field(description="Confidence level: high|medium|low")
can_answer_directly: bool = Field(description="Can answer without tools")
suggested_approach: str = Field(description="Brief description of approach")
class TextDecision(BaseModel):
"""Schema for reversed text decision"""
chosen_version: str = Field(description="original|reversed")
reasoning: str = Field(description="Reasoning for the choice")
confidence: str = Field(description="Confidence level: high|medium|low")
# =============================================================================
# AI BRAIN WITH LANGCHAIN
# =============================================================================
class LangChainQwen3Brain:
"""AI Brain using LangChain + HuggingFace with Groq fallback"""
def __init__(self):
# Primary: HuggingFace
self.hf_client = InferenceClient(
provider="auto",
api_key=os.environ.get("HF_TOKEN", "")
)
self.hf_model = "Qwen/Qwen3-8B"
# Fallback: Groq
self.groq_client = Groq(
api_key=os.environ.get("GROQ_API_KEY", "")
)
self.groq_model = "llama3-8b-8192"
# Setup parsers
self.json_parser = JsonOutputParser()
self.str_parser = StrOutputParser()
print("🧠 LangChain Hybrid Brain initialized (HF + Groq fallback)")
def _create_structured_model(self, schema: BaseModel):
"""Create model with structured output"""
try:
# Try HuggingFace with structured output
from langchain_huggingface import ChatHuggingFace
hf_model = ChatHuggingFace(
llm=self.hf_client,
model_id=self.hf_model
)
return hf_model.with_structured_output(schema)
except Exception as hf_error:
print(f"⚠️ HF structured output failed: {str(hf_error)[:50]}...")
try:
# Fallback to Groq with structured output
from langchain_groq import ChatGroq
groq_model = ChatGroq(
api_key=os.environ.get("GROQ_API_KEY", ""),
model=self.groq_model
)
return groq_model.with_structured_output(schema)
except Exception as groq_error:
print(f"⚠️ Both structured output failed")
return None
def _invoke_model(self, messages: List[Dict[str, str]]) -> str:
"""Invoke model with messages - try HF first, fallback to Groq"""
# Try HuggingFace first
try:
completion = self.hf_client.chat.completions.create(
model=self.hf_model,
messages=messages,
max_tokens=2048,
temperature=0.7
)
return completion.choices[0].message.content
except Exception as hf_error:
print(f"⚠️ HuggingFace failed: {str(hf_error)[:100]}...")
print("🔄 Falling back to Groq...")
# Fallback to Groq
try:
completion = self.groq_client.chat.completions.create(
model=self.groq_model,
messages=messages,
max_tokens=2048,
temperature=0.7
)
return completion.choices[0].message.content
except Exception as groq_error:
return f"AI Error: Both HF ({str(hf_error)[:50]}) and Groq ({str(groq_error)[:50]}) failed"
def analyze_question(self, question: str, task_id: str = "") -> Dict[str, Any]:
"""AI analyzes question and decides approach with structured output"""
# Create structured model
structured_model = self._create_structured_model(QuestionAnalysis)
if structured_model:
analysis_prompt = f"""
Analyze this question and decide the approach:
Question: "{question}"
Task ID: "{task_id}"
Important rules:
- If question asks about Mercedes Sosa albums, Wikipedia, historical facts -> use "wiki"
- If YouTube URL present -> use "youtube"
- If mentions image, photo, chess position -> use "image"
- If mentions audio, voice, mp3 -> use "audio"
- If mentions file attachment, Excel, CSV -> use "file"
- For math, tables, logic problems -> use "text" but needs_tools=false
- Be accurate about question_type to trigger correct tools
/no_thinking
"""
try:
result = structured_model.invoke(analysis_prompt)
return result.dict()
except Exception as e:
print(f"⚠️ Structured analysis failed: {str(e)[:50]}...")
# Fallback analysis
question_type = analyze_question_type(question)
return {
"question_type": question_type,
"needs_tools": bool(task_id) or question_type in ["wiki", "youtube", "image", "audio", "file"],
"reasoning": "Fallback analysis - structured output failed",
"confidence": "medium",
"can_answer_directly": question_type == "text" and not task_id,
"suggested_approach": f"Use {question_type} processing"
}
def generate_final_answer(self, question: str, tool_results: Dict[str, Any], context: str = "") -> str:
"""Generate final answer using LangChain"""
# Build context summary
if tool_results and tool_results.get("tool_results"):
context_summary = build_context_summary(
tool_results.get("tool_results", []),
tool_results.get("cached_data", {})
)
else:
context_summary = context or "No additional context available"
answer_prompt = get_response_prompt(
"final_answer",
question=question,
context_summary=context_summary
) + "\n\n/no_thinking"
messages = [
{"role": "system", "content": get_system_prompt("reasoning_agent")},
{"role": "user", "content": answer_prompt}
]
return self._invoke_model(messages)
def decide_on_reversed_text(self, original: str, reversed: str) -> Dict[str, Any]:
"""AI decides which version of text to use with structured output"""
# Create structured model
structured_model = self._create_structured_model(TextDecision)
if structured_model:
decision_prompt = f"""
You are analyzing two versions of the same text to determine which makes more sense:
Original: "{original}"
Reversed: "{reversed}"
Analyze both versions and decide which one is more likely to be the correct question.
Consider grammar, word order, and meaning.
/no_thinking
"""
try:
result = structured_model.invoke(decision_prompt)
return result.dict()
except Exception as e:
print(f"⚠️ Structured decision failed: {str(e)[:50]}...")
# Fallback decision
return {
"chosen_version": "reversed" if len(reversed.split()) > 3 else "original",
"reasoning": "Fallback decision based on text structure",
"confidence": "low"
}
# =============================================================================
# LANGGRAPH NODES
# =============================================================================
# Initialize AI brain
ai_brain = LangChainQwen3Brain()
def analyze_question_node(state: AgentState) -> AgentState:
"""AI analyzes the question and decides approach"""
question = state["question"]
task_id = state.get("task_id", "")
print(f"🔍 AI analyzing question: {question[:50]}...")
# Get AI analysis
analysis = ai_brain.analyze_question(question, task_id)
state["ai_analysis"] = analysis
# Determine if tools are needed
state["should_use_tools"] = analysis.get("needs_tools", True)
print(f"📊 AI Analysis:")
print(f" Type: {analysis.get('question_type', 'unknown')}")
print(f" Needs tools: {analysis.get('needs_tools', True)}")
print(f" Confidence: {analysis.get('confidence', 'medium')}")
print(f" Reasoning: {analysis.get('reasoning', 'No reasoning provided')}")
return state
def process_with_tools_node(state: AgentState) -> AgentState:
"""Process question using utils tool system"""
question = state["question"]
task_id = state.get("task_id", "")
print(f"🔧 Processing with tools...")
try:
# Use utils tool orchestrator
result = process_question_with_tools(question, task_id)
state["tool_processing_result"] = result
print(f"✅ Tool processing completed:")
print(f" Question type: {result.get('question_type', 'unknown')}")
print(f" Successful tools: {result.get('successful_tools', [])}")
print(f" Failed tools: {result.get('failed_tools', [])}")
except Exception as e:
print(f"❌ Tool processing failed: {str(e)}")
state["tool_processing_result"] = {
"error": str(e),
"processed_question": question,
"question_type": "error",
"tools_used": [],
"successful_tools": [],
"failed_tools": [],
"tool_results": [],
"cached_data": {}
}
return state
def answer_directly_node(state: AgentState) -> AgentState:
"""Answer question directly without tools"""
question = state["question"]
print(f"💭 AI answering directly...")
# Generate direct answer
direct_prompt = f"""
Answer this question directly based on your knowledge:
Question: {question}
Provide a clear, accurate, and helpful answer.
"""
messages = [
{"role": "system", "content": get_system_prompt("reasoning_agent")},
{"role": "user", "content": direct_prompt}
]
answer = ai_brain._invoke_model(messages)
state["final_answer"] = answer
state["processing_complete"] = True
return state
def generate_final_answer_node(state: AgentState) -> AgentState:
"""Generate final answer using AI + tool results"""
question = state["question"]
tool_results = state.get("tool_processing_result", {})
print(f"🎯 Generating final answer...")
# Generate comprehensive answer
answer = ai_brain.generate_final_answer(question, tool_results)
state["final_answer"] = answer
state["processing_complete"] = True
print(f"✅ Final answer generated")
return state
# =============================================================================
# LANGGRAPH WORKFLOW
# =============================================================================
def create_agent_workflow():
"""Create LangGraph workflow"""
workflow = StateGraph(AgentState)
# Add nodes
workflow.add_node("analyze", analyze_question_node)
workflow.add_node("use_tools", process_with_tools_node)
workflow.add_node("direct_answer", answer_directly_node)
workflow.add_node("generate_answer", generate_final_answer_node)
# Routing logic
def should_use_tools(state: AgentState) -> str:
"""AI-driven routing decision"""
should_use = state.get("should_use_tools", True)
can_answer_directly = state.get("ai_analysis", {}).get("can_answer_directly", False)
if can_answer_directly and not should_use:
print("🚀 AI decided to answer directly")
return "direct_answer"
else:
print("🔧 AI decided to use tools")
return "use_tools"
# Add conditional edges
workflow.add_conditional_edges(
"analyze",
should_use_tools,
{
"use_tools": "use_tools",
"direct_answer": "direct_answer"
}
)
# Connect tool processing to final answer
workflow.add_edge("use_tools", "generate_answer")
# End edges
workflow.add_edge("direct_answer", END)
workflow.add_edge("generate_answer", END)
# Set entry point
workflow.set_entry_point("analyze")
return workflow.compile()
# =============================================================================
# MAIN AGENT CLASS
# =============================================================================
class LangGraphUtilsAgent:
"""Main AI Agent using LangGraph + Utils system"""
def __init__(self):
self.workflow = create_agent_workflow()
self.ai_brain = ai_brain
print("🤖 LangGraph Utils Agent initialized!")
print("🧠 AI Brain: LangChain + HuggingFace with Groq fallback")
print("🔧 Tools: YouTube, Image OCR, Audio Transcript, Wikipedia, File Reader, Text Processor")
print("⚡ Features: AI-driven routing, Smart tool selection, Multimodal processing")
def process_question(self, question: str, task_id: str = "") -> str:
"""Main entry point for processing questions"""
try:
print(f"\n🚀 Processing question: {question}")
print(f"📄 Task ID: {task_id or 'None'}")
# Reset agent state for new question
reset_agent_state()
# Initialize LangGraph state
initial_state = {
"messages": [HumanMessage(content=question)],
"question": question,
"task_id": task_id,
"ai_analysis": {},
"should_use_tools": True,
"tool_processing_result": {},
"final_answer": "",
"processing_complete": False
}
# Execute workflow
print("\n🔄 Starting LangGraph workflow...")
start_time = time.time()
final_state = self.workflow.invoke(initial_state)
execution_time = time.time() - start_time
print(f"\n⏱️ Total execution time: {execution_time:.2f} seconds")
# Return final answer
answer = final_state.get("final_answer", "No answer generated")
print(f"\n✅ Question processed successfully!")
return answer
except Exception as e:
error_msg = f"Agent processing error: {str(e)}"
print(f"\n❌ {error_msg}")
import traceback
traceback.print_exc()
return error_msg
# =============================================================================
# GLOBAL AGENT INSTANCE
# =============================================================================
# Create global agent
agent = LangGraphUtilsAgent()
def process_question(question: str, task_id: str = "") -> str:
"""Global function for processing questions"""
return agent.process_question(question, task_id)
# =============================================================================
# TESTING
# =============================================================================
if __name__ == "__main__":
print("🧪 Testing LangGraph Utils Agent\n")
test_cases = [
{
"question": "Who was Marie Curie?",
"task_id": "",
"description": "Wikipedia factual question"
},
{
"question": "What is 25 + 17 * 3?",
"task_id": "",
"description": "Math calculation"
},
{
"question": ".rewsna eht sa \"tfel\" drow eht fo etisoppo eht etirw ,ecnetnes siht dnatsrednu uoy fI",
"task_id": "",
"description": "Reversed text question"
},
{
"question": "How many continents are there?",
"task_id": "",
"description": "General knowledge"
}
]
for i, test_case in enumerate(test_cases, 1):
print(f"\n{'='*60}")
print(f"TEST {i}: {test_case['description']}")
print(f"{'='*60}")
print(f"Question: {test_case['question']}")
try:
answer = process_question(test_case["question"], test_case["task_id"])
print(f"\nAnswer: {answer}")
except Exception as e:
print(f"\nTest failed: {str(e)}")
print(f"\n{'-'*60}")
print("\n✅ All tests completed!")