File size: 11,864 Bytes
91585f1
 
 
 
4b30def
91585f1
 
7bf68b4
 
91585f1
 
 
4b30def
 
91585f1
 
 
 
 
4b30def
91585f1
 
 
 
 
 
 
4b30def
 
 
 
 
 
 
 
91585f1
 
5c36702
 
91585f1
 
 
 
 
 
 
 
 
4b30def
91585f1
5c36702
 
4b30def
5c36702
 
 
91585f1
5c36702
 
91585f1
4b30def
 
 
 
 
 
3513671
 
4b30def
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3513671
 
 
4b30def
3513671
 
972498b
4b30def
972498b
 
4b30def
cafabb9
 
 
 
3513671
4b30def
 
3513671
4b30def
3513671
4b30def
 
3513671
4b30def
cafabb9
 
3513671
 
 
 
 
 
 
 
cafabb9
 
3513671
cafabb9
 
3513671
cafabb9
 
 
4b30def
cafabb9
 
 
 
 
 
 
 
 
 
 
 
4b30def
cafabb9
4b30def
7bf68b4
cafabb9
972498b
 
 
 
 
 
 
 
 
5c36702
91585f1
972498b
91585f1
 
 
972498b
5c36702
 
 
 
 
 
 
 
91585f1
5c36702
91585f1
972498b
 
5c36702
 
 
91585f1
972498b
91585f1
5c36702
 
 
 
 
 
 
 
 
 
 
 
 
 
 
972498b
91585f1
972498b
91585f1
972498b
 
 
 
 
91585f1
 
 
 
c1eba65
 
4cd66e1
0c50b1b
5c36702
4cd66e1
c1eba65
c11aebe
 
0c50b1b
 
c11aebe
0c50b1b
91585f1
 
 
 
eee65e1
 
 
 
 
 
91585f1
 
0c50b1b
91585f1
eee65e1
972498b
91585f1
972498b
91585f1
972498b
 
 
 
 
 
91585f1
 
0c50b1b
 
972498b
0c50b1b
 
 
65d3de6
0c50b1b
 
 
 
65d3de6
91585f1
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
import gradio as gr
import torch
from transformers import pipeline
from pyannote.audio import Pipeline
from pydub import AudioSegment, effects, silence
import os
import datetime
from langdetect import detect
from langdetect.lang_detect_exception import LangDetectException

# --- Configuration ---
HF_TOKEN = os.environ.get("HF_TOKEN")
MODEL_NAME = "openai/whisper-medium"   # modèle principal pour la transcription finale
LANG_MODEL_NAME = "openai/whisper-tiny"  # modèle léger dédié à la détection rapide

device = 0 if torch.cuda.is_available() else "cpu"
torch_dtype = torch.float16 if torch.cuda.is_available() else torch.float32

# --- Initialisation des modèles ---
# Pipeline principal (qualité) pour la transcription complète
pipe = pipeline(
    "automatic-speech-recognition",
    model=MODEL_NAME,
    torch_dtype=torch_dtype,
    device=device,
)

# Pipeline léger (rapide) pour la détection de langue uniquement
lang_pipe = pipeline(
    "automatic-speech-recognition",
    model=LANG_MODEL_NAME,
    torch_dtype=torch_dtype,
    device=device,
)

if HF_TOKEN:
    pyannote_pipeline = Pipeline.from_pretrained(
        "pyannote/speaker-diarization-3.1",
        use_auth_token=HF_TOKEN
    )
    pyannote_pipeline.to(torch.device("cuda" if torch.cuda.is_available() else "cpu"))
else:
    pyannote_pipeline = None
    print("Avertissement : Le token Hugging Face n'est pas défini. La diarisation sera désactivée.")

# --- Fonctions de traitement audio ---

def convert_to_wav(audio_path):
    """Convertit n'importe quel fichier audio en format WAV mono 16 kHz."""
    try:
        audio = AudioSegment.from_file(audio_path)
        audio = audio.set_channels(1)
        audio = audio.set_frame_rate(16000)  # standard Whisper
        wav_path = os.path.splitext(audio_path)[0] + ".wav"
        audio.export(wav_path, format="wav")
        return wav_path
    except Exception as e:
        print(f"Erreur lors de la conversion en WAV : {e}")
        return None

