|
|
"""Example usage of CNN ONNX voicemail detector. |
|
|
|
|
|
This script demonstrates how to use the fast CNN voicemail detection model |
|
|
for both single file inference and real-time streaming scenarios. |
|
|
""" |
|
|
|
|
|
from pathlib import Path |
|
|
from typing import Tuple |
|
|
|
|
|
import librosa |
|
|
import numpy as np |
|
|
import onnxruntime as ort |
|
|
|
|
|
|
|
|
class CNNVoicemailDetector: |
|
|
"""Fast CNN-based voicemail detector using ONNX.""" |
|
|
|
|
|
def __init__(self, model_path: str = "model.onnx"): |
|
|
"""Initialize the detector. |
|
|
|
|
|
Args: |
|
|
model_path: Path to the ONNX model file |
|
|
""" |
|
|
|
|
|
sess_options = ort.SessionOptions() |
|
|
sess_options.graph_optimization_level = ( |
|
|
ort.GraphOptimizationLevel.ORT_ENABLE_ALL |
|
|
) |
|
|
self.session = ort.InferenceSession(model_path, sess_options) |
|
|
|
|
|
self.sample_rate = 16000 |
|
|
self.duration = 4.0 |
|
|
self.expected_samples = int(self.sample_rate * self.duration) |
|
|
|
|
|
|
|
|
self.n_fft = 512 |
|
|
self.hop_length = 256 |
|
|
self.n_mels = 128 |
|
|
self.fmin = 0 |
|
|
self.fmax = 8000 |
|
|
|
|
|
|
|
|
self.id2label = {0: "live_human", 1: "voicemail"} |
|
|
|
|
|
def extract_mel_spectrogram(self, audio: np.ndarray) -> np.ndarray: |
|
|
"""Extract mel-spectrogram features from audio. |
|
|
|
|
|
Args: |
|
|
audio: Audio array of shape (64000,) - 4 seconds at 16kHz |
|
|
|
|
|
Returns: |
|
|
Mel-spectrogram of shape (1, 1, 128, 251) |
|
|
""" |
|
|
|
|
|
mel_spec = librosa.feature.melspectrogram( |
|
|
y=audio, |
|
|
sr=self.sample_rate, |
|
|
n_fft=self.n_fft, |
|
|
hop_length=self.hop_length, |
|
|
n_mels=self.n_mels, |
|
|
fmin=self.fmin, |
|
|
fmax=self.fmax, |
|
|
) |
|
|
|
|
|
|
|
|
mel_spec_db = librosa.power_to_db(mel_spec, ref=np.max) |
|
|
|
|
|
|
|
|
mel_spec_normalized = (mel_spec_db - mel_spec_db.min()) / ( |
|
|
mel_spec_db.max() - mel_spec_db.min() + 1e-8 |
|
|
) |
|
|
|
|
|
|
|
|
return mel_spec_normalized.reshape(1, 1, 128, -1).astype(np.float32) |
|
|
|
|
|
def preprocess_audio(self, audio_path: str) -> np.ndarray: |
|
|
"""Load and preprocess audio file. |
|
|
|
|
|
Args: |
|
|
audio_path: Path to audio file |
|
|
|
|
|
Returns: |
|
|
Preprocessed mel-spectrogram ready for inference |
|
|
""" |
|
|
|
|
|
audio, sr = librosa.load(audio_path, sr=self.sample_rate, mono=True) |
|
|
|
|
|
|
|
|
audio_segment = audio[: self.expected_samples] |
|
|
|
|
|
|
|
|
if len(audio_segment) < self.expected_samples: |
|
|
audio_segment = np.pad( |
|
|
audio_segment, (0, self.expected_samples - len(audio_segment)) |
|
|
) |
|
|
|
|
|
|
|
|
return self.extract_mel_spectrogram(audio_segment) |
|
|
|
|
|
def predict(self, audio_path: str) -> Tuple[str, float, dict]: |
|
|
"""Detect voicemail from audio file. |
|
|
|
|
|
Args: |
|
|
audio_path: Path to audio file |
|
|
|
|
|
Returns: |
|
|
Tuple of (prediction, confidence, probabilities_dict) |
|
|
""" |
|
|
|
|
|
mel_spec = self.preprocess_audio(audio_path) |
|
|
|
|
|
|
|
|
outputs = self.session.run(None, {"input": mel_spec}) |
|
|
logits = outputs[0] |
|
|
|
|
|
|
|
|
prediction_idx = np.argmax(logits, axis=-1)[0] |
|
|
prediction = self.id2label[prediction_idx] |
|
|
|
|
|
|
|
|
probabilities = np.exp(logits) / np.sum(np.exp(logits), axis=-1, keepdims=True) |
|
|
confidence = probabilities[0][prediction_idx] |
|
|
|
|
|
probs_dict = { |
|
|
"live_human": float(probabilities[0][0]), |
|
|
"voicemail": float(probabilities[0][1]), |
|
|
} |
|
|
|
|
|
return prediction, float(confidence), probs_dict |
|
|
|
|
|
def predict_from_array(self, audio_array: np.ndarray) -> Tuple[str, float, dict]: |
|
|
"""Detect voicemail from audio array. |
|
|
|
|
|
Args: |
|
|
audio_array: Audio array (4 seconds @ 16kHz = 64,000 samples) |
|
|
|
|
|
Returns: |
|
|
Tuple of (prediction, confidence, probabilities_dict) |
|
|
""" |
|
|
|
|
|
if len(audio_array) < self.expected_samples: |
|
|
audio_array = np.pad( |
|
|
audio_array, (0, self.expected_samples - len(audio_array)) |
|
|
) |
|
|
elif len(audio_array) > self.expected_samples: |
|
|
audio_array = audio_array[: self.expected_samples] |
|
|
|
|
|
|
|
|
mel_spec = self.extract_mel_spectrogram(audio_array) |
|
|
|
|
|
|
|
|
outputs = self.session.run(None, {"input": mel_spec}) |
|
|
logits = outputs[0] |
|
|
|
|
|
|
|
|
prediction_idx = np.argmax(logits, axis=-1)[0] |
|
|
prediction = self.id2label[prediction_idx] |
|
|
|
|
|
|
|
|
probabilities = np.exp(logits) / np.sum(np.exp(logits), axis=-1, keepdims=True) |
|
|
confidence = probabilities[0][prediction_idx] |
|
|
|
|
|
probs_dict = { |
|
|
"live_human": float(probabilities[0][0]), |
|
|
"voicemail": float(probabilities[0][1]), |
|
|
} |
|
|
|
|
|
return prediction, float(confidence), probs_dict |
|
|
|
|
|
|
|
|
class StreamingVoicemailDetector: |
|
|
"""Real-time streaming voicemail detector with rolling buffer.""" |
|
|
|
|
|
def __init__(self, model_path: str = "model.onnx", sample_rate: int = 16000): |
|
|
"""Initialize streaming detector. |
|
|
|
|
|
Args: |
|
|
model_path: Path to the ONNX model file |
|
|
sample_rate: Audio sample rate (default: 16000) |
|
|
""" |
|
|
self.detector = CNNVoicemailDetector(model_path) |
|
|
self.sample_rate = sample_rate |
|
|
self.buffer_duration = 4.0 |
|
|
self.buffer_size = int(sample_rate * self.buffer_duration) |
|
|
self.audio_buffer = np.zeros(self.buffer_size, dtype=np.float32) |
|
|
self.is_ready = False |
|
|
self.samples_received = 0 |
|
|
|
|
|
def add_audio(self, audio_chunk: np.ndarray) -> None: |
|
|
"""Add audio chunk to rolling buffer. |
|
|
|
|
|
Args: |
|
|
audio_chunk: New audio samples to add |
|
|
""" |
|
|
chunk_size = len(audio_chunk) |
|
|
|
|
|
|
|
|
self.audio_buffer = np.roll(self.audio_buffer, -chunk_size) |
|
|
self.audio_buffer[-chunk_size:] = audio_chunk |
|
|
|
|
|
|
|
|
self.samples_received += chunk_size |
|
|
|
|
|
|
|
|
if self.samples_received >= self.buffer_size: |
|
|
self.is_ready = True |
|
|
|
|
|
def detect(self) -> Tuple[str, float, dict] | None: |
|
|
"""Detect voicemail from current buffer. |
|
|
|
|
|
Returns: |
|
|
Tuple of (prediction, confidence, probabilities) or None if not ready |
|
|
""" |
|
|
if not self.is_ready: |
|
|
return None |
|
|
|
|
|
return self.detector.predict_from_array(self.audio_buffer) |
|
|
|
|
|
def reset(self) -> None: |
|
|
"""Reset the detector buffer.""" |
|
|
self.audio_buffer = np.zeros(self.buffer_size, dtype=np.float32) |
|
|
self.is_ready = False |
|
|
self.samples_received = 0 |
|
|
|
|
|
|
|
|
def example_single_file(): |
|
|
"""Example: Detect voicemail from a single audio file.""" |
|
|
print("=" * 60) |
|
|
print("Example 1: Single File Detection") |
|
|
print("=" * 60) |
|
|
|
|
|
detector = CNNVoicemailDetector("model.onnx") |
|
|
|
|
|
|
|
|
audio_path = "test_audio.wav" |
|
|
|
|
|
try: |
|
|
prediction, confidence, probs = detector.predict(audio_path) |
|
|
|
|
|
print(f"\nAudio: {audio_path}") |
|
|
print(f"Prediction: {prediction}") |
|
|
print(f"Confidence: {confidence:.2%}") |
|
|
print("\nProbabilities:") |
|
|
print(f" Live Human: {probs['live_human']:.2%}") |
|
|
print(f" Voicemail: {probs['voicemail']:.2%}") |
|
|
except FileNotFoundError: |
|
|
print(f"\n⚠️ File not found: {audio_path}") |
|
|
print("Please provide a valid audio file path.") |
|
|
|
|
|
|
|
|
def example_batch_processing(): |
|
|
"""Example: Process multiple audio files.""" |
|
|
print("\n" + "=" * 60) |
|
|
print("Example 2: Batch Processing") |
|
|
print("=" * 60) |
|
|
|
|
|
detector = CNNVoicemailDetector("model.onnx") |
|
|
|
|
|
|
|
|
audio_dir = Path("test_audios") |
|
|
|
|
|
if not audio_dir.exists(): |
|
|
print(f"\n⚠️ Directory not found: {audio_dir}") |
|
|
print("Please create a directory with audio files.") |
|
|
return |
|
|
|
|
|
audio_files = list(audio_dir.glob("*.wav")) + list(audio_dir.glob("*.mp3")) |
|
|
|
|
|
if not audio_files: |
|
|
print(f"\n⚠️ No audio files found in {audio_dir}") |
|
|
return |
|
|
|
|
|
print(f"\nProcessing {len(audio_files)} files...\n") |
|
|
|
|
|
import time |
|
|
|
|
|
results = [] |
|
|
total_time = 0 |
|
|
|
|
|
for audio_path in audio_files: |
|
|
try: |
|
|
start_time = time.perf_counter() |
|
|
prediction, confidence, probs = detector.predict(str(audio_path)) |
|
|
inference_time = (time.perf_counter() - start_time) * 1000 |
|
|
|
|
|
total_time += inference_time |
|
|
results.append( |
|
|
{ |
|
|
"file": audio_path.name, |
|
|
"prediction": prediction, |
|
|
"confidence": confidence, |
|
|
"time_ms": inference_time, |
|
|
"probs": probs, |
|
|
} |
|
|
) |
|
|
except Exception as e: |
|
|
print(f"❌ Error processing {audio_path.name}: {e}") |
|
|
|
|
|
|
|
|
print(f"{'File':<30} {'Prediction':<15} {'Confidence':<12} {'Time (ms)':<10}") |
|
|
print("-" * 70) |
|
|
for result in results: |
|
|
print( |
|
|
f"{result['file']:<30} " |
|
|
f"{result['prediction']:<15} " |
|
|
f"{result['confidence']:<12.2%} " |
|
|
f"{result['time_ms']:<10.2f}" |
|
|
) |
|
|
|
|
|
|
|
|
voicemail_count = sum(1 for r in results if r["prediction"] == "voicemail") |
|
|
live_human_count = sum(1 for r in results if r["prediction"] == "live_human") |
|
|
avg_time = total_time / len(results) if results else 0 |
|
|
|
|
|
print("\nSummary:") |
|
|
print(f" Total files: {len(results)}") |
|
|
print(f" Voicemail: {voicemail_count}") |
|
|
print(f" Live Human: {live_human_count}") |
|
|
print(f" Average inference time: {avg_time:.2f}ms") |
|
|
print(f" Total processing time: {total_time:.2f}ms") |
|
|
|
|
|
|
|
|
def example_streaming(): |
|
|
"""Example: Real-time streaming audio detection.""" |
|
|
print("\n" + "=" * 60) |
|
|
print("Example 3: Real-time Streaming Detection") |
|
|
print("=" * 60) |
|
|
|
|
|
detector = StreamingVoicemailDetector("model.onnx") |
|
|
|
|
|
sample_rate = 16000 |
|
|
chunk_duration = 0.5 |
|
|
chunk_size = int(sample_rate * chunk_duration) |
|
|
|
|
|
print(f"\nBuffer duration: {detector.buffer_duration}s") |
|
|
print(f"Chunk duration: {chunk_duration}s") |
|
|
print(f"Chunk size: {chunk_size} samples") |
|
|
print("\nSimulating audio stream...\n") |
|
|
|
|
|
|
|
|
for i in range(10): |
|
|
|
|
|
audio_chunk = np.random.randn(chunk_size).astype(np.float32) |
|
|
|
|
|
|
|
|
detector.add_audio(audio_chunk) |
|
|
|
|
|
|
|
|
result = detector.detect() |
|
|
|
|
|
if result: |
|
|
prediction, confidence, _ = result |
|
|
status = "✅" if prediction == "voicemail" else "👤" |
|
|
print( |
|
|
f"Chunk {i + 1:2d}: {status} {prediction:<12} " |
|
|
f"(confidence: {confidence:.2%})" |
|
|
) |
|
|
else: |
|
|
samples_needed = detector.buffer_size - detector.samples_received |
|
|
print( |
|
|
f"Chunk {i + 1:2d}: ⏳ Buffering... ({samples_needed} samples needed)" |
|
|
) |
|
|
|
|
|
|
|
|
def example_performance_benchmark(): |
|
|
"""Example: Benchmark inference performance.""" |
|
|
print("\n" + "=" * 60) |
|
|
print("Example 4: Performance Benchmark") |
|
|
print("=" * 60) |
|
|
|
|
|
detector = CNNVoicemailDetector("model.onnx") |
|
|
|
|
|
|
|
|
sample_rate = 16000 |
|
|
duration = 4.0 |
|
|
audio_array = np.random.randn(int(sample_rate * duration)).astype(np.float32) |
|
|
|
|
|
|
|
|
detector.predict_from_array(audio_array) |
|
|
|
|
|
|
|
|
import time |
|
|
|
|
|
num_iterations = 100 |
|
|
times = [] |
|
|
|
|
|
print(f"\nRunning {num_iterations} iterations...\n") |
|
|
|
|
|
for i in range(num_iterations): |
|
|
start_time = time.perf_counter() |
|
|
prediction, confidence, _ = detector.predict_from_array(audio_array) |
|
|
elapsed = (time.perf_counter() - start_time) * 1000 |
|
|
times.append(elapsed) |
|
|
|
|
|
|
|
|
times = np.array(times) |
|
|
print("Performance Statistics:") |
|
|
print(f" Iterations: {num_iterations}") |
|
|
print(f" Mean: {times.mean():.2f}ms") |
|
|
print(f" Median: {np.median(times):.2f}ms") |
|
|
print(f" Min: {times.min():.2f}ms") |
|
|
print(f" Max: {times.max():.2f}ms") |
|
|
print(f" Std Dev: {times.std():.2f}ms") |
|
|
print(f"\n Throughput: {1000 / times.mean():.1f} inferences/second") |
|
|
print(f" Real-time factor: {(duration * 1000) / times.mean():.1f}x") |
|
|
|
|
|
|
|
|
def example_comparison_with_threshold(): |
|
|
"""Example: Using confidence thresholds for decision making.""" |
|
|
print("\n" + "=" * 60) |
|
|
print("Example 5: Confidence Threshold Strategy") |
|
|
print("=" * 60) |
|
|
|
|
|
detector = CNNVoicemailDetector("model.onnx") |
|
|
|
|
|
|
|
|
test_cases = [ |
|
|
("high_confidence_voicemail", 0.95), |
|
|
("medium_confidence_voicemail", 0.75), |
|
|
("low_confidence_voicemail", 0.55), |
|
|
("uncertain", 0.50), |
|
|
("low_confidence_human", 0.45), |
|
|
] |
|
|
|
|
|
thresholds = { |
|
|
"high": 0.90, |
|
|
"medium": 0.70, |
|
|
"low": 0.60, |
|
|
} |
|
|
|
|
|
print("\nConfidence Thresholds:") |
|
|
print(f" High confidence: ≥ {thresholds['high']:.0%}") |
|
|
print(f" Medium confidence: ≥ {thresholds['medium']:.0%}") |
|
|
print(f" Low confidence: ≥ {thresholds['low']:.0%}") |
|
|
print(f" Uncertain: < {thresholds['low']:.0%}") |
|
|
print() |
|
|
|
|
|
print(f"{'Case':<30} {'Confidence':<12} {'Action':<30}") |
|
|
print("-" * 75) |
|
|
|
|
|
for case_name, simulated_confidence in test_cases: |
|
|
|
|
|
if simulated_confidence >= thresholds["high"]: |
|
|
action = "Immediate hangup (very sure)" |
|
|
elif simulated_confidence >= thresholds["medium"]: |
|
|
action = "Hangup (confident)" |
|
|
elif simulated_confidence >= thresholds["low"]: |
|
|
action = "Hangup with logging" |
|
|
else: |
|
|
action = "Wait for more audio / human verify" |
|
|
|
|
|
print(f"{case_name:<30} {simulated_confidence:<12.2%} {action:<30}") |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
print("\n⚡ CNN Voicemail Detector - Usage Examples\n") |
|
|
|
|
|
|
|
|
example_single_file() |
|
|
example_batch_processing() |
|
|
example_streaming() |
|
|
example_performance_benchmark() |
|
|
example_comparison_with_threshold() |
|
|
|
|
|
print("\n" + "=" * 60) |
|
|
print("✅ All examples completed!") |
|
|
print("=" * 60) |
|
|
|