Spaces:
Sleeping
Sleeping
| import os | |
| import logging | |
| import json | |
| from typing import TypedDict, Annotated, Sequence, Dict, Any, List | |
| from langchain_openai import ChatOpenAI | |
| from langchain_core.runnables import Runnable | |
| from langchain_core.messages import BaseMessage, AIMessage, HumanMessage, SystemMessage | |
| from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder | |
| from langgraph.graph import StateGraph, END | |
| from langgraph.prebuilt import ToolNode | |
| from tools.analysis_tools import trigger_interview_analysis | |
| class AgentState(TypedDict): | |
| messages: Annotated[Sequence[BaseMessage], lambda x, y: x + y] | |
| user_id: str | |
| job_offer_id: str | |
| job_description: str | |
| class GraphInterviewProcessor: | |
| """ | |
| Cette classe encapsule la logique d'un entretien en utilisant LangGraph. | |
| Elle prépare toutes les données nécessaires à l'initialisation. | |
| """ | |
| def __init__(self, payload: Dict[str, Any]): | |
| logging.info("Initialisation de GraphInterviewProcessor...") | |
| self.user_id = payload["user_id"] | |
| self.job_offer_id = payload["job_offer_id"] | |
| self.job_offer = payload["job_offer"] | |
| self.cv_data = payload.get("cv_document", {}).get('candidat', {}) | |
| if not self.cv_data: | |
| raise ValueError("Données du candidat non trouvées dans le payload.") | |
| self.system_prompt_template = self._load_prompt_template('prompts/rag_prompt_old.txt') | |
| self.formatted_cv_str = self._format_cv_for_prompt() | |
| self.skills_summary = self._extract_skills_summary() | |
| self.reconversion_info = self._extract_reconversion_info() | |
| self.agent_runnable = self._create_agent_runnable() | |
| self.graph = self._build_graph() | |
| logging.info("GraphInterviewProcessor initialisé avec succès.") | |
| def _load_prompt_template(self, file_path: str) -> str: | |
| try: | |
| with open(file_path, 'r', encoding='utf-8') as f: | |
| return f.read() | |
| except FileNotFoundError: | |
| logging.error(f"Fichier prompt introuvable: {file_path}") | |
| return "Vous êtes un assistant RH." | |
| def _format_cv_for_prompt(self) -> str: | |
| return json.dumps(self.cv_data, indent=2, ensure_ascii=False) | |
| def _extract_skills_summary(self) -> str: | |
| competences = self.cv_data.get('analyse_competences', []) | |
| if not competences: return "Aucune analyse de compétences disponible." | |
| summary = [f"{comp.get('skill', '')}: {comp.get('level', 'débutant')}" for comp in competences] | |
| return "Niveaux de compétences du candidat: " + " | ".join(summary) | |
| def _extract_reconversion_info(self) -> str: | |
| reconversion = self.cv_data.get('reconversion', {}) | |
| if reconversion.get('is_reconversion'): | |
| return f"CANDIDAT EN RECONVERSION: {reconversion.get('analysis', '')}" | |
| return "Le candidat n'est pas identifié comme étant en reconversion." | |
| def _create_agent_runnable(self) -> Runnable: | |
| """Crée une chaîne (runnable) qui agit comme notre agent.""" | |
| prompt = ChatPromptTemplate.from_messages([ | |
| ("system", "{system_prompt_content}"), | |
| MessagesPlaceholder(variable_name="messages"), | |
| ]) | |
| llm = ChatOpenAI(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o-mini", temperature=0.7) | |
| tools = [trigger_interview_analysis] | |
| llm_with_tools = llm.bind_tools(tools) | |
| return prompt | llm_with_tools | |
| def _should_continue(self, state: AgentState) -> str: | |
| """ | |
| Détermine si l'entretien doit continuer ou se terminer. | |
| """ | |
| messages = state.get('messages', []) | |
| last_message = messages[-1] | |
| if hasattr(last_message, 'tool_calls') and last_message.tool_calls: | |
| for tool_call in last_message.tool_calls: | |
| if tool_call.get('name') == 'trigger_interview_analysis': | |
| print("Condition de fin détectée : appel à trigger_interview_analysis.") | |
| return "end" | |
| return "continue" | |
| def _agent_node(self, state: AgentState): | |
| """Prépare le prompt et appelle le runnable de l'agent.""" | |
| context_header = ( | |
| f"--- CONTEXTE TECHNIQUE POUR L'AGENT (ne pas mentionner à l'utilisateur) ---\n" | |
| f"L'ID de l'utilisateur actuel est : {state['user_id']}\n" | |
| f"L'ID de l'offre d'emploi actuelle est : {state['job_offer_id']}\n" | |
| f"Quand tu appelleras l'outil 'trigger_interview_analysis', tu devras OBLIGATOIREMENT utiliser ces IDs exacts.\n" | |
| f"--- FIN DU CONTEXTE TECHNIQUE ---\n\n" | |
| ) | |
| job_description_str = json.dumps(self.job_offer, ensure_ascii=False) | |
| system_prompt_content = self.system_prompt_template.format( | |
| entreprise=self.job_offer.get('entreprise', 'notre entreprise'), | |
| poste=self.job_offer.get('poste', 'ce poste'), | |
| mission=self.job_offer.get('mission', 'Non spécifiée'), | |
| profil_recherche=self.job_offer.get('profil_recherche', 'Non spécifié'), | |
| competences=self.job_offer.get('competences', 'Non spécifiées'), | |
| pole=self.job_offer.get('pole', 'Non spécifié'), | |
| cv=self.formatted_cv_str, | |
| skills_analysis=self.skills_summary, | |
| reconversion_analysis=self.reconversion_info, | |
| job_description=job_description_str, | |
| user_id=state['user_id'], | |
| job_offer_id=state['job_offer_id'], | |
| ) | |
| final_system_prompt = context_header + system_prompt_content | |
| job_description = json.dumps(self.job_offer) | |
| response = self.agent_runnable.invoke({ | |
| "system_prompt_content": final_system_prompt, | |
| "messages": state["messages"], | |
| "job_description": job_description | |
| }) | |
| return { | |
| "messages": [response], | |
| "job_description": job_description_str | |
| } | |
| def _router(self, state: AgentState) -> str: | |
| """ | |
| Route le flux du graphe en fonction de la dernière réponse de l'agent. | |
| - Si un outil d'analyse final est appelé, termine le graphe. | |
| - Si un autre outil est appelé, va au noeud d'outils. | |
| - Sinon, termine le tour de conversation. | |
| """ | |
| last_message = state["messages"][-1] | |
| if hasattr(last_message, 'tool_calls') and last_message.tool_calls: | |
| if any(tool_call.get('name') == 'trigger_interview_analysis' for tool_call in last_message.tool_calls): | |
| print(">>> Routeur : Appel à l'outil final détecté. Terminaison du graphe.") | |
| return "call_final_tool" | |
| return "call_tool" | |
| return "end_turn" | |
| def _build_graph(self) -> any: | |
| """Construit et compile le graphe d'états.""" | |
| tool_node = ToolNode([trigger_interview_analysis]) | |
| graph = StateGraph(AgentState) | |
| graph.add_node("agent", self._agent_node) | |
| graph.add_node("tools", tool_node) | |
| graph.add_node("final_tool_node", tool_node) | |
| graph.set_entry_point("agent") | |
| graph.add_conditional_edges( | |
| "agent", | |
| self._router, | |
| { | |
| "call_tool": "tools", | |
| "call_final_tool": "final_tool_node", | |
| "end_turn": END | |
| } | |
| ) | |
| graph.add_edge("tools", "agent") | |
| graph.add_edge("final_tool_node", END) | |
| return graph.compile() | |
| def invoke(self, messages: List[Dict[str, Any]]): | |
| """Point d'entrée pour lancer une conversation dans le graphe.""" | |
| langchain_messages = [HumanMessage(content=m["content"]) if m["role"] == "user" else AIMessage(content=m["content"]) for m in messages] | |
| if not langchain_messages: | |
| logging.info("Historique de conversation vide. Ajout d'un message de démarrage interne.") | |
| langchain_messages.append(HumanMessage(content="Bonjour, je suis prêt à commencer l'entretien.")) | |
| initial_state = { | |
| "user_id": self.user_id, | |
| "job_offer_id": self.job_offer_id, | |
| "messages": langchain_messages, | |
| "job_description": json.dumps(self.job_offer, ensure_ascii=False), | |
| } | |
| final_state = self.graph.invoke(initial_state) | |
| if not final_state or not final_state.get('messages'): | |
| logging.error("L'état final est vide ou ne contient pas de messages.") | |
| return {"response": "Erreur: Impossible de générer une réponse.", "status": "finished"} | |
| last_message = final_state['messages'][-1] | |
| status = "finished" if hasattr(last_message, 'tool_calls') and last_message.tool_calls else "interviewing" | |
| response_content = last_message.content | |
| return { | |
| "response": response_content, | |
| "status": status | |
| } |