QuentinL52's picture
Update main.py
037629b verified
raw
history blame
5.13 kB
import tempfile
import os
import json
from fastapi import FastAPI, UploadFile, File, HTTPException, BackgroundTasks
from fastapi.concurrency import run_in_threadpool
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, Field
from typing import List, Dict, Any, Optional
from src.models import load_all_models
from src.services.cv_service import CVParsingService
from src.services.interview_service import InterviewProcessor
from src.services.analysis_service import AnalysisService
from services.graph_service import GraphConversationManager
from fastapi.responses import JSONResponse
from bson import ObjectId
os.environ['HOME'] = '/tmp'
os.makedirs('/tmp/feedbacks', exist_ok=True)
graph_manager = GraphConversationManager()
app = FastAPI(
title="AIrh Interview Assistant",
description="API pour l'analyse de CV et la simulation d'entretiens d'embauche avec analyse asynchrone.",
version="2.0.0",
docs_url="/docs",
redoc_url="/redoc"
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
models = load_all_models()
cv_service = CVParsingService(models)
analysis_service = AnalysisService(models)
class InterviewRequest(BaseModel):
user_id: str = Field(..., example="user_12345")
job_offer_id: str = Field(..., example="job_offer_abcde")
cv_document: Dict[str, Any]
job_offer: Dict[str, Any]
messages: List[Dict[str, Any]]
conversation_history: List[Dict[str, Any]]
class Feedback(BaseModel):
status: str
feedback_data: Optional[Dict[str, Any]] = None
class HealthCheck(BaseModel):
status: str = "ok"
services: Dict[str, bool] = Field(default_factory=dict)
message: str = "API AIrh fonctionnelle"
def background_analysis_task(user_id: str, conversation_history: list, job_description: str):
feedback_path = f"/tmp/feedbacks/{user_id}.json"
with open(feedback_path, "w", encoding="utf-8") as f:
json.dump({"status": "processing"}, f, ensure_ascii=False, indent=4)
result = analysis_service.run_analysis(conversation_history, job_description)
with open(feedback_path, "w", encoding="utf-8") as f:
json.dump({"status": "completed", "feedback_data": result}, f, ensure_ascii=False, indent=4)
@app.get("/", response_model=HealthCheck, tags=["Status"])
async def health_check():
services = {
"models_loaded": models.get("status", False),
"cv_parsing": True,
"interview_simulation": True,
"scoring_engine": True
}
return HealthCheck(services=services)
class MongoJSONEncoder(json.JSONEncoder):
def default(self, o):
if isinstance(o, ObjectId):
return str(o)
return super().default(o)
@app.post("/simulate-interview/")
async def simulate_interview(request: Request):
try:
payload = await request.json()
result = graph_manager.invoke(payload)
return JSONResponse(content=result)
except Exception as e:
logger.error(f"Error in simulate-interview endpoint: {e}", exc_info=True)
return JSONResponse(content={"error": "An internal error occurred."}, status_code=500)
@app.post("/parse-cv/", tags=["CV Parsing"])
async def parse_cv(file: UploadFile = File(...)):
if file.content_type != "application/pdf":
raise HTTPException(status_code=400, detail="Fichier PDF requis")
contents = await file.read()
with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as tmp:
tmp.write(contents)
tmp_path = tmp.name
result = await run_in_threadpool(cv_service.parse_cv, tmp_path)
if os.path.exists(tmp_path):
os.remove(tmp_path)
return result
'''
@app.post("/simulate-interview/", tags=["Interview"])
async def simulate_interview(request: InterviewRequest, background_tasks: BackgroundTasks):
processor = InterviewProcessor(
request.cv_document,
request.job_offer,
request.conversation_history
)
result = await run_in_threadpool(
processor.run,
request.messages
)
response_content = result["messages"][-1].content
if "nous allons maintenant passer a l'analyse" in response_content.lower():
job_description = request.job_offer.get('description', '')
background_tasks.add_task(
background_analysis_task,
request.user_id,
request.conversation_history + request.messages,
job_description
)
return {"response": response_content}
'''
@app.get("/get-feedback/{user_id}", response_model=Feedback, tags=["Analysis"])
async def get_feedback(user_id: str):
feedback_path = f"/tmp/feedbacks/{user_id}.json"
if not os.path.exists(feedback_path):
raise HTTPException(status_code=404, detail="Feedback non trouvé ou non encore traité.")
with open(feedback_path, "r", encoding="utf-8") as f:
data = json.load(f)
return Feedback(**data)
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=7860)