import sys
import io, os, stat
import subprocess
import random
from zipfile import ZipFile
import uuid
import time
import torch
import torchaudio
#download for mecab
os.system('python -m unidic download')
# By using XTTS you agree to CPML license https://coqui.ai/cpml
os.environ["COQUI_TOS_AGREED"] = "1"
import langid
import base64
import csv
from io import StringIO
import datetime
import re
import gradio as gr
from scipy.io.wavfile import write
from pydub import AudioSegment
from TTS.api import TTS
from TTS.tts.configs.xtts_config import XttsConfig
from TTS.tts.models.xtts import Xtts
from TTS.utils.generic_utils import get_user_data_dir
HF_TOKEN = os.environ.get("HF_TOKEN")
from huggingface_hub import HfApi
api = HfApi(token=HF_TOKEN)
repo_id = "coqui/xtts"
print("Export newer ffmpeg binary for denoise filter")
ZipFile("ffmpeg.zip").extractall()
print("Make ffmpeg binary executable")
st = os.stat("ffmpeg")
os.chmod("ffmpeg", st.st_mode | stat.S_IEXEC)
print("Downloading if not downloaded Coqui XTTS V2")
from TTS.utils.manage import ModelManager
model_name = "tts_models/multilingual/multi-dataset/xtts_v2"
ModelManager().download_model(model_name)
model_path = os.path.join(get_user_data_dir("tts"), model_name.replace("/", "--"))
print("XTTS downloaded")
config = XttsConfig()
config.load_json(os.path.join(model_path, "config.json"))
model = Xtts.init_from_config(config)
model.load_checkpoint(
config,
checkpoint_path=os.path.join(model_path, "model.pth"),
vocab_path=os.path.join(model_path, "vocab.json"),
eval=True,
use_deepspeed=True,
)
model.cuda()
DEVICE_ASSERT_DETECTED = 0
DEVICE_ASSERT_PROMPT = None
DEVICE_ASSERT_LANG = None
supported_languages = config.languages
def predict(
prompt,
language,
audio_file_pth,
mic_file_path,
use_mic,
voice_cleanup,
no_lang_auto_detect,
agree,
):
if agree == True:
if language not in supported_languages:
gr.Warning(
f"Language you put {language} in is not in is not in our Supported Languages, please choose from dropdown"
)
return (None, None, None, None)
language_predicted = langid.classify(prompt)[0].strip()
if language_predicted == "zh":
language_predicted = "zh-cn"
print(f"Detected language:{language_predicted}, Chosen language:{language}")
if len(prompt) > 15:
if language_predicted != language and not no_lang_auto_detect:
gr.Warning(
f"It looks like your text isn't the language you chose, if you're sure the text is the same language you chose, please check disable language auto-detection checkbox"
)
return (None, None, None, None)
if use_mic == True:
if mic_file_path is not None:
speaker_wav = mic_file_path
else:
gr.Warning(
"Please record your voice with Microphone, or uncheck Use Microphone to use reference audios"
)
return (None, None, None, None)
else:
speaker_wav = audio_file_pth
lowpassfilter = denoise = trim = loudness = True
if lowpassfilter:
lowpass_highpass = "lowpass=8000,highpass=75,"
else:
lowpass_highpass = ""
if trim:
trim_silence = "areverse,silenceremove=start_periods=1:start_silence=0:start_threshold=0.02,areverse,silenceremove=start_periods=1:start_silence=0:start_threshold=0.02,"
else:
trim_silence = ""
if voice_cleanup:
try:
out_filename = speaker_wav + str(uuid.uuid4()) + ".wav"
shell_command = f"./ffmpeg -y -i {speaker_wav} -af {lowpass_highpass}{trim_silence} {out_filename}".split(" ")
command_result = subprocess.run(
[item for item in shell_command],
capture_output=False,
text=True,
check=True,
)
speaker_wav = out_filename
print("Filtered microphone input")
except subprocess.CalledProcessError:
print("Error: failed filtering, use original microphone input")
else:
speaker_wav = speaker_wav
if len(prompt) < 2:
gr.Warning("Please give a longer prompt text")
return (None, None, None, None)
# Changed from 200 to 5000 characters
if len(prompt) > 5000:
gr.Warning(
"Text length limited to 5000 characters for this demo"
)
return (None, None, None, None)
global DEVICE_ASSERT_DETECTED
if DEVICE_ASSERT_DETECTED:
global DEVICE_ASSERT_PROMPT
global DEVICE_ASSERT_LANG
print(f"Unrecoverable exception caused by language:{DEVICE_ASSERT_LANG} prompt:{DEVICE_ASSERT_PROMPT}")
space = api.get_space_runtime(repo_id=repo_id)
if space.stage!="BUILDING":
api.restart_space(repo_id=repo_id)
else:
print("TRIED TO RESTART but space is building")
try:
metrics_text = ""
t_latent = time.time()
try:
(gpt_cond_latent, speaker_embedding) = model.get_conditioning_latents(
audio_path=speaker_wav,
gpt_cond_len=30,
gpt_cond_chunk_len=4,
max_ref_length=60
)
except Exception as e:
print("Speaker encoding error", str(e))
gr.Warning("It appears something wrong with reference, did you unmute your microphone?")
return (None, None, None, None)
latent_calculation_time = time.time() - t_latent
prompt = re.sub("([^\x00-\x7F]|\w)(\.|\。|\?)",r"\1 \2\2",prompt)
print("I: Generating new audio...")
t0 = time.time()
out = model.inference(
prompt,
language,
gpt_cond_latent,
speaker_embedding,
repetition_penalty=5.0,
temperature=0.75,
)
inference_time = time.time() - t0
print(f"I: Time to generate audio: {round(inference_time*1000)} milliseconds")
metrics_text+=f"Time to generate audio: {round(inference_time*1000)} milliseconds\n"
real_time_factor= (time.time() - t0) / out['wav'].shape[-1] * 24000
print(f"Real-time factor (RTF): {real_time_factor}")
metrics_text+=f"Real-time factor (RTF): {real_time_factor:.2f}\n"
torchaudio.save("output.wav", torch.tensor(out["wav"]).unsqueeze(0), 24000)
except RuntimeError as e:
if "device-side assert" in str(e):
print(f"Exit due to: Unrecoverable exception caused by language:{language} prompt:{prompt}", flush=True)
gr.Warning("Unhandled Exception encounter, please retry in a minute")
print("Cuda device-assert Runtime encountered need restart")
if not DEVICE_ASSERT_DETECTED:
DEVICE_ASSERT_DETECTED = 1
DEVICE_ASSERT_PROMPT = prompt
DEVICE_ASSERT_LANG = language
error_time = datetime.datetime.now().strftime("%d-%m-%Y-%H:%M:%S")
error_data = [
error_time,
prompt,
language,
audio_file_pth,
mic_file_path,
use_mic,
voice_cleanup,
no_lang_auto_detect,
agree,
]
error_data = [str(e) if type(e) != str else e for e in error_data]
print(error_data)
print(speaker_wav)
write_io = StringIO()
csv.writer(write_io).writerows([error_data])
csv_upload = write_io.getvalue().encode()
filename = error_time + "_" + str(uuid.uuid4()) + ".csv"
print("Writing error csv")
error_api = HfApi()
error_api.upload_file(
path_or_fileobj=csv_upload,
path_in_repo=filename,
repo_id="coqui/xtts-flagged-dataset",
repo_type="dataset",
)
speaker_filename = error_time + "_reference_" + str(uuid.uuid4()) + ".wav"
error_api = HfApi()
error_api.upload_file(
path_or_fileobj=speaker_wav,
path_in_repo=speaker_filename,
repo_id="coqui/xtts-flagged-dataset",
repo_type="dataset",
)
space = api.get_space_runtime(repo_id=repo_id)
if space.stage!="BUILDING":
api.restart_space(repo_id=repo_id)
else:
print("TRIED TO RESTART but space is building")
else:
if "Failed to decode" in str(e):
print("Speaker encoding error", str(e))
gr.Warning("It appears something wrong with reference, did you unmute your microphone?")
else:
print("RuntimeError: non device-side assert error:", str(e))
gr.Warning("Something unexpected happened please retry again.")
return (None, None, None, None)
return (
gr.make_waveform(audio="output.wav"),
"output.wav",
metrics_text,
speaker_wav,
)
else:
gr.Warning("Please accept the Terms & Condition!")
return (None, None, None, None)
title = "Coqui🐸 XTTS (5000 Char Limit)"
description = """
This demo is running **XTTS v2.0.3** with 5000 character limit. XTTS is a multilingual text-to-speech model with voice cloning.
Supported languages: Arabic (ar), Portuguese (pt), Chinese (zh-cn), Czech (cs), Dutch (nl), English (en), French (fr), German (de), Italian (it), Polish (pl), Russian (ru), Spanish (es), Turkish (tr), Japanese (ja), Korean (ko), Hungarian (hu), Hindi (hi)
"""
with gr.Blocks(analytics_enabled=False) as demo:
with gr.Row():
with gr.Column():
gr.Markdown("""
##
""")
with gr.Column():
pass
with gr.Row():
with gr.Column():
gr.Markdown(description)
with gr.Column():
gr.Markdown("""
| | |
| ------------------------------- | --------------------------------------- |
| 🐸💬 **CoquiTTS** |
|
| 💼 **Documentation** | [ReadTheDocs](https://tts.readthedocs.io/en/latest/) |
""")
with gr.Row():
with gr.Column():
input_text_gr = gr.Textbox(
label="Text Prompt",
info="Up to 5000 text characters.",
value="Hi there, I'm your new voice clone. Try your best to upload quality audio.",
lines=5,
max_lines=10
)
language_gr = gr.Dropdown(
label="Language",
choices=["en", "es", "fr", "de", "it", "pt", "pl", "tr", "ru", "nl", "cs", "ar", "zh-cn", "ja", "ko", "hu", "hi"],
value="en",
)
ref_gr = gr.Audio(
label="Reference Audio",
type="filepath",
value="examples/female.wav",
)
mic_gr = gr.Audio(
source="microphone",
type="filepath",
label="Use Microphone for Reference",
)
use_mic_gr = gr.Checkbox(
label="Use Microphone",
value=False,
)
clean_ref_gr = gr.Checkbox(
label="Cleanup Reference Voice",
value=False,
)
auto_det_lang_gr = gr.Checkbox(
label="Do not use language auto-detect",
value=False,
)
tos_gr = gr.Checkbox(
label="Agree to CPML terms",
value=False,
)
tts_button = gr.Button("Generate Speech", elem_id="send-btn", visible=True)
with gr.Column():
video_gr = gr.Video(label="Waveform Visual")
audio_gr = gr.Audio(label="Synthesised Audio", autoplay=True)
out_text_gr = gr.Text(label="Metrics")
ref_audio_gr = gr.Audio(label="Reference Audio Used")
tts_button.click(
predict,
[input_text_gr, language_gr, ref_gr, mic_gr, use_mic_gr, clean_ref_gr, auto_det_lang_gr, tos_gr],
outputs=[video_gr, audio_gr, out_text_gr, ref_audio_gr]
)
demo.queue()
demo.launch(debug=True, show_api=True)