Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| streetsoundtext.py - A pipeline that downloads Google Street View panoramas, | |
| extracts perspective views, and analyzes them for sound information. | |
| """ | |
| import os | |
| import requests | |
| import argparse | |
| import numpy as np | |
| import torch | |
| import time | |
| from PIL import Image | |
| from io import BytesIO | |
| from config import LOGS_DIR | |
| import torchvision.transforms as T | |
| from torchvision.transforms.functional import InterpolationMode | |
| from transformers import AutoModel, AutoTokenizer | |
| from utils import sample_perspective_img | |
| import cv2 | |
| log_dir = LOGS_DIR | |
| os.makedirs(log_dir, exist_ok=True) # Creates the directory if it doesn't exist | |
| # soundscape_query = "<image>\nWhat can we expect to hear from the location captured in this image? Name the around five nouns. Avoid speculation and provide a concise response including sound sources visible in the image." | |
| soundscape_query = """<image> | |
| Identify 5 potential sound sources visible in this image. For each source, provide both the noun and a brief description of its typical sound. | |
| Format your response exactly like these examples (do not include the word "Noun:" in your response): | |
| Car: engine humming with occasional honking. | |
| River: gentle flowing water with subtle splashing sounds. | |
| Trees: rustling leaves moved by the wind. | |
| """ | |
| # Constants | |
| IMAGENET_MEAN = (0.485, 0.456, 0.406) | |
| IMAGENET_STD = (0.229, 0.224, 0.225) | |
| # Model Leaderboard Paths | |
| MODEL_LEADERBOARD = { | |
| "intern_2_5-8B": "OpenGVLab/InternVL2_5-8B-MPO", | |
| "intern_2_5-4B": "OpenGVLab/InternVL2_5-4B-MPO", | |
| } | |
| class StreetViewDownloader: | |
| """Downloads panoramic images from Google Street View""" | |
| def __init__(self): | |
| # URLs for API requests | |
| # https://www.google.ca/maps/rpc/photo/listentityphotos?authuser=0&hl=en&gl=us&pb=!1e3!5m45!2m2!1i203!2i100!3m3!2i4!3sCAEIBAgFCAYgAQ!5b1!7m33!1m3!1e1!2b0!3e3!1m3!1e2!2b1!3e2!1m3!1e2!2b0!3e3!1m3!1e8!2b0!3e3!1m3!1e10!2b0!3e3!1m3!1e10!2b1!3e2!1m3!1e10!2b0!3e4!1m3!1e9!2b1!3e2!2b1!8m0!9b0!11m1!4b1!6m3!1sI63QZ8b4BcSli-gPvPHf-Qc!7e81!15i11021!9m2!2d-90.30324219145255!3d38.636242944711036!10d91.37627840655999 | |
| #self.panoid_req = 'https://www.google.com/maps/preview/reveal?authuser=0&hl=en&gl=us&pb=!2m9!1m3!1d82597.14038230096!2d{}!3d{}!2m0!3m2!1i1523!2i1272!4f13.1!3m2!2d{}!3d{}!4m2!1syPETZOjwLvCIptQPiJum-AQ!7e81!5m5!2m4!1i96!2i64!3i1!4i8' | |
| self.panoid_req = 'https://www.google.ca/maps/rpc/photo/listentityphotos?authuser=0&hl=en&gl=us&pb=!1e3!5m45!2m2!1i203!2i100!3m3!2i4!3sCAEIBAgFCAYgAQ!5b1!7m33!1m3!1e1!2b0!3e3!1m3!1e2!2b1!3e2!1m3!1e2!2b0!3e3!1m3!1e8!2b0!3e3!1m3!1e10!2b0!3e3!1m3!1e10!2b1!3e2!1m3!1e10!2b0!3e4!1m3!1e9!2b1!3e2!2b1!8m0!9b0!11m1!4b1!6m3!1sI63QZ8b4BcSli-gPvPHf-Qc!7e81!15i11021!9m2!2d{}!3d{}!10d25' | |
| # https://www.google.com/maps/photometa/v1?authuser=0&hl=en&gl=us&pb=!1m4!1smaps_sv.tactile!11m2!2m1!1b1!2m2!1sen!2sus!3m3!1m2!1e2!2s{}!4m61!1e1!1e2!1e3!1e4!1e5!1e6!1e8!1e12!1e17!2m1!1e1!4m1!1i48!5m1!1e1!5m1!1e2!6m1!1e1!6m1!1e2!9m36!1m3!1e2!2b1!3e2!1m3!1e2!2b0!3e3!1m3!1e3!2b1!3e2!1m3!1e3!2b0!3e3!1m3!1e8!2b0!3e3!1m3!1e1!2b0!3e3!1m3!1e4!2b0!3e3!1m3!1e10!2b1!3e2!1m3!1e10!2b0!3e3!11m2!3m1!4b1 # vmSzE7zkK2eETwAP_r8UdQ | |
| # https://www.google.ca/maps/photometa/v1?authuser=0&hl=en&gl=us&pb=!1m4!1smaps_sv.tactile!11m2!2m1!1b1!2m2!1sen!2sus!3m3!1m2!1e2!2s{}!4m61!1e1!1e2!1e3!1e4!1e5!1e6!1e8!1e12!1e17!2m1!1e1!4m1!1i48!5m1!1e1!5m1!1e2!6m1!1e1!6m1!1e2!9m36!1m3!1e2!2b1!3e2!1m3!1e2!2b0!3e3!1m3!1e3!2b1!3e2!1m3!1e3!2b0!3e3!1m3!1e8!2b0!3e3!1m3!1e1!2b0!3e3!1m3!1e4!2b0!3e3!1m3!1e10!2b1!3e2!1m3!1e10!2b0!3e3!11m2!3m1!4b1 # -9HfuNFUDOw_IP5SA5IspA | |
| self.photometa_req = 'https://www.google.com/maps/photometa/v1?authuser=0&hl=en&gl=us&pb=!1m4!1smaps_sv.tactile!11m2!2m1!1b1!2m2!1sen!2sus!3m5!1m2!1e2!2s{}!2m1!5s0x87d8b49f53fc92e9:0x6ecb6e520c6f4d9f!4m57!1e1!1e2!1e3!1e4!1e5!1e6!1e8!1e12!2m1!1e1!4m1!1i48!5m1!1e1!5m1!1e2!6m1!1e1!6m1!1e2!9m36!1m3!1e2!2b1!3e2!1m3!1e2!2b0!3e3!1m3!1e3!2b1!3e2!1m3!1e3!2b0!3e3!1m3!1e8!2b0!3e3!1m3!1e1!2b0!3e3!1m3!1e4!2b0!3e3!1m3!1e10!2b1!3e2!1m3!1e10!2b0!3e3' | |
| self.panimg_req = 'https://streetviewpixels-pa.googleapis.com/v1/tile?cb_client=maps_sv.tactile&panoid={}&x={}&y={}&zoom={}' | |
| def get_image_id(self, lat, lon): | |
| """Get Street View panorama ID for given coordinates""" | |
| null = None | |
| pr_response = requests.get(self.panoid_req.format(lon, lat, lon, lat)) | |
| if pr_response.status_code != 200: | |
| error_message = f"Error fetching panorama ID: HTTP {pr_response.status_code}" | |
| if pr_response.status_code == 400: | |
| error_message += " - Bad request. Check coordinates format." | |
| elif pr_response.status_code == 401 or pr_response.status_code == 403: | |
| error_message += " - Authentication error. Check API key and permissions." | |
| elif pr_response.status_code == 404: | |
| error_message += " - No panorama found at these coordinates." | |
| elif pr_response.status_code == 429: | |
| error_message += " - Rate limit exceeded. Try again later." | |
| elif pr_response.status_code >= 500: | |
| error_message += " - Server error. Try again later." | |
| return None | |
| pr = BytesIO(pr_response.content).getvalue().decode('utf-8') | |
| pr = eval(pr[pr.index('\n'):]) | |
| try: | |
| panoid = pr[0][0][0] | |
| except: | |
| return None | |
| return panoid | |
| def download_image(self, lat, lon, zoom=1): | |
| """Download Street View panorama and metadata""" | |
| null = None | |
| panoid = self.get_image_id(lat, lon) | |
| if panoid is None: | |
| raise ValueError(f"get_image_id failed() at coordinates: {lat}, {lon}") | |
| # Get metadata | |
| pm_response = requests.get(self.photometa_req.format(panoid)) | |
| pm = BytesIO(pm_response.content).getvalue().decode('utf-8') | |
| pm = eval(pm[pm.index('\n'):]) | |
| pan_list = pm[1][0][5][0][3][0] | |
| # Extract relevant info | |
| pid = pan_list[0][0][1] | |
| plat = pan_list[0][2][0][2] | |
| plon = pan_list[0][2][0][3] | |
| p_orient = pan_list[0][2][2][0] | |
| # Download image tiles and assemble panorama | |
| img_part_inds = [(x, y) for x in range(2**zoom) for y in range(2**(zoom-1))] | |
| img = np.zeros((512*(2**(zoom-1)), 512*(2**zoom), 3), dtype=np.uint8) | |
| for x, y in img_part_inds: | |
| sub_img_response = requests.get(self.panimg_req.format(pid, x, y, zoom)) | |
| sub_img = np.array(Image.open(BytesIO(sub_img_response.content))) | |
| img[512*y:512*(y+1), 512*x:512*(x+1)] = sub_img | |
| if (img[-1] == 0).all(): | |
| # raise ValueError("Failed to download complete panorama") | |
| print("Failed to download complete panorama") | |
| return img, pid, plat, plon, p_orient | |
| class PerspectiveExtractor: | |
| """Extracts perspective views from panoramic images""" | |
| def __init__(self, output_shape=(256, 256), fov=(90, 90)): | |
| self.output_shape = output_shape | |
| self.fov = fov | |
| def extract_views(self, pano_img, face_size=512): | |
| """Extract front, back, left, and right views based on orientation""" | |
| # orientations = { | |
| # "front": (0, p_orient, 0), # Align front with real orientation | |
| # "back": (0, p_orient + 180, 0), # Behind | |
| # "left": (0, p_orient - 90, 0), # Left side | |
| # "right": (0, p_orient + 90, 0), # Right side | |
| # } | |
| # cutouts = {} | |
| # for view, rot in orientations.items(): | |
| # cutout, fov, applied_rot = sample_perspective_img( | |
| # pano_img, self.output_shape, fov=self.fov, rot=rot | |
| # ) | |
| # cutouts[view] = cutout | |
| # return cutouts | |
| """ | |
| Convert ERP panorama to four cubic faces: Front, Left, Back, Right. | |
| Args: | |
| erp_img (numpy.ndarray): The input equirectangular image. | |
| face_size (int): The size of each cubic face. | |
| Returns: | |
| dict: A dictionary with the four cube faces. | |
| """ | |
| # Get ERP dimensions | |
| h_erp, w_erp, _ = pano_img.shape | |
| # Define cube face directions (yaw, pitch, roll) | |
| cube_faces = { | |
| "front": (0, 0), | |
| "left": (90, 0), | |
| "back": (180, 0), | |
| "right": (-90, 0), | |
| } | |
| # Output faces | |
| faces = {} | |
| # Generate each face | |
| for face_name, (yaw, pitch) in cube_faces.items(): | |
| # Create a perspective transformation matrix | |
| fov = 90 # Field of view | |
| K = np.array([ | |
| [face_size / (2 * np.tan(np.radians(fov / 2))), 0, face_size / 2], | |
| [0, face_size / (2 * np.tan(np.radians(fov / 2))), face_size / 2], | |
| [0, 0, 1] | |
| ]) | |
| # Generate 3D world coordinates for the cube face | |
| x, y = np.meshgrid(np.linspace(-1, 1, face_size), np.linspace(-1, 1, face_size)) | |
| z = np.ones_like(x) | |
| # Normalize 3D points | |
| points_3d = np.stack((x, y, z), axis=-1) # Shape: (H, W, 3) | |
| points_3d /= np.linalg.norm(points_3d, axis=-1, keepdims=True) | |
| # Apply rotation to align with the cube face | |
| yaw_rad, pitch_rad = np.radians(yaw), np.radians(pitch) | |
| Ry = np.array([[np.cos(yaw_rad), 0, np.sin(yaw_rad)], [0, 1, 0], [-np.sin(yaw_rad), 0, np.cos(yaw_rad)]]) | |
| Rx = np.array([[1, 0, 0], [0, np.cos(pitch_rad), -np.sin(pitch_rad)], [0, np.sin(pitch_rad), np.cos(pitch_rad)]]) | |
| R = Ry @ Rx | |
| # Rotate points | |
| points_3d_rot = np.einsum('ij,hwj->hwi', R, points_3d) | |
| # Convert 3D to spherical coordinates | |
| lon = np.arctan2(points_3d_rot[..., 0], points_3d_rot[..., 2]) | |
| lat = np.arcsin(points_3d_rot[..., 1]) | |
| # Map spherical coordinates to ERP image coordinates | |
| x_erp = (w_erp * (lon / (2 * np.pi) + 0.5)).astype(np.float32) | |
| y_erp = (h_erp * (0.5 - lat / np.pi)).astype(np.float32) | |
| # Sample pixels from ERP image | |
| face_img = cv2.remap(pano_img, x_erp, y_erp, interpolation=cv2.INTER_LINEAR, borderMode=cv2.BORDER_WRAP) | |
| cv2.rotate(face_img, cv2.ROTATE_180, face_img) | |
| faces[face_name] = face_img | |
| return faces | |
| class ImageAnalyzer: | |
| """Analyzes images using Vision-Language Models""" | |
| def __init__(self, model_name="intern_2_5-4B", use_cuda=True): | |
| self.model_name = model_name | |
| self.use_cuda = use_cuda and torch.cuda.is_available() | |
| self.model, self.tokenizer, self.device = self._load_model() | |
| def _load_model(self): | |
| """Load selected Vision-Language Model""" | |
| if self.model_name not in MODEL_LEADERBOARD: | |
| raise ValueError(f"Model '{self.model_name}' not found. Choose from: {list(MODEL_LEADERBOARD.keys())}") | |
| model_path = MODEL_LEADERBOARD[self.model_name] | |
| # Configure device and parameters | |
| if self.use_cuda: | |
| device = torch.device("cuda") | |
| torch_dtype = torch.bfloat16 | |
| use_flash_attn = True | |
| else: | |
| device = torch.device("cpu") | |
| torch_dtype = torch.float32 | |
| use_flash_attn = False | |
| # Load model and tokenizer | |
| model = AutoModel.from_pretrained( | |
| model_path, | |
| torch_dtype=torch_dtype, | |
| load_in_8bit=False, | |
| low_cpu_mem_usage=True, | |
| use_flash_attn=use_flash_attn, | |
| trust_remote_code=True, | |
| ).eval().to(device) | |
| tokenizer = AutoTokenizer.from_pretrained( | |
| model_path, | |
| trust_remote_code=True, | |
| use_fast=False | |
| ) | |
| return model, tokenizer, device | |
| def _build_transform(self, input_size=448): | |
| """Create image transformation pipeline""" | |
| transform = T.Compose([ | |
| T.Lambda(lambda img: img.convert('RGB') if img.mode != 'RGB' else img), | |
| T.Resize((input_size, input_size), interpolation=InterpolationMode.BICUBIC), | |
| T.ToTensor(), | |
| T.Normalize(mean=IMAGENET_MEAN, std=IMAGENET_STD) | |
| ]) | |
| return transform | |
| def _find_closest_aspect_ratio(self, aspect_ratio, target_ratios, width, height, image_size): | |
| """Find closest aspect ratio for image tiling""" | |
| best_ratio_diff = float('inf') | |
| best_ratio = (1, 1) | |
| area = width * height | |
| for ratio in target_ratios: | |
| target_aspect_ratio = ratio[0] / ratio[1] | |
| ratio_diff = abs(aspect_ratio - target_aspect_ratio) | |
| if ratio_diff < best_ratio_diff: | |
| best_ratio_diff = ratio_diff | |
| best_ratio = ratio | |
| elif ratio_diff == best_ratio_diff: | |
| if area > 0.5 * image_size * image_size * ratio[0] * ratio[1]: | |
| best_ratio = ratio | |
| return best_ratio | |
| def _preprocess_image(self, image, min_num=1, max_num=12, image_size=448, use_thumbnail=False): | |
| """Preprocess image for model input""" | |
| orig_width, orig_height = image.size | |
| aspect_ratio = orig_width / orig_height | |
| # Calculate possible image aspect ratios | |
| target_ratios = set( | |
| (i, j) for n in range(min_num, max_num + 1) | |
| for i in range(1, n + 1) | |
| for j in range(1, n + 1) | |
| if i * j <= max_num and i * j >= min_num | |
| ) | |
| target_ratios = sorted(target_ratios, key=lambda x: x[0] * x[1]) | |
| # Find closest aspect ratio | |
| target_aspect_ratio = self._find_closest_aspect_ratio( | |
| aspect_ratio, target_ratios, orig_width, orig_height, image_size | |
| ) | |
| # Calculate target dimensions | |
| target_width = image_size * target_aspect_ratio[0] | |
| target_height = image_size * target_aspect_ratio[1] | |
| blocks = target_aspect_ratio[0] * target_aspect_ratio[1] | |
| # Resize and split image | |
| resized_img = image.resize((target_width, target_height)) | |
| processed_images = [] | |
| for i in range(blocks): | |
| box = ( | |
| (i % (target_width // image_size)) * image_size, | |
| (i // (target_width // image_size)) * image_size, | |
| ((i % (target_width // image_size)) + 1) * image_size, | |
| ((i // (target_width // image_size)) + 1) * image_size | |
| ) | |
| split_img = resized_img.crop(box) | |
| processed_images.append(split_img) | |
| assert len(processed_images) == blocks | |
| if use_thumbnail and len(processed_images) != 1: | |
| thumbnail_img = image.resize((image_size, image_size)) | |
| processed_images.append(thumbnail_img) | |
| return processed_images | |
| def load_image(self, image_path, input_size=448, max_num=12): | |
| """Load and process image for analysis""" | |
| image = Image.open(image_path).convert('RGB') | |
| transform = self._build_transform(input_size) | |
| images = self._preprocess_image(image, image_size=input_size, use_thumbnail=True, max_num=max_num) | |
| pixel_values = [transform(image) for image in images] | |
| pixel_values = torch.stack(pixel_values) | |
| return pixel_values | |
| def analyze_image(self, image_path, max_num=12): | |
| """Analyze image for expected sounds""" | |
| # Load and process image | |
| pixel_values = self.load_image(image_path, max_num=max_num) | |
| # Move to device with appropriate dtype | |
| if self.device.type == "cuda": | |
| pixel_values = pixel_values.to(torch.bfloat16).to(self.device) | |
| else: | |
| pixel_values = pixel_values.to(torch.float32).to(self.device) | |
| # Create sound-focused query | |
| query = soundscape_query | |
| # Generate response | |
| generation_config = dict(max_new_tokens=1024, do_sample=True) | |
| response = self.model.chat(self.tokenizer, pixel_values, query, generation_config) | |
| return response | |
| class StreetSoundTextPipeline: | |
| """Complete pipeline for Street View sound analysis""" | |
| def __init__(self, log_dir="logs", model_name="intern_2_5-4B", use_cuda=True): | |
| # Create log directory if it doesn't exist | |
| self.log_dir = log_dir | |
| os.makedirs(log_dir, exist_ok=True) | |
| # Initialize components | |
| self.downloader = StreetViewDownloader() | |
| self.extractor = PerspectiveExtractor() | |
| # self.analyzer = ImageAnalyzer(model_name=model_name, use_cuda=use_cuda) | |
| self.analyzer = None | |
| self.model_name = model_name | |
| self.use_cuda = use_cuda | |
| def _load_analyzer(self): | |
| if self.analyzer is None: | |
| self.analyzer = ImageAnalyzer(model_name=self.model_name, use_cuda=self.use_cuda) | |
| def _unload_analyzer(self): | |
| if self.analyzer is not None: | |
| if hasattr(self.analyzer, 'model') and self.analyzer.model is not None: | |
| self.analyzer.model = self.analyzer.model.to("cpu") | |
| del self.analyzer.model | |
| self.analyzer.model = None | |
| torch.cuda.empty_cache() | |
| self.analyzer = None | |
| def process(self, lat, lon, view, panoramic=False): | |
| """ | |
| Process a location to generate sound description for specified view or all views | |
| Args: | |
| lat (float): Latitude | |
| lon (float): Longitude | |
| view (str): Perspective view ('front', 'back', 'left', 'right') | |
| panoramic (bool): If True, process all views instead of just the specified one | |
| Returns: | |
| dict or list: Results including panorama info and sound description(s) | |
| """ | |
| if view not in ["front", "back", "left", "right"]: | |
| raise ValueError(f"Invalid view: {view}. Choose from: front, back, left, right") | |
| # Step 1: Download panoramic image | |
| print(f"Downloading Street View panorama for coordinates: {lat}, {lon}") | |
| pano_path = os.path.join(self.log_dir, "panorama.jpg") | |
| pano_img, pid, plat, plon, p_orient = self.downloader.download_image(lat, lon) | |
| Image.fromarray(pano_img).save(pano_path) | |
| # Step 2: Extract perspective views | |
| print(f"Extracting perspective views with orientation: {p_orient}°") | |
| cutouts = self.extractor.extract_views(pano_img, 512) | |
| # Save all views | |
| for v, img in cutouts.items(): | |
| view_path = os.path.join(self.log_dir, f"{v}.jpg") | |
| Image.fromarray(img).save(view_path) | |
| self._load_analyzer() | |
| print("\n[DEBUG] Current soundscape query:") | |
| print(soundscape_query) | |
| print("-" * 50) | |
| if panoramic: | |
| # Process all views | |
| print(f"Analyzing all views for sound information") | |
| results = [] | |
| for current_view in ["front", "back", "left", "right"]: | |
| view_path = os.path.join(self.log_dir, f"{current_view}.jpg") | |
| sound_description = self.analyzer.analyze_image(view_path) | |
| view_result = { | |
| "panorama_id": pid, | |
| "coordinates": {"lat": plat, "lon": plon}, | |
| "orientation": p_orient, | |
| "view": current_view, | |
| "sound_description": sound_description, | |
| "files": { | |
| "panorama": pano_path, | |
| "view_path": view_path | |
| } | |
| } | |
| results.append(view_result) | |
| self._unload_analyzer() | |
| return results | |
| else: | |
| # Process only the selected view | |
| view_path = os.path.join(self.log_dir, f"{view}.jpg") | |
| print(f"Analyzing {view} view for sound information") | |
| sound_description = self.analyzer.analyze_image(view_path) | |
| self._unload_analyzer() | |
| # Prepare results | |
| results = { | |
| "panorama_id": pid, | |
| "coordinates": {"lat": plat, "lon": plon}, | |
| "orientation": p_orient, | |
| "view": view, | |
| "sound_description": sound_description, | |
| "files": { | |
| "panorama": pano_path, | |
| "views": {v: os.path.join(self.log_dir, f"{v}.jpg") for v in cutouts.keys()} | |
| } | |
| } | |
| return results | |
| def parse_location(location_str): | |
| """Parse location string in format 'lat,lon' into float tuple""" | |
| try: | |
| lat, lon = map(float, location_str.split(',')) | |
| return lat, lon | |
| except ValueError: | |
| raise argparse.ArgumentTypeError("Location must be in format 'latitude,longitude'") | |
| def generate_caption(lat, lon, view="front", model="intern_2_5-4B", cpu_only=False, panoramic=False): | |
| """ | |
| Generate sound captions for one or all views of a street view location | |
| Args: | |
| lat (float/str): Latitude | |
| lon (float/str): Longitude | |
| view (str): Perspective view ('front', 'back', 'left', 'right') | |
| model (str): Model name to use for analysis | |
| cpu_only (bool): Whether to force CPU usage | |
| panoramic (bool): If True, process all views instead of just the specified one | |
| Returns: | |
| dict or list: Results with sound descriptions | |
| """ | |
| pipeline = StreetSoundTextPipeline( | |
| log_dir=log_dir, | |
| model_name=model, | |
| use_cuda=not cpu_only | |
| ) | |
| try: | |
| results = pipeline.process(lat, lon, view, panoramic=panoramic) | |
| if panoramic: | |
| # Process results for all views | |
| print(f"Generated captions for all views at location: {lat}, {lon}") | |
| else: | |
| print(f"Generated caption for {view} view at location: {lat}, {lon}") | |
| return results | |
| except Exception as e: | |
| print(f"Error: {str(e)}") | |
| return None |