import os import io import uuid import ssl from datetime import datetime from urllib.parse import urlparse, urlunparse from fastapi import FastAPI, File, Form, UploadFile, Query, Depends, Request from fastapi.responses import FileResponse, JSONResponse from huggingface_hub import InferenceClient from PIL import Image from pymongo import MongoClient, ReturnDocument from pymongo.errors import DuplicateKeyError from bson import ObjectId from typing import Optional, Union from datetime import timedelta try: from firebase_app_check import verify_app_check_token except ImportError: # Fallback if firebase_app_check is not available from fastapi import Request async def verify_app_check_token(request: Request) -> Optional[dict]: """Fallback: Skip App Check verification if module not available""" print("[WARNING] firebase_app_check module not found. App Check verification disabled.") return None HF_TOKEN = os.getenv("HF_TOKEN") QWEN_MODEL_ID = os.getenv("QWEN_MODEL_ID", "Qwen/Qwen-Image-Edit") QWEN_PROVIDER = os.getenv("QWEN_PROVIDER", "fal-ai") # or "auto" # MongoDB connection strings (can be set as environment variables in Hugging Face Space Secrets) MONGODB_NANOBANANA = os.getenv( "MONGODB_NANOBANANA", "mongodb+srv://itishalogicgo_db_user:uiCMMk0qeHwo2pGt@nanobanana.u83frx3.mongodb.net/?appName=nanobanana" ) MONGODB_DESCRATCH = os.getenv( "MONGODB_DESCRATCH", "mongodb+srv://itishalogicgo_db_user:10SkvRu4v0o05T9L@cluster0.k5j6xro.mongodb.net/?appName=Cluster0" ) MONGODB_ADMIN = os.getenv("MONGODB_admin") # Default prompts for each database DEFAULT_PROMPT_NANOBANANA = "Using the provided photo of my parents, create a multi-stage creative setup. Show their grayscale 3D model displayed on a computer screen in the background. In the center, place a realistic 3D figurine of them in color, standing on a round base. To the side, include a colorful digital illustration of them printed on a poster or box. The scene should look like a modern studio environment, blending 3D modeling, realism, and stylized artwork in one frame" DEFAULT_PROMPT_DESCRATCH = "Remove all scratches, dust, cracks, and surface imperfections from the image. Smoothly restore damaged areas with realistic texture and lighting while preserving original details, colors, and natural look" app = FastAPI(title="Qwen Image Edit API (hosted inference)") # Initialize InferenceClient try: client = InferenceClient(provider=QWEN_PROVIDER, api_key=HF_TOKEN) MODEL_READY = True except Exception as e: print(f"Warning: could not initialize InferenceClient: {e}") client = None MODEL_READY = False # MongoDB client cache mongodb_clients = {} def _normalize_date(dt: datetime) -> datetime: """Return UTC date at midnight for consistent daily counters.""" return datetime(dt.year, dt.month, dt.day) def _build_ai_edit_daily_count(existing: Optional[list], now: datetime) -> list: """ Ensure ai_edit_daily_count is present and includes all days up to today. - If today's date exists: return list unchanged. - If missing dates: fill gaps with count=0 and add today with count=1. """ today = _normalize_date(now) if not existing: return [{"date": today, "count": 1}] # Normalize dates and keep order normalized = [] seen_dates = set() for entry in existing: date_val = entry.get("date") if isinstance(date_val, datetime): d = _normalize_date(date_val) elif isinstance(date_val, str): try: d = _normalize_date(datetime.fromisoformat(date_val)) except Exception: continue else: continue seen_dates.add(d) normalized.append({"date": d, "count": entry.get("count", 0)}) if today in seen_dates: return normalized # already recorded today; do not change counts # Fill missing days from last recorded date to today normalized.sort(key=lambda x: x["date"]) last_date = normalized[-1]["date"] cursor = last_date + timedelta(days=1) while cursor < today: normalized.append({"date": cursor, "count": 0}) cursor += timedelta(days=1) # Add today with default count=1 normalized.append({"date": today, "count": 1}) return normalized def get_mongodb_client(db_name: str): """Get MongoDB client for the specified database""" if db_name in mongodb_clients: try: # Test if cached client is still alive mongodb_clients[db_name].admin.command('ping') return mongodb_clients[db_name] except: # Client is dead, remove from cache del mongodb_clients[db_name] try: if db_name == "nanobanana": # Use relaxed SSL settings (works in Hugging Face Spaces) try: client = MongoClient( MONGODB_NANOBANANA, tlsAllowInvalidCertificates=True, serverSelectionTimeoutMS=15000, connectTimeoutMS=20000, socketTimeoutMS=30000 ) client.admin.command('ping') mongodb_clients[db_name] = client print(f"[MongoDB] Successfully connected to nanobanana") return client except Exception as e: print(f"[MongoDB] Connection failed: {str(e)[:200]}") raise elif db_name == "Descratch_logicgo": # Use relaxed SSL settings (works in Hugging Face Spaces) try: client = MongoClient( MONGODB_DESCRATCH, tlsAllowInvalidCertificates=True, serverSelectionTimeoutMS=15000, connectTimeoutMS=20000, socketTimeoutMS=30000 ) client.admin.command('ping') mongodb_clients[db_name] = client print(f"[MongoDB] Successfully connected to Descratch_logicgo") return client except Exception as e: print(f"[MongoDB] Connection failed: {str(e)[:200]}") raise else: return None except Exception as e: print(f"[MongoDB] Error connecting to MongoDB ({db_name}): {str(e)[:200]}") import traceback print(f"[MongoDB] Traceback: {traceback.format_exc()[:500]}") return None def save_to_mongodb(db_name: str, data: dict): """Save data to the specified MongoDB database""" try: print(f"[MongoDB] Attempting to save to database: {db_name}") mongo_client = get_mongodb_client(db_name) if mongo_client is None: print(f"[MongoDB] ERROR: Could not connect to MongoDB database: {db_name}") # Clear cached client so next attempt will retry if db_name in mongodb_clients: del mongodb_clients[db_name] return False # Map database name to actual MongoDB database name # Based on MongoDB Atlas, the actual database names might be different if db_name == "nanobanana": database_name = "nano_banana" # Use underscore to match Atlas elif db_name == "Descratch_logicgo": database_name = "Descratch_logicgo" else: database_name = db_name print(f"[MongoDB] Using database name: {database_name}") db = mongo_client.get_database(database_name) collection = db.get_collection("image_edits") # Add database name to the data data["database"] = db_name # Insert document print(f"[MongoDB] Inserting document with task_id: {data.get('task_id')}") try: result = collection.insert_one(data) print(f"[MongoDB] SUCCESS: Saved to MongoDB ({db_name}) -> Database: {database_name}, Collection: image_edits, ID: {result.inserted_id}") return True except Exception as insert_error: # If insert fails due to connection issues, clear cache and retry once if "SSL" in str(insert_error) or "TLS" in str(insert_error) or "connection" in str(insert_error).lower(): print(f"[MongoDB] Insert failed due to connection issue, clearing cache and retrying...") if db_name in mongodb_clients: del mongodb_clients[db_name] # Retry getting client and inserting mongo_client = get_mongodb_client(db_name) if mongo_client: db = mongo_client.get_database(database_name) collection = db.get_collection("image_edits") result = collection.insert_one(data) print(f"[MongoDB] SUCCESS: Saved to MongoDB ({db_name}) after retry -> ID: {result.inserted_id}") return True raise # Re-raise if not a connection error except Exception as e: import traceback print(f"[MongoDB] ERROR saving to MongoDB ({db_name}): {str(e)}") print(f"[MongoDB] Traceback: {traceback.format_exc()}") # Clear cached client on error so next attempt will retry if db_name in mongodb_clients: del mongodb_clients[db_name] return False def get_mongodb_admin_client(): """Get MongoDB client for the admin database""" if "admin" in mongodb_clients: try: # Test if cached client is still alive mongodb_clients["admin"].admin.command('ping') return mongodb_clients["admin"] except: # Client is dead, remove from cache del mongodb_clients["admin"] if not MONGODB_ADMIN: print("[MongoDB] MONGODB_admin connection string not set") return None try: # Use relaxed SSL settings (works in Hugging Face Spaces) try: client = MongoClient( MONGODB_ADMIN, tlsAllowInvalidCertificates=True, serverSelectionTimeoutMS=15000, connectTimeoutMS=20000, socketTimeoutMS=30000 ) client.admin.command('ping') mongodb_clients["admin"] = client print(f"[MongoDB] Successfully connected to admin database") return client except Exception as e: print(f"[MongoDB] Connection failed: {str(e)[:200]}") raise except Exception as e: print(f"[MongoDB] Error connecting to admin MongoDB: {str(e)[:200]}") import traceback print(f"[MongoDB] Traceback: {traceback.format_exc()[:500]}") return None def save_media_click(user_id: Optional[Union[int, str]], category_id: str): """ Save or update media click in admin database media_clicks collection. Args: user_id: Optional integer or ObjectId string. If None, a new ObjectId will be generated automatically. category_id: Required MongoDB ObjectId string for category. Structure: { userId: ObjectId, categories: [ { categoryId: ObjectId, click_count: int, lastClickedAt: Date } ], ai_edit_complete: int, # Count of times user used any model (default: 0 for new users) ai_edit_last_date: Date, # Last date user used any model (stored as Date object) createdAt: Date, updatedAt: Date } """ if not MONGODB_ADMIN: print("[MongoDB] MONGODB_admin not configured, skipping media_clicks logging") return False if not category_id: print("[MongoDB] category_id not provided, skipping media_clicks logging") return False try: mongo_client = get_mongodb_admin_client() if mongo_client is None: print(f"[MongoDB] ERROR: Could not connect to admin MongoDB") if "admin" in mongodb_clients: del mongodb_clients["admin"] return False # Get the database name from connection string or use 'admin' as default db_name = "admin" # default try: parsed = urlparse(MONGODB_ADMIN) if parsed.path and len(parsed.path) > 1: # Extract database name from path (remove leading /) db_name = parsed.path.lstrip('/').split('/')[0] or "admin" except: pass # Use default "admin" db = mongo_client.get_database(db_name) collection = db.get_collection("media_clicks") # Handle user_id: use provided ObjectId string directly or convert integer deterministically; otherwise generate if user_id is None: user_object_id = ObjectId() print(f"[MongoDB] user_id not provided, generated new ObjectId: {user_object_id}") else: try: user_id_str = str(user_id).strip() if len(user_id_str) == 24: user_object_id = ObjectId(user_id_str) print(f"[MongoDB] Using provided user_id as ObjectId: {user_object_id}") else: # Try integer -> deterministic ObjectId based on hex padded to 24 chars user_id_int = int(user_id_str) user_id_hex = hex(user_id_int)[2:].ljust(24, "0")[:24] user_object_id = ObjectId(user_id_hex) print(f"[MongoDB] Converted integer user_id '{user_id_str}' -> ObjectId: {user_object_id}") except Exception as e: print(f"[MongoDB] Error converting user_id to ObjectId: {e}, user_id: {user_id}, type: {type(user_id)}") user_object_id = ObjectId() print(f"[MongoDB] Generated new ObjectId due to conversion error: {user_object_id}") # Convert category_id to ObjectId try: category_object_id = ObjectId(category_id) except Exception as e: print(f"[MongoDB] Error converting category_id to ObjectId: {e}, category_id: {category_id}") return False now = datetime.utcnow() # ai_edit_last_date will be stored as Date object (not string) # Check if document with userId exists existing_doc = collection.find_one({"userId": user_object_id}) if existing_doc: today_daily_counts = _build_ai_edit_daily_count(existing_doc.get("ai_edit_daily_count"), now) print(f"[MongoDB] Found existing document for userId: {user_object_id}, categories count: {len(existing_doc.get('categories', []))}") # Document exists, check if category exists in categories array category_exists = False for cat in existing_doc.get("categories", []): cat_id = cat.get("categoryId") if isinstance(cat_id, ObjectId): if cat_id == category_object_id: category_exists = True current_count = cat.get("click_count", 0) print(f"[MongoDB] Category found - categoryId: {category_object_id}, current click_count: {current_count}") break elif str(cat_id) == str(category_object_id): category_exists = True current_count = cat.get("click_count", 0) print(f"[MongoDB] Category found (string match) - categoryId: {category_object_id}, current click_count: {current_count}") break if category_exists: # Category exists, increment click_count and update lastClickedAt # For existing users: increment ai_edit_complete by 1 and update ai_edit_last_date to current date result = collection.update_one( { "userId": user_object_id, "categories.categoryId": category_object_id }, { "$inc": { "categories.$.click_count": 1, "ai_edit_complete": 1 # Increment, default to 0 if field doesn't exist }, "$set": { "categories.$.lastClickedAt": now, "ai_edit_last_date": now, "updatedAt": now, "ai_edit_daily_count": today_daily_counts } } ) if result.modified_count > 0: print(f"[MongoDB] Updated category click_count - userId: {user_object_id}, categoryId: {category_object_id}, matched: {result.matched_count}, modified: {result.modified_count}") else: print(f"[MongoDB] WARNING: Update matched but did not modify - userId: {user_object_id}, categoryId: {category_object_id}, matched: {result.matched_count}, modified: {result.modified_count}") else: # Category doesn't exist, add new category to array # For existing users: increment ai_edit_complete by 1 and update ai_edit_last_date to current date print(f"[MongoDB] Category not found in existing document, adding new category - userId: {user_object_id}, categoryId: {category_object_id}") result = collection.update_one( {"userId": user_object_id}, { "$push": { "categories": { "categoryId": category_object_id, "click_count": 1, "lastClickedAt": now } }, "$inc": { "ai_edit_complete": 1 # Increment, default to 0 if field doesn't exist }, "$set": { "ai_edit_last_date": now, "updatedAt": now, "ai_edit_daily_count": today_daily_counts } } ) if result.modified_count > 0: print(f"[MongoDB] Added new category - userId: {user_object_id}, categoryId: {category_object_id}, matched: {result.matched_count}, modified: {result.modified_count}") else: print(f"[MongoDB] WARNING: Add category matched but did not modify - userId: {user_object_id}, categoryId: {category_object_id}, matched: {result.matched_count}, modified: {result.modified_count}") else: # Document doesn't exist, try to create new document # Use find_one_and_update with upsert which handles conflicts better # For new users: set ai_edit_complete = 1 directly (first model use) try: today_daily_counts_new = _build_ai_edit_daily_count(None, now) result = collection.find_one_and_update( {"userId": user_object_id}, { "$setOnInsert": { "userId": user_object_id, "createdAt": now, "ai_edit_complete": 1, # Set to 1 for first model use (new user) "ai_edit_daily_count": today_daily_counts_new }, "$push": { "categories": { "categoryId": category_object_id, "click_count": 1, "lastClickedAt": now } }, "$set": { "ai_edit_last_date": now, "updatedAt": now } }, upsert=True, return_document=ReturnDocument.AFTER ) if result: print(f"[MongoDB] Inserted/Updated document - userId: {user_object_id}, categoryId: {category_object_id}, ID: {result.get('_id')}") except DuplicateKeyError as dke: # If duplicate key error, try to find existing document and update it print(f"[MongoDB] Duplicate key error on insert: {str(dke)[:200]}") print(f"[MongoDB] Attempting to find and update existing document...") try: existing = collection.find_one({"userId": user_object_id}) if existing: # Document exists, add category existing_daily_counts = _build_ai_edit_daily_count(existing.get("ai_edit_daily_count"), now) result = collection.update_one( {"userId": user_object_id}, { "$push": { "categories": { "categoryId": category_object_id, "click_count": 1, "lastClickedAt": now } }, "$inc": { "ai_edit_complete": 1 # Increment, default to 0 if field doesn't exist }, "$set": { "ai_edit_last_date": now, "updatedAt": now, "ai_edit_daily_count": existing_daily_counts } } ) print(f"[MongoDB] Added category to existing document - userId: {user_object_id}, categoryId: {category_object_id}") else: # Document doesn't exist but index conflict - try to drop old index programmatically print(f"[MongoDB] WARNING: Old index conflict detected. Attempting to handle...") try: # Try to drop the problematic index collection.drop_index("user_id_1_header_1_media_id_1") print(f"[MongoDB] Successfully dropped old index. Retrying insert...") # Retry the insert today_daily_counts_new_retry = _build_ai_edit_daily_count(None, now) result = collection.find_one_and_update( {"userId": user_object_id}, { "$setOnInsert": { "userId": user_object_id, "createdAt": now, "ai_edit_complete": 1, # Set to 1 for first model use (new user) "ai_edit_daily_count": today_daily_counts_new_retry }, "$push": { "categories": { "categoryId": category_object_id, "click_count": 1, "lastClickedAt": now } }, "$set": { "ai_edit_last_date": now, "updatedAt": now } }, upsert=True, return_document=ReturnDocument.AFTER ) print(f"[MongoDB] Successfully inserted after dropping old index - userId: {user_object_id}, categoryId: {category_object_id}") except Exception as drop_error: print(f"[MongoDB] Could not drop index (may not have permissions): {str(drop_error)[:200]}") print(f"[MongoDB] ERROR: Cannot create document due to old index. Please drop index 'user_id_1_header_1_media_id_1' manually in MongoDB.") return False except Exception as find_error: print(f"[MongoDB] Error finding existing document: {str(find_error)[:200]}") return False return True except Exception as e: import traceback # Handle duplicate key errors gracefully (might be due to old index) if isinstance(e, DuplicateKeyError): print(f"[MongoDB] Duplicate key error (likely due to old index). Trying to update existing document...") try: # Try to find and update existing document existing = collection.find_one({"userId": user_object_id}) if existing: # Document exists, try to add category now_retry = datetime.utcnow() existing_daily_counts_retry = _build_ai_edit_daily_count(existing.get("ai_edit_daily_count"), now_retry) result = collection.update_one( {"userId": user_object_id}, { "$push": { "categories": { "categoryId": category_object_id, "click_count": 1, "lastClickedAt": now_retry } }, "$inc": { "ai_edit_complete": 1 # Increment, default to 0 if field doesn't exist }, "$set": { "ai_edit_last_date": now_retry, "updatedAt": now_retry, "ai_edit_daily_count": existing_daily_counts_retry } } ) print(f"[MongoDB] Recovered from duplicate key error - updated document") return True except Exception as retry_error: print(f"[MongoDB] Failed to recover from duplicate key error: {retry_error}") print(f"[MongoDB] ERROR saving media_clicks: {str(e)}") print(f"[MongoDB] Traceback: {traceback.format_exc()}") # Clear cached client on error so next attempt will retry if "admin" in mongodb_clients: del mongodb_clients["admin"] return False @app.get("/") async def root(): return {"message": "Qwen Image Edit API (hosted inference) running."} @app.get("/images/{task_id}") async def get_image(task_id: str): """ Get the edited image by task_id. Returns the image file. NOTE: This endpoint is PUBLIC and does NOT require authentication. Images can be accessed directly via URL without Firebase App Check or any other auth. """ # Use results directory which persists in Hugging Face Spaces os.makedirs("results", exist_ok=True) image_path = f"results/{task_id}.png" if not os.path.exists(image_path): return JSONResponse({"error": "Image not found"}, status_code=404) return FileResponse(image_path, media_type="image/png") @app.get("/logs/file/{db_name}") async def get_file_logs(db_name: str): """Get file-based logs when MongoDB is unavailable""" if db_name not in ["nanobanana", "Descratch_logicgo"]: return JSONResponse({"error": "Invalid database name"}, 400) try: import json log_dir = "/tmp/mongodb_logs" today = datetime.utcnow().strftime('%Y%m%d') filename = f"{log_dir}/{db_name}_{today}.jsonl" if not os.path.exists(filename): return JSONResponse({ "message": f"No log file found for {db_name} today", "filename": filename }, 404) # Read and parse log file logs = [] with open(filename, "r") as f: for line in f: if line.strip(): try: logs.append(json.loads(line)) except: pass return { "database": db_name, "filename": filename, "count": len(logs), "logs": logs[-50:] # Last 50 entries } except Exception as e: return JSONResponse({"error": str(e)}, 500) @app.get("/health") async def health(): """Health check endpoint with MongoDB connection status""" mongodb_status = {} for db_name in ["nanobanana", "Descratch_logicgo"]: try: client = get_mongodb_client(db_name) if client: # Try to ping the database database_name = "nano_banana" if db_name == "nanobanana" else "Descratch_logicgo" db = client.get_database(database_name) db.command('ping') # Try to count documents to verify write access collection = db.get_collection("image_edits") count = collection.count_documents({}) mongodb_status[db_name] = f"connected (documents: {count})" else: mongodb_status[db_name] = "disconnected" except Exception as e: mongodb_status[db_name] = f"error: {str(e)[:100]}" return { "status": "healthy", "proxy_mode": True, "model_loaded": MODEL_READY, "mongodb": mongodb_status } @app.post("/test-mongodb") async def test_mongodb( db: str = Form(...), app_check_claims: dict = Depends(verify_app_check_token), ): """Test endpoint to verify MongoDB connection and save a test document""" if db not in ["nanobanana", "Descratch_logicgo"]: return JSONResponse({"error": "Invalid database name"}, 400) # First, test connection connection_error = None try: mongo_client = get_mongodb_client(db) if mongo_client is None: connection_error = "get_mongodb_client returned None" else: # Test ping mongo_client.admin.command('ping') except Exception as e: connection_error = f"Connection failed: {str(e)[:300]}" import traceback connection_error += f"\nTraceback: {traceback.format_exc()[:500]}" if connection_error: return JSONResponse({ "status": "failed", "message": f"MongoDB connection failed", "error": connection_error }, 500) # Try to save test document test_data = { "task_id": str(uuid.uuid4()), "timestamp": datetime.utcnow().isoformat(), "test": True, "status": "test", "message": "This is a test document" } save_error = None try: success = save_to_mongodb(db, test_data) if not success: save_error = "save_to_mongodb returned False (check logs for details)" except Exception as e: save_error = f"Save exception: {str(e)[:300]}" import traceback save_error += f"\nTraceback: {traceback.format_exc()[:500]}" if save_error: return JSONResponse({ "status": "failed", "message": f"Failed to save test document to {db}", "error": save_error, "connection": "OK" if not connection_error else "FAILED" }, 500) return { "status": "success", "message": f"Test document saved to {db}", "task_id": test_data["task_id"], "connection": "OK" } async def process_image_edit( image: UploadFile, prompt: str, db: Optional[str] = None, task_id: Optional[str] = None, start_time: Optional[datetime] = None, request: Optional[Request] = None, user_id: Optional[Union[int, str]] = None, category_id: Optional[str] = None, ): """ Shared function to process image editing requests. Returns: (response, mongo_data) """ if task_id is None: task_id = str(uuid.uuid4()) if start_time is None: start_time = datetime.utcnow() # Prepare MongoDB data mongo_data = { "task_id": task_id, "timestamp": start_time.isoformat(), "prompt": prompt, "status": "processing", "error": None } if user_id: mongo_data["user_id"] = user_id if client is None: error_msg = "Inference client not initialized" mongo_data["status"] = "failed" mongo_data["error"] = error_msg if db: save_to_mongodb(db, mongo_data) return JSONResponse({"error": error_msg}, 500), mongo_data # Validate database parameter if db and db not in ["nanobanana", "Descratch_logicgo"]: error_msg = f"Invalid database name. Must be 'nanobanana' or 'Descratch_logicgo'" mongo_data["status"] = "failed" mongo_data["error"] = error_msg if db: save_to_mongodb(db, mongo_data) return JSONResponse({"error": error_msg}, 400), mongo_data try: # Validate image format if image.content_type and not image.content_type.startswith('image/'): error_msg = f"File must be an image. Got: {image.content_type}" mongo_data["status"] = "failed" mongo_data["error"] = error_msg if db: save_to_mongodb(db, mongo_data) return JSONResponse({"error": error_msg}, 400), mongo_data # Validate that the file can be opened as an image image_bytes = await image.read() image_size = len(image_bytes) mongo_data["image_size_bytes"] = image_size try: test_img = Image.open(io.BytesIO(image_bytes)) test_img.verify() # Verify it's a valid image # Reopen image after verify (verify() closes the image) test_img = Image.open(io.BytesIO(image_bytes)) image_format = test_img.format image_dimensions = test_img.size width, height = image_dimensions mongo_data["image_format"] = image_format mongo_data["image_dimensions"] = {"width": width, "height": height} # Validate minimum image dimensions (250x250) MIN_DIMENSION = 250 if width < MIN_DIMENSION or height < MIN_DIMENSION: error_msg = f"Image size too small. Minimum dimensions required: {MIN_DIMENSION}x{MIN_DIMENSION} pixels. Your image: {width}x{height} pixels" mongo_data["status"] = "failed" mongo_data["error"] = error_msg if db: save_to_mongodb(db, mongo_data) return JSONResponse({"error": error_msg}, 400), mongo_data except Exception as img_error: error_msg = f"Invalid image format. Supported: JPEG, PNG, WEBP, etc. Error: {str(img_error)}" mongo_data["status"] = "failed" mongo_data["error"] = error_msg if db: save_to_mongodb(db, mongo_data) return JSONResponse({"error": error_msg}, 400), mongo_data # Process with InferenceClient (supports JPEG, PNG, WEBP, and other common formats) out = client.image_to_image( image_bytes, prompt=prompt, model=QWEN_MODEL_ID, return_single_image=True, ) if isinstance(out, Image.Image): # Save image with task_id as filename for easy retrieval # Use results directory which persists in Hugging Face Spaces (not /tmp which is ephemeral) os.makedirs("results", exist_ok=True) path = f"results/{task_id}.png" out.save(path) # Update MongoDB data with success end_time = datetime.utcnow() processing_time = (end_time - start_time).total_seconds() mongo_data["status"] = "completed" mongo_data["processing_time_seconds"] = processing_time mongo_data["output_path"] = path mongo_data["output_format"] = "PNG" # Get base URL from request or environment if request: base_url = str(request.base_url).rstrip('/') else: base_url = os.getenv("BASE_URL", "https://logicgoinfotechspaces-nano-banana-experimental-2.hf.space") # Ensure HTTPS in the returned URL (Hugging Face may report http internally) parsed = urlparse(base_url) if parsed.scheme == "http": parsed = parsed._replace(scheme="https") base_url = urlunparse(parsed).rstrip('/') image_url = f"{base_url}/images/{task_id}" if image_url.startswith("http://"): image_url = "https://" + image_url[len("http://"):] mongo_data["image_url"] = image_url # Save to MongoDB if db parameter is provided (MANDATORY - always save regardless of user_id) if db: print(f"[API] Saving to MongoDB - db: {db}, task_id: {task_id}") save_success = save_to_mongodb(db, mongo_data) print(f"[API] MongoDB save result: {save_success}") else: print(f"[API] No db parameter provided, skipping MongoDB save") # Save media click to admin database ONLY if user_id is provided if category_id and user_id is not None: print(f"[API] Saving media click - category_id: {category_id}, user_id: {user_id}") save_media_click(user_id=user_id, category_id=category_id) elif category_id and user_id is None: print(f"[API] Skipping media_clicks logging - user_id not provided (but saved to {db} MongoDB)") # Return JSON response with image URL return JSONResponse({ "status": "success", "task_id": task_id, "image_url": image_url, "processing_time_seconds": processing_time, "message": "Image edited successfully" }), mongo_data # Unexpected provider response error_msg = "Unexpected provider response" mongo_data["status"] = "failed" mongo_data["error"] = error_msg if db: save_to_mongodb(db, mongo_data) return JSONResponse({"error": error_msg}, 500), mongo_data except Exception as e: error_msg = str(e) mongo_data["status"] = "failed" mongo_data["error"] = error_msg if db: save_to_mongodb(db, mongo_data) return JSONResponse({"error": error_msg}, 500), mongo_data @app.post("/edit/nanobanana") async def edit_image_nanobanana( request: Request, image: UploadFile = File(...), user_id: Optional[str] = Form(None), category_id: Optional[str] = Form(None), app_check_claims: dict = Depends(verify_app_check_token), ): """ Edit an image using the default nanobanana prompt. Automatically saves to nanobanana MongoDB database. Uses categoryId: 69368d62b95a6c2a75920505 for media_clicks logging (default, can be overridden). Parameters: - image: Image file to edit - user_id: Optional integer user ID - category_id: Optional MongoDB ObjectId string for category (defaults to 69368d62b95a6c2a75920505) """ task_id = str(uuid.uuid4()) start_time = datetime.utcnow() print(f"[API] Nanobanana edit request - task_id: {task_id}") # Process user_id: accept ObjectId string (24 hex) or integer; fall back to auto-generate user_id_processed = None if user_id: user_id_str = str(user_id).strip() # Case 1: valid ObjectId format if len(user_id_str) == 24: try: ObjectId(user_id_str) user_id_processed = user_id_str print(f"[API] Using user_id as ObjectId string: {user_id_str}") except Exception as e: print(f"[API] WARNING: Invalid ObjectId format for user_id '{user_id_str}': {e}. Will auto-generate user_id.") user_id_processed = None else: # Case 2: try integer -> deterministic ObjectId try: user_id_int = int(user_id_str) user_id_hex = hex(user_id_int)[2:].ljust(24, "0")[:24] ObjectId(user_id_hex) # validate user_id_processed = user_id_hex print(f"[API] Using user_id from integer -> ObjectId hex: {user_id_hex}") except Exception as e: print(f"[API] WARNING: user_id '{user_id_str}' is not valid int or ObjectId. Auto-generating. Detail: {e}") user_id_processed = None # Will auto-generate in save_media_click # Set default categoryId for nanobanana if not provided if not category_id: category_id = "69368d62b95a6c2a75920505" response, mongo_data = await process_image_edit( image=image, prompt=DEFAULT_PROMPT_NANOBANANA, db="nanobanana", task_id=task_id, start_time=start_time, request=request, user_id=user_id_processed, category_id=category_id, ) return response @app.post("/edit/descratch") async def edit_image_descratch( request: Request, image: UploadFile = File(...), user_id: Optional[str] = Form(None), category_id: Optional[str] = Form(None), app_check_claims: dict = Depends(verify_app_check_token), ): """ Edit an image using the default Descratch prompt. Automatically saves to Descratch_logicgo MongoDB database. Uses categoryId: 69368fbb2e46bd68ae18899e for media_clicks logging (default, can be overridden). Parameters: - image: Image file to edit - user_id: Optional integer user ID - category_id: Optional MongoDB ObjectId string for category (defaults to 69368fbb2e46bd68ae18899e) """ task_id = str(uuid.uuid4()) start_time = datetime.utcnow() print(f"[API] Descratch edit request - task_id: {task_id}") # Process user_id: accept ObjectId string (24 hex) or integer; fall back to auto-generate user_id_processed = None if user_id: user_id_str = str(user_id).strip() # Case 1: valid ObjectId format if len(user_id_str) == 24: try: ObjectId(user_id_str) user_id_processed = user_id_str print(f"[API] Using user_id as ObjectId string: {user_id_str}") except Exception as e: print(f"[API] WARNING: Invalid ObjectId format for user_id '{user_id_str}': {e}. Will auto-generate user_id.") user_id_processed = None else: # Case 2: try integer -> deterministic ObjectId try: user_id_int = int(user_id_str) user_id_hex = hex(user_id_int)[2:].ljust(24, "0")[:24] ObjectId(user_id_hex) # validate user_id_processed = user_id_hex print(f"[API] Using user_id from integer -> ObjectId hex: {user_id_hex}") except Exception as e: print(f"[API] WARNING: user_id '{user_id_str}' is not valid int or ObjectId. Auto-generating. Detail: {e}") user_id_processed = None # Will auto-generate in save_media_click # Set default categoryId for descratch if not provided if not category_id: category_id = "69368fbb2e46bd68ae18899e" response, mongo_data = await process_image_edit( image=image, prompt=DEFAULT_PROMPT_DESCRATCH, db="Descratch_logicgo", task_id=task_id, start_time=start_time, request=request, user_id=user_id_processed, category_id=category_id, ) return response @app.post("/edit") async def edit_image( request: Request, prompt: str = Form(...), image: UploadFile = File(...), db: Optional[str] = Form(None), user_id: Optional[str] = Form(None), category_id: Optional[str] = Form(None), app_check_claims: dict = Depends(verify_app_check_token), ): """ Edit an image using a custom text prompt. Parameters: - prompt: Text description of the desired edit (required) - image: Image file to edit - db: Database to store the request (optional): "nanobanana" or "Descratch_logicgo" If not provided, request is not saved to MongoDB - user_id: Optional integer user ID - category_id: Optional MongoDB ObjectId string for category (e.g., "69368fbb2e46bd68ae18899e") """ task_id = str(uuid.uuid4()) start_time = datetime.utcnow() # Log the request print(f"[API] Custom edit request - task_id: {task_id}, db: {db}, prompt: {prompt[:50]}...") # Validate prompt is provided if not prompt or prompt.strip() == "": return JSONResponse({ "error": "Prompt parameter is required for custom edits" }, 400) # Process user_id: accept ObjectId string (24 hex) or integer; fall back to auto-generate user_id_processed = None if user_id: user_id_str = str(user_id).strip() # Case 1: valid ObjectId format if len(user_id_str) == 24: try: ObjectId(user_id_str) user_id_processed = user_id_str print(f"[API] Using user_id as ObjectId string: {user_id_str}") except Exception as e: print(f"[API] WARNING: Invalid ObjectId format for user_id '{user_id_str}': {e}. Will auto-generate user_id.") user_id_processed = None else: # Case 2: try integer -> deterministic ObjectId try: user_id_int = int(user_id_str) user_id_hex = hex(user_id_int)[2:].ljust(24, "0")[:24] ObjectId(user_id_hex) # validate user_id_processed = user_id_hex print(f"[API] Using user_id from integer -> ObjectId hex: {user_id_hex}") except Exception as e: print(f"[API] WARNING: user_id '{user_id_str}' is not valid int or ObjectId. Auto-generating. Detail: {e}") user_id_processed = None # Will auto-generate in save_media_click response, mongo_data = await process_image_edit( image=image, prompt=prompt, db=db, task_id=task_id, start_time=start_time, request=request, user_id=user_id_processed, category_id=category_id, ) return response