from typing import Dict, List, Any import json import base64 import numpy as np import cv2 import torch import insightface from PIL import Image import io class EndpointHandler: def __init__(self, path=""): self.face_app = None self.use_gpu = False self._init_model() def _init_model(self): """Initialize InsightFace model""" self.use_gpu = torch.cuda.is_available() if self.use_gpu: providers = ['CUDAExecutionProvider', 'CPUExecutionProvider'] ctx_id = 0 else: providers = ['CPUExecutionProvider'] ctx_id = -1 self.face_app = insightface.app.FaceAnalysis( providers=providers, allowed_modules=['detection', 'recognition'] ) self.face_app.prepare(ctx_id=ctx_id, det_size=(640, 640)) print(f"Face model loaded: {'GPU' if self.use_gpu else 'CPU'}") def __call__(self, data: Dict[str, Any]) -> Dict[str, Any]: """Handle the actual inference request""" try: # Handle health check if data.get("inputs") == "test": return { "status": "healthy", "gpu_available": self.use_gpu, "model_loaded": self.face_app is not None } # Handle batch embedding extraction if "images" in data: return self._extract_embeddings_batch(data) return {"error": "Unknown request format"} except Exception as e: return {"error": str(e)} def _extract_embeddings_batch(self, data): """Extract embeddings from batch of images""" images = data.get("images", []) enhance_quality = data.get("enhance_quality", True) aggressive = data.get("aggressive_enhancement", False) embeddings = [] extraction_info = [] for idx, img_b64 in enumerate(images): try: # Decode image img_data = base64.b64decode(img_b64) img_array = np.frombuffer(img_data, dtype=np.uint8) img = cv2.imdecode(img_array, cv2.IMREAD_COLOR) if img is None: embeddings.append(None) extraction_info.append({"error": "Failed to decode", "index": idx}) continue # Enhance if requested if enhance_quality: img = self._enhance_image(img, aggressive) # Extract faces faces = self.face_app.get(img) if len(faces) == 0: embeddings.append(None) extraction_info.append({ "face_count": 0, "strategy_used": "gpu_batch" if self.use_gpu else "cpu_batch", "enhancement_used": enhance_quality, "index": idx }) continue # Get best face face = max(faces, key=lambda x: (x.bbox[2] - x.bbox[0]) * (x.bbox[3] - x.bbox[1])) embedding = face.embedding / np.linalg.norm(face.embedding) embeddings.append(embedding.tolist()) # Calculate metrics bbox_area = (face.bbox[2] - face.bbox[0]) * (face.bbox[3] - face.bbox[1]) img_area = img.shape[0] * img.shape[1] confidence = min((bbox_area / img_area) * 2.0, 1.0) extraction_info.append({ "face_count": len(faces), "confidence": float(confidence), "strategy_used": "gpu_batch" if self.use_gpu else "cpu_batch", "enhancement_used": enhance_quality, "quality_score": float(confidence), "index": idx }) except Exception as e: embeddings.append(None) extraction_info.append({"error": str(e), "index": idx}) successful = len([e for e in embeddings if e is not None]) return { "embeddings": embeddings, "extraction_info": extraction_info, "total_processed": len(images), "successful": successful, "processing_mode": "gpu" if self.use_gpu else "cpu" } def _enhance_image(self, img, aggressive=False): """Image enhancement logic""" try: if aggressive: img = cv2.bilateralFilter(img, 15, 90, 90) lab = cv2.cvtColor(img, cv2.COLOR_BGR2LAB) l, a, b = cv2.split(lab) clahe = cv2.createCLAHE(clipLimit=4.0, tileGridSize=(8,8)) l = clahe.apply(l) img = cv2.merge([l, a, b]) img = cv2.cvtColor(img, cv2.COLOR_LAB2BGR) else: img = cv2.bilateralFilter(img, 9, 75, 75) kernel = np.array([[-1,-1,-1], [-1, 9,-1], [-1,-1,-1]]) img = cv2.filter2D(img, -1, kernel) return img except: return img