itsbava's picture
Upload 4 files
1897004 verified
import os
import base64
import numpy as np
import cv2
import faiss
import torch
import insightface
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import List, Dict, Any, Optional
from PIL import Image, ImageOps
import io
import logging
from datetime import datetime
app = FastAPI(title="Orcan VisionTrace Hybrid GPU Service", version="1.0.0")
# Global models
face_app = None
use_gpu_face_recognition = False
class BatchEmbeddingRequest(BaseModel):
images: List[str] # Base64 encoded images
enhance_quality: bool = True
aggressive_enhancement: bool = False
class IndexCreationRequest(BaseModel):
embeddings: List[List[float]]
dataset_size: int
dimension: int = 512
@app.on_event("startup")
async def startup_event():
global face_app, use_gpu_face_recognition
print("Starting Orcan VisionTrace Hybrid Service...")
# Check GPU availability
use_gpu_face_recognition = torch.cuda.is_available()
print(f"CUDA Available: {use_gpu_face_recognition}")
if use_gpu_face_recognition:
print("GPU detected - Using CUDA for face recognition")
providers = ['CUDAExecutionProvider', 'CPUExecutionProvider']
ctx_id = 0
else:
print("No GPU detected - Using CPU for face recognition")
providers = ['CPUExecutionProvider']
ctx_id = -1
try:
# Initialize InsightFace
face_app = insightface.app.FaceAnalysis(
providers=providers,
allowed_modules=['detection', 'recognition']
)
face_app.prepare(ctx_id=ctx_id, det_size=(640, 640))
print("InsightFace initialized successfully")
except Exception as e:
print(f"Error initializing InsightFace: {e}")
# Fallback to CPU
face_app = insightface.app.FaceAnalysis(
providers=['CPUExecutionProvider'],
allowed_modules=['detection', 'recognition']
)
face_app.prepare(ctx_id=-1, det_size=(640, 640))
use_gpu_face_recognition = False
print("Fallback to CPU face recognition")
print(f"Service ready - Face Recognition: {'GPU' if use_gpu_face_recognition else 'CPU'}, FAISS: CPU")
@app.get("/")
async def root():
return {
"service": "Orcan VisionTrace Hybrid GPU Service",
"status": "running",
"face_recognition": "GPU" if use_gpu_face_recognition else "CPU",
"faiss_indexing": "CPU",
"version": "1.0.0"
}
@app.get("/health")
async def health_check():
return {
"status": "healthy",
"gpu_available": torch.cuda.is_available(),
"face_model_loaded": face_app is not None,
"using_gpu_face_recognition": use_gpu_face_recognition,
"faiss_mode": "CPU",
"timestamp": datetime.utcnow().isoformat()
}
@app.post("/extract_embeddings_batch")
async def extract_embeddings_batch(request: BatchEmbeddingRequest):
"""Extract face embeddings from multiple images using GPU acceleration"""
try:
embeddings = []
extraction_info = []
print(f"Processing batch of {len(request.images)} images")
for idx, img_b64 in enumerate(request.images):
try:
# Decode base64 image
img_data = base64.b64decode(img_b64)
img_array = np.frombuffer(img_data, dtype=np.uint8)
img = cv2.imdecode(img_array, cv2.IMREAD_COLOR)
if img is None:
embeddings.append(None)
extraction_info.append({"error": "Failed to decode image", "index": idx})
continue
# Apply enhancement if requested
if request.enhance_quality:
img = enhance_image(img, request.aggressive_enhancement)
# Extract face embeddings using GPU/CPU
faces = face_app.get(img)
if len(faces) == 0:
embeddings.append(None)
extraction_info.append({
"face_count": 0,
"strategy_used": "gpu_batch" if use_gpu_face_recognition else "cpu_batch",
"enhancement_used": request.enhance_quality,
"index": idx
})
continue
# Get best face (largest bounding box)
face = max(faces, key=lambda x: (x.bbox[2] - x.bbox[0]) * (x.bbox[3] - x.bbox[1]))
embedding = face.embedding
# Normalize embedding
embedding = embedding / np.linalg.norm(embedding)
embeddings.append(embedding.tolist())
# Calculate quality metrics
bbox_area = (face.bbox[2] - face.bbox[0]) * (face.bbox[3] - face.bbox[1])
img_area = img.shape[0] * img.shape[1]
face_size_ratio = bbox_area / img_area
extraction_info.append({
"face_count": len(faces),
"confidence": float(face_size_ratio),
"strategy_used": "gpu_batch" if use_gpu_face_recognition else "cpu_batch",
"enhancement_used": request.enhance_quality,
"quality_score": min(face_size_ratio * 2.0, 1.0),
"bbox_area": float(bbox_area),
"index": idx
})
except Exception as e:
embeddings.append(None)
extraction_info.append({"error": str(e), "index": idx})
successful_count = len([e for e in embeddings if e is not None])
print(f"Batch processing complete: {successful_count}/{len(request.images)} successful")
return {
"embeddings": embeddings,
"extraction_info": extraction_info,
"total_processed": len(request.images),
"successful": successful_count,
"processing_mode": "gpu" if use_gpu_face_recognition else "cpu"
}
except Exception as e:
print(f"Batch processing error: {e}")
raise HTTPException(status_code=500, detail=str(e))
def enhance_image(img, aggressive=False):
"""Enhanced image quality improvement"""
try:
if aggressive:
# Aggressive enhancement for very poor quality images
img = cv2.bilateralFilter(img, 15, 90, 90)
# Histogram equalization
lab = cv2.cvtColor(img, cv2.COLOR_BGR2LAB)
l, a, b = cv2.split(lab)
clahe = cv2.createCLAHE(clipLimit=4.0, tileGridSize=(8,8))
l = clahe.apply(l)
img = cv2.merge([l, a, b])
img = cv2.cvtColor(img, cv2.COLOR_LAB2BGR)
# Strong sharpening
kernel = np.array([[-1,-1,-1], [-1, 12,-1], [-1,-1,-1]])
img = cv2.filter2D(img, -1, kernel)
# Gamma correction
gamma = 1.4
inv_gamma = 1.0 / gamma
table = np.array([((i / 255.0) ** inv_gamma) * 255 for i in np.arange(0, 256)]).astype("uint8")
img = cv2.LUT(img, table)
else:
# Standard enhancement
img = cv2.bilateralFilter(img, 9, 75, 75)
# Sharpening
kernel = np.array([[-1,-1,-1], [-1, 9,-1], [-1,-1,-1]])
img = cv2.filter2D(img, -1, kernel)
# CLAHE
lab = cv2.cvtColor(img, cv2.COLOR_BGR2LAB)
l, a, b = cv2.split(lab)
clahe = cv2.createCLAHE(clipLimit=3.0, tileGridSize=(8,8))
l = clahe.apply(l)
img = cv2.merge([l, a, b])
img = cv2.cvtColor(img, cv2.COLOR_LAB2BGR)
return img
except Exception as e:
print(f"Enhancement error: {e}")
return img
@app.post("/create_faiss_index")
async def create_faiss_index(request: IndexCreationRequest):
"""Create FAISS index using CPU (hybrid approach)"""
try:
embeddings_array = np.array(request.embeddings, dtype='float32')
print(f"Creating FAISS index for {embeddings_array.shape[0]} vectors")
# Choose optimal index type based on dataset size
if request.dataset_size < 1000:
index = faiss.IndexFlatL2(request.dimension)
index_type = "IndexFlatL2"
params = {}
elif request.dataset_size < 50000:
nlist = max(4, min(request.dataset_size // 39, 100))
quantizer = faiss.IndexFlatL2(request.dimension)
index = faiss.IndexIVFFlat(quantizer, request.dimension, nlist)
index_type = "IndexIVFFlat"
params = {"nlist": nlist}
else:
nlist = max(100, min(request.dataset_size // 39, 1000))
quantizer = faiss.IndexFlatL2(request.dimension)
index = faiss.IndexIVFPQ(quantizer, request.dimension, nlist, 64, 8)
index_type = "IndexIVFPQ"
params = {"nlist": nlist, "m": 64, "nbits": 8}
# Train index if needed
if hasattr(index, 'train') and not index.is_trained:
print(f"Training {index_type} index...")
index.train(embeddings_array)
print("Index training completed")
# Add vectors to index
index.add(embeddings_array)
print(f"Added {index.ntotal} vectors to index")
# Serialize index
index_data = faiss.serialize_index(index)
index_b64 = base64.b64encode(index_data).decode()
return {
"index_data": index_b64,
"index_type": f"CPU_{index_type}",
"index_params": params,
"vectors_added": index.ntotal,
"dataset_size": request.dataset_size
}
except Exception as e:
print(f"Index creation error: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/search_faiss")
async def search_faiss(request: dict):
"""Perform similarity search using CPU FAISS"""
try:
# Deserialize index
index_data = base64.b64decode(request["index_data"])
index = faiss.deserialize_index(np.frombuffer(index_data, dtype=np.uint8))
query_embedding = np.array([request["query_embedding"]], dtype='float32')
k = request.get("k", 25)
print(f"Searching index with {index.ntotal} vectors for top-{k}")
# Perform search on CPU
distances, indices = index.search(query_embedding, k)
return {
"distances": distances[0].tolist(),
"indices": indices[0].tolist(),
"total_vectors": index.ntotal,
"search_mode": "cpu"
}
except Exception as e:
print(f"Search error: {e}")
raise HTTPException(status_code=500, detail=str(e))
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)