import tempfile import os import json from fastapi import FastAPI, UploadFile, File, HTTPException, BackgroundTasks from fastapi.concurrency import run_in_threadpool from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel, Field from typing import List, Dict, Any, Optional from src.models import load_all_models from src.services.cv_service import CVParsingService from src.services.interview_service import InterviewProcessor from src.services.analysis_service import AnalysisService from services.graph_service import GraphConversationManager from fastapi.responses import JSONResponse from bson import ObjectId os.environ['HOME'] = '/tmp' os.makedirs('/tmp/feedbacks', exist_ok=True) graph_manager = GraphConversationManager() app = FastAPI( title="AIrh Interview Assistant", description="API pour l'analyse de CV et la simulation d'entretiens d'embauche avec analyse asynchrone.", version="2.0.0", docs_url="/docs", redoc_url="/redoc" ) app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) models = load_all_models() cv_service = CVParsingService(models) analysis_service = AnalysisService(models) class InterviewRequest(BaseModel): user_id: str = Field(..., example="user_12345") job_offer_id: str = Field(..., example="job_offer_abcde") cv_document: Dict[str, Any] job_offer: Dict[str, Any] messages: List[Dict[str, Any]] conversation_history: List[Dict[str, Any]] class Feedback(BaseModel): status: str feedback_data: Optional[Dict[str, Any]] = None class HealthCheck(BaseModel): status: str = "ok" services: Dict[str, bool] = Field(default_factory=dict) message: str = "API AIrh fonctionnelle" def background_analysis_task(user_id: str, conversation_history: list, job_description: str): feedback_path = f"/tmp/feedbacks/{user_id}.json" with open(feedback_path, "w", encoding="utf-8") as f: json.dump({"status": "processing"}, f, ensure_ascii=False, indent=4) result = analysis_service.run_analysis(conversation_history, job_description) with open(feedback_path, "w", encoding="utf-8") as f: json.dump({"status": "completed", "feedback_data": result}, f, ensure_ascii=False, indent=4) @app.get("/", response_model=HealthCheck, tags=["Status"]) async def health_check(): services = { "models_loaded": models.get("status", False), "cv_parsing": True, "interview_simulation": True, "scoring_engine": True } return HealthCheck(services=services) class MongoJSONEncoder(json.JSONEncoder): def default(self, o): if isinstance(o, ObjectId): return str(o) return super().default(o) @app.post("/simulate-interview/") async def simulate_interview(request: Request): try: payload = await request.json() result = graph_manager.invoke(payload) return JSONResponse(content=result) except Exception as e: logger.error(f"Error in simulate-interview endpoint: {e}", exc_info=True) return JSONResponse(content={"error": "An internal error occurred."}, status_code=500) @app.post("/parse-cv/", tags=["CV Parsing"]) async def parse_cv(file: UploadFile = File(...)): if file.content_type != "application/pdf": raise HTTPException(status_code=400, detail="Fichier PDF requis") contents = await file.read() with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as tmp: tmp.write(contents) tmp_path = tmp.name result = await run_in_threadpool(cv_service.parse_cv, tmp_path) if os.path.exists(tmp_path): os.remove(tmp_path) return result ''' @app.post("/simulate-interview/", tags=["Interview"]) async def simulate_interview(request: InterviewRequest, background_tasks: BackgroundTasks): processor = InterviewProcessor( request.cv_document, request.job_offer, request.conversation_history ) result = await run_in_threadpool( processor.run, request.messages ) response_content = result["messages"][-1].content if "nous allons maintenant passer a l'analyse" in response_content.lower(): job_description = request.job_offer.get('description', '') background_tasks.add_task( background_analysis_task, request.user_id, request.conversation_history + request.messages, job_description ) return {"response": response_content} ''' @app.get("/get-feedback/{user_id}", response_model=Feedback, tags=["Analysis"]) async def get_feedback(user_id: str): feedback_path = f"/tmp/feedbacks/{user_id}.json" if not os.path.exists(feedback_path): raise HTTPException(status_code=404, detail="Feedback non trouvé ou non encore traité.") with open(feedback_path, "r", encoding="utf-8") as f: data = json.load(f) return Feedback(**data) if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=7860)