Spaces:
				
			
			
	
			
			
					
		Running
		
	
	
	
			
			
	
	
	
	
		
		
					
		Running
		
	| import os.path | |
| from typing import Optional | |
| import matplotlib.pyplot as plt | |
| import numpy as np | |
| import soundfile as sf | |
| import streamlit as st | |
| import torch | |
| import transformers | |
| from dtw import dtw | |
| from scipy import signal | |
| from transformers import AutoConfig | |
| from transformers.models.wav2vec2 import Wav2Vec2Model | |
| from datetime import datetime | |
| from random import randrange | |
| import os | |
| import psutil | |
| def play_audio(filename): | |
| audio_file = open(filename, "rb") | |
| audio_bytes = audio_file.read() | |
| st.audio(audio_bytes, format="audio/wav") | |
| def aligner(x, y): | |
| return dtw(x, y, keep_internals=True) | |
| def compute_costs(gcm): | |
| res = [[] for _ in range(gcm.N)] | |
| for i in range(gcm.index1.shape[0]): | |
| d = gcm.localCostMatrix[gcm.index1[i], gcm.index2[i]] | |
| res[gcm.index1[i]].append(d) | |
| n = [len(x) for x in res] | |
| res = [np.mean(x) for x in res] | |
| return res, n | |
| #@st.cache(show_spinner=False, hash_funcs={torch.nn.parameter.Parameter: lambda _: None}, max_entries=1) | |
| def load_wav2vec2_featurizer(model_id: str, layer: Optional[int] = None): | |
| transformers.logging.set_verbosity(transformers.logging.ERROR) | |
| model_kwargs = {} | |
| if layer is not None: | |
| model_kwargs["num_hidden_layers"] = int(layer) if layer > 0 else 0 | |
| with st.spinner("Loading model..."): | |
| model = Wav2Vec2Model.from_pretrained(model_id, **model_kwargs) | |
| model.eval() | |
| if torch.cuda.is_available(): | |
| model.cuda() | |
| # st.success("Done!") | |
| return model | |
| #@st.cache(persist=True, show_spinner=False, max_entries=3) | |
| def run(model_id, layer, filename_x, filename_y): | |
| model = load_wav2vec2_featurizer(model_id, layer) | |
| def _featurize(path): | |
| input_values, rate = sf.read(path, dtype=np.float32) | |
| if len(input_values.shape) == 2: | |
| input_values = input_values.mean(1) | |
| if rate != 16_000: | |
| new_length = int(input_values.shape[0] / rate * 16_000) | |
| input_values = signal.resample(input_values, new_length) | |
| input_values = torch.from_numpy(input_values).unsqueeze(0) | |
| if torch.cuda.is_available(): | |
| input_values = input_values.cuda() | |
| if layer is None: | |
| hidden_states = model(input_values, output_hidden_states=True).hidden_states | |
| hidden_states = [s.squeeze(0).cpu().numpy() for s in hidden_states] | |
| return hidden_states | |
| if layer >= 0: | |
| hidden_state = model(input_values).last_hidden_state.squeeze(0).cpu().numpy() | |
| else: | |
| hidden_state = model.feature_extractor(input_values) | |
| hidden_state = hidden_state.transpose(1, 2) | |
| if layer == -1: | |
| hidden_state = model.feature_projection(hidden_state) | |
| hidden_state = hidden_state.squeeze(0).cpu().numpy() | |
| return hidden_state | |
| with st.spinner("Measuring distance..."): | |
| feats_x = _featurize(filename_x) | |
| feats_y = _featurize(filename_y) | |
| print('3. Features computed', datetime.now().strftime('%d-%m-%Y %H:%M:%S')) # test | |
| gcm = aligner(feats_x, feats_y) | |
| print('4. Alignments computed', datetime.now().strftime('%d-%m-%Y %H:%M:%S')) # test | |
| d = gcm.normalizedDistance | |
| print("Distance:", d) | |
| c, n = compute_costs(gcm) | |
| print('5. Costs computed', datetime.now().strftime('%d-%m-%Y %H:%M:%S')) # test | |
| del model | |
| return d, c, n | |
| def main(): | |
| st.title("Word-level Neural Acoustic Distance Visualizer") | |
| st.write( | |
| "This tool visualizes pronunciation differences between two recordings of the same word. The two recordings have to be wave files containing a single spoken word. \n\n\ | |
| Choose any wav2vec 2.0 compatible model identifier on the [Hugging Face Model Hub](https://huggingface.co/models?filter=wav2vec2) and select the output layer you want to use.\n\n\ | |
| To upload your own recordings select 'custom upload' in the audio file selection step. The first recording is put on the x-axis of the plot and the second one will be the reference recording for computing distance.\n\ | |
| You should already see an example plot of two sample recordings.\n\n\ | |
| This visualization tool is part of [neural representations for modeling variation in speech](https://doi.org/10.1016/j.wocn.2022.101137). \n\ | |
| Please see our paper for further details.") | |
| st.subheader("Model selection:") | |
| model_id = st.selectbox("Select the wav2vec 2.0 model you want to use:", | |
| ("facebook/wav2vec2-large-960h", "facebook/wav2vec2-large", "facebook/wav2vec2-large-xlsr-53", | |
| "facebook/wav2vec2-xls-r-300m", "other"), | |
| index=0) | |
| if model_id == "other": | |
| model_id = st.text_input("Enter the wav2vec 2.0 model you want to use:", | |
| value="facebook/wav2vec2-large-960h", | |
| key="model") | |
| print(f"\n### Start new run\n") # test | |
| try: | |
| cfg = AutoConfig.from_pretrained(model_id) | |
| layer = st.number_input("Select the layer you want to use:", min_value=1, max_value=cfg.num_hidden_layers, value=10) | |
| except OSError: | |
| st.error( | |
| "Please select a wav2vec 2.0 compatible model identifier on the [Hugging Face Model Hub](https://huggingface.co/models?filter=wav2vec2)." | |
| ) | |
| layer = None | |
| print('1. Model selected', datetime.now().strftime('%d-%m-%Y %H:%M:%S')) # test | |
| st.subheader("Audio file selection:") | |
| filename_x = st.selectbox("Filename (x-axis):", | |
| ("falling_huud_mobiel_201145.wav", "falling_hood_mobiel_203936.wav", "custom upload")) | |
| if filename_x == "falling_huud_mobiel_201145.wav": | |
| filename_x = "./examples/falling_huud_mobiel_201145.wav" | |
| play_audio(filename_x) | |
| if filename_x == "falling_hood_mobiel_203936.wav": | |
| filename_x = "./examples/falling_hood_mobiel_203936.wav" | |
| play_audio(filename_x) | |
| filename_y = st.selectbox("Filename (y-axis):", | |
| ("falling_hood_mobiel_203936.wav", "falling_huud_mobiel_201145.wav", "custom upload")) | |
| if filename_y == "falling_huud_mobiel_201145.wav": | |
| filename_y = "./examples/falling_huud_mobiel_201145.wav" | |
| play_audio(filename_y) | |
| if filename_y == "falling_hood_mobiel_203936.wav": | |
| filename_y = "./examples/falling_hood_mobiel_203936.wav" | |
| play_audio(filename_y) | |
| if filename_x == "custom upload": | |
| filename_x = st.file_uploader("Choose a file (x-axis)", key="f_x") | |
| if filename_y == "custom upload": | |
| filename_y = st.file_uploader("Choose a file (y-axis)", key="f_y") | |
| print('2. Files selected', datetime.now().strftime('%d-%m-%Y %H:%M:%S')) # test | |
| if filename_x is not None and filename_y is not None and layer is not None: | |
| print(f"\nX: {filename_x}\nY: {filename_y}") | |
| d, c, n = run(model_id, layer, filename_x, filename_y) | |
| # d_b, c_b, n_b = run(featurizer_b) | |
| fig, axes = plt.subplots(figsize=(4, 2.5)) | |
| print('6. Plot init', datetime.now().strftime('%d-%m-%Y %H:%M:%S')) # test | |
| window_size = 9 | |
| rate = 20 | |
| x = np.arange(0, len(c) * rate, rate) | |
| offset = (window_size - 1) // 2 | |
| x_ = x[offset:-offset] | |
| # Target layer | |
| axes.plot(x, c, alpha=0.5, color="gray", linestyle="--") | |
| axes.scatter(x, c, np.array(n) * 10, color="gray") | |
| c_ = np.convolve(c, np.ones(window_size) / window_size, mode="valid") | |
| axes.plot(x_, c_) | |
| # Last layer | |
| # axes.plot(x, c_b, alpha=0.5, color="gray", linestyle="--") | |
| # axes.scatter(x, c_b, np.array(n_b) * 10, color="gray") | |
| # c_b_ = np.convolve(c_b, np.ones(window_size) / window_size, mode="valid") | |
| # axes.plot(x_, c_b_, linestyle="--") | |
| axes.set_xlabel("time (ms)") | |
| axes.set_ylabel("distance per frame") | |
| axes.hlines(y=d, xmin=0, xmax=np.max(x), linestyles="dashdot") | |
| plt.tight_layout(pad=0) | |
| plt_id = randrange(0, 10) | |
| plt.savefig("./output/plot" + str(plt_id) + ".pdf") | |
| st.pyplot(fig) | |
| main() | |
| print('7. Plot filled', datetime.now().strftime('%d-%m-%Y %H:%M:%S')) # test | |
| if os.path.isfile("./output/plot.pdf"): | |
| st.caption(" Visualization of neural acoustic distances\ | |
| per frame (based on wav2vec 2.0) with the pronunciation of\ | |
| the first filename on the x-axis and distances to the pronunciation\ | |
| of second filename on the y-axis. The horizontal line represents\ | |
| the global distance value (i.e. the average of all individual frames).\ | |
| The blue continuous line represents the moving average distance based on 9 frames,\ | |
| corresponding to 180ms. As a result of the moving average, the blue line does not cover the entire duration of\ | |
| the sample. Larger bullet sizes indicate that multiple\ | |
| frames in the pronunciation on the y-axis are aligned to a single frame in the pronunciation on the x-axis.") | |
| with open("./output/plot.pdf", "rb") as file: | |
| btn = st.download_button(label="Download plot", data=file, file_name="plot.pdf", mime="image/pdf") | |
| print('8. End', datetime.now().strftime('%d-%m-%Y %H:%M:%S')) # test | |
| print(f"9. RAM used: {psutil.Process().memory_info().rss / (1024 * 1024):.2f} MB") # test | |
| for name in dir(): | |
| if not name.startswith('_'): | |
| del globals()[name] | |
| import gc | |
| gc.collect() |