Spaces:
Sleeping
Sleeping
| import os | |
| import shutil | |
| import tempfile | |
| import logging | |
| from typing import Optional, List | |
| from fastapi import UploadFile, HTTPException | |
| from app.embeddings import add_to_vector_store | |
| UPLOAD_DIR = "uploaded_files" | |
| os.makedirs(UPLOAD_DIR, exist_ok=True) | |
| MAX_FILE_SIZE = 10 * 1024 * 1024 # 10MB | |
| ALLOWED_EXTENSIONS = {".txt", ".md", ".pdf"} # Customize as needed | |
| logger = logging.getLogger(__name__) | |
| def is_extension_allowed(filename: str) -> bool: | |
| ext = os.path.splitext(filename)[1].lower() | |
| return ext in ALLOWED_EXTENSIONS | |
| def save_upload_to_disk(file: UploadFile, upload_dir: Optional[str] = None) -> str: | |
| """ | |
| Save UploadFile to disk under upload_dir (defaults to UPLOAD_DIR). | |
| Prevent overwriting by appending a counter if needed. | |
| Returns the saved file path. | |
| """ | |
| target_dir = upload_dir or UPLOAD_DIR | |
| os.makedirs(target_dir, exist_ok=True) | |
| filename = file.filename | |
| path = os.path.join(target_dir, filename) | |
| base, ext = os.path.splitext(filename) | |
| counter = 1 | |
| while os.path.exists(path): | |
| filename = f"{base}_{counter}{ext}" | |
| path = os.path.join(target_dir, filename) | |
| counter += 1 | |
| with open(path, "wb") as f: | |
| file.file.seek(0) | |
| shutil.copyfileobj(file.file, f) | |
| logger.info(f"π File saved to {path}") | |
| return path | |
| async def save_upload(file: UploadFile) -> str: | |
| """ | |
| Saves uploaded file to disk and returns the saved file path. | |
| """ | |
| file_path = os.path.join(UPLOAD_DIR, file.filename) | |
| os.makedirs(UPLOAD_DIR, exist_ok=True) | |
| with open(file_path, "wb") as out_file: | |
| content = await file.read() | |
| out_file.write(content) | |
| logger.info(f"β Uploaded file saved: {file_path}") | |
| return file_path | |
| def save_upload_temp(file: UploadFile) -> str: | |
| """ | |
| Save UploadFile to a temp file and return path. | |
| """ | |
| temp_dir = tempfile.gettempdir() | |
| temp_path = os.path.join(temp_dir, file.filename) | |
| with open(temp_path, "wb") as f: | |
| file.file.seek(0) | |
| shutil.copyfileobj(file.file, f) | |
| logger.debug(f"π¦ File saved temporarily at {temp_path}") | |
| return temp_path | |
| def read_file_content(file: UploadFile, max_size: int = MAX_FILE_SIZE) -> str: | |
| """ | |
| Read file content as decoded string. | |
| Raises HTTPException if file is too large. | |
| """ | |
| file.file.seek(0) | |
| content_bytes = file.file.read() | |
| if len(content_bytes) > max_size: | |
| raise HTTPException(status_code=413, detail="File too large") | |
| return content_bytes.decode(errors="ignore") | |
| def summarize_file_content(content: str, max_lines: int = 3) -> str: | |
| """ | |
| Return first max_lines lines of content, add "..." if truncated. | |
| """ | |
| lines = content.strip().splitlines() | |
| summary = "\n".join(lines[:max_lines]) | |
| if len(lines) > max_lines: | |
| summary += "\n..." | |
| return summary | |
| def process_uploaded_file(file: UploadFile) -> str: | |
| """ | |
| Read uploaded file content, validate type, add to vector store, return summary. | |
| """ | |
| if not is_extension_allowed(file.filename): | |
| raise HTTPException(status_code=415, detail="Unsupported file type") | |
| try: | |
| content = read_file_content(file) | |
| doc = {"page_content": content, "metadata": {"source": file.filename}} | |
| success = add_to_vector_store([doc], vector_store=None) # Provide actual vector_store if required | |
| logger.info(f"Vector store add success: {success}") | |
| return summarize_file_content(content) | |
| except Exception as e: | |
| logger.error(f"β Error processing uploaded file: {e}", exc_info=True) | |
| raise HTTPException(status_code=500, detail="Failed to process uploaded file") | |