Spaces:
Sleeping
Sleeping
| import modal | |
| from fastapi import FastAPI, UploadFile, File, Body, Query | |
| from starlette.applications import Starlette | |
| from starlette.routing import Mount | |
| import os | |
| import tempfile | |
| import io # Used by Whisper for BytesIO | |
| import httpx # For downloading videos from URLs | |
| from typing import Optional, List, Dict, Any | |
| import json | |
| import hashlib | |
| from fastapi.responses import JSONResponse | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from pydantic import BaseModel | |
| import re # For parsing search results | |
| import asyncio # For concurrent video processing | |
| import gradio as gr | |
| # Global Configuration (should be at the top of the file) | |
| WHISPER_MODEL_NAME = "openai/whisper-large-v3" # Use latest Whisper model | |
| CAPTION_MODEL_NAME = "microsoft/xclip-base-patch16" # For SpaceTimeGPT alternative | |
| CAPTION_PROCESSOR_NAME = "MCG-NJU/videomae-base" # For SpaceTimeGPT's video encoder | |
| # CAPTION_TOKENIZER_NAME = "gpt2" # For SpaceTimeGPT's text decoder (usually part of processor) | |
| ACTION_MODEL_NAME = "MCG-NJU/videomae-base-finetuned-kinetics" | |
| ACTION_PROCESSOR_NAME = "MCG-NJU/videomae-base" # Or VideoMAEImageProcessor.from_pretrained(ACTION_MODEL_NAME) | |
| OBJECT_DETECTION_MODEL_NAME = "facebook/detr-resnet-50" | |
| OBJECT_DETECTION_PROCESSOR_NAME = "facebook/detr-resnet-50" | |
| # --- Modal Image Definition --- | |
| video_analysis_image = ( | |
| modal.Image.debian_slim(python_version="3.10") | |
| .apt_install("ffmpeg") | |
| .pip_install( | |
| "gradio==3.50.2", # Pin Gradio version for stability | |
| "transformers[torch]", # For all Hugging Face models and PyTorch | |
| "soundfile", # For Whisper | |
| "av", # For video frame extraction | |
| "Pillow", # For image processing | |
| "timm", # Often a dependency for vision models | |
| "torchvision", | |
| "torchaudio", | |
| "fastapi[standard]", # For web endpoints | |
| "pydantic", # For request body validation | |
| "httpx" # For downloading video from URL | |
| ) | |
| ) | |
| # --- Modal App Definition --- | |
| app = modal.App(name="video-analysis-gradio-pipeline") # New app name, using App | |
| # --- Pydantic model for web endpoint request --- | |
| class VideoAnalysisRequestPayload(BaseModel): | |
| video_url: str | |
| # --- Constants for Model Names --- | |
| # WHISPER_MODEL_NAME = "openai/whisper-large-v3" | |
| # CAPTION_MODEL_NAME = "Neleac/SpaceTimeGPT" | |
| # CAPTION_PROCESSOR_NAME = "MCG-NJU/videomae-base" # For SpaceTimeGPT's video encoder | |
| # # CAPTION_TOKENIZER_NAME = "gpt2" # For SpaceTimeGPT's text decoder (usually part of processor) | |
| # ACTION_MODEL_NAME = "MCG-NJU/videomae-base-finetuned-kinetics" | |
| # ACTION_PROCESSOR_NAME = "MCG-NJU/videomae-base" # Or VideoMAEImageProcessor.from_pretrained(ACTION_MODEL_NAME) | |
| # OBJECT_DETECTION_MODEL_NAME = "facebook/detr-resnet-50" | |
| # OBJECT_DETECTION_PROCESSOR_NAME = "facebook/detr-resnet-50" | |
| # --- Modal Distributed Dictionary for Caching --- | |
| video_analysis_cache = modal.Dict.from_name("video_analysis_cache", create_if_missing=True) | |
| # --- Hugging Face Token Secret --- | |
| HF_TOKEN_SECRET = modal.Secret.from_name("my-huggingface-secret") | |
| # --- Helper: Hugging Face Login --- | |
| def _login_to_hf(): | |
| import os | |
| from huggingface_hub import login | |
| hf_token = os.environ.get("HF_TOKEN") | |
| if hf_token: | |
| try: | |
| login(token=hf_token) | |
| print("Successfully logged into Hugging Face Hub.") | |
| return True | |
| except Exception as e: | |
| print(f"Hugging Face Hub login failed: {e}") | |
| return False | |
| else: | |
| print("HF_TOKEN secret not found. Some models might fail to load.") | |
| return False | |
| # === 1. Transcription with Whisper === | |
| def transcribe_video_with_whisper(video_bytes: bytes) -> str: | |
| _login_to_hf() | |
| import torch | |
| from transformers import pipeline | |
| import soundfile as sf | |
| import av # For robust audio extraction | |
| import numpy as np | |
| import io | |
| print("[Whisper] Starting transcription.") | |
| temp_audio_path = None | |
| try: | |
| # Robust audio extraction using PyAV | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") as tmp_video_file: | |
| tmp_video_file.write(video_bytes) | |
| video_path = tmp_video_file.name | |
| container = av.open(video_path) | |
| audio_stream = next((s for s in container.streams if s.type == 'audio'), None) | |
| if audio_stream is None: | |
| return "Whisper Error: No audio stream found in video." | |
| # Decode and resample audio to 16kHz mono WAV | |
| # Store resampled audio in a temporary WAV file | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as tmp_audio_file_for_sf: | |
| temp_audio_path = tmp_audio_file_for_sf.name | |
| output_container = av.open(temp_audio_path, mode='w') | |
| output_stream = output_container.add_stream('pcm_s16le', rate=16000, layout='mono') | |
| for frame in container.decode(audio_stream): | |
| for packet in output_stream.encode(frame): | |
| output_container.mux(packet) | |
| # Flush stream | |
| for packet in output_stream.encode(): | |
| output_container.mux(packet) | |
| output_container.close() | |
| container.close() | |
| os.remove(video_path) # Clean up temp video file | |
| pipe = pipeline( | |
| "automatic-speech-recognition", | |
| model=WHISPER_MODEL_NAME, | |
| torch_dtype=torch.float16, | |
| device="cuda:0" if torch.cuda.is_available() else "cpu", | |
| ) | |
| print(f"[Whisper] Pipeline loaded. Transcribing {temp_audio_path}...") | |
| outputs = pipe(temp_audio_path, chunk_length_s=30, batch_size=8, return_timestamps=False) | |
| transcription = outputs["text"] | |
| print(f"[Whisper] Transcription successful: {transcription[:100]}...") | |
| return transcription | |
| except Exception as e: | |
| print(f"[Whisper] Error: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| return f"Whisper Error: {str(e)}" | |
| finally: | |
| if temp_audio_path and os.path.exists(temp_audio_path): | |
| os.remove(temp_audio_path) | |
| if 'video_path' in locals() and video_path and os.path.exists(video_path): | |
| os.remove(video_path) # Ensure temp video is cleaned up if audio extraction failed early | |
| # === 2. Captioning with SpaceTimeGPT === | |
| def generate_captions_with_spacetimegpt(video_bytes: bytes) -> str: | |
| _login_to_hf() | |
| import torch | |
| from transformers import AutoProcessor, AutoModelForCausalLM | |
| import av | |
| import numpy as np | |
| import tempfile | |
| print("[SpaceTimeGPT] Starting captioning.") | |
| video_path = None | |
| try: | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") as tmp_video_file: | |
| tmp_video_file.write(video_bytes) | |
| video_path = tmp_video_file.name | |
| container = av.open(video_path) | |
| video_stream = next((s for s in container.streams if s.type == 'video'), None) | |
| if video_stream is None: | |
| return "SpaceTimeGPT Error: No video stream found." | |
| num_frames_to_sample = 16 | |
| total_frames = video_stream.frames | |
| if total_frames == 0: return "SpaceTimeGPT Error: Video has no frames." | |
| indices = np.linspace(0, total_frames - 1, num_frames_to_sample, dtype=int) | |
| frames = [] | |
| for i in indices: | |
| container.seek(i, stream=video_stream) | |
| frame = next(container.decode(video_stream)) | |
| frames.append(frame.to_rgb().to_ndarray()) | |
| container.close() | |
| video_frames_np = np.stack(frames) | |
| processor = AutoProcessor.from_pretrained(CAPTION_PROCESSOR_NAME, trust_remote_code=True) | |
| model = AutoModelForCausalLM.from_pretrained(CAPTION_MODEL_NAME, trust_remote_code=True) | |
| device = "cuda:0" if torch.cuda.is_available() else "cpu" | |
| model.to(device) | |
| if hasattr(processor, 'tokenizer'): # Check if tokenizer exists | |
| processor.tokenizer.padding_side = "right" | |
| print("[SpaceTimeGPT] Model and processor loaded. Generating captions...") | |
| inputs = processor(text=None, videos=list(video_frames_np), return_tensors="pt", padding=True).to(device) | |
| generated_ids = model.generate(**inputs, max_new_tokens=128) | |
| captions = processor.batch_decode(generated_ids, skip_special_tokens=True)[0].strip() | |
| print(f"[SpaceTimeGPT] Captioning successful: {captions}") | |
| return captions | |
| except Exception as e: | |
| print(f"[SpaceTimeGPT] Error: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| return f"SpaceTimeGPT Error: {str(e)}" | |
| finally: | |
| if video_path and os.path.exists(video_path): | |
| os.remove(video_path) | |
| # === 3. Action Recognition with VideoMAE === | |
| def generate_action_labels(video_bytes: bytes) -> List[Dict[str, Any]]: | |
| _login_to_hf() | |
| import torch | |
| from transformers import VideoMAEImageProcessor, VideoMAEForVideoClassification | |
| import av | |
| import numpy as np | |
| import tempfile | |
| print("[VideoMAE] Starting action recognition.") | |
| video_path = None | |
| try: | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") as tmp_video_file: | |
| tmp_video_file.write(video_bytes) | |
| video_path = tmp_video_file.name | |
| container = av.open(video_path) | |
| video_stream = next((s for s in container.streams if s.type == 'video'), None) | |
| if video_stream is None: | |
| return [{"error": "VideoMAE Error: No video stream found."}] | |
| num_frames_to_sample = 16 | |
| total_frames = video_stream.frames | |
| if total_frames == 0: return [{"error": "VideoMAE Error: Video has no frames."}] | |
| indices = np.linspace(0, total_frames - 1, num_frames_to_sample, dtype=int) | |
| video_frames_list = [] | |
| for i in indices: | |
| container.seek(i, stream=video_stream) | |
| frame = next(container.decode(video_stream)) | |
| video_frames_list.append(frame.to_rgb().to_ndarray()) | |
| container.close() | |
| processor = VideoMAEImageProcessor.from_pretrained(ACTION_PROCESSOR_NAME) | |
| model = VideoMAEForVideoClassification.from_pretrained(ACTION_MODEL_NAME) | |
| device = "cuda:0" if torch.cuda.is_available() else "cpu" | |
| model.to(device) | |
| print("[VideoMAE] Model and processor loaded. Classifying actions...") | |
| inputs = processor(video_frames_list, return_tensors="pt").to(device) | |
| with torch.no_grad(): | |
| outputs = model(**inputs) | |
| logits = outputs.logits | |
| top_k = 5 | |
| probabilities = torch.softmax(logits, dim=-1) | |
| top_probs, top_indices = torch.topk(probabilities, top_k) | |
| results = [] | |
| for i in range(top_k): | |
| label = model.config.id2label[top_indices[0, i].item()] | |
| score = top_probs[0, i].item() | |
| results.append({"action": label, "confidence": round(score, 4)}) | |
| print(f"[VideoMAE] Action recognition successful: {results}") | |
| return results | |
| except Exception as e: | |
| print(f"[VideoMAE] Error: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| return [{"error": f"VideoMAE Error: {str(e)}"}] | |
| finally: | |
| if video_path and os.path.exists(video_path): | |
| os.remove(video_path) | |
| # === 4. Object Detection with DETR === | |
| def generate_object_detection(video_bytes: bytes) -> List[Dict[str, Any]]: | |
| _login_to_hf() | |
| import torch | |
| from transformers import DetrImageProcessor, DetrForObjectDetection | |
| from PIL import Image # Imported but not directly used, av.frame.to_image() is used | |
| import av | |
| import numpy as np | |
| import tempfile | |
| print("[DETR] Starting object detection.") | |
| video_path = None | |
| try: | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") as tmp_video_file: | |
| tmp_video_file.write(video_bytes) | |
| video_path = tmp_video_file.name | |
| container = av.open(video_path) | |
| video_stream = next((s for s in container.streams if s.type == 'video'), None) | |
| if video_stream is None: | |
| return [{"error": "DETR Error: No video stream found."}] | |
| num_frames_to_extract = 3 | |
| total_frames = video_stream.frames | |
| if total_frames == 0: return [{"error": "DETR Error: Video has no frames."}] | |
| frame_indices = np.linspace(0, total_frames - 1, num_frames_to_extract, dtype=int) | |
| processor = DetrImageProcessor.from_pretrained(OBJECT_DETECTION_PROCESSOR_NAME) | |
| model = DetrForObjectDetection.from_pretrained(OBJECT_DETECTION_MODEL_NAME) | |
| device = "cuda:0" if torch.cuda.is_available() else "cpu" | |
| model.to(device) | |
| print("[DETR] Model and processor loaded.") | |
| all_frame_detections = [] | |
| for frame_num, target_frame_index in enumerate(frame_indices): | |
| container.seek(target_frame_index, stream=video_stream) | |
| frame = next(container.decode(video_stream)) | |
| pil_image = frame.to_image() | |
| print(f"[DETR] Processing frame {frame_num + 1}/{num_frames_to_extract} (original index {target_frame_index})...") | |
| inputs = processor(images=pil_image, return_tensors="pt").to(device) | |
| outputs = model(**inputs) | |
| target_sizes = torch.tensor([pil_image.size[::-1]], device=device) | |
| results = processor.post_process_object_detection(outputs, target_sizes=target_sizes, threshold=0.7)[0] | |
| frame_detections = [] | |
| for score, label, box in zip(results["scores"], results["labels"], results["boxes"]): | |
| frame_detections.append({ | |
| "label": model.config.id2label[label.item()], | |
| "confidence": round(score.item(), 3), | |
| "box": [round(coord) for coord in box.tolist()] | |
| }) | |
| if frame_detections: # Only add if detections are present for this frame | |
| all_frame_detections.append({ | |
| "frame_number": frame_num + 1, | |
| "original_frame_index": int(target_frame_index), | |
| "detections": frame_detections | |
| }) | |
| container.close() | |
| print(f"[DETR] Object detection successful: {all_frame_detections if all_frame_detections else 'No objects detected with threshold.'}") | |
| return all_frame_detections if all_frame_detections else [{"info": "No objects detected with current threshold."}] | |
| except Exception as e: | |
| print(f"[DETR] Error: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| return [{"error": f"DETR Error: {str(e)}"}] | |
| finally: | |
| if video_path and os.path.exists(video_path): | |
| os.remove(video_path) | |
| # === 5. Comprehensive Video Analysis (Orchestrator) === | |
| async def analyze_video_comprehensive(video_bytes: bytes) -> Dict[str, Any]: | |
| print("[Orchestrator] Starting comprehensive video analysis.") | |
| cache_key = hashlib.sha256(video_bytes).hexdigest() | |
| try: | |
| cached_result = await video_analysis_cache.get(cache_key) | |
| if cached_result: | |
| print(f"[Orchestrator] Cache hit for key: {cache_key}") | |
| return cached_result | |
| except Exception as e: | |
| # Log error but proceed with analysis if cache get fails | |
| print(f"[Orchestrator] Cache GET error: {e}. Proceeding with fresh analysis.") | |
| print(f"[Orchestrator] Cache miss for key: {cache_key}. Performing full analysis.") | |
| results = {} | |
| print("[Orchestrator] Calling transcription...") | |
| try: | |
| # .call() is synchronous in the context of the Modal function execution | |
| results["transcription"] = transcribe_video_with_whisper.call(video_bytes) | |
| except Exception as e: | |
| print(f"[Orchestrator] Error in transcription: {e}") | |
| results["transcription"] = f"Transcription Error: {str(e)}" | |
| print("[Orchestrator] Calling captioning...") | |
| try: | |
| results["caption"] = generate_captions_with_spacetimegpt.call(video_bytes) | |
| except Exception as e: | |
| print(f"[Orchestrator] Error in captioning: {e}") | |
| results["caption"] = f"Captioning Error: {str(e)}" | |
| print("[Orchestrator] Calling action recognition...") | |
| try: | |
| results["actions"] = generate_action_labels.call(video_bytes) | |
| except Exception as e: | |
| print(f"[Orchestrator] Error in action recognition: {e}") | |
| results["actions"] = [{"error": f"Action Recognition Error: {str(e)}"}] # Ensure list type for error | |
| print("[Orchestrator] Calling object detection...") | |
| try: | |
| results["objects"] = generate_object_detection.call(video_bytes) | |
| except Exception as e: | |
| print(f"[Orchestrator] Error in object detection: {e}") | |
| results["objects"] = [{"error": f"Object Detection Error: {str(e)}"}] # Ensure list type for error | |
| print("[Orchestrator] All analyses attempted. Storing results in cache.") | |
| try: | |
| await video_analysis_cache.put(cache_key, results) | |
| print(f"[Orchestrator] Successfully cached results for key: {cache_key}") | |
| except Exception as e: | |
| print(f"[Orchestrator] Cache PUT error: {e}") | |
| return results | |
| # === FastAPI Endpoint for Video Analysis === | |
| def process_video_analysis(payload: VideoAnalysisRequestPayload): | |
| """FastAPI endpoint for comprehensive video analysis.""" | |
| print(f"[FastAPI Endpoint] Received request for video analysis") | |
| video_url = payload.video_url | |
| if not video_url: | |
| return JSONResponse(status_code=400, content={"error": "video_url must be provided in JSON payload."}) | |
| print(f"[FastAPI Endpoint] Processing video_url: {video_url}") | |
| try: | |
| # Download video | |
| import httpx | |
| with httpx.Client() as client: | |
| response = client.get(video_url, follow_redirects=True, timeout=60.0) | |
| response.raise_for_status() | |
| video_bytes = response.content | |
| if not video_bytes: | |
| return JSONResponse(status_code=400, content={"error": f"Failed to download video from URL: {video_url}. Content was empty."}) | |
| print(f"[FastAPI Endpoint] Successfully downloaded {len(video_bytes)} bytes from {video_url}") | |
| # Call comprehensive analysis | |
| analysis_results = analyze_video_comprehensive.call(video_bytes) | |
| print("[FastAPI Endpoint] Comprehensive analysis finished.") | |
| return JSONResponse(status_code=200, content=analysis_results) | |
| except httpx.RequestError as e: | |
| print(f"[FastAPI Endpoint] httpx.RequestError downloading video: {e}") | |
| return JSONResponse(status_code=400, content={"error": f"Error downloading video from URL: {video_url}. Details: {str(e)}"}) | |
| except Exception as e: | |
| print(f"[FastAPI Endpoint] Unexpected Exception during analysis: {e}") | |
| return JSONResponse(status_code=500, content={"error": f"Unexpected server error during analysis: {str(e)}"}) | |
| # === 6. Topic-Based Video Search === | |
| def find_video_urls_for_topic(topic: str, max_results: int = 3) -> List[str]: | |
| """Finds video URLs (YouTube, direct links) for a given topic using web search.""" | |
| print(f"[TopicSearch] Finding video URLs for topic: '{topic}', max_results={max_results}") | |
| # This import is inside because search_web is a tool available to Cascade, not directly to Modal runtime | |
| # This function will be called via .remote() and its implementation will be provided by Cascade's tool execution | |
| # For now, this is a placeholder for where the search_web tool would be invoked. | |
| # In a real Modal execution, this function would need to use a library like 'requests' and 'beautifulsoup' | |
| # or a dedicated search API (e.g., SerpApi, Google Search API) if called from within Modal directly. | |
| # Since Cascade calls this, it will use its 'search_web' tool. | |
| # Simulate search results for now, as direct tool call from Modal code isn't standard. | |
| # When Cascade calls this, it should intercept and use its search_web tool. | |
| # For local testing or direct Modal runs, this would need a real search implementation. | |
| # Placeholder: In a real scenario, this function would use a search tool/API. | |
| # For the purpose of this exercise, we'll assume Cascade's `search_web` tool will be used | |
| # when this function is invoked through Cascade's orchestration. | |
| # If running this Modal app standalone, this part needs a concrete implementation. | |
| # Example of what the logic would look like if we had search results: | |
| # query = f"{topic} video youtube OR .mp4 OR .mov" | |
| # search_results = [] # This would be populated by a search_web call | |
| # For demonstration, let's return some dummy URLs. Replace with actual search logic. | |
| # print(f"[TopicSearch] This is a placeholder. Actual search via Cascade's 'search_web' tool is expected.") | |
| # print(f"[TopicSearch] If running standalone, implement search logic here.") | |
| # The actual implementation will be handled by Cascade's search_web tool call | |
| # when this function is called via .remote() by another function that Cascade is orchestrating. | |
| # This function definition serves as a Modal-compatible stub for Cascade's tool. | |
| # This function is more of a declaration for Cascade to use its tool. | |
| # The actual search logic will be implicitly handled by Cascade's tool call mechanism | |
| # when `find_video_urls_for_topic.remote()` is used in a subsequent step orchestrated by Cascade. | |
| # If this function were to be *truly* self-contained within Modal and callable independently | |
| # *without* Cascade's direct tool invocation, it would need its own HTTP client and parsing logic here. | |
| # However, given the context of Cascade's operation, this stub is appropriate for Cascade to inject its tool usage. | |
| # The `search_web` tool will be called by Cascade when it orchestrates the call to this function. | |
| # So, this Python function in `modal_whisper_app.py` mostly defines the signature and intent. | |
| # We will rely on Cascade to make the actual search_web call and provide the results back to the orchestrator. | |
| # This function, when called by Cascade, will trigger a `search_web` tool call. | |
| # The tool call will be made by Cascade, not by the Modal runtime directly. | |
| # For now, let's assume this function's body is a placeholder for that interaction. | |
| # The key is that the *calling* function (e.g., analyze_videos_by_topic) will use .remote(), | |
| # and Cascade will manage the search_web tool call. | |
| # To make this runnable standalone (for testing Modal part without Cascade), one might add: | |
| # if modal.is_local(): | |
| # # basic requests/bs4 search or return dummy data | |
| # pass | |
| # For the flow with Cascade, this function primarily serves as a named Modal function | |
| # that Cascade understands it needs to provide search results for. | |
| # The actual search logic is deferred to Cascade's tool execution. | |
| # We will return an empty list here, expecting Cascade to populate it via its mechanisms when called. | |
| print(f"[TopicSearch] Function '{find_video_urls_for_topic.__name__}' called. Expecting Cascade to perform web search.") | |
| # This is a conceptual placeholder. The actual search will be done by Cascade's tool. | |
| # When `analyze_videos_by_topic` calls `find_video_urls_for_topic.remote()`, | |
| # Cascade will execute its `search_web` tool and the result will be used. | |
| return [] # Placeholder: Cascade will provide actual URLs via its search_web tool. | |
| # Helper function (not a Modal function) to extract video URLs from search results | |
| def extract_video_urls_from_search(search_results: List[Dict[str, str]], max_urls: int = 3) -> List[str]: | |
| """Extracts video URLs from a list of search result dictionaries.""" | |
| video_urls = [] | |
| seen_urls = set() | |
| # Regex for YouTube, Vimeo, and common video file extensions | |
| # Simplified YouTube regex to catch most common video and shorts links | |
| youtube_regex = r"(?:https?://)?(?:www\.)?(?:youtube\.com/(?:watch\?v=|embed/|shorts/)|youtu\.be/)([a-zA-Z0-9_-]{11})" | |
| vimeo_regex = r"(?:https?://)?(?:www\.)?vimeo\.com/(\d+)" | |
| direct_video_regex = r"https?://[^\s]+\.(mp4|mov|avi|webm|mkv)(\?[^\s]*)?" | |
| patterns = [ | |
| re.compile(youtube_regex), | |
| re.compile(vimeo_regex), | |
| re.compile(direct_video_regex) | |
| ] | |
| for item in search_results: | |
| url = item.get("link") or item.get("url") # Common keys for URL in search results | |
| if not url: | |
| continue | |
| for pattern in patterns: | |
| match = pattern.search(url) | |
| if match: | |
| # Reconstruct canonical YouTube URL if it's a short link or embed | |
| if pattern.pattern == youtube_regex and match.group(1): | |
| normalized_url = f"https://www.youtube.com/watch?v={match.group(1)}" | |
| else: | |
| normalized_url = url | |
| if normalized_url not in seen_urls: | |
| video_urls.append(normalized_url) | |
| seen_urls.add(normalized_url) | |
| if len(video_urls) >= max_urls: | |
| break | |
| if len(video_urls) >= max_urls: | |
| break | |
| print(f"[URL Extraction] Extracted {len(video_urls)} video URLs: {video_urls}") | |
| return video_urls | |
| # === 7. Topic-Based Video Analysis Orchestrator === | |
| async def _download_and_analyze_one_video(client: httpx.AsyncClient, video_url: str, topic: str) -> Dict[str, Any]: | |
| """Helper to download and analyze a single video. Returns result or error dict.""" | |
| print(f"[TopicAnalysisWorker] Processing video URL for topic '{topic}': {video_url}") | |
| try: | |
| # 1. Download video | |
| print(f"[TopicAnalysisWorker] Downloading video from: {video_url}") | |
| response = await client.get(video_url) | |
| response.raise_for_status() # Raise HTTPError for bad responses (4XX or 5XX) | |
| video_bytes = await response.aread() | |
| print(f"[TopicAnalysisWorker] Downloaded {len(video_bytes)} bytes from {video_url}") | |
| if not video_bytes: | |
| raise ValueError("Downloaded video content is empty.") | |
| # 2. Analyze video | |
| analysis_result = await analyze_video_comprehensive.coro(video_bytes) | |
| # Check if the analysis itself returned an error structure | |
| if isinstance(analysis_result, dict) and any(key + "_error" in analysis_result for key in ["transcription", "caption", "actions", "objects"]): | |
| print(f"[TopicAnalysisWorker] Comprehensive analysis for {video_url} reported errors: {analysis_result}") | |
| return {"url": video_url, "error_type": "analysis_error", "error_details": analysis_result} | |
| else: | |
| return {"url": video_url, "analysis": analysis_result} | |
| except httpx.HTTPStatusError as e: | |
| print(f"[TopicAnalysisWorker] HTTP error downloading {video_url}: {e}") | |
| return {"url": video_url, "error_type": "download_error", "error_details": f"HTTP {e.response.status_code}: {e.response.text[:200]}"} | |
| except httpx.RequestError as e: | |
| print(f"[TopicAnalysisWorker] Request error downloading {video_url}: {e}") | |
| return {"url": video_url, "error_type": "download_error", "error_details": f"Failed to download: {str(e)}"} | |
| except Exception as e: | |
| print(f"[TopicAnalysisWorker] Error processing video {video_url}: {e}") | |
| import traceback | |
| # Consider logging traceback.format_exc() instead of printing if running in a less verbose environment | |
| # traceback.print_exc() # This might be too verbose for regular Modal logs | |
| return {"url": video_url, "error_type": "processing_error", "error_details": str(e), "traceback": traceback.format_exc()[:1000]} | |
| async def analyze_videos_by_topic(video_urls: List[str], topic: str) -> Dict[str, Any]: | |
| """Analyzes a list of videos (by URL) concurrently and aggregates results for a topic.""" | |
| print(f"[TopicAnalysis] Starting concurrent analysis for topic: '{topic}' with {len(video_urls)} video(s).") | |
| results_aggregator = { | |
| "topic": topic, | |
| "analyzed_videos": [], | |
| "errors": [] | |
| } | |
| if not video_urls: | |
| results_aggregator["errors"].append({"topic_error": "No video URLs provided or found for the topic."}) | |
| return results_aggregator | |
| async with httpx.AsyncClient(timeout=300.0) as client: # 5 min timeout for individual downloads | |
| tasks = [_download_and_analyze_one_video(client, url, topic) for url in video_urls] | |
| # return_exceptions=True allows us to get results for successful tasks even if others fail | |
| individual_results = await asyncio.gather(*tasks, return_exceptions=True) | |
| for res_or_exc in individual_results: | |
| if isinstance(res_or_exc, Exception): | |
| # This handles exceptions not caught within _download_and_analyze_one_video itself (should be rare) | |
| # Or if return_exceptions=True was used and _download_and_analyze_one_video raised an unhandled one. | |
| print(f"[TopicAnalysis] An unexpected exception occurred during asyncio.gather: {res_or_exc}") | |
| results_aggregator["errors"].append({"url": "unknown_url_due_to_gather_exception", "processing_error": str(res_or_exc)}) | |
| elif isinstance(res_or_exc, dict): | |
| if "error_type" in res_or_exc: | |
| results_aggregator["errors"].append(res_or_exc) # Append the error dict directly | |
| elif "analysis" in res_or_exc: | |
| results_aggregator["analyzed_videos"].append(res_or_exc) | |
| else: | |
| print(f"[TopicAnalysis] Received an unexpected dictionary structure: {res_or_exc}") | |
| results_aggregator["errors"].append({"url": res_or_exc.get("url", "unknown"), "processing_error": "Unknown result structure"}) | |
| else: | |
| print(f"[TopicAnalysis] Received an unexpected result type from asyncio.gather: {type(res_or_exc)}") | |
| results_aggregator["errors"].append({"url": "unknown", "processing_error": f"Unexpected result type: {type(res_or_exc)}"}) | |
| print(f"[TopicAnalysis] Finished concurrent analysis for topic '{topic}'.") | |
| return results_aggregator | |
| # === Gradio Interface === | |
| def video_analyzer_gradio_ui(): | |
| print("[Gradio] UI function called to define interface.") | |
| def analyze_video_all_models(video_filepath): | |
| print(f"[Gradio] Received video filepath for analysis: {video_filepath}") | |
| if not video_filepath or not os.path.exists(video_filepath): | |
| return "Error: Video file path is invalid or does not exist.", "", "[]", "[]" | |
| with open(video_filepath, "rb") as f: | |
| video_bytes_content = f.read() | |
| print(f"[Gradio] Read {len(video_bytes_content)} bytes from video path: {video_filepath}") | |
| if not video_bytes_content: | |
| return "Error: Could not read video bytes.", "", "[]", "[]" | |
| print("[Gradio] Calling Whisper...") | |
| transcription = transcribe_video_with_whisper.call(video_bytes_content) | |
| print(f"[Gradio] Whisper result length: {len(transcription)}") | |
| print("[Gradio] Calling SpaceTimeGPT...") | |
| captions = generate_captions_with_spacetimegpt.call(video_bytes_content) | |
| print(f"[Gradio] SpaceTimeGPT result: {captions}") | |
| print("[Gradio] Calling VideoMAE...") | |
| action_labels = generate_action_labels.call(video_bytes_content) | |
| print(f"[Gradio] VideoMAE result: {action_labels}") | |
| print("[Gradio] Calling DETR...") | |
| object_detections = generate_object_detection.call(video_bytes_content) | |
| print(f"[Gradio] DETR result: {object_detections}") | |
| return transcription, captions, str(action_labels), str(object_detections) | |
| with gr.Blocks(title="Comprehensive Video Analyzer", theme=gr.themes.Soft()) as demo: | |
| gr.Markdown("# Comprehensive Video Analyzer") | |
| gr.Markdown("Upload a video to get transcription, captions, action labels, and object detections.") | |
| with gr.Row(): | |
| video_input = gr.Video(label="Upload Video", sources=["upload"], type="filepath") | |
| submit_button = gr.Button("Analyze Video", variant="primary") | |
| with gr.Tabs(): | |
| with gr.TabItem("Transcription (Whisper)"): | |
| transcription_output = gr.Textbox(label="Transcription", lines=10, interactive=False) | |
| with gr.TabItem("Dense Captions (SpaceTimeGPT)"): | |
| caption_output = gr.Textbox(label="Captions", lines=10, interactive=False) | |
| with gr.TabItem("Action Recognition (VideoMAE)"): | |
| action_output = gr.Textbox(label="Predicted Actions (JSON format)", lines=10, interactive=False) | |
| with gr.TabItem("Object Detection (DETR)"): | |
| object_output = gr.Textbox(label="Detected Objects (JSON format)", lines=10, interactive=False) | |
| submit_button.click( | |
| fn=analyze_video_all_models, | |
| inputs=[video_input], | |
| outputs=[transcription_output, caption_output, action_output, object_output] | |
| ) | |
| gr.Markdown("### Example Video") | |
| gr.Markdown("You can test with a short video. Processing may take a few minutes depending on video length and model inference times.") | |
| print("[Gradio] UI definition complete.") | |
| return gr.routes.App.create_app(demo) | |