|
|
import gradio as gr |
|
|
import torch |
|
|
from transformers import pipeline |
|
|
from pyannote.audio import Pipeline |
|
|
from pydub import AudioSegment, effects, silence |
|
|
import os |
|
|
import datetime |
|
|
from langdetect import detect |
|
|
from langdetect.lang_detect_exception import LangDetectException |
|
|
|
|
|
|
|
|
HF_TOKEN = os.environ.get("HF_TOKEN") |
|
|
MODEL_NAME = "openai/whisper-medium" |
|
|
LANG_MODEL_NAME = "openai/whisper-tiny" |
|
|
|
|
|
device = 0 if torch.cuda.is_available() else "cpu" |
|
|
torch_dtype = torch.float16 if torch.cuda.is_available() else torch.float32 |
|
|
|
|
|
|
|
|
|
|
|
pipe = pipeline( |
|
|
"automatic-speech-recognition", |
|
|
model=MODEL_NAME, |
|
|
torch_dtype=torch_dtype, |
|
|
device=device, |
|
|
) |
|
|
|
|
|
|
|
|
lang_pipe = pipeline( |
|
|
"automatic-speech-recognition", |
|
|
model=LANG_MODEL_NAME, |
|
|
torch_dtype=torch_dtype, |
|
|
device=device, |
|
|
) |
|
|
|
|
|
if HF_TOKEN: |
|
|
pyannote_pipeline = Pipeline.from_pretrained( |
|
|
"pyannote/speaker-diarization-3.1", |
|
|
use_auth_token=HF_TOKEN |
|
|
) |
|
|
pyannote_pipeline.to(torch.device("cuda" if torch.cuda.is_available() else "cpu")) |
|
|
else: |
|
|
pyannote_pipeline = None |
|
|
print("Avertissement : Le token Hugging Face n'est pas défini. La diarisation sera désactivée.") |
|
|
|
|
|
|
|
|
|
|
|
def convert_to_wav(audio_path): |
|
|
"""Convertit n'importe quel fichier audio en format WAV mono 16 kHz.""" |
|
|
try: |
|
|
audio = AudioSegment.from_file(audio_path) |
|
|
audio = audio.set_channels(1) |
|
|
audio = audio.set_frame_rate(16000) |
|
|
wav_path = os.path.splitext(audio_path)[0] + ".wav" |
|
|
audio.export(wav_path, format="wav") |
|
|
return wav_path |
|
|
except Exception as e: |
|
|
print(f"Erreur lors de la conversion en WAV : {e}") |
|
|
return None |
|
|
|
|
|
def make_speech_head_wav(input_wav_path, max_seconds=6, min_silence_len_ms=300, silence_thresh_db=None): |
|
|
""" |
|
|
Crée un extrait de tête (jusqu'à max_seconds) contenant de la parole. |
|
|
- Supprime le silence en tête (leading silence). |
|
|
- Cherche la première zone avec parole en scannant par fenêtres si besoin. |
|
|
""" |
|
|
try: |
|
|
audio = AudioSegment.from_wav(input_wav_path) |
|
|
|
|
|
|
|
|
normalized = effects.normalize(audio) |
|
|
|
|
|
|
|
|
if silence_thresh_db is None: |
|
|
silence_thresh_db = normalized.dBFS - 16 |
|
|
|
|
|
|
|
|
start_trim = silence.detect_leading_silence( |
|
|
normalized, |
|
|
silence_thresh=silence_thresh_db, |
|
|
chunk_size=10 |
|
|
) |
|
|
trimmed = normalized[start_trim:] |
|
|
|
|
|
|
|
|
if len(trimmed) < 500: |
|
|
clip = normalized[: max_seconds * 1000] |
|
|
else: |
|
|
|
|
|
|
|
|
window_ms = 6000 |
|
|
step_ms = 3000 |
|
|
pos = 0 |
|
|
selected = None |
|
|
|
|
|
while pos < len(trimmed) and pos < 60000: |
|
|
candidate = trimmed[pos: pos + window_ms] |
|
|
|
|
|
nonsil = silence.detect_nonsilent( |
|
|
candidate, |
|
|
min_silence_len=min_silence_len_ms, |
|
|
silence_thresh=silence_thresh_db |
|
|
) |
|
|
if nonsil: |
|
|
|
|
|
selected = candidate |
|
|
break |
|
|
pos += step_ms |
|
|
|
|
|
clip = selected if selected is not None else trimmed[: window_ms] |
|
|
|
|
|
|
|
|
clip = clip[: max_seconds * 1000] |
|
|
short_path = os.path.splitext(input_wav_path)[0] + f"_head_speech_{max_seconds}s.wav" |
|
|
clip.export(short_path, format="wav") |
|
|
return short_path |
|
|
except Exception as e: |
|
|
print(f"Erreur lors de la création de l'extrait parole : {e}") |
|
|
return None |
|
|
|
|
|
def detect_language_on_upload(filepath): |
|
|
"""Détecte la langue d'un fichier audio rapidement (6 s avec silence initial enlevé) via Whisper-tiny + LangDetect.""" |
|
|
if filepath is None: |
|
|
return "auto" |
|
|
|
|
|
try: |
|
|
wav_filepath = convert_to_wav(filepath) |
|
|
if not wav_filepath: |
|
|
return "auto" |
|
|
|
|
|
|
|
|
short_wav = make_speech_head_wav(wav_filepath, max_seconds=6) |
|
|
if not short_wav: |
|
|
short_wav = wav_filepath |
|
|
|
|
|
|
|
|
outputs = lang_pipe( |
|
|
short_wav, |
|
|
chunk_length_s=6, |
|
|
return_timestamps=False |
|
|
) |
|
|
|
|
|
transcribed_text = outputs.get("text", "").strip() |
|
|
|
|
|
|
|
|
whisper_lang = outputs.get("language") |
|
|
if whisper_lang and isinstance(whisper_lang, str) and len(whisper_lang) <= 5: |
|
|
return whisper_lang |
|
|
|
|
|
if len(transcribed_text) < 10: |
|
|
return "auto" |
|
|
|
|
|
|
|
|
detected_lang = detect(transcribed_text) |
|
|
|
|
|
|
|
|
lang_mapping = { |
|
|
'fr': 'fr', |
|
|
'en': 'en', |
|
|
'es': 'es', |
|
|
'de': 'de', |
|
|
'it': 'it', |
|
|
'pt': 'pt', |
|
|
'nl': 'nl', |
|
|
'pl': 'pl', |
|
|
'ru': 'ru', |
|
|
'ja': 'ja', |
|
|
'ko': 'ko', |
|
|
'zh-cn': 'zh', |
|
|
'zh': 'zh' |
|
|
} |
|
|
|
|
|
return lang_mapping.get(detected_lang, "auto") |
|
|
|
|
|
except (LangDetectException, Exception) as e: |
|
|
print(f"Erreur lors de la détection de langue : {e}") |
|
|
return "auto" |
|
|
|
|
|
def save_txt(content, filename): |
|
|
if not content or content.strip() == "": |
|
|
return None |
|
|
with open(filename, "w", encoding="utf-8") as f: |
|
|
f.write(content) |
|
|
return filename |
|
|
|
|
|
def transcribe_audio(filepath, diarize, language_choice): |
|
|
if filepath is None: |
|
|
return "Aucun fichier audio fourni.", "Veuillez télécharger un fichier audio.", "", None, None |
|
|
|
|
|
wav_filepath = convert_to_wav(filepath) |
|
|
if not wav_filepath: |
|
|
return "Erreur : Le fichier audio n'a pas pu être converti.", "Conversion échouée.", "", None, None |
|
|
|
|
|
whisper_params = { |
|
|
"chunk_length_s": 30, |
|
|
"batch_size": 24, |
|
|
"return_timestamps": True |
|
|
} |
|
|
if language_choice != "auto": |
|
|
whisper_params["generate_kwargs"] = {"language": language_choice} |
|
|
|
|
|
outputs = pipe(wav_filepath, **whisper_params) |
|
|
transcription = outputs["text"].strip() |
|
|
|
|
|
detected_language = outputs.get("language", "Non disponible") |
|
|
language_info = f"Langue détectée: {detected_language}" |
|
|
if language_choice != "auto": |
|
|
language_info += f" (Langue forcée: {language_choice})" |
|
|
|
|
|
diarized_transcription = "" |
|
|
if diarize and pyannote_pipeline: |
|
|
try: |
|
|
diarization = pyannote_pipeline(wav_filepath) |
|
|
for turn, _, speaker in diarization.itertracks(yield_label=True): |
|
|
segment_start = turn.start |
|
|
segment_end = turn.end |
|
|
segment_text = "" |
|
|
for chunk in outputs["chunks"]: |
|
|
chunk_start = chunk['timestamp'][0] |
|
|
chunk_end = chunk['timestamp'][1] |
|
|
if chunk_start is not None and chunk_end is not None: |
|
|
if max(segment_start, chunk_start) < min(segment_end, chunk_end): |
|
|
segment_text += chunk['text'] |
|
|
start_time = str(datetime.timedelta(seconds=int(segment_start))) |
|
|
diarized_transcription += f"[{start_time}] {speaker}:{segment_text.strip()}\n" |
|
|
except Exception as e: |
|
|
diarized_transcription = f"Erreur pendant la diarisation : {e}" |
|
|
elif diarize: |
|
|
diarized_transcription = "Diarisation activée mais le modèle n'a pas pu être chargé (token manquant ?)." |
|
|
else: |
|
|
diarized_transcription = "Diarisation non activée." |
|
|
|
|
|
transcription_file = save_txt(transcription, "transcription.txt") |
|
|
diarization_file = save_txt(diarized_transcription, "transcription_diarized.txt") |
|
|
return transcription, diarized_transcription, language_info, transcription_file, diarization_file |
|
|
|
|
|
|
|
|
|
|
|
with gr.Blocks() as demo: |
|
|
gr.HTML("<div style='text-align:center;'><h1>Transcription et Diarisation de fichiers Audio</h1></div>") |
|
|
gr.Markdown("Transcrivez et diarisez automatiquement vos fichiers audio (WhatsApp (opus), mp3, wav, m4a, etc.) grâce à Whisper et pyannote, directement dans ce Space.") |
|
|
|
|
|
gr.Markdown(""" |
|
|
## 🚀 Comment utiliser l'application |
|
|
|
|
|
1. **Téléchargez** un fichier audio (opus, wav, mp3, m4a, etc.): la langue principale sera alors automatiquement détectée ou restera en "auto" dans le cas contraire |
|
|
2. **Choisissez** la langue ou laissez en "auto": via le menu déroulant, vous pouvez changer la langue détectée automatiquement |
|
|
3. **Activez** ou non la case "Diarisation" |
|
|
4. **Cliquez** sur "Transcrire" |
|
|
5. **Obtenez** la transcription et, si activé, la version diarizée (par locuteur) |
|
|
6. **Reset** les fichiers avant de recommencer une transcription avec un autre fichier audio |
|
|
""") |
|
|
|
|
|
with gr.Row(): |
|
|
with gr.Column(): |
|
|
audio_input = gr.Audio(type="filepath", label="Télécharger un fichier audio") |
|
|
language_dropdown = gr.Dropdown( |
|
|
choices=["auto", "fr", "en", "es", "de", "it", "pt", "nl", "pl", "ru", "ja", "ko", "zh"], |
|
|
value="auto", |
|
|
label="Langue (auto = détection automatique)", |
|
|
info="Choisissez la langue ou laissez en 'auto' pour la détection automatique" |
|
|
) |
|
|
diarize_checkbox = gr.Checkbox(label="Activer la Diarisation", value=True) |
|
|
submit_btn = gr.Button("Transcrire", variant="primary") |
|
|
reset_btn = gr.Button("Reset", variant="secondary") |
|
|
with gr.Column(): |
|
|
language_info_output = gr.Textbox(label="Information sur la langue", lines=1) |
|
|
transcription_file = gr.File(label="Télécharger la transcription (.txt)") |
|
|
transcription_output = gr.Textbox(label="Transcription Complète", lines=10) |
|
|
diarization_file = gr.File(label="Télécharger la transcription diarizée (.txt)") |
|
|
diarization_output = gr.Textbox(label="Transcription avec Diarisation (par locuteur)", lines=15) |
|
|
|
|
|
audio_input.change( |
|
|
fn=detect_language_on_upload, |
|
|
inputs=audio_input, |
|
|
outputs=language_dropdown |
|
|
) |
|
|
|
|
|
submit_btn.click( |
|
|
fn=transcribe_audio, |
|
|
inputs=[audio_input, diarize_checkbox, language_dropdown], |
|
|
outputs=[transcription_output, diarization_output, language_info_output, transcription_file, diarization_file] |
|
|
) |
|
|
|
|
|
def reset_fields(): |
|
|
return "", "", "", None, None, None, "auto", True |
|
|
|
|
|
reset_btn.click( |
|
|
fn=reset_fields, |
|
|
inputs=[], |
|
|
outputs=[transcription_output, diarization_output, language_info_output, transcription_file, diarization_file, audio_input, language_dropdown, diarize_checkbox] |
|
|
) |
|
|
|
|
|
demo.launch(share=True) |