def make_speech_head_wav(input_wav_path, max_seconds=6, min_silence_len_ms=300, silence_thresh_db=None):
    """
    Crée un extrait de tête (jusqu'à max_seconds) contenant de la parole.
    - Supprime le silence en tête (leading silence).
    - Cherche la première zone avec parole en scannant par fenêtres si besoin.
    """
    try:
        audio = AudioSegment.from_wav(input_wav_path)

        # Normalisation légère pour faciliter la détection du silence
        normalized = effects.normalize(audio)

        # Détermine un seuil de silence si non fourni (ex: -40 dBFS relatif)
        if silence_thresh_db is None:
            silence_thresh_db = normalized.dBFS - 16  # plus permissif que -30 sur audios faibles

        # 1) Enlever le silence au début
        start_trim = silence.detect_leading_silence(
            normalized,
            silence_thresh=silence_thresh_db,
            chunk_size=10
        )
        trimmed = normalized[start_trim:]

        # Si après trim il n'y a presque rien, fallback: on gardera les premières secondes
        if len(trimmed) < 500:  # moins de 0.5 s
            clip = normalized[: max_seconds * 1000]
        else:
            # 2) Assurer qu'on capture de la parole: si les toutes premières secondes
            #    restent silencieuses (cas rares), on scanne par fenêtres.
            window_ms = 6000  # 6 s
            step_ms = 3000    # avance de 3 s si besoin
            pos = 0
            selected = None

            while pos < len(trimmed) and pos < 60000:  # on limite la recherche à 60 s
                candidate = trimmed[pos: pos + window_ms]
                # Détecte les segments non silencieux dans la fenêtre
                nonsil = silence.detect_nonsilent(
                    candidate,
                    min_silence_len=min_silence_len_ms,
                    silence_thresh=silence_thresh_db
                )
                if nonsil:
                    # On a trouvé de la parole dans cette fenêtre
                    selected = candidate
                    break
                pos += step_ms

            clip = selected if selected is not None else trimmed[: window_ms]

        # Tronque à max_seconds
        clip = clip[: max_seconds * 1000]
        short_path = os.path.splitext(input_wav_path)[0] + f"_head_speech_{max_seconds}s.wav"
        clip.export(short_path, format="wav")
        return short_path
    except Exception as e:
        print(f"Erreur lors de la création de l'extrait parole : {e}")
        return None

def detect_language_on_upload(filepath):
    """Détecte la langue d'un fichier audio rapidement (6 s avec silence initial enlevé) via Whisper-tiny + LangDetect."""
    if filepath is None:
        return "auto"
    
    try:
        wav_filepath = convert_to_wav(filepath)
        if not wav_filepath:
            return "auto"

        # Extrait court de 6 s contenant de la parole (si possible)
        short_wav = make_speech_head_wav(wav_filepath, max_seconds=6)
        if not short_wav:
            short_wav = wav_filepath  # fallback

        # Transcription rapide sur l'extrait court avec le modèle tiny
        outputs = lang_pipe(
            short_wav,
            chunk_length_s=6,        # adapté à l'extrait
            return_timestamps=False
        )

        transcribed_text = outputs.get("text", "").strip()

        # Si Whisper renvoie déjà une langue
        whisper_lang = outputs.get("language")
        if whisper_lang and isinstance(whisper_lang, str) and len(whisper_lang) <= 5:
            return whisper_lang

        if len(transcribed_text) < 10:
            return "auto"

        # Utilise LangDetect sur le texte transcrit
        detected_lang = detect(transcribed_text)

        # Mapping des codes de langue LangDetect vers les codes Whisper
        lang_mapping = {
            'fr': 'fr',
            'en': 'en', 
            'es': 'es',
            'de': 'de',
            'it': 'it',
            'pt': 'pt',
            'nl': 'nl',
            'pl': 'pl',
            'ru': 'ru',
            'ja': 'ja',
            'ko': 'ko',
            'zh-cn': 'zh',
            'zh': 'zh'
        }
        
        return lang_mapping.get(detected_lang, "auto")
        
    except (LangDetectException, Exception) as e:
        print(f"Erreur lors de la détection de langue : {e}")
        return "auto"

def save_txt(content, filename):
    if not content or content.strip() == "":
        return None
    with open(filename, "w", encoding="utf-8") as f:
        f.write(content)
    return filename

