Spaces:
Sleeping
Sleeping
| import tempfile | |
| import requests | |
| from fastapi import FastAPI, UploadFile, File, HTTPException, Body | |
| from fastapi.concurrency import run_in_threadpool | |
| from pydantic import BaseModel, Field | |
| from typing import List, Dict, Any, Optional | |
| from datetime import datetime | |
| import uvicorn | |
| import os | |
| import logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| from src.cv_parsing_agents import CvParserAgent | |
| from src.interview_simulator.entretient_version_prod import InterviewProcessor | |
| from src.scoring_engine import ContextualScoringEngine | |
| from src.rag_handler import RAGHandler | |
| app = FastAPI( | |
| title="API d'IA pour la RH", | |
| description="Une API pour le parsing de CV et la simulation d'entretiens avec analyse asynchrone.", | |
| version="1.3.0" | |
| ) | |
| # Configuration de l'API Celery externe | |
| CELERY_API_URL = os.getenv("CELERY_API_URL", "https://celery-7as1.onrender.com") | |
| # Initialisation des services au démarrage | |
| try: | |
| logger.info("Initialisation du RAG Handler...") | |
| rag_handler = RAGHandler() | |
| if rag_handler.vector_store: | |
| logger.info(f"Vector store chargé avec {rag_handler.vector_store.index.ntotal} vecteurs.") | |
| else: | |
| logger.warning("Le RAG Handler n'a pas pu être initialisé (pas de documents ?). Le feedback contextuel sera désactivé.") | |
| except Exception as e: | |
| logger.error(f"Erreur critique lors de l'initialisation du RAG Handler: {e}", exc_info=True) | |
| rag_handler = None | |
| class InterviewRequest(BaseModel): | |
| user_id: str = Field(..., example="google_user_12345") | |
| job_offer_id: str = Field(..., example="job_offer_abcde") | |
| cv_document: Dict[str, Any] = Field(..., example={"candidat": {"nom": "John Doe", "compétences": {"hard_skills": ["Python", "FastAPI"]}}}) | |
| job_offer: Dict[str, Any] = Field(..., example={"poste": "Développeur Python", "description": "Recherche développeur expérimenté..."}) | |
| messages: List[Dict[str, Any]] | |
| conversation_history: List[Dict[str, Any]] | |
| class AnalysisRequest(BaseModel): | |
| conversation_history: List[Dict[str, Any]] | |
| job_description_text: str | |
| candidate_id: Optional[str] = None | |
| class TaskResponse(BaseModel): | |
| task_id: str | |
| status: str | |
| result: Any = None | |
| message: Optional[str] = None | |
| class HealthCheck(BaseModel): | |
| status: str = Field(default="ok", example="ok") | |
| celery_api_status: Optional[str] = None | |
| async def read_root() -> HealthCheck: | |
| """Vérifie que l'API est en cours d'exécution et teste la connexion à l'API Celery.""" | |
| celery_status = "unknown" | |
| try: | |
| response = requests.get(f"{CELERY_API_URL}/", timeout=5) | |
| if response.status_code == 200: | |
| celery_status = "connected" | |
| else: | |
| celery_status = "error" | |
| except Exception as e: | |
| logger.warning(f"Impossible de se connecter à l'API Celery: {e}") | |
| celery_status = "disconnected" | |
| return HealthCheck(status="ok", celery_api_status=celery_status) | |
| # --- Endpoint du parser de CV --- | |
| async def parse_cv_endpoint(file: UploadFile = File(...)): | |
| if file.content_type != "application/pdf": | |
| raise HTTPException(status_code=400, detail="Le fichier doit être au format PDF.") | |
| tmp_path = None | |
| try: | |
| contents = await file.read() | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as tmp: | |
| tmp.write(contents) | |
| tmp.flush() | |
| tmp_path = tmp.name | |
| logger.info(f"Début du parsing du CV temporaire : {tmp_path}") | |
| cv_agent = CvParserAgent(pdf_path=tmp_path) | |
| parsed_data = await run_in_threadpool(cv_agent.process) | |
| if not parsed_data: | |
| raise HTTPException(status_code=500, detail="Échec du parsing du CV.") | |
| logger.info("Parsing du CV réussi. Lancement du scoring contextuel.") | |
| scoring_engine = ContextualScoringEngine(parsed_data) | |
| scored_skills_data = await run_in_threadpool(scoring_engine.calculate_scores) | |
| if parsed_data.get("candidat"): | |
| parsed_data["candidat"].update(scored_skills_data) | |
| else: | |
| parsed_data.update(scored_skills_data) | |
| logger.info("Scoring terminé. Retour de la réponse complète.") | |
| return parsed_data | |
| except Exception as e: | |
| logger.error(f"Erreur lors du parsing ou du scoring du CV : {e}", exc_info=True) | |
| raise HTTPException(status_code=500, detail=f"Erreur interne du serveur : {e}") | |
| finally: | |
| if tmp_path and os.path.exists(tmp_path): | |
| try: | |
| os.remove(tmp_path) | |
| logger.info(f"Fichier temporaire supprimé : {tmp_path}") | |
| except Exception as cleanup_error: | |
| logger.warning(f"Erreur lors de la suppression du fichier temporaire : {cleanup_error}") | |
| # --- Endpoint de simulation d'entretien --- | |
| async def simulate_interview_endpoint(request: InterviewRequest): | |
| try: | |
| processor = InterviewProcessor( | |
| cv_document=request.cv_document, | |
| job_offer=request.job_offer, | |
| conversation_history=request.conversation_history | |
| ) | |
| ai_response_object = await run_in_threadpool(processor.run, messages=request.messages) | |
| # On retourne juste la réponse de l'assistant pour le chat | |
| return {"response": ai_response_object["messages"][-1].content} | |
| except Exception as e: | |
| logger.error(f"Erreur interne dans /simulate-interview/: {e}", exc_info=True) | |
| raise HTTPException(status_code=500, detail=f"Erreur interne du serveur : {e}") | |
| # --- Endpoints pour l'analyse asynchrone via API Celery externe --- | |
| async def trigger_analysis(request: AnalysisRequest): | |
| """ | |
| Déclenche l'analyse de l'entretien en tâche de fond via l'API Celery externe. | |
| Retourne immédiatement un ID de tâche. | |
| """ | |
| try: | |
| logger.info(f"Déclenchement d'analyse via API Celery pour candidat: {request.candidate_id}") | |
| # Appel à l'API Celery externe | |
| celery_response = requests.post( | |
| f"{CELERY_API_URL}/trigger-analysis", | |
| json={ | |
| "conversation_history": request.conversation_history, | |
| "job_description_text": request.job_description_text, | |
| "candidate_id": request.candidate_id | |
| }, | |
| headers={"Content-Type": "application/json"}, | |
| timeout=30 | |
| ) | |
| if celery_response.status_code == 202: | |
| celery_data = celery_response.json() | |
| return TaskResponse( | |
| task_id=celery_data["task_id"], | |
| status=celery_data["status"], | |
| result=celery_data.get("result"), | |
| message="Analyse démarrée avec succès" | |
| ) | |
| else: | |
| logger.error(f"Erreur API Celery: {celery_response.status_code} - {celery_response.text}") | |
| raise HTTPException( | |
| status_code=503, | |
| detail=f"Service d'analyse indisponible: {celery_response.status_code}" | |
| ) | |
| except requests.exceptions.RequestException as e: | |
| logger.error(f"Erreur de connexion à l'API Celery: {e}") | |
| raise HTTPException( | |
| status_code=503, | |
| detail="Service d'analyse temporairement indisponible" | |
| ) | |
| except Exception as e: | |
| logger.error(f"Erreur inattendue lors du déclenchement de l'analyse: {e}") | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def get_analysis_status(task_id: str): | |
| """ | |
| Vérifie le statut de la tâche d'analyse via l'API Celery externe. | |
| Si terminée, retourne le résultat. | |
| """ | |
| try: | |
| logger.info(f"Vérification du statut pour la tâche: {task_id}") | |
| # Appel à l'API Celery externe | |
| celery_response = requests.get( | |
| f"{CELERY_API_URL}/task-status/{task_id}", | |
| timeout=10 | |
| ) | |
| if celery_response.status_code == 200: | |
| celery_data = celery_response.json() | |
| return TaskResponse( | |
| task_id=task_id, | |
| status=celery_data["status"], | |
| result=celery_data.get("result"), | |
| message=celery_data.get("progress", "Statut récupéré") | |
| ) | |
| else: | |
| logger.error(f"Erreur API Celery: {celery_response.status_code}") | |
| raise HTTPException( | |
| status_code=503, | |
| detail="Service d'analyse indisponible" | |
| ) | |
| except requests.exceptions.RequestException as e: | |
| logger.error(f"Erreur de connexion à l'API Celery: {e}") | |
| raise HTTPException( | |
| status_code=503, | |
| detail="Service d'analyse temporairement indisponible" | |
| ) | |
| except Exception as e: | |
| logger.error(f"Erreur lors de la vérification du statut: {e}") | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def get_celery_stats(): | |
| """Récupère les statistiques de l'API Celery externe.""" | |
| try: | |
| response = requests.get(f"{CELERY_API_URL}/worker-stats", timeout=10) | |
| if response.status_code == 200: | |
| return response.json() | |
| else: | |
| return {"error": f"API Celery inaccessible: {response.status_code}"} | |
| except Exception as e: | |
| return {"error": f"Impossible de récupérer les stats: {e}"} | |
| if __name__ == "__main__": | |
| uvicorn.run(app, host="0.0.0.0", port=8000) |