QuentinL52's picture
Update main.py
c42e6f5 verified
raw
history blame
10 kB
import tempfile
import requests
from fastapi import FastAPI, UploadFile, File, HTTPException, Body
from fastapi.concurrency import run_in_threadpool
from pydantic import BaseModel, Field
from typing import List, Dict, Any, Optional
from datetime import datetime
import uvicorn
import os
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
from src.cv_parsing_agents import CvParserAgent
from src.interview_simulator.entretient_version_prod import InterviewProcessor
from src.scoring_engine import ContextualScoringEngine
from src.rag_handler import RAGHandler
app = FastAPI(
title="API d'IA pour la RH",
description="Une API pour le parsing de CV et la simulation d'entretiens avec analyse asynchrone.",
version="1.3.0"
)
# Configuration de l'API Celery externe
CELERY_API_URL = os.getenv("CELERY_API_URL", "https://celery-7as1.onrender.com")
# Initialisation des services au démarrage
try:
logger.info("Initialisation du RAG Handler...")
rag_handler = RAGHandler()
if rag_handler.vector_store:
logger.info(f"Vector store chargé avec {rag_handler.vector_store.index.ntotal} vecteurs.")
else:
logger.warning("Le RAG Handler n'a pas pu être initialisé (pas de documents ?). Le feedback contextuel sera désactivé.")
except Exception as e:
logger.error(f"Erreur critique lors de l'initialisation du RAG Handler: {e}", exc_info=True)
rag_handler = None
class InterviewRequest(BaseModel):
user_id: str = Field(..., example="google_user_12345")
job_offer_id: str = Field(..., example="job_offer_abcde")
cv_document: Dict[str, Any] = Field(..., example={"candidat": {"nom": "John Doe", "compétences": {"hard_skills": ["Python", "FastAPI"]}}})
job_offer: Dict[str, Any] = Field(..., example={"poste": "Développeur Python", "description": "Recherche développeur expérimenté..."})
messages: List[Dict[str, Any]]
conversation_history: List[Dict[str, Any]]
class AnalysisRequest(BaseModel):
conversation_history: List[Dict[str, Any]]
job_description_text: str
candidate_id: Optional[str] = None
class TaskResponse(BaseModel):
task_id: str
status: str
result: Any = None
message: Optional[str] = None
class HealthCheck(BaseModel):
status: str = Field(default="ok", example="ok")
celery_api_status: Optional[str] = None
@app.get("/", tags=["Status"], summary="Vérification de l'état de l'API")
async def read_root() -> HealthCheck:
"""Vérifie que l'API est en cours d'exécution et teste la connexion à l'API Celery."""
celery_status = "unknown"
try:
response = requests.get(f"{CELERY_API_URL}/", timeout=5)
if response.status_code == 200:
celery_status = "connected"
else:
celery_status = "error"
except Exception as e:
logger.warning(f"Impossible de se connecter à l'API Celery: {e}")
celery_status = "disconnected"
return HealthCheck(status="ok", celery_api_status=celery_status)
# --- Endpoint du parser de CV ---
@app.post("/parse-cv/", tags=["CV Parsing"], summary="Analyser un CV au format PDF avec scoring contextuel")
async def parse_cv_endpoint(file: UploadFile = File(...)):
if file.content_type != "application/pdf":
raise HTTPException(status_code=400, detail="Le fichier doit être au format PDF.")
tmp_path = None
try:
contents = await file.read()
with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as tmp:
tmp.write(contents)
tmp.flush()
tmp_path = tmp.name
logger.info(f"Début du parsing du CV temporaire : {tmp_path}")
cv_agent = CvParserAgent(pdf_path=tmp_path)
parsed_data = await run_in_threadpool(cv_agent.process)
if not parsed_data:
raise HTTPException(status_code=500, detail="Échec du parsing du CV.")
logger.info("Parsing du CV réussi. Lancement du scoring contextuel.")
scoring_engine = ContextualScoringEngine(parsed_data)
scored_skills_data = await run_in_threadpool(scoring_engine.calculate_scores)
if parsed_data.get("candidat"):
parsed_data["candidat"].update(scored_skills_data)
else:
parsed_data.update(scored_skills_data)
logger.info("Scoring terminé. Retour de la réponse complète.")
return parsed_data
except Exception as e:
logger.error(f"Erreur lors du parsing ou du scoring du CV : {e}", exc_info=True)
raise HTTPException(status_code=500, detail=f"Erreur interne du serveur : {e}")
finally:
if tmp_path and os.path.exists(tmp_path):
try:
os.remove(tmp_path)
logger.info(f"Fichier temporaire supprimé : {tmp_path}")
except Exception as cleanup_error:
logger.warning(f"Erreur lors de la suppression du fichier temporaire : {cleanup_error}")
# --- Endpoint de simulation d'entretien ---
@app.post("/simulate-interview/", tags=["Simulation d'Entretien"], summary="Gérer une conversation d'entretien")
async def simulate_interview_endpoint(request: InterviewRequest):
try:
processor = InterviewProcessor(
cv_document=request.cv_document,
job_offer=request.job_offer,
conversation_history=request.conversation_history
)
ai_response_object = await run_in_threadpool(processor.run, messages=request.messages)
# On retourne juste la réponse de l'assistant pour le chat
return {"response": ai_response_object["messages"][-1].content}
except Exception as e:
logger.error(f"Erreur interne dans /simulate-interview/: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=f"Erreur interne du serveur : {e}")
# --- Endpoints pour l'analyse asynchrone via API Celery externe ---
@app.post("/trigger-analysis/", tags=["Analyse Asynchrone"], response_model=TaskResponse, status_code=202)
async def trigger_analysis(request: AnalysisRequest):
"""
Déclenche l'analyse de l'entretien en tâche de fond via l'API Celery externe.
Retourne immédiatement un ID de tâche.
"""
try:
logger.info(f"Déclenchement d'analyse via API Celery pour candidat: {request.candidate_id}")
# Appel à l'API Celery externe
celery_response = requests.post(
f"{CELERY_API_URL}/trigger-analysis",
json={
"conversation_history": request.conversation_history,
"job_description_text": request.job_description_text,
"candidate_id": request.candidate_id
},
headers={"Content-Type": "application/json"},
timeout=30
)
if celery_response.status_code == 202:
celery_data = celery_response.json()
return TaskResponse(
task_id=celery_data["task_id"],
status=celery_data["status"],
result=celery_data.get("result"),
message="Analyse démarrée avec succès"
)
else:
logger.error(f"Erreur API Celery: {celery_response.status_code} - {celery_response.text}")
raise HTTPException(
status_code=503,
detail=f"Service d'analyse indisponible: {celery_response.status_code}"
)
except requests.exceptions.RequestException as e:
logger.error(f"Erreur de connexion à l'API Celery: {e}")
raise HTTPException(
status_code=503,
detail="Service d'analyse temporairement indisponible"
)
except Exception as e:
logger.error(f"Erreur inattendue lors du déclenchement de l'analyse: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.get("/analysis-status/{task_id}", tags=["Analyse Asynchrone"], response_model=TaskResponse)
async def get_analysis_status(task_id: str):
"""
Vérifie le statut de la tâche d'analyse via l'API Celery externe.
Si terminée, retourne le résultat.
"""
try:
logger.info(f"Vérification du statut pour la tâche: {task_id}")
# Appel à l'API Celery externe
celery_response = requests.get(
f"{CELERY_API_URL}/task-status/{task_id}",
timeout=10
)
if celery_response.status_code == 200:
celery_data = celery_response.json()
return TaskResponse(
task_id=task_id,
status=celery_data["status"],
result=celery_data.get("result"),
message=celery_data.get("progress", "Statut récupéré")
)
else:
logger.error(f"Erreur API Celery: {celery_response.status_code}")
raise HTTPException(
status_code=503,
detail="Service d'analyse indisponible"
)
except requests.exceptions.RequestException as e:
logger.error(f"Erreur de connexion à l'API Celery: {e}")
raise HTTPException(
status_code=503,
detail="Service d'analyse temporairement indisponible"
)
except Exception as e:
logger.error(f"Erreur lors de la vérification du statut: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.get("/celery-stats", tags=["Debug"], summary="Statistiques de l'API Celery")
async def get_celery_stats():
"""Récupère les statistiques de l'API Celery externe."""
try:
response = requests.get(f"{CELERY_API_URL}/worker-stats", timeout=10)
if response.status_code == 200:
return response.json()
else:
return {"error": f"API Celery inaccessible: {response.status_code}"}
except Exception as e:
return {"error": f"Impossible de récupérer les stats: {e}"}
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)