Spaces:
Sleeping
Sleeping
| import modal | |
| from fastapi import FastAPI, UploadFile, File, Body, Query | |
| from fastapi.responses import JSONResponse | |
| web_app = FastAPI(title="MCP Video Analysis API") | |
| import os | |
| import tempfile | |
| import io # Used by Whisper for BytesIO | |
| import httpx # For downloading videos from URLs | |
| from typing import Optional, List, Dict, Any | |
| import json | |
| import hashlib | |
| from fastapi.responses import JSONResponse | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from pydantic import BaseModel | |
| import re # For parsing search results | |
| import yt_dlp | |
| import asyncio # For concurrent video processing | |
| import gradio as gr | |
| # Global Configuration (should be at the top of the file) | |
| WHISPER_MODEL_NAME = "openai/whisper-large-v3" # Use latest Whisper model | |
| CAPTION_MODEL_NAME = "microsoft/xclip-base-patch16" # For SpaceTimeGPT alternative | |
| CAPTION_PROCESSOR_NAME = "MCG-NJU/videomae-base" # For SpaceTimeGPT's video encoder | |
| # CAPTION_TOKENIZER_NAME = "gpt2" # For SpaceTimeGPT's text decoder (usually part of processor) | |
| ACTION_MODEL_NAME = "MCG-NJU/videomae-base-finetuned-kinetics" | |
| ACTION_PROCESSOR_NAME = "MCG-NJU/videomae-base" # Or VideoMAEImageProcessor.from_pretrained(ACTION_MODEL_NAME) | |
| OBJECT_DETECTION_MODEL_NAME = "facebook/detr-resnet-50" | |
| OBJECT_DETECTION_PROCESSOR_NAME = "facebook/detr-resnet-50" | |
| # --- Modal Image Definition --- | |
| video_analysis_image_v2 = ( | |
| modal.Image.debian_slim(python_version="3.10") | |
| .apt_install("ffmpeg") | |
| .pip_install( | |
| "gradio==3.50.2", # Pin Gradio version for stability | |
| "transformers[torch]", # For all Hugging Face models and PyTorch | |
| "soundfile", # For Whisper | |
| "av", # For video frame extraction | |
| "Pillow", # For image processing | |
| "timm", # Often a dependency for vision models | |
| "torchvision", | |
| "torchaudio", | |
| "fastapi[standard]", # For web endpoints | |
| "pydantic", | |
| "yt-dlp", # For request body validation | |
| "httpx", # For downloading video from URL | |
| "cowsay==6.1" # Cache-busting package | |
| ) | |
| ) | |
| # --- Modal App Definition --- | |
| app = modal.App(name="video-analysis-gradio-pipeline") # New app name, using App | |
| # --- Pydantic model for web endpoint request --- | |
| class VideoAnalysisRequestPayload(BaseModel): | |
| video_url: Optional[str] = None | |
| class TopicAnalysisRequest(BaseModel): | |
| topic: str | |
| max_videos: int = Query(3, ge=1, le=10) # Default 3, min 1, max 10 videos | |
| # --- Constants for Model Names --- | |
| # WHISPER_MODEL_NAME = "openai/whisper-large-v3" | |
| CAPTION_MODEL_NAME = "Neleac/SpaceTimeGPT" | |
| CAPTION_PROCESSOR_NAME = "Neleac/SpaceTimeGPT" # Use processor from SpaceTimeGPT itself | |
| # # CAPTION_TOKENIZER_NAME = "gpt2" # For SpaceTimeGPT's text decoder (usually part of processor) | |
| # ACTION_MODEL_NAME = "MCG-NJU/videomae-base-finetuned-kinetics" | |
| # ACTION_PROCESSOR_NAME = "MCG-NJU/videomae-base" # Or VideoMAEImageProcessor.from_pretrained(ACTION_MODEL_NAME) | |
| # OBJECT_DETECTION_MODEL_NAME = "facebook/detr-resnet-50" | |
| # OBJECT_DETECTION_PROCESSOR_NAME = "facebook/detr-resnet-50" | |
| # --- Modal Distributed Dictionary for Caching --- | |
| video_analysis_cache = modal.Dict.from_name("video_analysis_cache", create_if_missing=True) | |
| # --- Hugging Face Token Secret --- | |
| HF_TOKEN_SECRET = modal.Secret.from_name("my-huggingface-secret") | |
| # --- Helper: Hugging Face Login --- | |
| def _login_to_hf(): | |
| import os | |
| from huggingface_hub import login | |
| hf_token = os.environ.get("HF_TOKEN") | |
| if hf_token: | |
| try: | |
| login(token=hf_token) | |
| print("Successfully logged into Hugging Face Hub.") | |
| return True | |
| except Exception as e: | |
| print(f"Hugging Face Hub login failed: {e}") | |
| return False | |
| else: | |
| print("HF_TOKEN secret not found. Some models might fail to load.") | |
| return False | |
| # === 1. Transcription with Whisper === | |
| def transcribe_video_with_whisper(video_bytes: bytes) -> str: | |
| _login_to_hf() | |
| import torch | |
| from transformers import pipeline | |
| import soundfile as sf | |
| import av # For robust audio extraction | |
| import numpy as np | |
| import io | |
| print("[Whisper] Starting transcription.") | |
| temp_audio_path = None | |
| try: | |
| # Robust audio extraction using PyAV | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") as tmp_video_file: | |
| tmp_video_file.write(video_bytes) | |
| video_path = tmp_video_file.name | |
| container = av.open(video_path) | |
| audio_stream = next((s for s in container.streams if s.type == 'audio'), None) | |
| if audio_stream is None: | |
| return "Whisper Error: No audio stream found in video." | |
| # Decode and resample audio to 16kHz mono WAV | |
| # Store resampled audio in a temporary WAV file | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as tmp_audio_file_for_sf: | |
| temp_audio_path = tmp_audio_file_for_sf.name | |
| output_container = av.open(temp_audio_path, mode='w') | |
| output_stream = output_container.add_stream('pcm_s16le', rate=16000, layout='mono') | |
| for frame in container.decode(audio_stream): | |
| for packet in output_stream.encode(frame): | |
| output_container.mux(packet) | |
| # Flush stream | |
| for packet in output_stream.encode(): | |
| output_container.mux(packet) | |
| output_container.close() | |
| container.close() | |
| os.remove(video_path) # Clean up temp video file | |
| pipe = pipeline( | |
| "automatic-speech-recognition", | |
| model=WHISPER_MODEL_NAME, | |
| torch_dtype=torch.float16, | |
| device="cuda:0" if torch.cuda.is_available() else "cpu", | |
| ) | |
| print(f"[Whisper] Pipeline loaded. Transcribing {temp_audio_path}...") | |
| # Add robust error handling for the Whisper model | |
| try: | |
| outputs = pipe(temp_audio_path, chunk_length_s=30, stride_length_s=5, batch_size=8, generate_kwargs={"language": "english"}, return_timestamps=False) | |
| except Exception as whisper_err: | |
| print(f"[Whisper] Error during transcription: {whisper_err}") | |
| # Try again with different settings if the first attempt failed | |
| print(f"[Whisper] Attempting fallback transcription with smaller chunk size...") | |
| outputs = pipe(temp_audio_path, chunk_length_s=10, stride_length_s=2, batch_size=4, generate_kwargs={"language": "english"}, return_timestamps=False) | |
| transcription = outputs["text"] | |
| print(f"[Whisper] Transcription successful: {transcription[:100]}...") | |
| return transcription | |
| except Exception as e: | |
| print(f"[Whisper] Error: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| return f"Whisper Error: {str(e)}" | |
| finally: | |
| if temp_audio_path and os.path.exists(temp_audio_path): | |
| os.remove(temp_audio_path) | |
| if 'video_path' in locals() and video_path and os.path.exists(video_path): | |
| os.remove(video_path) # Ensure temp video is cleaned up if audio extraction failed early | |
| # === 2. Captioning with SpaceTimeGPT === | |
| def generate_captions_with_spacetimegpt(video_bytes: bytes) -> str: | |
| _login_to_hf() | |
| import torch | |
| from transformers import AutoProcessor, AutoModelForVision2Seq | |
| import av | |
| import numpy as np | |
| import tempfile | |
| print("[SpaceTimeGPT] Starting captioning.") | |
| video_path = None | |
| try: | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") as tmp_video_file: | |
| tmp_video_file.write(video_bytes) | |
| video_path = tmp_video_file.name | |
| container = av.open(video_path) | |
| video_stream = next((s for s in container.streams if s.type == 'video'), None) | |
| if video_stream is None: | |
| return "SpaceTimeGPT Error: No video stream found." | |
| num_frames_to_sample = 16 | |
| total_frames = video_stream.frames | |
| if total_frames == 0: return "SpaceTimeGPT Error: Video has no frames." | |
| indices = np.linspace(0, total_frames - 1, num_frames_to_sample, dtype=int) | |
| frames = [] | |
| for i in indices: | |
| container.seek(int(i), stream=video_stream) | |
| frame = next(container.decode(video_stream)) | |
| frames.append(frame.to_rgb().to_ndarray()) | |
| container.close() | |
| video_frames_np = np.stack(frames) | |
| processor = AutoProcessor.from_pretrained(CAPTION_PROCESSOR_NAME, trust_remote_code=True) | |
| # Debug prints | |
| print(f"[SpaceTimeGPT] DEBUG: CAPTION_MODEL_NAME is {CAPTION_MODEL_NAME}") | |
| print(f"[SpaceTimeGPT] DEBUG: Intending to use model class: {AutoModelForVision2Seq.__name__}") | |
| print(f"[SpaceTimeGPT] DEBUG: Type of model class object: {type(AutoModelForVision2Seq)}") | |
| model = AutoModelForVision2Seq.from_pretrained(CAPTION_MODEL_NAME, trust_remote_code=True) | |
| device = "cuda:0" if torch.cuda.is_available() else "cpu" | |
| model.to(device) | |
| if hasattr(processor, 'tokenizer'): # Check if tokenizer exists | |
| processor.tokenizer.padding_side = "right" | |
| print("[SpaceTimeGPT] Model and processor loaded. Generating captions...") | |
| inputs = processor(text=None, videos=list(video_frames_np), return_tensors="pt", padding=True).to(device) | |
| generated_ids = model.generate(**inputs, max_new_tokens=128) | |
| captions = processor.batch_decode(generated_ids, skip_special_tokens=True)[0].strip() | |
| print(f"[SpaceTimeGPT] Captioning successful: {captions}") | |
| return captions | |
| except Exception as e: | |
| print(f"[SpaceTimeGPT] Error: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| return f"SpaceTimeGPT Error: {str(e)}" | |
| finally: | |
| if video_path and os.path.exists(video_path): | |
| os.remove(video_path) | |
| # === 3. Action Recognition with VideoMAE === | |
| def generate_action_labels(video_bytes: bytes) -> List[Dict[str, Any]]: | |
| _login_to_hf() | |
| import torch | |
| from transformers import VideoMAEImageProcessor, VideoMAEForVideoClassification | |
| import av | |
| import numpy as np | |
| import tempfile | |
| print("[VideoMAE] Starting action recognition.") | |
| video_path = None | |
| try: | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") as tmp_video_file: | |
| tmp_video_file.write(video_bytes) | |
| video_path = tmp_video_file.name | |
| container = av.open(video_path) | |
| video_stream = next((s for s in container.streams if s.type == 'video'), None) | |
| if video_stream is None: | |
| return [{"error": "VideoMAE Error: No video stream found."}] | |
| num_frames_to_sample = 16 | |
| total_frames = video_stream.frames | |
| if total_frames == 0: return [{"error": "VideoMAE Error: Video has no frames."}] | |
| indices = np.linspace(0, total_frames - 1, num_frames_to_sample, dtype=int) | |
| video_frames_list = [] | |
| for i in indices: | |
| container.seek(int(i), stream=video_stream) | |
| frame = next(container.decode(video_stream)) | |
| video_frames_list.append(frame.to_rgb().to_ndarray()) | |
| container.close() | |
| processor = VideoMAEImageProcessor.from_pretrained(ACTION_PROCESSOR_NAME) | |
| model = VideoMAEForVideoClassification.from_pretrained(ACTION_MODEL_NAME) | |
| device = "cuda:0" if torch.cuda.is_available() else "cpu" | |
| model.to(device) | |
| print("[VideoMAE] Model and processor loaded. Classifying actions...") | |
| inputs = processor(video_frames_list, return_tensors="pt").to(device) | |
| with torch.no_grad(): | |
| outputs = model(**inputs) | |
| logits = outputs.logits | |
| top_k = 5 | |
| probabilities = torch.softmax(logits, dim=-1) | |
| top_probs, top_indices = torch.topk(probabilities, top_k) | |
| results = [] | |
| for i in range(top_k): | |
| label = model.config.id2label[top_indices[0, i].item()] | |
| score = top_probs[0, i].item() | |
| results.append({"action": label, "confidence": round(score, 4)}) | |
| print(f"[VideoMAE] Action recognition successful: {results}") | |
| return results | |
| except Exception as e: | |
| print(f"[VideoMAE] Error: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| return [{"error": f"VideoMAE Error: {str(e)}"}] | |
| finally: | |
| if video_path and os.path.exists(video_path): | |
| os.remove(video_path) | |
| # === 4. Object Detection with DETR === | |
| def generate_object_detection(video_bytes: bytes) -> List[Dict[str, Any]]: | |
| _login_to_hf() | |
| import torch | |
| from transformers import DetrImageProcessor, DetrForObjectDetection | |
| from PIL import Image # Imported but not directly used, av.frame.to_image() is used | |
| import av | |
| import numpy as np | |
| import tempfile | |
| print("[DETR] Starting object detection.") | |
| video_path = None | |
| try: | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") as tmp_video_file: | |
| tmp_video_file.write(video_bytes) | |
| video_path = tmp_video_file.name | |
| container = av.open(video_path) | |
| video_stream = next((s for s in container.streams if s.type == 'video'), None) | |
| if video_stream is None: | |
| return [{"error": "DETR Error: No video stream found."}] | |
| num_frames_to_extract = 3 | |
| total_frames = video_stream.frames | |
| if total_frames == 0: return [{"error": "DETR Error: Video has no frames."}] | |
| frame_indices = np.linspace(0, total_frames - 1, num_frames_to_extract, dtype=int) | |
| processor = DetrImageProcessor.from_pretrained(OBJECT_DETECTION_PROCESSOR_NAME) | |
| model = DetrForObjectDetection.from_pretrained(OBJECT_DETECTION_MODEL_NAME) | |
| device = "cuda:0" if torch.cuda.is_available() else "cpu" | |
| model.to(device) | |
| print("[DETR] Model and processor loaded.") | |
| all_frame_detections = [] | |
| for frame_num, target_frame_index in enumerate(frame_indices): | |
| container.seek(int(target_frame_index), stream=video_stream) | |
| frame = next(container.decode(video_stream)) | |
| pil_image = frame.to_image() | |
| print(f"[DETR] Processing frame {frame_num + 1}/{num_frames_to_extract} (original index {target_frame_index})...") | |
| inputs = processor(images=pil_image, return_tensors="pt").to(device) | |
| outputs = model(**inputs) | |
| target_sizes = torch.tensor([pil_image.size[::-1]], device=device) | |
| results = processor.post_process_object_detection(outputs, target_sizes=target_sizes, threshold=0.7)[0] | |
| frame_detections = [] | |
| for score, label, box in zip(results["scores"], results["labels"], results["boxes"]): | |
| frame_detections.append({ | |
| "label": model.config.id2label[label.item()], | |
| "confidence": round(score.item(), 3), | |
| "box": [round(coord) for coord in box.tolist()] | |
| }) | |
| if frame_detections: # Only add if detections are present for this frame | |
| all_frame_detections.append({ | |
| "frame_number": frame_num + 1, | |
| "original_frame_index": int(target_frame_index), | |
| "detections": frame_detections | |
| }) | |
| container.close() | |
| print(f"[DETR] Object detection successful: {all_frame_detections if all_frame_detections else 'No objects detected with threshold.'}") | |
| return all_frame_detections if all_frame_detections else [{"info": "No objects detected with current threshold."}] | |
| except Exception as e: | |
| print(f"[DETR] Error: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| return [{"error": f"DETR Error: {str(e)}"}] | |
| finally: | |
| if video_path and os.path.exists(video_path): | |
| os.remove(video_path) | |
| # === 5. Comprehensive Video Analysis (Orchestrator) === | |
| async def analyze_video_comprehensive(video_bytes: bytes) -> Dict[str, Any]: | |
| print("[Orchestrator] Starting comprehensive video analysis.") | |
| cache_key = hashlib.sha256(video_bytes).hexdigest() | |
| try: | |
| cached_result = video_analysis_cache.get(cache_key) | |
| if cached_result: | |
| print(f"[Orchestrator] Cache hit for key: {cache_key}") | |
| return cached_result | |
| except Exception as e: | |
| # Log error but proceed with analysis if cache get fails | |
| print(f"[Orchestrator] Cache GET error: {e}. Proceeding with fresh analysis.") | |
| print(f"[Orchestrator] Cache miss for key: {cache_key}. Performing full analysis.") | |
| results = {} | |
| print("[Orchestrator] Calling transcription...") | |
| try: | |
| # .call() is synchronous in the context of the Modal function execution | |
| results["transcription"] = transcribe_video_with_whisper.remote(video_bytes) | |
| except Exception as e: | |
| print(f"[Orchestrator] Error in transcription: {e}") | |
| results["transcription"] = f"Transcription Error: {str(e)}" | |
| print("[Orchestrator] Calling captioning...") | |
| try: | |
| results["caption"] = generate_captions_with_spacetimegpt.remote(video_bytes) | |
| except Exception as e: | |
| print(f"[Orchestrator] Error in captioning: {e}") | |
| results["caption"] = f"Captioning Error: {str(e)}" | |
| print("[Orchestrator] Calling action recognition...") | |
| try: | |
| results["actions"] = generate_action_labels.remote(video_bytes) | |
| except Exception as e: | |
| print(f"[Orchestrator] Error in action recognition: {e}") | |
| results["actions"] = [{"error": f"Action Recognition Error: {str(e)}"}] # Ensure list type for error | |
| print("[Orchestrator] Calling object detection...") | |
| try: | |
| results["objects"] = generate_object_detection.remote(video_bytes) | |
| except Exception as e: | |
| print(f"[Orchestrator] Error in object detection: {e}") | |
| results["objects"] = [{"error": f"Object Detection Error: {str(e)}"}] # Ensure list type for error | |
| print("[Orchestrator] All analyses attempted. Storing results in cache.") | |
| try: | |
| video_analysis_cache.put(cache_key, results) | |
| print(f"[Orchestrator] Successfully cached results for key: {cache_key}") | |
| except Exception as e: | |
| print(f"[Orchestrator] Cache PUT error: {e}") | |
| return results | |
| # === FastAPI Endpoint for Video Analysis === | |
| def process_video_analysis(payload: VideoAnalysisRequestPayload): | |
| """FastAPI endpoint for comprehensive video analysis.""" | |
| print(f"[FastAPI Endpoint] Received request for video analysis") | |
| video_url = payload.video_url | |
| if not video_url: | |
| return JSONResponse(status_code=400, content={"error": "video_url must be provided in JSON payload."}) | |
| print(f"[FastAPI Endpoint] Processing video_url: {video_url}") | |
| try: | |
| # Download video using yt-dlp with enhanced options for robustness | |
| import yt_dlp | |
| import tempfile | |
| import os | |
| import subprocess | |
| import shutil | |
| video_bytes = None | |
| with tempfile.TemporaryDirectory() as tmpdir: | |
| output_base = os.path.join(tmpdir, 'video') | |
| output_path = output_base + '.mp4' | |
| # Enhanced yt-dlp options for more reliable downloads | |
| ydl_opts = { | |
| # Request specific formats in priority order | |
| 'format': 'bestvideo[ext=mp4]+bestaudio[ext=m4a]/best[ext=mp4]/best', | |
| 'outtmpl': output_base, | |
| 'quiet': False, # Temporarily enable output for debugging | |
| 'verbose': True, # More verbose output to diagnose issues | |
| 'no_warnings': False, # Show warnings for debugging | |
| 'noplaylist': True, | |
| # Force remux to ensure valid container | |
| 'merge_output_format': 'mp4', | |
| # Add postprocessors to ensure valid MP4 | |
| 'postprocessors': [{ | |
| 'key': 'FFmpegVideoConvertor', | |
| 'preferedformat': 'mp4', | |
| 'postprocessor_args': ['-movflags', '+faststart'], | |
| }], | |
| # Force ffmpeg to create a valid MP4 with moov atom at the beginning | |
| 'prefer_ffmpeg': True, | |
| 'http_headers': { | |
| 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/108.0.0.0 Safari/537.36' | |
| }, | |
| } | |
| try: | |
| print(f"[FastAPI Endpoint] Downloading video with enhanced yt-dlp options from {video_url}") | |
| download_success = False | |
| # Try yt-dlp first | |
| try: | |
| with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
| ydl.download([video_url]) | |
| # Find the actual output file (might have a different extension) | |
| downloaded_files = [f for f in os.listdir(tmpdir) if f.startswith('video')] | |
| if downloaded_files: | |
| actual_file = os.path.join(tmpdir, downloaded_files[0]) | |
| print(f"[FastAPI Endpoint] Found downloaded file: {actual_file}") | |
| download_success = True | |
| except Exception as e: | |
| print(f"[FastAPI Endpoint] yt-dlp download failed: {e}. Trying direct download...") | |
| # Fallback to direct download if it's a direct video URL | |
| if not download_success and (video_url.endswith('.mp4') or 'commondatastorage.googleapis.com' in video_url): | |
| import requests | |
| try: | |
| print(f"[FastAPI Endpoint] Attempting direct download for {video_url}") | |
| actual_file = os.path.join(tmpdir, 'direct_video.mp4') | |
| with requests.get(video_url, stream=True) as r: | |
| r.raise_for_status() | |
| with open(actual_file, 'wb') as f: | |
| for chunk in r.iter_content(chunk_size=8192): | |
| f.write(chunk) | |
| print(f"[FastAPI Endpoint] Direct download successful: {actual_file}") | |
| download_success = True | |
| except Exception as e: | |
| print(f"[FastAPI Endpoint] Direct download failed: {e}") | |
| # For testing: Try a sample video if all downloads failed (Big Buck Bunny) | |
| if not download_success: | |
| test_url = "http://commondatastorage.googleapis.com/gtv-videos-bucket/sample/BigBuckBunny.mp4" | |
| print(f"[FastAPI Endpoint] All downloads failed. Falling back to sample video: {test_url}") | |
| import requests | |
| try: | |
| actual_file = os.path.join(tmpdir, 'fallback_video.mp4') | |
| with requests.get(test_url, stream=True) as r: | |
| r.raise_for_status() | |
| with open(actual_file, 'wb') as f: | |
| for chunk in r.iter_content(chunk_size=8192): | |
| f.write(chunk) | |
| print(f"[FastAPI Endpoint] Fallback download successful") | |
| download_success = True | |
| except Exception as e: | |
| print(f"[FastAPI Endpoint] Even fallback download failed: {e}") | |
| raise Exception("All download methods failed") | |
| # Ensure it's a properly formatted MP4 using ffmpeg directly | |
| final_output = os.path.join(tmpdir, 'final_video.mp4') | |
| try: | |
| # Use ffmpeg to re-encode the file, ensuring proper moov atom placement | |
| print(f"[FastAPI Endpoint] Reprocessing with ffmpeg to ensure valid MP4 format") | |
| subprocess.run( | |
| ["ffmpeg", "-i", actual_file, "-c:v", "copy", "-c:a", "copy", "-movflags", "faststart", final_output], | |
| check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE | |
| ) | |
| if os.path.exists(final_output) and os.path.getsize(final_output) > 0: | |
| with open(final_output, 'rb') as f: | |
| video_bytes = f.read() | |
| print(f"[FastAPI Endpoint] Successfully reprocessed video, size: {len(video_bytes)} bytes") | |
| else: | |
| print(f"[FastAPI Endpoint] ffmpeg reprocessing failed to produce valid output") | |
| except subprocess.SubprocessError as se: | |
| print(f"[FastAPI Endpoint] ffmpeg reprocessing failed: {se}") | |
| # If ffmpeg fails, try with the original file | |
| if os.path.exists(actual_file) and os.path.getsize(actual_file) > 0: | |
| with open(actual_file, 'rb') as f: | |
| video_bytes = f.read() | |
| print(f"[FastAPI Endpoint] Using original download, size: {len(video_bytes)} bytes") | |
| except yt_dlp.utils.DownloadError: | |
| # Fallback to httpx for direct links if yt-dlp fails | |
| print(f"[FastAPI Endpoint] yt-dlp failed, falling back to httpx for {video_url}") | |
| try: | |
| import httpx | |
| with httpx.Client() as client: | |
| response = client.get(video_url, follow_redirects=True, timeout=60.0) | |
| response.raise_for_status() | |
| video_bytes = response.content | |
| except httpx.RequestError as he: | |
| return JSONResponse(status_code=400, content={"error": f"Failed to download video from URL using both yt-dlp and httpx. Details: {he}"}) | |
| if not video_bytes: | |
| return JSONResponse(status_code=400, content={"error": f"Downloaded video from URL {video_url} is empty or download failed."}) | |
| print(f"[FastAPI Endpoint] Successfully downloaded and validated {len(video_bytes)} bytes from {video_url} using enhanced downloader.") | |
| # Call comprehensive analysis | |
| analysis_results = analyze_video_comprehensive.remote(video_bytes) | |
| print("[FastAPI Endpoint] Comprehensive analysis finished.") | |
| return JSONResponse(status_code=200, content=analysis_results) | |
| except httpx.RequestError as e: | |
| print(f"[FastAPI Endpoint] httpx.RequestError downloading video: {e}") | |
| return JSONResponse(status_code=400, content={"error": f"Error downloading video from URL: {video_url}. Details: {str(e)}"}) | |
| except Exception as e: | |
| print(f"[FastAPI Endpoint] Unexpected Exception during analysis: {e}") | |
| return JSONResponse(status_code=500, content={"error": f"Unexpected server error during analysis: {str(e)}"}) | |
| # === FastAPI Endpoint for Topic Analysis === | |
| async def handle_analyze_topic_request(request: TopicAnalysisRequest): | |
| """ | |
| Handles a request to analyze videos based on a topic. | |
| 1. Finds video URLs for the topic using YouTube search. | |
| 2. Concurrently analyzes these videos. | |
| 3. Returns aggregated results. | |
| """ | |
| print(f"[TopicAPI] Received request to analyze topic: '{request.topic}', max_videos: {request.max_videos}") | |
| try: | |
| # Use .aio for async call if the Modal function is async, or just .remote if it's sync | |
| # Assuming find_video_urls_for_topic is sync as defined, but can be called with .remote() | |
| # If find_video_urls_for_topic itself becomes async, then .remote.aio() is appropriate. | |
| # For now, let's assume it's called as a standard remote Modal function. | |
| video_urls = await find_video_urls_for_topic.remote.aio(request.topic, request.max_videos) | |
| if not video_urls: | |
| print(f"[TopicAPI] No video URLs found for topic: '{request.topic}'") | |
| return JSONResponse( | |
| status_code=404, | |
| content={ | |
| "status": "error", | |
| "message": "No videos found for the specified topic.", | |
| "topic": request.topic, | |
| "details": "The YouTube search did not return any relevant video URLs." | |
| } | |
| ) | |
| print(f"[TopicAPI] Found {len(video_urls)} URLs for topic '{request.topic}', proceeding to analysis.") | |
| # analyze_videos_by_topic is an async Modal function, so use .remote.aio() | |
| analysis_results = await analyze_videos_by_topic.remote.aio(video_urls, request.topic) | |
| print(f"[TopicAPI] Successfully analyzed videos for topic: '{request.topic}'") | |
| return analysis_results | |
| except Exception as e: | |
| print(f"[TopicAPI] Error during topic analysis for '{request.topic}': {e}") | |
| import traceback | |
| traceback.print_exc() | |
| return JSONResponse( | |
| status_code=500, | |
| content={ | |
| "status": "error", | |
| "message": "An internal server error occurred during topic analysis.", | |
| "topic": request.topic, | |
| "error_details_str": str(e) # Keep it simple for JSON | |
| } | |
| ) | |
| # === 6. Topic-Based Video Search === | |
| def find_video_urls_for_topic(topic: str, max_results: int = 3) -> List[str]: | |
| """Finds video URLs (YouTube) for a given topic using yt-dlp.""" | |
| print(f"[TopicSearch] Finding video URLs for topic: '{topic}', max_results={max_results}") | |
| video_urls = [] | |
| try: | |
| # Add a common user-agent to avoid getting blocked | |
| # Let yt-dlp find ffmpeg in the PATH instead of hardcoding it | |
| ydl_opts = { | |
| 'quiet': True, | |
| 'extract_flat': 'discard_in_playlist', | |
| 'force_generic_extractor': False, | |
| 'default_search': f"ytsearch{max_results}", | |
| 'noplaylist': True, | |
| 'prefer_ffmpeg': True, | |
| 'http_headers': { | |
| 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/108.0.0.0 Safari/537.36' | |
| } | |
| } | |
| with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
| # extract_info with a search query like 'ytsearchN:query' returns a playlist dictionary | |
| search_result = ydl.extract_info(topic, download=False) | |
| if search_result and 'entries' in search_result: | |
| for entry in search_result['entries']: | |
| # Ensure entry is a dictionary and has 'webpage_url' | |
| if isinstance(entry, dict) and entry.get('webpage_url'): | |
| video_urls.append(entry['webpage_url']) | |
| # yt-dlp search might return more than max_results, so we cap it here | |
| if len(video_urls) >= max_results: | |
| break | |
| # Sometimes a single video result might not be in 'entries' | |
| elif isinstance(search_result, dict) and search_result.get('webpage_url'): | |
| video_urls.append(search_result['webpage_url']) | |
| # Ensure we don't exceed max_results if the loop didn't break early enough | |
| video_urls = video_urls[:max_results] | |
| print(f"[TopicSearch] Found {len(video_urls)} video URLs for topic '{topic}': {video_urls}") | |
| except Exception as e: | |
| print(f"[TopicSearch] Error finding videos for topic '{topic}': {e}") | |
| import traceback | |
| traceback.print_exc() | |
| return video_urls | |
| # Helper function (not a Modal function) to extract video URLs from search results | |
| def extract_video_urls_from_search(search_results: List[Dict[str, str]], max_urls: int = 3) -> List[str]: | |
| """Extracts video URLs from a list of search result dictionaries.""" | |
| video_urls = [] | |
| seen_urls = set() | |
| # Regex for YouTube, Vimeo, and common video file extensions | |
| # Simplified YouTube regex to catch most common video and shorts links | |
| youtube_regex = r"(?:https?://)?(?:www\.)?(?:youtube\.com/(?:watch\?v=|embed/|shorts/)|youtu\.be/)([a-zA-Z0-9_-]{11})" | |
| vimeo_regex = r"(?:https?://)?(?:www\.)?vimeo\.com/(\d+)" | |
| direct_video_regex = r"https?://[^\s]+\.(mp4|mov|avi|webm|mkv)(\?[^\s]*)?" | |
| patterns = [ | |
| re.compile(youtube_regex), | |
| re.compile(vimeo_regex), | |
| re.compile(direct_video_regex) | |
| ] | |
| for item in search_results: | |
| url = item.get("link") or item.get("url") # Common keys for URL in search results | |
| if not url: | |
| continue | |
| for pattern in patterns: | |
| match = pattern.search(url) | |
| if match: | |
| # Reconstruct canonical YouTube URL if it's a short link or embed | |
| if pattern.pattern == youtube_regex and match.group(1): | |
| normalized_url = f"https://www.youtube.com/watch?v={match.group(1)}" | |
| else: | |
| normalized_url = url | |
| if normalized_url not in seen_urls: | |
| video_urls.append(normalized_url) | |
| seen_urls.add(normalized_url) | |
| if len(video_urls) >= max_urls: | |
| break | |
| if len(video_urls) >= max_urls: | |
| break | |
| # === 7. Topic-Based Video Analysis Orchestrator === | |
| async def _analyze_video_worker(video_url: str) -> dict: | |
| """ | |
| Worker function to download a video from a URL and run comprehensive analysis. | |
| This is designed to be called concurrently. | |
| """ | |
| print(f"[Worker] Starting analysis for {video_url}") | |
| try: | |
| async with httpx.AsyncClient() as client: | |
| print(f"[Worker] Downloading video from {video_url}") | |
| response = await client.get(video_url, follow_redirects=True, timeout=60.0) | |
| response.raise_for_status() | |
| video_bytes = await response.aread() | |
| print(f"[Worker] Downloaded {len(video_bytes)} bytes from {video_url}") | |
| if not video_bytes: | |
| raise ValueError("Downloaded video content is empty.") | |
| analysis_result = await analyze_video_comprehensive.coro(video_bytes) | |
| if isinstance(analysis_result, dict) and any("error" in str(v).lower() for v in analysis_result.values()): | |
| print(f"[Worker] Comprehensive analysis for {video_url} reported errors: {analysis_result}") | |
| return {"url": video_url, "status": "error", "error_type": "analysis_error", "details": analysis_result} | |
| else: | |
| return {"url": video_url, "status": "success", "analysis": analysis_result} | |
| except httpx.HTTPStatusError as e: | |
| print(f"[Worker] HTTP error downloading {video_url}: {e}") | |
| return {"url": video_url, "status": "error", "error_type": "download_error", "details": f"HTTP {e.response.status_code}"} | |
| except httpx.RequestError as e: | |
| print(f"[Worker] Request error downloading {video_url}: {e}") | |
| return {"url": video_url, "status": "error", "error_type": "download_error", "details": f"Failed to download: {str(e)}"} | |
| except Exception as e: | |
| print(f"[Worker] Error processing video {video_url}: {e}") | |
| import traceback | |
| return {"url": video_url, "status": "error", "error_type": "processing_error", "details": str(e), "traceback": traceback.format_exc()[:1000]} | |
| async def analyze_videos_by_topic(video_urls: List[str], topic: str) -> Dict[str, Any]: | |
| """Analyzes a list of videos (by URL) concurrently and aggregates results for a topic.""" | |
| print(f"[TopicAnalysis] Starting concurrent analysis for topic: '{topic}' with {len(video_urls)} video(s).") | |
| results_aggregator = { | |
| "topic": topic, | |
| "analyzed_videos": [], | |
| "errors": [] | |
| } | |
| if not video_urls: | |
| results_aggregator["errors"].append({"topic_error": "No video URLs provided or found for the topic."}) | |
| return results_aggregator | |
| # Use .map to run the worker function concurrently on all video URLs | |
| # The list() call forces the generator to execute and retrieve all results. | |
| individual_results = list(_analyze_video_worker.map(video_urls)) | |
| for result in individual_results: | |
| if isinstance(result, dict): | |
| if result.get("status") == "error": | |
| results_aggregator["errors"].append(result) | |
| else: | |
| results_aggregator["analyzed_videos"].append(result) | |
| else: | |
| # This case handles unexpected return types from the worker, like exceptions | |
| print(f"[TopicAnalysis] Received an unexpected result type from worker: {type(result)}") | |
| results_aggregator["errors"].append({"url": "unknown", "error_type": "unexpected_result", "details": str(result)}) | |
| print(f"[TopicAnalysis] Finished concurrent analysis for topic '{topic}'.") | |
| return results_aggregator | |
| # === Gradio Interface === | |
| def video_analyzer_gradio_ui(): | |
| print("[Gradio] UI function called to define interface.") | |
| def analyze_video_all_models(video_filepath): | |
| print(f"[Gradio] Received video filepath for analysis: {video_filepath}") | |
| if not video_filepath or not os.path.exists(video_filepath): | |
| return "Error: Video file path is invalid or does not exist.", "", "[]", "[]" | |
| with open(video_filepath, "rb") as f: | |
| video_bytes_content = f.read() | |
| print(f"[Gradio] Read {len(video_bytes_content)} bytes from video path: {video_filepath}") | |
| if not video_bytes_content: | |
| return "Error: Could not read video bytes.", "", "[]", "[]" | |
| print("[Gradio] Calling Whisper...") | |
| transcription = transcribe_video_with_whisper.call(video_bytes_content) | |
| print(f"[Gradio] Whisper result length: {len(transcription)}") | |
| print("[Gradio] Calling SpaceTimeGPT...") | |
| captions = generate_captions_with_spacetimegpt.call(video_bytes_content) | |
| print(f"[Gradio] SpaceTimeGPT result: {captions}") | |
| print("[Gradio] Calling VideoMAE...") | |
| action_labels = generate_action_labels.call(video_bytes_content) | |
| print(f"[Gradio] VideoMAE result: {action_labels}") | |
| print("[Gradio] Calling DETR...") | |
| object_detections = generate_object_detection.call(video_bytes_content) | |
| print(f"[Gradio] DETR result: {object_detections}") | |
| return transcription, captions, str(action_labels), str(object_detections) | |
| with gr.Blocks(title="Comprehensive Video Analyzer", theme=gr.themes.Soft()) as demo: | |
| gr.Markdown("# Comprehensive Video Analyzer") | |
| gr.Markdown("Upload a video to get transcription, captions, action labels, and object detections.") | |
| with gr.Row(): | |
| video_input = gr.Video(label="Upload Video", sources=["upload"], type="filepath") | |
| submit_button = gr.Button("Analyze Video", variant="primary") | |
| with gr.Tabs(): | |
| with gr.TabItem("Transcription (Whisper)"): | |
| transcription_output = gr.Textbox(label="Transcription", lines=10, interactive=False) | |
| with gr.TabItem("Dense Captions (SpaceTimeGPT)"): | |
| caption_output = gr.Textbox(label="Captions", lines=10, interactive=False) | |
| with gr.TabItem("Action Recognition (VideoMAE)"): | |
| action_output = gr.Textbox(label="Predicted Actions (JSON format)", lines=10, interactive=False) | |
| with gr.TabItem("Object Detection (DETR)"): | |
| object_output = gr.Textbox(label="Detected Objects (JSON format)", lines=10, interactive=False) | |
| submit_button.click( | |
| fn=analyze_video_all_models, | |
| inputs=[video_input], | |
| outputs=[transcription_output, caption_output, action_output, object_output] | |
| ) | |
| gr.Markdown("### Example Video") | |
| gr.Markdown("You can test with a short video. Processing may take a few minutes depending on video length and model inference times.") | |
| print("[Gradio] UI definition complete.") | |
| return gr.routes.App.create_app(demo) | |