import os import base64 import numpy as np import cv2 import faiss import torch import insightface from fastapi import FastAPI, HTTPException from pydantic import BaseModel from typing import List, Dict, Any, Optional from PIL import Image, ImageOps import io import logging from datetime import datetime app = FastAPI(title="Orcan VisionTrace Hybrid GPU Service", version="1.0.0") # Global models face_app = None use_gpu_face_recognition = False class BatchEmbeddingRequest(BaseModel): images: List[str] # Base64 encoded images enhance_quality: bool = True aggressive_enhancement: bool = False class IndexCreationRequest(BaseModel): embeddings: List[List[float]] dataset_size: int dimension: int = 512 @app.on_event("startup") async def startup_event(): global face_app, use_gpu_face_recognition print("Starting Orcan VisionTrace Hybrid Service...") # Check GPU availability use_gpu_face_recognition = torch.cuda.is_available() print(f"CUDA Available: {use_gpu_face_recognition}") if use_gpu_face_recognition: print("GPU detected - Using CUDA for face recognition") providers = ['CUDAExecutionProvider', 'CPUExecutionProvider'] ctx_id = 0 else: print("No GPU detected - Using CPU for face recognition") providers = ['CPUExecutionProvider'] ctx_id = -1 try: # Initialize InsightFace face_app = insightface.app.FaceAnalysis( providers=providers, allowed_modules=['detection', 'recognition'] ) face_app.prepare(ctx_id=ctx_id, det_size=(640, 640)) print("InsightFace initialized successfully") except Exception as e: print(f"Error initializing InsightFace: {e}") # Fallback to CPU face_app = insightface.app.FaceAnalysis( providers=['CPUExecutionProvider'], allowed_modules=['detection', 'recognition'] ) face_app.prepare(ctx_id=-1, det_size=(640, 640)) use_gpu_face_recognition = False print("Fallback to CPU face recognition") print(f"Service ready - Face Recognition: {'GPU' if use_gpu_face_recognition else 'CPU'}, FAISS: CPU") @app.get("/") async def root(): return { "service": "Orcan VisionTrace Hybrid GPU Service", "status": "running", "face_recognition": "GPU" if use_gpu_face_recognition else "CPU", "faiss_indexing": "CPU", "version": "1.0.0" } @app.get("/health") async def health_check(): return { "status": "healthy", "gpu_available": torch.cuda.is_available(), "face_model_loaded": face_app is not None, "using_gpu_face_recognition": use_gpu_face_recognition, "faiss_mode": "CPU", "timestamp": datetime.utcnow().isoformat() } @app.post("/extract_embeddings_batch") async def extract_embeddings_batch(request: BatchEmbeddingRequest): """Extract face embeddings from multiple images using GPU acceleration""" try: embeddings = [] extraction_info = [] print(f"Processing batch of {len(request.images)} images") for idx, img_b64 in enumerate(request.images): try: # Decode base64 image img_data = base64.b64decode(img_b64) img_array = np.frombuffer(img_data, dtype=np.uint8) img = cv2.imdecode(img_array, cv2.IMREAD_COLOR) if img is None: embeddings.append(None) extraction_info.append({"error": "Failed to decode image", "index": idx}) continue # Apply enhancement if requested if request.enhance_quality: img = enhance_image(img, request.aggressive_enhancement) # Extract face embeddings using GPU/CPU faces = face_app.get(img) if len(faces) == 0: embeddings.append(None) extraction_info.append({ "face_count": 0, "strategy_used": "gpu_batch" if use_gpu_face_recognition else "cpu_batch", "enhancement_used": request.enhance_quality, "index": idx }) continue # Get best face (largest bounding box) face = max(faces, key=lambda x: (x.bbox[2] - x.bbox[0]) * (x.bbox[3] - x.bbox[1])) embedding = face.embedding # Normalize embedding embedding = embedding / np.linalg.norm(embedding) embeddings.append(embedding.tolist()) # Calculate quality metrics bbox_area = (face.bbox[2] - face.bbox[0]) * (face.bbox[3] - face.bbox[1]) img_area = img.shape[0] * img.shape[1] face_size_ratio = bbox_area / img_area extraction_info.append({ "face_count": len(faces), "confidence": float(face_size_ratio), "strategy_used": "gpu_batch" if use_gpu_face_recognition else "cpu_batch", "enhancement_used": request.enhance_quality, "quality_score": min(face_size_ratio * 2.0, 1.0), "bbox_area": float(bbox_area), "index": idx }) except Exception as e: embeddings.append(None) extraction_info.append({"error": str(e), "index": idx}) successful_count = len([e for e in embeddings if e is not None]) print(f"Batch processing complete: {successful_count}/{len(request.images)} successful") return { "embeddings": embeddings, "extraction_info": extraction_info, "total_processed": len(request.images), "successful": successful_count, "processing_mode": "gpu" if use_gpu_face_recognition else "cpu" } except Exception as e: print(f"Batch processing error: {e}") raise HTTPException(status_code=500, detail=str(e)) def enhance_image(img, aggressive=False): """Enhanced image quality improvement""" try: if aggressive: # Aggressive enhancement for very poor quality images img = cv2.bilateralFilter(img, 15, 90, 90) # Histogram equalization lab = cv2.cvtColor(img, cv2.COLOR_BGR2LAB) l, a, b = cv2.split(lab) clahe = cv2.createCLAHE(clipLimit=4.0, tileGridSize=(8,8)) l = clahe.apply(l) img = cv2.merge([l, a, b]) img = cv2.cvtColor(img, cv2.COLOR_LAB2BGR) # Strong sharpening kernel = np.array([[-1,-1,-1], [-1, 12,-1], [-1,-1,-1]]) img = cv2.filter2D(img, -1, kernel) # Gamma correction gamma = 1.4 inv_gamma = 1.0 / gamma table = np.array([((i / 255.0) ** inv_gamma) * 255 for i in np.arange(0, 256)]).astype("uint8") img = cv2.LUT(img, table) else: # Standard enhancement img = cv2.bilateralFilter(img, 9, 75, 75) # Sharpening kernel = np.array([[-1,-1,-1], [-1, 9,-1], [-1,-1,-1]]) img = cv2.filter2D(img, -1, kernel) # CLAHE lab = cv2.cvtColor(img, cv2.COLOR_BGR2LAB) l, a, b = cv2.split(lab) clahe = cv2.createCLAHE(clipLimit=3.0, tileGridSize=(8,8)) l = clahe.apply(l) img = cv2.merge([l, a, b]) img = cv2.cvtColor(img, cv2.COLOR_LAB2BGR) return img except Exception as e: print(f"Enhancement error: {e}") return img @app.post("/create_faiss_index") async def create_faiss_index(request: IndexCreationRequest): """Create FAISS index using CPU (hybrid approach)""" try: embeddings_array = np.array(request.embeddings, dtype='float32') print(f"Creating FAISS index for {embeddings_array.shape[0]} vectors") # Choose optimal index type based on dataset size if request.dataset_size < 1000: index = faiss.IndexFlatL2(request.dimension) index_type = "IndexFlatL2" params = {} elif request.dataset_size < 50000: nlist = max(4, min(request.dataset_size // 39, 100)) quantizer = faiss.IndexFlatL2(request.dimension) index = faiss.IndexIVFFlat(quantizer, request.dimension, nlist) index_type = "IndexIVFFlat" params = {"nlist": nlist} else: nlist = max(100, min(request.dataset_size // 39, 1000)) quantizer = faiss.IndexFlatL2(request.dimension) index = faiss.IndexIVFPQ(quantizer, request.dimension, nlist, 64, 8) index_type = "IndexIVFPQ" params = {"nlist": nlist, "m": 64, "nbits": 8} # Train index if needed if hasattr(index, 'train') and not index.is_trained: print(f"Training {index_type} index...") index.train(embeddings_array) print("Index training completed") # Add vectors to index index.add(embeddings_array) print(f"Added {index.ntotal} vectors to index") # Serialize index index_data = faiss.serialize_index(index) index_b64 = base64.b64encode(index_data).decode() return { "index_data": index_b64, "index_type": f"CPU_{index_type}", "index_params": params, "vectors_added": index.ntotal, "dataset_size": request.dataset_size } except Exception as e: print(f"Index creation error: {e}") raise HTTPException(status_code=500, detail=str(e)) @app.post("/search_faiss") async def search_faiss(request: dict): """Perform similarity search using CPU FAISS""" try: # Deserialize index index_data = base64.b64decode(request["index_data"]) index = faiss.deserialize_index(np.frombuffer(index_data, dtype=np.uint8)) query_embedding = np.array([request["query_embedding"]], dtype='float32') k = request.get("k", 25) print(f"Searching index with {index.ntotal} vectors for top-{k}") # Perform search on CPU distances, indices = index.search(query_embedding, k) return { "distances": distances[0].tolist(), "indices": indices[0].tolist(), "total_vectors": index.ntotal, "search_mode": "cpu" } except Exception as e: print(f"Search error: {e}") raise HTTPException(status_code=500, detail=str(e)) if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)