Spaces:
Running
Running
| from flask import Flask, request, Response | |
| from io import BytesIO | |
| import torch | |
| from av import open as avopen | |
| from typing import Dict, List | |
| import re_matching | |
| import utils | |
| from infer import infer, get_net_g, latest_version | |
| from scipy.io import wavfile | |
| import gradio as gr | |
| from config import config | |
| # Flask Init | |
| app = Flask(__name__) | |
| app.config["JSON_AS_ASCII"] = False | |
| def replace_punctuation(text, i=2): | |
| punctuation = ",。?!" | |
| for char in punctuation: | |
| text = text.replace(char, char * i) | |
| return text | |
| def wav2(i, o, format): | |
| inp = avopen(i, "rb") | |
| out = avopen(o, "wb", format=format) | |
| if format == "ogg": | |
| format = "libvorbis" | |
| ostream = out.add_stream(format) | |
| for frame in inp.decode(audio=0): | |
| for p in ostream.encode(frame): | |
| out.mux(p) | |
| for p in ostream.encode(None): | |
| out.mux(p) | |
| out.close() | |
| inp.close() | |
| net_g_List = [] | |
| hps_List = [] | |
| # 模型角色字典 | |
| # 使用方法 chr_name = chrsMap[model_id][chr_id] | |
| chrsMap: List[Dict[int, str]] = list() | |
| # 加载模型 | |
| models = config.server_config.models | |
| for model in models: | |
| hps_List.append(utils.get_hparams_from_file(model["config"])) | |
| # 添加角色字典 | |
| chrsMap.append(dict()) | |
| for name, cid in hps_List[-1].data.spk2id.items(): | |
| chrsMap[-1][cid] = name | |
| version = ( | |
| hps_List[-1].version if hasattr(hps_List[-1], "version") else latest_version | |
| ) | |
| net_g_List.append( | |
| get_net_g( | |
| model_path=model["model"], | |
| version=version, | |
| device=model["device"], | |
| hps=hps_List[-1], | |
| ) | |
| ) | |
| def generate_audio( | |
| slices, | |
| sdp_ratio, | |
| noise_scale, | |
| noise_scale_w, | |
| length_scale, | |
| speaker, | |
| language, | |
| ): | |
| audio_list = [] | |
| silence = np.zeros(hps.data.sampling_rate // 2, dtype=np.int16) | |
| with torch.no_grad(): | |
| for piece in slices: | |
| audio = infer( | |
| piece, | |
| sdp_ratio=sdp_ratio, | |
| noise_scale=noise_scale, | |
| noise_scale_w=noise_scale_w, | |
| length_scale=length_scale, | |
| sid=speaker, | |
| language=language, | |
| hps=hps, | |
| net_g=net_g, | |
| device=device, | |
| ) | |
| audio16bit = gr.processing_utils.convert_to_16_bit_wav(audio) | |
| audio_list.append(audio16bit) | |
| audio_list.append(silence) # 将静音添加到列表中 | |
| return audio_list | |
| def main(): | |
| try: | |
| model = int(request.args.get("model")) | |
| speaker = request.args.get("speaker", "") # 指定人物名 | |
| speaker_id = request.args.get("speaker_id", None) # 直接指定id | |
| text = request.args.get("text").replace("/n", "") | |
| sdp_ratio = float(request.args.get("sdp_ratio", 0.2)) | |
| noise = float(request.args.get("noise", 0.5)) | |
| noisew = float(request.args.get("noisew", 0.6)) | |
| length = float(request.args.get("length", 1.2)) | |
| language = request.args.get("language") | |
| if length >= 2: | |
| return "Too big length" | |
| if len(text) >= 250: | |
| return "Too long text" | |
| fmt = request.args.get("format", "wav") | |
| if None in (speaker, text): | |
| return "Missing Parameter" | |
| if fmt not in ("mp3", "wav", "ogg"): | |
| return "Invalid Format" | |
| if language not in ("JP", "ZH", "EN", "mix"): | |
| return "Invalid language" | |
| except: | |
| return "Invalid Parameter" | |
| if speaker_id is not None: | |
| if speaker_id.isdigit(): | |
| speaker = chrsMap[model][int(speaker_id)] | |
| audio_list = [] | |
| if language == "mix": | |
| bool_valid, str_valid = re_matching.validate_text(text) | |
| if not bool_valid: | |
| return str_valid, ( | |
| hps.data.sampling_rate, | |
| np.concatenate([np.zeros(hps.data.sampling_rate // 2)]), | |
| ) | |
| result = re_matching.text_matching(text) | |
| for one in result: | |
| _speaker = one.pop() | |
| for lang, content in one: | |
| audio_list.extend( | |
| generate_audio( | |
| content.split("|"), | |
| sdp_ratio, | |
| noise_scale, | |
| noise_scale_w, | |
| length_scale, | |
| _speaker, | |
| lang, | |
| ) | |
| ) | |
| else: | |
| audio_list.extend( | |
| generate_audio( | |
| text.split("|"), | |
| sdp_ratio, | |
| noise_scale, | |
| noise_scale_w, | |
| length_scale, | |
| speaker, | |
| language, | |
| ) | |
| ) | |
| audio_concat = np.concatenate(audio_list) | |
| with BytesIO() as wav: | |
| wavfile.write(wav, hps_List[model].data.sampling_rate, audio_concat) | |
| torch.cuda.empty_cache() | |
| if fmt == "wav": | |
| return Response(wav.getvalue(), mimetype="audio/wav") | |
| wav.seek(0, 0) | |
| with BytesIO() as ofp: | |
| wav2(wav, ofp, fmt) | |
| return Response( | |
| ofp.getvalue(), mimetype="audio/mpeg" if fmt == "mp3" else "audio/ogg" | |
| ) | |
| if __name__ == "__main__": | |
| app.run(port=config.server_config.port, server_name="0.0.0.0") | |