def transcribe_audio(filepath, diarize, language_choice):
    if filepath is None:
        return "Aucun fichier audio fourni.", "Veuillez télécharger un fichier audio.", "", None, None

    wav_filepath = convert_to_wav(filepath)
    if not wav_filepath:
        return "Erreur : Le fichier audio n'a pas pu être converti.", "Conversion échouée.", "", None, None

    whisper_params = {
        "chunk_length_s": 30,
        "batch_size": 24,
        "return_timestamps": True
    }
    if language_choice != "auto":
        whisper_params["generate_kwargs"] = {"language": language_choice}

    outputs = pipe(wav_filepath, **whisper_params)
    transcription = outputs["text"].strip()

    detected_language = outputs.get("language", "Non disponible")
    language_info = f"Langue détectée: {detected_language}"
    if language_choice != "auto":
        language_info += f" (Langue forcée: {language_choice})"

    diarized_transcription = ""
    if diarize and pyannote_pipeline:
        try:
            diarization = pyannote_pipeline(wav_filepath)
            for turn, _, speaker in diarization.itertracks(yield_label=True):
                segment_start = turn.start
                segment_end = turn.end
                segment_text = ""
                for chunk in outputs["chunks"]:
                    chunk_start = chunk['timestamp'][0]
                    chunk_end = chunk['timestamp'][1]
                    if chunk_start is not None and chunk_end is not None:
                        if max(segment_start, chunk_start) < min(segment_end, chunk_end):
                            segment_text += chunk['text']
                start_time = str(datetime.timedelta(seconds=int(segment_start)))
                diarized_transcription += f"[{start_time}] {speaker}:{segment_text.strip()}\n"
        except Exception as e:
            diarized_transcription = f"Erreur pendant la diarisation : {e}"
    elif diarize:
        diarized_transcription = "Diarisation activée mais le modèle n'a pas pu être chargé (token manquant ?)."
    else:
        diarized_transcription = "Diarisation non activée."

    transcription_file = save_txt(transcription, "transcription.txt")
    diarization_file = save_txt(diarized_transcription, "transcription_diarized.txt")
    return transcription, diarized_transcription, language_info, transcription_file, diarization_file

# --- Interface Gradio ---

with gr.Blocks() as demo:
    gr.HTML("<div style='text-align:center;'><h1>Transcription et Diarisation de fichiers Audio</h1></div>")
    gr.Markdown("Transcrivez et diarisez automatiquement vos fichiers audio (WhatsApp (opus), mp3, wav, m4a, etc.) grâce à Whisper et pyannote, directement dans ce Space.")

    gr.Markdown("""
## 🚀 Comment utiliser l'application

1. **Téléchargez** un fichier audio (opus, wav, mp3, m4a, etc.): la langue principale sera alors automatiquement détectée ou restera en "auto" dans le cas contraire
2. **Choisissez** la langue ou laissez en "auto": via le menu déroulant, vous pouvez changer la langue détectée automatiquement
3. **Activez** ou non la case "Diarisation"
4. **Cliquez** sur "Transcrire"
5. **Obtenez** la transcription et, si activé, la version diarizée (par locuteur)
6. **Reset** les fichiers avant de recommencer une transcription avec un autre fichier audio
""")

    with gr.Row():
        with gr.Column():
            audio_input = gr.Audio(type="filepath", label="Télécharger un fichier audio")
            language_dropdown = gr.Dropdown(
                choices=["auto", "fr", "en", "es", "de", "it", "pt", "nl", "pl", "ru", "ja", "ko", "zh"],
                value="auto",
                label="Langue (auto = détection automatique)",
                info="Choisissez la langue ou laissez en 'auto' pour la détection automatique"
            )
            diarize_checkbox = gr.Checkbox(label="Activer la Diarisation", value=True)
            submit_btn = gr.Button("Transcrire", variant="primary")
            reset_btn = gr.Button("Reset", variant="secondary")
        with gr.Column():
            language_info_output = gr.Textbox(label="Information sur la langue", lines=1)
            transcription_file = gr.File(label="Télécharger la transcription (.txt)")
            transcription_output = gr.Textbox(label="Transcription Complète", lines=10)
            diarization_file = gr.File(label="Télécharger la transcription diarizée (.txt)")
            diarization_output = gr.Textbox(label="Transcription avec Diarisation (par locuteur)", lines=15)

    audio_input.change(
        fn=detect_language_on_upload,
        inputs=audio_input,
        outputs=language_dropdown
    )

    submit_btn.click(
        fn=transcribe_audio,
        inputs=[audio_input, diarize_checkbox, language_dropdown],
        outputs=[transcription_output, diarization_output, language_info_output, transcription_file, diarization_file]
    )

    def reset_fields():
        return "", "", "", None, None, None, "auto", True

    reset_btn.click(
        fn=reset_fields,
        inputs=[],
        outputs=[transcription_output, diarization_output, language_info_output, transcription_file, diarization_file, audio_input, language_dropdown, diarize_checkbox]
    )

demo.launch(share=True)