# -*- coding: utf-8 -*- """Username_Transformer Automatically generated by Colab. Original file is located at https://colab.research.google.com/drive/1iae8ZzCuKYOPmMyTibAh7hVzwjbrW4Pe """ # Commented out IPython magic to ensure Python compatibility. # Install PyTorch # %pip install torch torchvision torchaudio # Install other dependencies # %pip install numpy pandas nltk elevenlabs requests import torch import torch.nn as nn from torch.utils.data import Dataset, DataLoader import numpy as np import nltk import re from collections import Counter from tqdm import tqdm import requests from nltk.corpus import cmudict import os import pandas as pd # allow cuDNN benchmark to pick fastest model import torch.backends.cudnn as cudnn cudnn.benchmark = True nltk.download('cmudict') cmu_dict = cmudict.dict() url = "https://raw.githubusercontent.com/danielmiessler/SecLists/master/Usernames/xato-net-10-million-usernames.txt" try: response = requests.get(url) response.raise_for_status() # Raise an exception for bad status codes usernames = response.text.splitlines() print(f"Downloaded {len(usernames)} usernames.") except requests.exceptions.RequestException as e: print(f"Error downloading usernames: {e}") usernames = [] def normalize_username(username): # Convert to lowercase username = username.lower() # Replace numbers with words num_to_word = { '0': ' zero ', '1': ' one ', '2': ' two ', '3': ' three ', '4': ' four ', '5': ' five ', '6': ' six ', '7': ' seven ', '8': ' eight ', '9': ' nine ' } for num, word in num_to_word.items(): username = username.replace(num, word) # Replace special characters with spaces username = re.sub(r'[\W_]+', ' ', username) # Remove extra spaces username = re.sub(r'\s+', ' ', username).strip() return username def get_phonemes(word): phonemes_list = cmu_dict.get(word) if phonemes_list: return phonemes_list[0] # Use the first pronunciation else: return None # Only show usernames that have correct phonemes def username_to_phonemes(username): normalized = normalize_username(username) words = normalized.split() phonemes = [] for word in words: phoneme = get_phonemes(word) if phoneme: phonemes.extend(phoneme) # else: # print(f"Warning: Unable to find phonemes for word: {word}") return phonemes input_sequences = [] target_sequences = [] for username in usernames: input_seq = list(normalize_username(username)) target_seq = username_to_phonemes(username) if target_seq: input_sequences.append(input_seq) target_sequences.append(target_seq) # Character Vocabulary char_counter = Counter([char for seq in input_sequences for char in seq]) char_list = [''] + sorted(char_counter.keys()) char_vocab = {char: idx for idx, char in enumerate(char_list)} # Phoneme Vocabulary phoneme_counter = Counter([phoneme for seq in target_sequences for phoneme in seq]) phoneme_list = ['', '', ''] + sorted(phoneme_counter.keys()) phoneme_vocab = {phoneme: idx for idx, phoneme in enumerate(phoneme_list)} def encode_sequence(seq, vocab, max_len, add_special_tokens=False): encoded = [vocab.get(token, vocab['']) for token in seq] if add_special_tokens: encoded = [vocab['']] + encoded + [vocab['']] # Trim or pad the sequence to max_len encoded = encoded[:max_len] + [vocab['']] * max(0, max_len - len(encoded)) return encoded max_input_len = max(len(seq) for seq in input_sequences) max_target_len = max(len(seq) for seq in target_sequences) + 2 # For and encoded_inputs = [encode_sequence(seq, char_vocab, max_input_len) for seq in input_sequences] encoded_targets = [encode_sequence(seq, phoneme_vocab, max_target_len, True) for seq in target_sequences] class UsernameDataset(Dataset): def __init__(self, inputs, targets): self.inputs = torch.tensor(inputs, dtype=torch.long) self.targets = torch.tensor(targets, dtype=torch.long) def __len__(self): return len(self.inputs) def __getitem__(self, idx): return self.inputs[idx], self.targets[idx] dataset = UsernameDataset(encoded_inputs, encoded_targets) data_loader = DataLoader(dataset, batch_size=512, shuffle=True) # Function to decode sequences def decode_sequence(encoded_seq, vocab): idx_to_token = {idx: token for token, idx in vocab.items()} decoded_seq = [idx_to_token.get(idx, '') for idx in encoded_seq] return decoded_seq # Create lists to store decoded usernames and pronunciations usernames = [] pronunciations = [] # Iterate through the dataset and decode sequences for input_seq, target_seq in dataset: username = ''.join(decode_sequence(input_seq.tolist(), char_vocab)) pronunciation = ' '.join(decode_sequence(target_seq.tolist(), phoneme_vocab)) usernames.append(username) pronunciations.append(pronunciation) # Create a Pandas DataFrame df = pd.DataFrame({'username': usernames, 'pronunciation': pronunciations}) # Export to CSV df.to_csv('username_pronunciation.csv', index=False) class Encoder(nn.Module): def __init__(self, input_dim, emb_dim, hid_dim): super().__init__() self.embedding = nn.Embedding(input_dim, emb_dim, padding_idx=char_vocab['']) self.gru = nn.GRU(emb_dim, hid_dim, batch_first=True) def forward(self, src): embedded = self.embedding(src) outputs, hidden = self.gru(embedded) return outputs, hidden class Attention(nn.Module): def __init__(self, hid_dim): super().__init__() self.attn = nn.Linear(hid_dim * 2, hid_dim) self.v = nn.Linear(hid_dim, 1, bias=False) def forward(self, hidden, encoder_outputs): src_len = encoder_outputs.shape[1] hidden = hidden.repeat(1, src_len, 1) energy = torch.tanh(self.attn(torch.cat((hidden, encoder_outputs), dim=2))) attention = self.v(energy).squeeze(2) return torch.softmax(attention, dim=1) class Decoder(nn.Module): def __init__(self, output_dim, emb_dim, hid_dim, attention): super().__init__() self.output_dim = output_dim self.attention = attention self.embedding = nn.Embedding(output_dim, emb_dim, padding_idx=phoneme_vocab['']) self.gru = nn.GRU(emb_dim + hid_dim, hid_dim, batch_first=True) self.fc_out = nn.Linear(hid_dim * 2, output_dim) def forward(self, input, hidden, encoder_outputs): input = input.unsqueeze(1) embedded = self.embedding(input) a = self.attention(hidden.permute(1, 0, 2), encoder_outputs) a = a.unsqueeze(1) weighted = torch.bmm(a, encoder_outputs) rnn_input = torch.cat((embedded, weighted), dim=2) output, hidden = self.gru(rnn_input, hidden) output = torch.cat((output.squeeze(1), weighted.squeeze(1)), dim=1) prediction = self.fc_out(output) return prediction, hidden class Seq2Seq(nn.Module): def __init__(self, encoder, decoder, device): super().__init__() self.encoder = encoder self.decoder = decoder self.device = device def forward(self, src, trg, teacher_forcing_ratio=0.5): batch_size = src.shape[0] trg_len = trg.shape[1] trg_vocab_size = self.decoder.output_dim outputs = torch.zeros(batch_size, trg_len, trg_vocab_size).to(self.device) encoder_outputs, hidden = self.encoder(src) input = trg[:, 0] for t in range(1, trg_len): output, hidden = self.decoder(input, hidden, encoder_outputs) outputs[:, t] = output top1 = output.argmax(1) teacher_force = np.random.random() < teacher_forcing_ratio input = trg[:, t] if teacher_force else top1 return outputs def get_latest_checkpoint(directory): # Get a list of all files in the directory files = os.listdir(directory) # Filter the list to only include g2p{n}.pth files checkpoint_files = [f for f in files if re.match(r'g2p\d+\.pth', f)] # Extract the numbers from the filenames checkpoint_numbers = [int(re.search(r'g2p(\d+)\.pth', f).group(1)) for f in checkpoint_files] print(checkpoint_numbers) # Sort the files by their numbers sorted_files = sorted(zip(checkpoint_numbers, checkpoint_files)) # Get the latest file (last element in the sorted list) if sorted_files: latest_file = sorted_files[-1][1] latest_checkpoint_path = os.path.join(directory, latest_file) return latest_checkpoint_path else: return None def get_next_version(directory): files = os.listdir(directory) # Filter the list to only include g2p{n}.pth files checkpoint_files = [f for f in files if re.match(r'g2p\d+\.pth', f)] # Extract the numbers from the filenames checkpoint_numbers = [int(re.search(r'g2p(\d+)\.pth', f).group(1)) for f in checkpoint_files] print(checkpoint_numbers) # Sort the files by their numbers sorted_files = sorted(zip(checkpoint_numbers, checkpoint_files)) if sorted_files: latest_version = sorted_files[-1][0] print(f"Latest version: {sorted_files[-1]}") return latest_version + 1 else: return 1 # Start with version 1 if no checkpoints exist def save_checkpoint(model, directory, version): filename = f"g2p{version}.pth" filepath = os.path.join(directory, filename) torch.save(model.state_dict(), filepath) print(f"Model saved to {filepath}") # Get the latest checkpoint file path directory = '/content/drive/MyDrive/AI/username_g2p/' latest_checkpoint_file = get_latest_checkpoint(directory) if latest_checkpoint_file: print(f"Latest checkpoint file: {latest_checkpoint_file}") else: print("No checkpoint files found.") print(get_next_version(directory)) INPUT_DIM = len(char_vocab) OUTPUT_DIM = len(phoneme_vocab) ENC_EMB_DIM = 64 DEC_EMB_DIM = 64 HID_DIM = 128 attn = Attention(HID_DIM) enc = Encoder(INPUT_DIM, ENC_EMB_DIM, HID_DIM) dec = Decoder(OUTPUT_DIM, DEC_EMB_DIM, HID_DIM, attn) device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') model = Seq2Seq(enc, dec, device).to(device) optimizer = torch.optim.Adam(model.parameters()) criterion = nn.CrossEntropyLoss(ignore_index=phoneme_vocab['']) # Path to your checkpoint file checkpoint_file = latest_checkpoint_file if latest_checkpoint_file else 'g2p1.pth' # Check if the checkpoint file exists if os.path.exists(checkpoint_file): # Load the checkpoint print(f"Loading checkpoint from {checkpoint_file}") model.load_state_dict(torch.load(checkpoint_file)) else: print(f"Checkpoint file not found. Using default initialization.") print(device) # Verify input sequences max_input_idx = max([max(seq) for seq in encoded_inputs]) print(f'Max input index: {max_input_idx}, Input vocab size: {INPUT_DIM}') # Verify target sequences max_target_idx = max([max(seq) for seq in encoded_targets]) print(f'Max target index: {max_target_idx}, Output vocab size: {OUTPUT_DIM}') def train(model, loader, optimizer, criterion, clip): model.train() epoch_loss = 0 for src, trg in tqdm(loader, desc="Training Batches"): src, trg = src.to(device), trg.to(device) optimizer.zero_grad() output = model(src, trg) output_dim = output.shape[-1] output = output[:, 1:].reshape(-1, output_dim) trg = trg[:, 1:].reshape(-1) loss = criterion(output, trg) loss.backward() torch.nn.utils.clip_grad_norm_(model.parameters(), clip) optimizer.step() epoch_loss += loss.item() return epoch_loss / len(loader) N_EPOCHS = 1 CLIP = 1 for epoch in range(N_EPOCHS): loss = train(model, data_loader, optimizer, criterion, CLIP) print(f'Epoch: {epoch+1}, Loss: {loss:.4f}') # Get the next version number next_version = get_next_version(directory) # Save the model with the new version number save_checkpoint(model, directory, next_version) def predict(model, username): model.eval() with torch.no_grad(): normalized = normalize_username(username) input_seq = encode_sequence(list(normalized), char_vocab, max_input_len) src = torch.tensor([input_seq], dtype=torch.long).to(device) encoder_outputs, hidden = model.encoder(src) input_token = torch.tensor([phoneme_vocab['']], dtype=torch.long).to(device) outputs = [] for _ in range(max_target_len): output, hidden = model.decoder(input_token, hidden, encoder_outputs) top1 = output.argmax(1) if top1.item() == phoneme_vocab['']: break outputs.append(top1.item()) input_token = top1 idx_to_phoneme = {idx: phoneme for phoneme, idx in phoneme_vocab.items()} predicted_phonemes = [idx_to_phoneme[idx] for idx in outputs] return ' '.join(predicted_phonemes) test_username = 'supercalafragalisticexpialadocous' test_username = 'barnabassacket' pronunciation = predict(model, test_username) print(f'Username: {test_username}') print(f'Pronunciation: {pronunciation}') # from https://github.com/margonaut/CMU-to-IPA-Converter/blob/master/cmu_ipa_mapping.rb CMU_IPA_MAPPING = { "B": "b", "CH": "ʧ", "D": "d", "DH": "ð", "F": "f", "G": "g", "HH": "h", "JH": "ʤ", "K": "k", "L": "l", "M": "m", "N": "n", "NG": "ŋ", "P": "p", "R": "r", "S": "s", "SH": "ʃ", "T": "t", "TH": "θ", "V": "v", "W": "w", "Y": "j", "Z": "z", "ZH": "ʒ", "AA0": "ɑ", "AA1": "ɑ", "AA2": "ɑ", "AE0": "æ", "AE1": "æ", "AE2": "æ", "AH0": "ə", "AH1": "ʌ", "AH2": "ʌ", "AO0": "ɔ", "AO1": "ɔ", "AO2": "ɔ", "EH0": "ɛ", "EH1": "ɛ", "EH2": "ɛ", "ER0": "ɚ", "ER1": "ɝ", "ER2": "ɝ", "IH0": "ɪ", "IH1": "ɪ", "IH2": "ɪ", "IY0": "i", "IY1": "i", "IY2": "i", "UH0": "ʊ", "UH1": "ʊ", "UH2": "ʊ", "UW0": "u", "UW1": "u", "UW2": "u", "AW0": "aʊ", "AW1": "aʊ", "AW2": "aʊ", "AY0": "aɪ", "AY1": "aɪ", "AY2": "aɪ", "EY0": "eɪ", "EY1": "eɪ", "EY2": "eɪ", "OW0": "oʊ", "OW1": "oʊ", "OW2": "oʊ", "OY0": "ɔɪ", "OY1": "ɔɪ", "OY2": "ɔɪ" } pronunciation = predict(model, test_username) ipa_sequence = ''.join([CMU_IPA_MAPPING.get(phoneme, phoneme) for phoneme in pronunciation.split()]) print(f'Username: {test_username}') print(f'Pronunciation: {ipa_sequence}') ssml_template = """{text}""" class Alphabets: IPA = "ipa" CMU = "cmu-arpabet" print(ssml_template.format(alphabet=Alphabets.IPA, phonetics="ˈæktʃuəli", text="actually")) from google.colab import userdata eleven_labs_key = userdata.get('ELEVENLABS') from elevenlabs import save from elevenlabs.client import ElevenLabs from IPython.display import Audio, display sound_file = 'test.mp3' def build_eleven_labs_query(username: str): client = ElevenLabs( api_key=eleven_labs_key, ) audio = client.generate( text=ssml_template.format( alphabet=Alphabets.CMU, phonetics=predict(model, username), text=username ), voice="Rachel", model="eleven_flash_v2" ) save(audio, sound_file) build_eleven_labs_query(test_username) display(Audio(sound_file, autoplay=True)) # prompt: get the parameters of a pytorch model import torch # Assuming 'model' is your Seq2Seq model instance # Replace with your actual model if named differently # Method 1: Using model.named_parameters() for name, param in model.named_parameters(): print(f"Parameter Name: {name}, Shape: {param.shape}") # Method 2: Using model.parameters() (without parameter names) for param in model.parameters(): print(f"Parameter Shape: {param.shape}") print(f"Model Parameters: {sum(p.numel() for p in model.parameters())}") # prompt: visualize the weights import matplotlib.pyplot as plt import numpy as np # Assuming 'model' is your Seq2Seq model instance # Replace with your actual model if named differently # Collect parameter shapes and names parameter_shapes = [] parameter_names = [] for name, param in model.named_parameters(): parameter_shapes.append(np.prod(param.shape)) parameter_names.append(name) # Create a bar chart plt.figure(figsize=(10, 6)) plt.bar(parameter_names, parameter_shapes) plt.xlabel("Parameter Name") plt.ylabel("Number of Weights") plt.title("Distribution of Weights in the Model") plt.xticks(rotation=90) # Rotate x-axis labels for better readability plt.tight_layout() plt.show()