""" Audio Preprocessing Module for Multilingual Audio Intelligence System This module handles the standardization of diverse audio inputs into a consistent format suitable for downstream ML models. It supports various audio formats (wav, mp3, ogg, flac), sample rates (8k-48k), bit depths (4-32 bits), and handles SNR variations as specified in PS-6 requirements. Key Features: - Format conversion and standardization - Intelligent resampling to 16kHz - Stereo to mono conversion - Volume normalization for SNR robustness - Memory-efficient processing - Robust error handling Dependencies: pydub, librosa, numpy System Dependencies: ffmpeg (for format conversion) """ import os import logging import numpy as np import librosa from pydub import AudioSegment from pydub.utils import which from typing import Tuple, Optional, Union, Dict, Any import tempfile import warnings import time from pathlib import Path # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Suppress librosa warnings for cleaner output warnings.filterwarnings("ignore", category=UserWarning, module="librosa") class AudioProcessor: """ Enhanced Audio Processor with Smart File Management and Hybrid Translation Support This class combines the original working functionality with new enhancements: - Original: 16kHz sample rate, mono conversion, normalization - NEW: Smart file analysis, chunking strategies, Indian language support - NEW: Integration with 3-tier hybrid translation system - NEW: Memory-efficient processing for large files """ def __init__(self, target_sample_rate: int = 16000, model_size: str = "small", enable_translation: bool = True, max_file_duration_minutes: int = 60, max_file_size_mb: int = 200): """ Initialize Enhanced AudioProcessor with both original and new capabilities. Args: target_sample_rate (int): Target sample rate in Hz (default: 16kHz) model_size (str): Whisper model size for transcription enable_translation (bool): Enable translation capabilities max_file_duration_minutes (int): Maximum file duration for processing max_file_size_mb (int): Maximum file size for processing """ # Original attributes self.target_sample_rate = target_sample_rate self.supported_formats = ['.wav', '.mp3', '.ogg', '.flac', '.m4a', '.aac'] # NEW: Enhanced attributes self.model_size = model_size self.enable_translation = enable_translation self.max_file_duration = max_file_duration_minutes self.max_file_size = max_file_size_mb # Initialize enhanced components self.whisper_model = None self.processing_stats = { 'files_processed': 0, 'total_processing_time': 0.0, 'chunks_processed': 0, 'languages_detected': set() } # Verify ffmpeg availability if not which("ffmpeg"): logger.warning("ffmpeg not found. Some format conversions may fail.") logger.info(f"โœ… Enhanced AudioProcessor initialized") logger.info(f" Model: {model_size}, Translation: {enable_translation}") logger.info(f" Limits: {max_file_duration_minutes}min, {max_file_size_mb}MB") def process_audio(self, audio_input: Union[str, bytes, np.ndarray], input_sample_rate: Optional[int] = None) -> Tuple[np.ndarray, int]: """ Main processing function that standardizes any audio input. Args: audio_input: Can be file path (str), audio bytes, or numpy array input_sample_rate: Required if audio_input is numpy array Returns: Tuple[np.ndarray, int]: (processed_audio_array, sample_rate) Raises: ValueError: If input format is unsupported or invalid FileNotFoundError: If audio file doesn't exist Exception: For processing errors """ try: # Determine input type and load audio if isinstance(audio_input, str): # File path input audio_array, original_sr = self._load_from_file(audio_input) elif isinstance(audio_input, bytes): # Bytes input (e.g., from uploaded file) audio_array, original_sr = self._load_from_bytes(audio_input) elif isinstance(audio_input, np.ndarray): # Numpy array input if input_sample_rate is None: raise ValueError("input_sample_rate must be provided for numpy array input") audio_array = audio_input.astype(np.float32) original_sr = input_sample_rate else: raise ValueError(f"Unsupported input type: {type(audio_input)}") logger.info(f"Loaded audio: {audio_array.shape}, {original_sr}Hz") # Apply preprocessing pipeline processed_audio = self._preprocess_pipeline(audio_array, original_sr) logger.info(f"Processed audio: {processed_audio.shape}, {self.target_sample_rate}Hz") return processed_audio, self.target_sample_rate except Exception as e: logger.error(f"Audio processing failed: {str(e)}") raise def _load_from_file(self, file_path: str) -> Tuple[np.ndarray, int]: """Load audio from file path.""" if not os.path.exists(file_path): raise FileNotFoundError(f"Audio file not found: {file_path}") file_ext = os.path.splitext(file_path)[1].lower() if file_ext not in self.supported_formats: raise ValueError(f"Unsupported format {file_ext}. Supported: {self.supported_formats}") try: # Use librosa for robust loading with automatic resampling audio_array, sample_rate = librosa.load(file_path, sr=None, mono=False) return audio_array, sample_rate except Exception as e: # Fallback to pydub for format conversion logger.warning(f"librosa failed, trying pydub: {e}") return self._load_with_pydub(file_path) def _load_from_bytes(self, audio_bytes: bytes) -> Tuple[np.ndarray, int]: """Load audio from bytes (e.g., uploaded file).""" # Create temporary file for processing with tempfile.NamedTemporaryFile(delete=False, suffix='.audio') as tmp_file: tmp_file.write(audio_bytes) tmp_path = tmp_file.name try: # Try to detect format and load audio_array, sample_rate = self._load_with_pydub(tmp_path) return audio_array, sample_rate finally: # Clean up temporary file try: os.unlink(tmp_path) except OSError: pass def _load_with_pydub(self, file_path: str) -> Tuple[np.ndarray, int]: """Load audio using pydub with format detection.""" try: # Let pydub auto-detect format audio_segment = AudioSegment.from_file(file_path) # Convert to numpy array samples = np.array(audio_segment.get_array_of_samples(), dtype=np.float32) # Handle stereo audio if audio_segment.channels == 2: samples = samples.reshape((-1, 2)) # Normalize to [-1, 1] range samples = samples / (2**15) # 16-bit normalization return samples, audio_segment.frame_rate except Exception as e: raise Exception(f"Failed to load audio with pydub: {str(e)}") def _preprocess_pipeline(self, audio_array: np.ndarray, original_sr: int) -> np.ndarray: """ Apply the complete preprocessing pipeline. Pipeline steps: 1. Convert stereo to mono 2. Resample to target sample rate 3. Normalize amplitude 4. Apply basic noise reduction (optional) """ # Step 1: Convert to mono if stereo if len(audio_array.shape) > 1 and audio_array.shape[0] == 2: # librosa format: (channels, samples) for stereo audio_array = np.mean(audio_array, axis=0) elif len(audio_array.shape) > 1 and audio_array.shape[1] == 2: # pydub format: (samples, channels) for stereo audio_array = np.mean(audio_array, axis=1) # Ensure 1D array audio_array = audio_array.flatten() logger.debug(f"After mono conversion: {audio_array.shape}") # Step 2: Resample if necessary if original_sr != self.target_sample_rate: audio_array = librosa.resample( audio_array, orig_sr=original_sr, target_sr=self.target_sample_rate, res_type='kaiser_best' # High quality resampling ) logger.debug(f"Resampled from {original_sr}Hz to {self.target_sample_rate}Hz") # Step 3: Amplitude normalization audio_array = self._normalize_audio(audio_array) # Step 4: Basic preprocessing for robustness audio_array = self._apply_preprocessing_filters(audio_array) return audio_array.astype(np.float32) def _normalize_audio(self, audio_array: np.ndarray) -> np.ndarray: """ Normalize audio amplitude to handle varying SNR conditions. Uses RMS-based normalization for better handling of varying signal-to-noise ratios (-5dB to 20dB as per PS-6 requirements). """ # Calculate RMS (Root Mean Square) rms = np.sqrt(np.mean(audio_array**2)) if rms > 0: # Target RMS level (prevents over-amplification) target_rms = 0.1 normalization_factor = target_rms / rms # Apply normalization with clipping protection normalized = audio_array * normalization_factor normalized = np.clip(normalized, -1.0, 1.0) logger.debug(f"RMS normalization: {rms:.4f} -> {target_rms:.4f}") return normalized return audio_array def _apply_preprocessing_filters(self, audio_array: np.ndarray) -> np.ndarray: """ Apply basic preprocessing filters for improved robustness. Includes: - DC offset removal - Light high-pass filtering (removes very low frequencies) """ # Remove DC offset audio_array = audio_array - np.mean(audio_array) # Simple high-pass filter to remove very low frequencies (< 80Hz) # This helps with handling background noise and rumble try: from scipy.signal import butter, filtfilt # Design high-pass filter nyquist = self.target_sample_rate / 2 cutoff = 80 / nyquist # 80Hz cutoff if cutoff < 1.0: # Valid frequency range b, a = butter(N=1, Wn=cutoff, btype='high') audio_array = filtfilt(b, a, audio_array) logger.debug("Applied high-pass filter (80Hz cutoff)") except ImportError: logger.debug("scipy not available, skipping high-pass filter") except Exception as e: logger.debug(f"High-pass filter failed: {e}") return audio_array def get_audio_info(self, audio_input: Union[str, bytes]) -> dict: """ Get detailed information about audio file without full processing. Returns: dict: Audio metadata including duration, sample rate, channels, etc. """ try: if isinstance(audio_input, str): # File path if not os.path.exists(audio_input): raise FileNotFoundError(f"Audio file not found: {audio_input}") audio_segment = AudioSegment.from_file(audio_input) else: # Bytes input with tempfile.NamedTemporaryFile(delete=False) as tmp_file: tmp_file.write(audio_input) tmp_path = tmp_file.name try: audio_segment = AudioSegment.from_file(tmp_path) finally: try: os.unlink(tmp_path) except OSError: pass return { 'duration_seconds': len(audio_segment) / 1000.0, 'sample_rate': audio_segment.frame_rate, 'channels': audio_segment.channels, 'sample_width': audio_segment.sample_width, 'frame_count': audio_segment.frame_count(), 'max_possible_amplitude': audio_segment.max_possible_amplitude } except Exception as e: logger.error(f"Failed to get audio info: {e}") return {} # NEW ENHANCED METHODS FOR COMPETITION-WINNING FEATURES def analyze_audio_file(self, file_path: str) -> 'AudioInfo': """ NEW: Analyze audio file and return comprehensive information. This supports our smart file management for large files. """ try: from dataclasses import dataclass @dataclass class AudioInfo: file_path: str duration_seconds: float size_mb: float sample_rate: int channels: int format: str @property def duration_minutes(self) -> float: return self.duration_seconds / 60.0 @property def is_large_file(self) -> bool: return self.duration_minutes > 30 or self.size_mb > 100 info = self.get_audio_info(file_path) file_size = os.path.getsize(file_path) / (1024 * 1024) # MB return AudioInfo( file_path=file_path, duration_seconds=info.get('duration_seconds', 0), size_mb=file_size, sample_rate=info.get('sample_rate', 0), channels=info.get('channels', 0), format=Path(file_path).suffix.lower() ) except Exception as e: logger.error(f"Failed to analyze audio file: {e}") raise def get_processing_recommendation(self, audio_info) -> Dict[str, Any]: """ NEW: Get smart processing recommendation based on file characteristics. Helps handle large files efficiently for competition requirements. """ if audio_info.duration_minutes > 60 or audio_info.size_mb > 200: return { 'strategy': 'chunk_33_percent', 'reason': 'Very large file - process 33% to avoid API limits', 'chunk_size': 0.33, 'warning': 'File is very large. Processing only 33% to prevent timeouts.' } elif audio_info.duration_minutes > 30 or audio_info.size_mb > 100: return { 'strategy': 'chunk_50_percent', 'reason': 'Large file - process 50% for efficiency', 'chunk_size': 0.50, 'warning': 'File is large. Processing 50% for optimal performance.' } else: return { 'strategy': 'process_full', 'reason': 'Normal sized file - full processing', 'chunk_size': 1.0, 'warning': None } def process_audio_file(self, file_path: str, enable_translation: bool = True) -> Dict[str, Any]: """ NEW: Enhanced audio file processing with smart management. This integrates all our new features while maintaining compatibility. """ start_time = time.time() try: logger.info(f"๐ŸŽต Processing audio file: {Path(file_path).name}") # Analyze file first audio_info = self.analyze_audio_file(file_path) recommendation = self.get_processing_recommendation(audio_info) logger.info(f"๐Ÿ“Š File Analysis:") logger.info(f" Duration: {audio_info.duration_minutes:.1f} minutes") logger.info(f" Size: {audio_info.size_mb:.1f} MB") logger.info(f" Strategy: {recommendation['strategy']}") # Process audio using original method processed_audio, sample_rate = self.process_audio(file_path) # Apply chunking strategy if needed if recommendation['chunk_size'] < 1.0: chunk_size = int(len(processed_audio) * recommendation['chunk_size']) processed_audio = processed_audio[:chunk_size] logger.info(f"๐Ÿ“ Applied {recommendation['strategy']}: using {recommendation['chunk_size']*100}% of audio") # Update stats self.processing_stats['files_processed'] += 1 self.processing_stats['total_processing_time'] += time.time() - start_time # Return comprehensive result return { 'processed_audio': processed_audio, 'sample_rate': sample_rate, 'audio_info': audio_info, 'recommendation': recommendation, 'processing_time': time.time() - start_time, 'status': 'success' } except Exception as e: logger.error(f"โŒ Audio processing failed: {e}") return { 'error': str(e), 'processing_time': time.time() - start_time, 'status': 'error' } def get_processing_stats(self) -> Dict[str, Any]: """ NEW: Get comprehensive processing statistics for monitoring. """ return { 'files_processed': self.processing_stats['files_processed'], 'total_processing_time': self.processing_stats['total_processing_time'], 'average_processing_time': ( self.processing_stats['total_processing_time'] / max(1, self.processing_stats['files_processed']) ), 'chunks_processed': self.processing_stats['chunks_processed'], 'languages_detected': list(self.processing_stats['languages_detected']), 'supported_formats': self.supported_formats, 'model_size': self.model_size, 'translation_enabled': self.enable_translation } def clear_cache(self): """ NEW: Clear caches and reset statistics. """ self.processing_stats = { 'files_processed': 0, 'total_processing_time': 0.0, 'chunks_processed': 0, 'languages_detected': set() } logger.info("๐Ÿงน AudioProcessor cache cleared") # Utility functions for common audio operations def validate_audio_file(file_path: str) -> bool: """ Quick validation of audio file without full loading. Args: file_path (str): Path to audio file Returns: bool: True if file appears to be valid audio """ try: processor = AudioProcessor() info = processor.get_audio_info(file_path) return info.get('duration_seconds', 0) > 0 except Exception: return False def estimate_processing_time(file_path: str) -> float: """ Estimate processing time based on audio duration. Args: file_path (str): Path to audio file Returns: float: Estimated processing time in seconds """ try: processor = AudioProcessor() info = processor.get_audio_info(file_path) duration = info.get('duration_seconds', 0) # Rough estimate: 0.1x to 0.3x real-time for preprocessing # depending on format conversion needs estimated_time = duration * 0.2 return max(estimated_time, 1.0) # Minimum 1 second except Exception: return 10.0 # Default estimate if __name__ == "__main__": # Example usage and testing processor = AudioProcessor() # Test with a sample file (if available) test_files = ["sample.wav", "sample.mp3", "test_audio.flac"] for test_file in test_files: if os.path.exists(test_file): try: print(f"\nTesting {test_file}:") # Get info info = processor.get_audio_info(test_file) print(f"Info: {info}") # Process audio, sr = processor.process_audio(test_file) print(f"Processed: shape={audio.shape}, sr={sr}") # Validate is_valid = validate_audio_file(test_file) print(f"Valid: {is_valid}") except Exception as e: print(f"Error processing {test_file}: {e}")