Spaces:
Sleeping
Sleeping
| from fastapi import FastAPI, File, UploadFile, Form, HTTPException, Request | |
| from fastapi.staticfiles import StaticFiles | |
| from fastapi.templating import Jinja2Templates | |
| from fastapi.responses import HTMLResponse, JSONResponse | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from typing import Optional | |
| import requests | |
| import os | |
| import io | |
| import tempfile | |
| import logging | |
| import subprocess | |
| # Configuration du logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| app = FastAPI() | |
| # Configuration CORS | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # Configuration des fichiers statiques | |
| app.mount("/static", StaticFiles(directory="frontend"), name="static") | |
| templates = Jinja2Templates(directory="frontend") | |
| async def serve_frontend(request: Request): | |
| return templates.TemplateResponse("index.html", {"request": request}) | |
| # Configuration Hugging Face | |
| HF_API_KEY = os.getenv("HF_API_KEY", "") | |
| HEADERS = {"Authorization": f"Bearer {HF_API_KEY}"} if HF_API_KEY else {} | |
| # Configuration Hugging Face | |
| HF_MODELS = { | |
| "summary": "facebook/bart-large-cnn", | |
| # "qa": "deepset/roberta-base-squad2" # <- ancien modèle commenté | |
| "qa": "HPAI-BSC/Llama3-Aloe-8B-Alpha" # <- nouveau modèle | |
| } | |
| HF_API_URL = "/static-proxy?url=https%3A%2F%2Fapi-inference.huggingface.co%2Fmodels%2F%26quot%3B%3C%2Fspan%3E%3C!-- HTML_TAG_END --> | |
| def query_huggingface(model: str, payload: dict): | |
| try: | |
| api_url = f"{HF_API_URL}{model}" | |
| logger.info(f"Requête à {api_url}") | |
| response = requests.post( | |
| api_url, | |
| headers=HEADERS, | |
| json=payload, | |
| timeout=60 | |
| ) | |
| if response.status_code != 200: | |
| logger.error(f"Erreur API Hugging Face: {response.status_code}, {response.text}") | |
| return {"error": f"Erreur API: {response.status_code}"} | |
| return response.json() | |
| except Exception as e: | |
| logger.error(f"Erreur API: {str(e)}") | |
| return {"error": str(e)} | |
| async def convert_to_text(file: UploadFile): | |
| """Convertit différents formats de fichiers en texte avec gestion robuste des erreurs""" | |
| try: | |
| # Vérification du type de fichier | |
| if not file.filename: | |
| return "Aucun fichier fourni" | |
| ext = os.path.splitext(file.filename)[1].lower() | |
| # Lecture du contenu | |
| content = await file.read() | |
| # Traitement des fichiers texte | |
| if ext == '.txt': | |
| return content.decode('utf-8', errors='replace') | |
| # Traitement des PDF avec pdftotext | |
| elif ext == '.pdf': | |
| try: | |
| with tempfile.NamedTemporaryFile(suffix='.pdf', delete=False) as tmp_pdf: | |
| tmp_pdf.write(content) | |
| tmp_pdf.flush() | |
| tmp_pdf_path = tmp_pdf.name | |
| try: | |
| result = subprocess.run( | |
| ["pdftotext", tmp_pdf_path, "-"], | |
| capture_output=True, | |
| text=True, | |
| timeout=30 | |
| ) | |
| os.unlink(tmp_pdf_path) # Supprimer le fichier temporaire | |
| if result.returncode == 0: | |
| return result.stdout | |
| else: | |
| error_msg = result.stderr or "Erreur inconnue lors de la conversion PDF" | |
| logger.error(f"PDF conversion failed: {error_msg}") | |
| return f"Erreur de conversion PDF: {error_msg}" | |
| except: | |
| # S'assurer que le fichier temporaire est supprimé en cas d'erreur | |
| if os.path.exists(tmp_pdf_path): | |
| os.unlink(tmp_pdf_path) | |
| raise | |
| except FileNotFoundError: | |
| logger.warning("pdftotext non installé") | |
| return "Conversion PDF non disponible (pdftotext requis)" | |
| except subprocess.TimeoutExpired: | |
| return "Timeout lors de la conversion PDF" | |
| # Traitement des fichiers Word avec pandoc | |
| elif ext in ('.docx', '.doc'): | |
| try: | |
| with tempfile.NamedTemporaryFile(suffix=ext, delete=False) as tmp_doc: | |
| tmp_doc.write(content) | |
| tmp_doc.flush() | |
| tmp_doc_path = tmp_doc.name | |
| try: | |
| result = subprocess.run( | |
| ["pandoc", "-t", "plain", tmp_doc_path], | |
| capture_output=True, | |
| text=True, | |
| timeout=30 | |
| ) | |
| os.unlink(tmp_doc_path) # Supprimer le fichier temporaire | |
| if result.returncode == 0: | |
| return result.stdout | |
| else: | |
| error_msg = result.stderr or "Erreur inconnue lors de la conversion DOCX" | |
| logger.error(f"DOCX conversion failed: {error_msg}") | |
| return f"Erreur de conversion DOCX: {error_msg}" | |
| except: | |
| # S'assurer que le fichier temporaire est supprimé en cas d'erreur | |
| if os.path.exists(tmp_doc_path): | |
| os.unlink(tmp_doc_path) | |
| raise | |
| except FileNotFoundError: | |
| logger.warning("pandoc non installé") | |
| return "Conversion DOCX non disponible (pandoc requis)" | |
| except subprocess.TimeoutExpired: | |
| return "Timeout lors de la conversion DOCX" | |
| else: | |
| return f"Format de fichier non supporté: {ext}" | |
| except Exception as e: | |
| logger.error(f"Erreur de conversion: {str(e)}") | |
| return f"Erreur lors de la conversion du fichier: {str(e)}" | |
| async def summarize_document(file: UploadFile = File(...)): | |
| """Endpoint pour résumer des documents avec gestion améliorée des PDF""" | |
| try: | |
| logger.info(f"Traitement du fichier: {file.filename}") | |
| text = await convert_to_text(file) | |
| if not text or not text.strip(): | |
| raise HTTPException(400, "Fichier vide ou problème de conversion") | |
| # Si le texte est un message d'erreur | |
| if text.startswith(("Erreur de conversion", "Conversion", "Format non supporté")): | |
| return { | |
| "filename": file.filename, | |
| "summary": text, # Retourne le message d'erreur comme "résumé" | |
| "text_length": len(text), | |
| "warning": True | |
| } | |
| # Limite la taille pour l'API | |
| text_to_process = text[:3000] # Réduire pour plus de fiabilité | |
| response = query_huggingface(HF_MODELS["summary"], { | |
| "inputs": text_to_process, | |
| "parameters": {"max_length": 150, "min_length": 30} | |
| }) | |
| if "error" in response: | |
| logger.error(f"Erreur de l'API HF: {response['error']}") | |
| return { | |
| "filename": file.filename, | |
| "summary": f"Erreur lors de la génération du résumé: {response['error']}", | |
| "text_length": len(text), | |
| "warning": True | |
| } | |
| # Gérer différents formats de réponse possibles | |
| summary_text = "" | |
| if isinstance(response, list) and len(response) > 0: | |
| if isinstance(response[0], dict) and "summary_text" in response[0]: | |
| summary_text = response[0]["summary_text"] | |
| elif isinstance(response[0], str): | |
| summary_text = response[0] | |
| elif isinstance(response, dict) and "summary_text" in response: | |
| summary_text = response["summary_text"] | |
| if not summary_text: | |
| summary_text = "Le modèle n'a pas pu générer de résumé. Essayez avec un texte plus court ou plus clair." | |
| return { | |
| "filename": file.filename, | |
| "summary": summary_text, | |
| "text_length": len(text), | |
| "warning": False | |
| } | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.error(f"Erreur dans summarize: {str(e)}") | |
| raise HTTPException(500, f"Erreur interne: {str(e)}") | |
| async def answer_question( | |
| question: str = Form(...), | |
| file: Optional[UploadFile] = File(None) | |
| ): | |
| """Endpoint pour répondre à des questions basées sur un document""" | |
| try: | |
| logger.info(f"Question reçue: {question}") | |
| context = "" | |
| if file: | |
| logger.info(f"Traitement du fichier: {file.filename}") | |
| context = await convert_to_text(file) | |
| # Si le contexte est un message d'erreur | |
| if context.startswith(("Erreur de conversion", "Conversion", "Format non supporté")): | |
| return { | |
| "question": question, | |
| "answer": f"Problème avec le document: {context}", | |
| "warning": True | |
| } | |
| # Si aucun fichier fourni, on répond juste à la question | |
| if not context or not context.strip(): | |
| context = "Pas de contexte disponible." | |
| # Limite la taille du contexte pour l'API | |
| context_to_process = context[:3000] # Réduire pour plus de fiabilité | |
| response = query_huggingface(HF_MODELS["qa"], { | |
| "inputs": { | |
| "question": question, | |
| "context": context_to_process | |
| } | |
| }) | |
| if "error" in response: | |
| logger.error(f"Erreur de l'API HF: {response['error']}") | |
| return { | |
| "question": question, | |
| "answer": f"Erreur lors de l'analyse: {response['error']}", | |
| "warning": True | |
| } | |
| # Gérer différents formats de réponse possibles | |
| answer = "" | |
| if isinstance(response, dict): | |
| if "answer" in response: | |
| answer = response["answer"] | |
| elif "answer_text" in response: | |
| answer = response["answer_text"] | |
| elif "answers" in response and len(response["answers"]) > 0: | |
| answer = response["answers"][0]["text"] | |
| if not answer: | |
| answer = "Je n'ai pas trouvé de réponse précise à cette question dans le document fourni." | |
| return { | |
| "question": question, | |
| "answer": answer, | |
| "warning": False | |
| } | |
| except Exception as e: | |
| logger.error(f"Erreur dans answer-question: {str(e)}") | |
| raise HTTPException(500, f"Erreur interne: {str(e)}") | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run(app, host="0.0.0.0", port=7860) |