import sys import io, os, stat import subprocess import random from zipfile import ZipFile import uuid import time import torch import torchaudio #download for mecab os.system('python -m unidic download') # By using XTTS you agree to CPML license https://coqui.ai/cpml os.environ["COQUI_TOS_AGREED"] = "1" import langid import base64 import csv from io import StringIO import datetime import re import gradio as gr from scipy.io.wavfile import write from pydub import AudioSegment from TTS.api import TTS from TTS.tts.configs.xtts_config import XttsConfig from TTS.tts.models.xtts import Xtts from TTS.utils.generic_utils import get_user_data_dir HF_TOKEN = os.environ.get("HF_TOKEN") from huggingface_hub import HfApi api = HfApi(token=HF_TOKEN) repo_id = "coqui/xtts" print("Export newer ffmpeg binary for denoise filter") ZipFile("ffmpeg.zip").extractall() print("Make ffmpeg binary executable") st = os.stat("ffmpeg") os.chmod("ffmpeg", st.st_mode | stat.S_IEXEC) print("Downloading if not downloaded Coqui XTTS V2") from TTS.utils.manage import ModelManager model_name = "tts_models/multilingual/multi-dataset/xtts_v2" ModelManager().download_model(model_name) model_path = os.path.join(get_user_data_dir("tts"), model_name.replace("/", "--")) print("XTTS downloaded") config = XttsConfig() config.load_json(os.path.join(model_path, "config.json")) model = Xtts.init_from_config(config) model.load_checkpoint( config, checkpoint_path=os.path.join(model_path, "model.pth"), vocab_path=os.path.join(model_path, "vocab.json"), eval=True, use_deepspeed=True, ) model.cuda() DEVICE_ASSERT_DETECTED = 0 DEVICE_ASSERT_PROMPT = None DEVICE_ASSERT_LANG = None supported_languages = config.languages def predict( prompt, language, audio_file_pth, mic_file_path, use_mic, voice_cleanup, no_lang_auto_detect, agree, ): if agree == True: if language not in supported_languages: gr.Warning( f"Language you put {language} in is not in is not in our Supported Languages, please choose from dropdown" ) return (None, None, None, None) language_predicted = langid.classify(prompt)[0].strip() if language_predicted == "zh": language_predicted = "zh-cn" print(f"Detected language:{language_predicted}, Chosen language:{language}") if len(prompt) > 15: if language_predicted != language and not no_lang_auto_detect: gr.Warning( f"It looks like your text isn't the language you chose, if you're sure the text is the same language you chose, please check disable language auto-detection checkbox" ) return (None, None, None, None) if use_mic == True: if mic_file_path is not None: speaker_wav = mic_file_path else: gr.Warning( "Please record your voice with Microphone, or uncheck Use Microphone to use reference audios" ) return (None, None, None, None) else: speaker_wav = audio_file_pth lowpassfilter = denoise = trim = loudness = True if lowpassfilter: lowpass_highpass = "lowpass=8000,highpass=75," else: lowpass_highpass = "" if trim: trim_silence = "areverse,silenceremove=start_periods=1:start_silence=0:start_threshold=0.02,areverse,silenceremove=start_periods=1:start_silence=0:start_threshold=0.02," else: trim_silence = "" if voice_cleanup: try: out_filename = speaker_wav + str(uuid.uuid4()) + ".wav" shell_command = f"./ffmpeg -y -i {speaker_wav} -af {lowpass_highpass}{trim_silence} {out_filename}".split(" ") command_result = subprocess.run( [item for item in shell_command], capture_output=False, text=True, check=True, ) speaker_wav = out_filename print("Filtered microphone input") except subprocess.CalledProcessError: print("Error: failed filtering, use original microphone input") else: speaker_wav = speaker_wav if len(prompt) < 2: gr.Warning("Please give a longer prompt text") return (None, None, None, None) # Changed from 200 to 5000 characters if len(prompt) > 5000: gr.Warning( "Text length limited to 5000 characters for this demo" ) return (None, None, None, None) global DEVICE_ASSERT_DETECTED if DEVICE_ASSERT_DETECTED: global DEVICE_ASSERT_PROMPT global DEVICE_ASSERT_LANG print(f"Unrecoverable exception caused by language:{DEVICE_ASSERT_LANG} prompt:{DEVICE_ASSERT_PROMPT}") space = api.get_space_runtime(repo_id=repo_id) if space.stage!="BUILDING": api.restart_space(repo_id=repo_id) else: print("TRIED TO RESTART but space is building") try: metrics_text = "" t_latent = time.time() try: (gpt_cond_latent, speaker_embedding) = model.get_conditioning_latents( audio_path=speaker_wav, gpt_cond_len=30, gpt_cond_chunk_len=4, max_ref_length=60 ) except Exception as e: print("Speaker encoding error", str(e)) gr.Warning("It appears something wrong with reference, did you unmute your microphone?") return (None, None, None, None) latent_calculation_time = time.time() - t_latent prompt = re.sub("([^\x00-\x7F]|\w)(\.|\。|\?)",r"\1 \2\2",prompt) print("I: Generating new audio...") t0 = time.time() out = model.inference( prompt, language, gpt_cond_latent, speaker_embedding, repetition_penalty=5.0, temperature=0.75, ) inference_time = time.time() - t0 print(f"I: Time to generate audio: {round(inference_time*1000)} milliseconds") metrics_text+=f"Time to generate audio: {round(inference_time*1000)} milliseconds\n" real_time_factor= (time.time() - t0) / out['wav'].shape[-1] * 24000 print(f"Real-time factor (RTF): {real_time_factor}") metrics_text+=f"Real-time factor (RTF): {real_time_factor:.2f}\n" torchaudio.save("output.wav", torch.tensor(out["wav"]).unsqueeze(0), 24000) except RuntimeError as e: if "device-side assert" in str(e): print(f"Exit due to: Unrecoverable exception caused by language:{language} prompt:{prompt}", flush=True) gr.Warning("Unhandled Exception encounter, please retry in a minute") print("Cuda device-assert Runtime encountered need restart") if not DEVICE_ASSERT_DETECTED: DEVICE_ASSERT_DETECTED = 1 DEVICE_ASSERT_PROMPT = prompt DEVICE_ASSERT_LANG = language error_time = datetime.datetime.now().strftime("%d-%m-%Y-%H:%M:%S") error_data = [ error_time, prompt, language, audio_file_pth, mic_file_path, use_mic, voice_cleanup, no_lang_auto_detect, agree, ] error_data = [str(e) if type(e) != str else e for e in error_data] print(error_data) print(speaker_wav) write_io = StringIO() csv.writer(write_io).writerows([error_data]) csv_upload = write_io.getvalue().encode() filename = error_time + "_" + str(uuid.uuid4()) + ".csv" print("Writing error csv") error_api = HfApi() error_api.upload_file( path_or_fileobj=csv_upload, path_in_repo=filename, repo_id="coqui/xtts-flagged-dataset", repo_type="dataset", ) speaker_filename = error_time + "_reference_" + str(uuid.uuid4()) + ".wav" error_api = HfApi() error_api.upload_file( path_or_fileobj=speaker_wav, path_in_repo=speaker_filename, repo_id="coqui/xtts-flagged-dataset", repo_type="dataset", ) space = api.get_space_runtime(repo_id=repo_id) if space.stage!="BUILDING": api.restart_space(repo_id=repo_id) else: print("TRIED TO RESTART but space is building") else: if "Failed to decode" in str(e): print("Speaker encoding error", str(e)) gr.Warning("It appears something wrong with reference, did you unmute your microphone?") else: print("RuntimeError: non device-side assert error:", str(e)) gr.Warning("Something unexpected happened please retry again.") return (None, None, None, None) return ( gr.make_waveform(audio="output.wav"), "output.wav", metrics_text, speaker_wav, ) else: gr.Warning("Please accept the Terms & Condition!") return (None, None, None, None) title = "Coqui🐸 XTTS (5000 Char Limit)" description = """
This demo is running **XTTS v2.0.3** with 5000 character limit. XTTS is a multilingual text-to-speech model with voice cloning.
Supported languages: Arabic (ar), Portuguese (pt), Chinese (zh-cn), Czech (cs), Dutch (nl), English (en), French (fr), German (de), Italian (it), Polish (pl), Russian (ru), Spanish (es), Turkish (tr), Japanese (ja), Korean (ko), Hungarian (hu), Hindi (hi)
""" with gr.Blocks(analytics_enabled=False) as demo: with gr.Row(): with gr.Column(): gr.Markdown(""" ## """) with gr.Column(): pass with gr.Row(): with gr.Column(): gr.Markdown(description) with gr.Column(): gr.Markdown(""" | | | | ------------------------------- | --------------------------------------- | | 🐸💬 **CoquiTTS** | | | 💼 **Documentation** | [ReadTheDocs](https://tts.readthedocs.io/en/latest/) | """) with gr.Row(): with gr.Column(): input_text_gr = gr.Textbox( label="Text Prompt", info="Up to 5000 text characters.", value="Hi there, I'm your new voice clone. Try your best to upload quality audio.", lines=5, max_lines=10 ) language_gr = gr.Dropdown( label="Language", choices=["en", "es", "fr", "de", "it", "pt", "pl", "tr", "ru", "nl", "cs", "ar", "zh-cn", "ja", "ko", "hu", "hi"], value="en", ) ref_gr = gr.Audio( label="Reference Audio", type="filepath", value="examples/female.wav", ) mic_gr = gr.Audio( source="microphone", type="filepath", label="Use Microphone for Reference", ) use_mic_gr = gr.Checkbox( label="Use Microphone", value=False, ) clean_ref_gr = gr.Checkbox( label="Cleanup Reference Voice", value=False, ) auto_det_lang_gr = gr.Checkbox( label="Do not use language auto-detect", value=False, ) tos_gr = gr.Checkbox( label="Agree to CPML terms", value=False, ) tts_button = gr.Button("Generate Speech", elem_id="send-btn", visible=True) with gr.Column(): video_gr = gr.Video(label="Waveform Visual") audio_gr = gr.Audio(label="Synthesised Audio", autoplay=True) out_text_gr = gr.Text(label="Metrics") ref_audio_gr = gr.Audio(label="Reference Audio Used") tts_button.click( predict, [input_text_gr, language_gr, ref_gr, mic_gr, use_mic_gr, clean_ref_gr, auto_det_lang_gr, tos_gr], outputs=[video_gr, audio_gr, out_text_gr, ref_audio_gr] ) demo.queue() demo.launch(debug=True, show_api=True)