interview_agents_api / services /graph_service.py
QuentinL52's picture
Update services/graph_service.py
bf11e49 verified
raw
history blame
8.86 kB
import os
import logging
import json
from typing import TypedDict, Annotated, Sequence, Dict, Any, List
from langchain_openai import ChatOpenAI
from langchain_core.runnables import Runnable
from langchain_core.messages import BaseMessage, AIMessage, HumanMessage, SystemMessage
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langgraph.graph import StateGraph, END
from langgraph.prebuilt import ToolNode
from tools.analysis_tools import trigger_interview_analysis
class AgentState(TypedDict):
messages: Annotated[Sequence[BaseMessage], lambda x, y: x + y]
user_id: str
job_offer_id: str
job_description: str
class GraphInterviewProcessor:
"""
Cette classe encapsule la logique d'un entretien en utilisant LangGraph.
Elle prépare toutes les données nécessaires à l'initialisation.
"""
def __init__(self, payload: Dict[str, Any]):
logging.info("Initialisation de GraphInterviewProcessor...")
self.user_id = payload["user_id"]
self.job_offer_id = payload["job_offer_id"]
self.job_offer = payload["job_offer"]
self.cv_data = payload.get("cv_document", {}).get('candidat', {})
if not self.cv_data:
raise ValueError("Données du candidat non trouvées dans le payload.")
self.system_prompt_template = self._load_prompt_template('prompts/rag_prompt_old.txt')
self.formatted_cv_str = self._format_cv_for_prompt()
self.skills_summary = self._extract_skills_summary()
self.reconversion_info = self._extract_reconversion_info()
self.agent_runnable = self._create_agent_runnable()
self.graph = self._build_graph()
logging.info("GraphInterviewProcessor initialisé avec succès.")
def _load_prompt_template(self, file_path: str) -> str:
try:
with open(file_path, 'r', encoding='utf-8') as f:
return f.read()
except FileNotFoundError:
logging.error(f"Fichier prompt introuvable: {file_path}")
return "Vous êtes un assistant RH."
def _format_cv_for_prompt(self) -> str:
return json.dumps(self.cv_data, indent=2, ensure_ascii=False)
def _extract_skills_summary(self) -> str:
competences = self.cv_data.get('analyse_competences', [])
if not competences: return "Aucune analyse de compétences disponible."
summary = [f"{comp.get('skill', '')}: {comp.get('level', 'débutant')}" for comp in competences]
return "Niveaux de compétences du candidat: " + " | ".join(summary)
def _extract_reconversion_info(self) -> str:
reconversion = self.cv_data.get('reconversion', {})
if reconversion.get('is_reconversion'):
return f"CANDIDAT EN RECONVERSION: {reconversion.get('analysis', '')}"
return "Le candidat n'est pas identifié comme étant en reconversion."
def _create_agent_runnable(self) -> Runnable:
"""Crée une chaîne (runnable) qui agit comme notre agent."""
prompt = ChatPromptTemplate.from_messages([
("system", "{system_prompt_content}"),
MessagesPlaceholder(variable_name="messages"),
])
llm = ChatOpenAI(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o-mini", temperature=0.7)
tools = [trigger_interview_analysis]
llm_with_tools = llm.bind_tools(tools)
return prompt | llm_with_tools
def _should_continue(self, state: AgentState) -> str:
"""
Détermine si l'entretien doit continuer ou se terminer.
"""
messages = state.get('messages', [])
last_message = messages[-1]
if hasattr(last_message, 'tool_calls') and last_message.tool_calls:
for tool_call in last_message.tool_calls:
if tool_call.get('name') == 'trigger_interview_analysis':
print("Condition de fin détectée : appel à trigger_interview_analysis.")
return "end"
return "continue"
def _agent_node(self, state: AgentState):
"""Prépare le prompt et appelle le runnable de l'agent."""
context_header = (
f"--- CONTEXTE TECHNIQUE POUR L'AGENT (ne pas mentionner à l'utilisateur) ---\n"
f"L'ID de l'utilisateur actuel est : {state['user_id']}\n"
f"L'ID de l'offre d'emploi actuelle est : {state['job_offer_id']}\n"
f"Quand tu appelleras l'outil 'trigger_interview_analysis', tu devras OBLIGATOIREMENT utiliser ces IDs exacts.\n"
f"--- FIN DU CONTEXTE TECHNIQUE ---\n\n"
)
job_description_str = json.dumps(self.job_offer, ensure_ascii=False)
system_prompt_content = self.system_prompt_template.format(
entreprise=self.job_offer.get('entreprise', 'notre entreprise'),
poste=self.job_offer.get('poste', 'ce poste'),
mission=self.job_offer.get('mission', 'Non spécifiée'),
profil_recherche=self.job_offer.get('profil_recherche', 'Non spécifié'),
competences=self.job_offer.get('competences', 'Non spécifiées'),
pole=self.job_offer.get('pole', 'Non spécifié'),
cv=self.formatted_cv_str,
skills_analysis=self.skills_summary,
reconversion_analysis=self.reconversion_info,
job_description=job_description_str,
user_id=state['user_id'],
job_offer_id=state['job_offer_id'],
)
final_system_prompt = context_header + system_prompt_content
job_description = json.dumps(self.job_offer)
response = self.agent_runnable.invoke({
"system_prompt_content": final_system_prompt,
"messages": state["messages"],
"job_description": job_description
})
return {
"messages": [response],
"job_description": job_description_str
}
def _router(self, state: AgentState) -> str:
"""
Route le flux du graphe en fonction de la dernière réponse de l'agent.
- Si un outil d'analyse final est appelé, termine le graphe.
- Si un autre outil est appelé, va au noeud d'outils.
- Sinon, termine le tour de conversation.
"""
last_message = state["messages"][-1]
if hasattr(last_message, 'tool_calls') and last_message.tool_calls:
if any(tool_call.get('name') == 'trigger_interview_analysis' for tool_call in last_message.tool_calls):
print(">>> Routeur : Appel à l'outil final détecté. Terminaison du graphe.")
return "call_final_tool"
return "call_tool"
return "end_turn"
def _build_graph(self) -> any:
"""Construit et compile le graphe d'états."""
tool_node = ToolNode([trigger_interview_analysis])
graph = StateGraph(AgentState)
graph.add_node("agent", self._agent_node)
graph.add_node("tools", tool_node)
graph.add_node("final_tool_node", tool_node)
graph.set_entry_point("agent")
graph.add_conditional_edges(
"agent",
self._router,
{
"call_tool": "tools",
"call_final_tool": "final_tool_node",
"end_turn": END
}
)
graph.add_edge("tools", "agent")
graph.add_edge("final_tool_node", END)
return graph.compile()
def invoke(self, messages: List[Dict[str, Any]]):
"""Point d'entrée pour lancer une conversation dans le graphe."""
langchain_messages = [HumanMessage(content=m["content"]) if m["role"] == "user" else AIMessage(content=m["content"]) for m in messages]
if not langchain_messages:
logging.info("Historique de conversation vide. Ajout d'un message de démarrage interne.")
langchain_messages.append(HumanMessage(content="Bonjour, je suis prêt à commencer l'entretien."))
initial_state = {
"user_id": self.user_id,
"job_offer_id": self.job_offer_id,
"messages": langchain_messages,
"job_description": json.dumps(self.job_offer, ensure_ascii=False),
}
final_state = self.graph.invoke(initial_state)
if not final_state or not final_state.get('messages'):
logging.error("L'état final est vide ou ne contient pas de messages.")
return {"response": "Erreur: Impossible de générer une réponse.", "status": "finished"}
last_message = final_state['messages'][-1]
status = "finished" if hasattr(last_message, 'tool_calls') and last_message.tool_calls else "interviewing"
response_content = last_message.content
return {
"response": response_content,
"status": status
}