import os import logging import json from typing import TypedDict, Annotated, Sequence, Dict, Any, List from langchain_openai import ChatOpenAI from langchain_core.runnables import Runnable from langchain_core.messages import BaseMessage, AIMessage, HumanMessage from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder from langgraph.graph import StateGraph, END from langgraph.prebuilt import ToolNode from tools.analysis_tools import trigger_interview_analysis class AgentState(TypedDict): messages: Annotated[Sequence[BaseMessage], lambda x, y: x + y] user_id: str job_offer_id: str job_description: str class GraphInterviewProcessor: """ Cette classe encapsule la logique d'un entretien en utilisant LangGraph. Elle prépare toutes les données nécessaires à l'initialisation. """ def __init__(self, payload: Dict[str, Any]): logging.info("Initialisation de GraphInterviewProcessor...") self.user_id = payload["user_id"] self.job_offer_id = payload["job_offer_id"] self.job_offer = payload["job_offer"] self.cv_data = payload.get("cv_document", {}).get('candidat', {}) if not self.cv_data: raise ValueError("Données du candidat non trouvées dans le payload.") self.system_prompt_template = self._load_prompt_template('prompts/rag_prompt_old.txt') self.formatted_cv_str = self._format_cv_for_prompt() self.skills_summary = self._extract_skills_summary() self.reconversion_info = self._extract_reconversion_info() self.agent_runnable = self._create_agent_runnable() self.graph = self._build_graph() logging.info("GraphInterviewProcessor initialisé avec succès.") def _load_prompt_template(self, file_path: str) -> str: try: with open(file_path, 'r', encoding='utf-8') as f: return f.read() except FileNotFoundError: logging.error(f"Fichier prompt introuvable: {file_path}") return "Vous êtes un assistant RH." def _format_cv_for_prompt(self) -> str: return json.dumps(self.cv_data, indent=2, ensure_ascii=False) def _extract_skills_summary(self) -> str: competences = self.cv_data.get('analyse_competences', []) if not competences: return "Aucune analyse de compétences disponible." summary = [f"{comp.get('skill', '')}: {comp.get('level', 'débutant')}" for comp in competences] return "Niveaux de compétences du candidat: " + " | ".join(summary) def _extract_reconversion_info(self) -> str: reconversion = self.cv_data.get('reconversion', {}) if reconversion.get('is_reconversion'): return f"CANDIDAT EN RECONVERSION: {reconversion.get('analysis', '')}" return "Le candidat n'est pas identifié comme étant en reconversion." def _create_agent_runnable(self) -> Runnable: """Crée une chaîne (runnable) qui agit comme notre agent.""" prompt = ChatPromptTemplate.from_messages([ ("system", "{system_prompt_content}"), MessagesPlaceholder(variable_name="messages"), ]) llm = ChatOpenAI(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o-mini", temperature=0.7) tools = [trigger_interview_analysis] llm_with_tools = llm.bind_tools(tools) return prompt | llm_with_tools def _agent_node(self, state: AgentState): """Prépare le prompt et appelle le runnable de l'agent.""" job_description_str = json.dumps(self.job_offer, ensure_ascii=False) system_prompt_content = self.system_prompt_template.format( user_id=state['user_id'], job_offer_id=state['job_offer_id'], entreprise=self.job_offer.get('entreprise', 'notre entreprise'), poste=self.job_offer.get('poste', 'ce poste'), mission=self.job_offer.get('mission', 'Non spécifiée'), profil_recherche=self.job_offer.get('profil_recherche', 'Non spécifié'), competences=self.job_offer.get('competences', 'Non spécifiées'), pole=self.job_offer.get('pole', 'Non spécifié'), cv=self.formatted_cv_str, skills_analysis=self.skills_summary, reconversion_analysis=self.reconversion_info, job_description=job_description_str ) response = self.agent_runnable.invoke({ "system_prompt_content": system_prompt_content, "messages": state["messages"] }) return {"messages": [response]} def _router(self, state: AgentState) -> str: """Route le flux du graphe en fonction de la dernière réponse de l'agent.""" last_message = state["messages"][-1] if hasattr(last_message, 'tool_calls') and last_message.tool_calls: if any(tool_call.get('name') == 'trigger_interview_analysis' for tool_call in last_message.tool_calls): return "call_final_tool" return "call_tool" return "end_turn" def _final_analysis_node(self, state: AgentState): """ Appelle l'outil d'analyse finale. Construit les arguments manuellement à partir de l'état du graphe pour garantir la fiabilité. """ conversation_history = [] for msg in state["messages"]: if isinstance(msg, HumanMessage): role = "user" elif isinstance(msg, AIMessage): role = "assistant" else: continue conversation_history.append({"role": role, "content": msg.content}) tool_input = { "user_id": state['user_id'], "job_offer_id": state['job_offer_id'], "job_description": state['job_description'], "conversation_history": conversation_history } trigger_interview_analysis.invoke(tool_input) return {} def _build_graph(self) -> any: """Construit et compile le graphe d'états.""" tool_node = ToolNode([trigger_interview_analysis]) graph = StateGraph(AgentState) graph.add_node("agent", self._agent_node) graph.add_node("tools", tool_node) graph.add_node("final_tool_node", self._final_analysis_node) graph.set_entry_point("agent") graph.add_conditional_edges( "agent", self._router, { "call_tool": "tools", "call_final_tool": "final_tool_node", "end_turn": END } ) graph.add_edge("tools", "agent") graph.add_edge("final_tool_node", END) return graph.compile() def invoke(self, messages: List[Dict[str, Any]]): """Point d'entrée pour lancer une conversation dans le graphe.""" langchain_messages = [HumanMessage(content=m["content"]) if m["role"] == "user" else AIMessage(content=m["content"]) for m in messages] if not langchain_messages: logging.info("Historique de conversation vide. Ajout d'un message de démarrage interne.") langchain_messages.append(HumanMessage(content="Bonjour, je suis prêt à commencer l'entretien.")) initial_state = { "user_id": self.user_id, "job_offer_id": self.job_offer_id, "messages": langchain_messages, "job_description": json.dumps(self.job_offer, ensure_ascii=False), } final_state = self.graph.invoke(initial_state) if not final_state or not final_state.get('messages'): logging.error("L'état final est vide ou ne contient pas de messages.") return {"response": "Erreur: Impossible de générer une réponse.", "status": "finished"} last_message = final_state['messages'][-1] status = "finished" if hasattr(last_message, 'tool_calls') and last_message.tool_calls else "interviewing" response_content = last_message.content return { "response": response_content, "status": status }