Spaces:
Runtime error
Runtime error
| import gradio as gr | |
| import torch | |
| from transformers import GPT2LMHeadModel, T5Tokenizer | |
| model_name = "akiFQC/japanese-dialogpt-small-aozora" | |
| tokenizer = T5Tokenizer.from_pretrained(model_name) | |
| tokenizer.do_lower_case = True # due to some bug of tokenizer config loading | |
| model = GPT2LMHeadModel.from_pretrained(model_name) | |
| class DialogGPT: | |
| def __init__(self, tokenizer, model, n_candidate=4, param_lambda=0.10): | |
| self.tokenizer = tokenizer | |
| self.model = model | |
| self.model.eval() | |
| self.n_candidate = n_candidate | |
| self.param_lambda = param_lambda | |
| self.param_gamma: int = 2 | |
| def _calc_single_scores(self, token_ids): | |
| with torch.inference_mode(): | |
| candidate_token_ids = token_ids[:, :-1] | |
| label_token_ids = token_ids[:, 1:] | |
| outputs = self.model(candidate_token_ids, labels=label_token_ids) | |
| _, logits = outputs[:2] | |
| logits = torch.log_softmax(logits, dim=-1) | |
| logit_at_target = logits.gather( | |
| dim=-1, index=candidate_token_ids.unsqueeze(-1) | |
| ).squeeze(-1) | |
| # mask out pad token positio | |
| mask_at_pad = candidate_token_ids == self.tokenizer.pad_token_id | |
| # log_likelihood (b, l) | |
| log_likelihood = logit_at_target | |
| log_likelihood.masked_fill_(mask_at_pad, 0.0) | |
| log_likelihood_per_candidate = log_likelihood[:, self.param_gamma:].sum(dim=1) | |
| # normalize by length | |
| # log_likelihood_per_candidate = log_likelihood_per_candidate / (candidate_token_ids.shape[1] - mask_at_pad.sum(dim=1)) | |
| return log_likelihood_per_candidate | |
| def _calc_scores(self, sequences, scores, input_ids=None): | |
| transition_scores = model.compute_transition_scores( | |
| sequences, scores, normalize_logits=True | |
| ) | |
| if input_ids is None: | |
| input_length = 0 | |
| else: | |
| input_length = input_ids.shape[1] | |
| generated_tokens = sequences[:, input_length:] # n x l | |
| assert ( | |
| generated_tokens.shape[1] == transition_scores.shape[1] | |
| ), f"{generated_tokens.shape[1]} != {transition_scores.shape[1]}" | |
| # print(transition_scores.shape) | |
| # print(generated_tokens) | |
| transition_scores.masked_fill_( | |
| generated_tokens == self.tokenizer.pad_token_id, 0.0 | |
| ) | |
| transition_scores = transition_scores.sum(dim=1) | |
| # print(transition_scores) | |
| return transition_scores | |
| def reply(self, reply, history) -> str: | |
| chat_history_ids = torch.LongTensor(history).unsqueeze(0) | |
| # encode the new user input, add the eos_token and return a tensor in Pytorch | |
| new_user_input_ids = self.tokenizer.encode( | |
| reply + self.tokenizer.eos_token, return_tensors="pt" | |
| ) | |
| # append the new user input tokens to the chat history | |
| bot_input_ids = ( | |
| torch.cat([chat_history_ids, new_user_input_ids], dim=-1) | |
| if chat_history_ids is not None | |
| else new_user_input_ids | |
| ) | |
| # generated a response while limiting the total chat history to 1000 tokens, | |
| with torch.inference_mode(): | |
| output = model.generate( | |
| bot_input_ids, | |
| pad_token_id=self.tokenizer.pad_token_id, | |
| do_sample=True, | |
| top_p=0.93, | |
| temperature=0.5, | |
| repetition_penalty=1.17, | |
| max_time=10, | |
| num_return_sequences=self.n_candidate, | |
| max_length=512, | |
| min_length=4, | |
| forced_eos_token_id=self.tokenizer.pad_token_id, | |
| return_dict_in_generate=True, | |
| output_scores=True, | |
| min_new_tokens=2, | |
| ) | |
| # score of each candidate | |
| scores_condition_s2t = self._calc_scores( | |
| sequences=output.sequences, scores=output.scores, input_ids=bot_input_ids | |
| ) | |
| new_token_ids = output.sequences[:, bot_input_ids.shape[-1] :] | |
| single_scores = self._calc_single_scores(new_token_ids) * self.param_lambda | |
| total_scores = scores_condition_s2t - single_scores | |
| id_selected = torch.argmax(total_scores) | |
| chat_history_ids = output.sequences[id_selected].unsqueeze( | |
| 0 | |
| ) # update chat history | |
| # remove pad token | |
| chat_history_ids = chat_history_ids[ | |
| :, chat_history_ids[0] != self.tokenizer.pad_token_id | |
| ] | |
| replay_string = tokenizer.decode( | |
| chat_history_ids[:, :][0], skip_special_tokens=False | |
| ) | |
| return replay_string, chat_history_ids[0].tolist() | |
| bot = DialogGPT( | |
| tokenizer, | |
| model, | |
| ) | |
| def predict(input, history=[]): | |
| replay_string, history = bot.reply(input, history) | |
| response = replay_string.split(tokenizer.eos_token) | |
| response = [ | |
| (response[i], response[i + 1]) for i in range(0, len(response) - 1, 2) | |
| ] # convert to tuples of list | |
| return response, history | |
| with gr.Blocks() as demo: | |
| chatbot = gr.Chatbot() | |
| state = gr.State([]) | |
| with gr.Row(): | |
| txt = gr.Textbox( | |
| show_label=False, placeholder="Enter text and press enter" | |
| ).style(container=False) | |
| txt.submit(predict, [txt, state], [chatbot, state]) | |
| demo.launch() | |