|
|
|
|
|
import os |
|
|
import re |
|
|
import tempfile |
|
|
from typing import Iterable, List, Optional, Tuple |
|
|
|
|
|
import numpy as np |
|
|
import soundfile as sf |
|
|
from scipy.signal import resample_poly |
|
|
|
|
|
|
|
|
try: |
|
|
from silero_vad import load_silero_vad, VADIterator |
|
|
except Exception: |
|
|
load_silero_vad = None |
|
|
VADIterator = None |
|
|
|
|
|
try: |
|
|
from moonshine_onnx import MoonshineOnnxModel, load_tokenizer |
|
|
except Exception: |
|
|
MoonshineOnnxModel = None |
|
|
load_tokenizer = None |
|
|
|
|
|
from .utils import load_sensevoice_model, s2tw_converter |
|
|
|
|
|
SAMPLING_RATE = 16000 |
|
|
CHUNK_SIZE = 512 |
|
|
|
|
|
|
|
|
tokenizer = None |
|
|
|
|
|
def clean_transcript(text): |
|
|
text = re.sub(r'[�\uFFFD��]', '', text) |
|
|
text = re.sub(r'([\u4e00-\u9fa5])\1{2,}', r'\1', text) |
|
|
text = re.sub(r'([\u4e00-\u9fa5]) ([ \u4e00-\u9fa5])', r'\1\2', text) |
|
|
return text |
|
|
|
|
|
def transcribe_file( |
|
|
audio_path: str, |
|
|
vad_threshold: float, |
|
|
model_name: str, |
|
|
backend: str = "moonshine", |
|
|
language: str = "auto", |
|
|
textnorm: str = "withitn", |
|
|
) -> Iterable[Tuple[Optional[Tuple[float, float, str]], List[Tuple[float, float, str]], float]]: |
|
|
""" |
|
|
Transcribe audio file using specified backend. |
|
|
|
|
|
Args: |
|
|
audio_path: Path to audio file |
|
|
vad_threshold: VAD threshold (0-1) |
|
|
model_name: Model name (backend-specific) |
|
|
backend: Either "moonshine" or "sensevoice" |
|
|
language: Language for sensevoice (auto or specific language code) |
|
|
textnorm: Text normalization for sensevoice ("withitn" or "noitn") |
|
|
|
|
|
Yields: |
|
|
Tuple of (current_utterance, all_utterances) |
|
|
""" |
|
|
if load_silero_vad is None or VADIterator is None: |
|
|
raise RuntimeError("silero_vad is not available. Please install 'silero-vad' package.") |
|
|
|
|
|
vad_model = load_silero_vad(onnx=True) |
|
|
vad_iterator = VADIterator(model=vad_model, sampling_rate=SAMPLING_RATE, threshold=vad_threshold) |
|
|
|
|
|
|
|
|
if backend == "moonshine": |
|
|
if MoonshineOnnxModel is None or load_tokenizer is None: |
|
|
raise RuntimeError("moonshine_onnx is not available. Install the dependency or choose 'sensevoice' backend.") |
|
|
model = MoonshineOnnxModel(model_name=f"moonshine/{model_name}") |
|
|
global tokenizer |
|
|
if tokenizer is None: |
|
|
tokenizer = load_tokenizer() |
|
|
elif backend == "sensevoice": |
|
|
model = load_sensevoice_model(model_name) |
|
|
else: |
|
|
raise ValueError(f"Unknown backend: {backend}") |
|
|
|
|
|
wav, orig_sr = sf.read(audio_path, dtype='float32') |
|
|
if orig_sr != SAMPLING_RATE: |
|
|
gcd = np.gcd(int(orig_sr), SAMPLING_RATE) |
|
|
up = SAMPLING_RATE // gcd |
|
|
down = orig_sr // gcd |
|
|
wav = resample_poly(wav, up, down) |
|
|
if wav.ndim > 1: |
|
|
wav = wav.mean(axis=1) |
|
|
|
|
|
utterances = [] |
|
|
speech_chunks = [] |
|
|
segment_start = 0.0 |
|
|
|
|
|
i = 0 |
|
|
while i < len(wav): |
|
|
chunk = wav[i:i + CHUNK_SIZE] |
|
|
if len(chunk) < CHUNK_SIZE: |
|
|
chunk = np.pad(chunk, (0, CHUNK_SIZE - len(chunk)), mode='constant') |
|
|
i += CHUNK_SIZE |
|
|
|
|
|
speech_dict = vad_iterator(chunk) |
|
|
speech_chunks.append(chunk) |
|
|
|
|
|
if speech_dict: |
|
|
if "end" in speech_dict: |
|
|
|
|
|
segment_end = i / SAMPLING_RATE |
|
|
|
|
|
|
|
|
speech_buffer = np.concatenate(speech_chunks) |
|
|
|
|
|
if backend == "moonshine": |
|
|
text = model.generate(speech_buffer[np.newaxis, :].astype(np.float32)) |
|
|
text = tokenizer.decode_batch(text)[0].strip() |
|
|
if text: |
|
|
cleaned_text = clean_transcript(s2tw_converter.convert(text)) |
|
|
elif backend == "sensevoice": |
|
|
|
|
|
stream = model.create_stream() |
|
|
stream.accept_waveform(SAMPLING_RATE, speech_buffer) |
|
|
model.decode_stream(stream) |
|
|
result = stream.result |
|
|
text = result.text |
|
|
|
|
|
cleaned_text = clean_transcript(s2tw_converter.convert(text)) |
|
|
|
|
|
if text: |
|
|
utterances.append((segment_start, segment_end, cleaned_text)) |
|
|
progress = min(100, (i / len(wav)) * 100) |
|
|
yield utterances[-1], utterances.copy(), progress |
|
|
|
|
|
|
|
|
speech_chunks = [] |
|
|
segment_start = i / SAMPLING_RATE |
|
|
vad_iterator.reset_states() |
|
|
|
|
|
|
|
|
if speech_chunks: |
|
|
speech_buffer = np.concatenate(speech_chunks) |
|
|
if len(speech_buffer) > SAMPLING_RATE * 0.5: |
|
|
segment_end = len(wav) / SAMPLING_RATE |
|
|
|
|
|
if backend == "moonshine": |
|
|
text = model.generate(speech_buffer[np.newaxis, :].astype(np.float32)) |
|
|
text = tokenizer.decode_batch(text)[0].strip() |
|
|
if text: |
|
|
cleaned_text = clean_transcript(s2tw_converter.convert(text)) |
|
|
elif backend == "sensevoice": |
|
|
|
|
|
stream = model.create_stream() |
|
|
stream.accept_waveform(SAMPLING_RATE, speech_buffer) |
|
|
model.decode_stream(stream) |
|
|
result = stream.result |
|
|
text = result.text |
|
|
|
|
|
cleaned_text = clean_transcript(s2tw_converter.convert(text)) |
|
|
|
|
|
if text: |
|
|
utterances.append((segment_start, segment_end, cleaned_text)) |
|
|
yield utterances[-1], utterances.copy(), 100.0 |
|
|
|
|
|
|
|
|
if utterances: |
|
|
yield None, utterances, 100.0 |
|
|
else: |
|
|
yield None, [(-1, -1, "No speech detected")], 100.0 |