Spaces:
Sleeping
Sleeping
| """Extends the internal Whisper classes to support a KenLM. | |
| This code is still used here, but has been recently moved to the following | |
| whisper fork: https://github.com/zuazo-forks/whisper/tree/lm-simple | |
| Example | |
| ------- | |
| Download and convert the model to OpenAI format: | |
| ```shell | |
| # Converts the model from Hugging Face to OpenAI format: | |
| $ ./convert_hf_to_openai.py \ | |
| --checkpoint zuazo/whisper-medium-eu \ | |
| --whisper_dump_path zuazo-whisper-medium-eu.pt | |
| ``` | |
| Transcription example: | |
| ```python | |
| # Converts the model from Hugging Face to OpenAI format: | |
| from convert_hf_to_openai import convert_tfms_to_openai_whisper | |
| convert_tfms_to_openai_whisper( | |
| "zuazo/whisper-medium-eu", "zuazo-whisper-medium-eu.pt" | |
| ) | |
| HF model path: zuazo/whisper-medium-eu | |
| OpenAI model path: zuazo-whisper-medium-eu.pt | |
| # Hack Whisper to support LM and load the options interface to set it up: | |
| from whisper_decoder_with_lm import LMOptions | |
| # Select an audio file: | |
| audio_path = "tests/data/common_voice_eu_18591439.mp3" | |
| # Set original Whisper transcription options: | |
| decode_options = { | |
| "language": "eu", | |
| "without_timestamps": True, | |
| "temperature": 0.0, # this is important | |
| "beam_size": 5, | |
| "patience": None, | |
| } | |
| transcribe_options = {"task": "transcribe", **decode_options} | |
| # Set LM-specific options: | |
| LMOptions().lm_path = "5gram-eu.bin" | |
| LMOptions().lm_alpha = 0.33582368603855817 | |
| LMOptions().lm_beta = 0.6882556478819416 | |
| # Load the model and transcribe the audio: | |
| import whisper | |
| model = whisper.load_model("zuazo-whisper-medium-eu.pt") | |
| result = model.transcribe(audio_path, **transcribe_options) | |
| result["text"] | |
| 'Non demontre dago langraizoka eta non bolikosta?' | |
| ``` | |
| """ | |
| import logging | |
| import string | |
| from threading import Lock | |
| from typing import Optional, Tuple | |
| import kenlm | |
| import torch | |
| import torch.nn.functional as F | |
| from torch import Tensor | |
| from transformers import AutoModelForCausalLM, AutoTokenizer | |
| from whisper import Whisper | |
| from whisper.decoding import BeamSearchDecoder, DecodingOptions, DecodingTask, Inference | |
| from whisper.normalizers import BasicTextNormalizer | |
| from whisper.tokenizer import Tokenizer | |
| # Extending the DecodingOptions class to support an LM | |
| # ==================================================== | |
| class LMOptions: # pylint: disable=too-few-public-methods | |
| """Singleton class to pass the LM options to the Beam Search algorithm. | |
| I did not found a better way to pass the configuration options to the | |
| `BeamSearchDecoderWithLM` class. | |
| """ | |
| _instance = None | |
| # A KenLM n-gram language model path: | |
| lm_path: str = None | |
| # Hugging Face LM model path or URI: | |
| llm_path: str = None | |
| # The maximum of the alpha hyperparameter of the CTC decoder explored | |
| # during hyperparameter optimization. Language Model weight. | |
| lm_alpha: float = 0.931289039105002 | |
| # End of string character list for the LM: | |
| lm_eos: str = "!?." | |
| # The maximum beta hyperparameter of the CTC decoder explored during | |
| # hyperparameter optimization. Word insertion weight. | |
| lm_beta: float = 1.1834137581510284 | |
| # Whether to normalize text before sending it to the languge model: | |
| lm_normalize: bool = True | |
| # Minimum number of tokens in a sequence required before applying language | |
| # model scoring. This prevents premature evaluation on short sequences. | |
| lm_token_threshold: int = 4 | |
| def __new__(cls): | |
| """ | |
| Create or return the LMOptions instance. | |
| This method implements the singleton pattern which ensures that only | |
| one instance of the LMOptions class exists. | |
| Returns | |
| ------- | |
| LMOptions | |
| The single instance of LMOptions. | |
| Example | |
| ------- | |
| >>> options1 = LMOptions() | |
| >>> LMOptions().lm_path = "5gram-eu.bin" | |
| >>> options2 = LMOptions() | |
| >>> options1 is options2 | |
| True | |
| """ | |
| if not cls._instance: | |
| cls._instance = super(LMOptions, cls).__new__(cls) | |
| return cls._instance | |
| # New Beam Search class with LM support (KenLM) | |
| # ============================================= | |
| class BeamSearchDecoderWithLM( | |
| BeamSearchDecoder | |
| ): # pylint: disable=too-many-instance-attributes | |
| """New Beam Search class with LM support (KenLM).""" | |
| def __init__( | |
| self, | |
| beam_size: int, | |
| tokenizer: Tokenizer, | |
| inference: Inference, | |
| patience: Optional[float] = None, | |
| lm_path: Optional[str] = None, | |
| lm_alpha: Optional[float] = None, | |
| lm_beta: Optional[float] = None, | |
| lm_eos: Optional[str] = None, | |
| lm_normalize: Optional[bool] = True, | |
| ): # pylint: disable=too-many-arguments | |
| """ | |
| Initialize the beam search decoder with n-gram language model support. | |
| Parameters | |
| ---------- | |
| beam_size : int | |
| The number of beams to use in the search process. | |
| tokenizer : Tokenizer | |
| The tokenizer instance used for tokenizing input text and | |
| detokenizing output tokens. | |
| inference : Inference | |
| The inference model used to predict the next token based on the | |
| current state. | |
| patience : Optional[float], default=None | |
| The patience parameter controls how long the search should wait for | |
| a better candidate before terminating the search early. | |
| lm_path : Optional[str], default=None | |
| The file path to the pre-trained KenLM language model. | |
| lm_alpha : Optional[float], default=None | |
| The weight (alpha) of the language model score. | |
| lm_beta : Optional[float], default=None | |
| The weight (beta) applied to the word count within the language | |
| model scoring. | |
| lm_eos : Optional[str], default=None | |
| Characters considered as end-of-sentence markers. | |
| lm_normalize : Optional[bool], default=True | |
| Indicates whether to normalize the text before scoring with the | |
| language model. | |
| """ | |
| super().__init__(beam_size, tokenizer.eot, inference, patience) | |
| self.tokenizer = tokenizer | |
| self.special_tokens = list(self.tokenizer.special_tokens.values()) | |
| self.lm_model = ( | |
| kenlm.Model(lm_path) if lm_path is not None else None | |
| ) # pylint: disable=c-extension-no-member | |
| self.lm_alpha = lm_alpha or 0.0 | |
| self.lm_beta = lm_beta or 0.0 | |
| self.lm_eos = lm_eos or "" # end of sentence chars | |
| self.lm_eow = set(string.punctuation) # end of word chars | |
| self.lm_normalize = lm_normalize # whether to normalize the LM text | |
| self.lm_normalizer = BasicTextNormalizer() # normalize for the KenLM | |
| self.finished_sequences = None | |
| def lm_score_and_word_count(self, sequence) -> Tuple[float, int]: | |
| """Get n-gram language model score and word count for a sequence. | |
| Parameters | |
| ---------- | |
| sequence : tuple of int | |
| A sequence of token IDs. | |
| Returns | |
| ------- | |
| float | |
| The language model score for the decoded text of the sequence. | |
| int | |
| The number of words in the decoded text of the sequence. | |
| """ | |
| if not self.lm_model: | |
| return None, 0.0 | |
| # Convert sequence of tokens to text | |
| sequence = tuple(t for t in sequence if t not in self.special_tokens) | |
| if len(sequence) < LMOptions().lm_token_threshold: | |
| return None, 0.0 | |
| text = self.tokenizer.decode(sequence) | |
| # Early return for empty text | |
| if not text: | |
| return None, 0.0 | |
| logging.debug('LM text: "%s"', text) | |
| # Normalize the text | |
| if self.lm_normalize: | |
| normalized_text = self.lm_normalizer(text) | |
| else: | |
| normalized_text = text | |
| logging.debug('LM text normalized: "%s"', normalized_text) | |
| # Check for end of sentence and end of word: | |
| eos = text[-1] in self.lm_eos | |
| word_count = len(normalized_text.split()) | |
| logging.debug("Word count: %d", word_count) | |
| # In KenLM, the most probable sequences have a higher score: | |
| score = self.lm_model.score(normalized_text, bos=True, eos=eos) | |
| logging.debug("LM score: %f", score) | |
| return score, word_count | |
| def update( # pylint: disable=too-many-locals,too-many-branches,too-many-statements # noqa: E501 | |
| self, tokens: Tensor, logits: Tensor, sum_logprobs: Tensor | |
| ) -> Tuple[Tensor, bool]: | |
| """Update the beam search state with language model scoring. | |
| This method performs a beam search step and updates internal states, | |
| such as finished sequences and token caches. The beam search step | |
| includes LM scoring for ranking beam candidates. | |
| The method internally: | |
| 1. Calculates the cumulative log probabilities for potential beam | |
| candidates by considering both the model's predictions and optional | |
| LM scores. | |
| 2. Ranks the candidates and keeps the top 'beam_size' sequences for | |
| each audio sample. | |
| 3. Checks and keeps track of sequences that have finished decoding. | |
| This code is based on `BeamSearchDecoder.update()`, but with the | |
| additional integration of language model scoring. | |
| Parameters | |
| ---------- | |
| tokens : Tensor) | |
| Current tokens in the beam. Should have shape | |
| [n_audio * beam_size, seq_len], where n_audio is the number of | |
| audio samples and beam_size is the number of beams. | |
| logits : Tensor | |
| Raw prediction scores for the next token, of shape | |
| [n_audio * beam_size, vocab_size]. | |
| sum_logprobs : Tensor | |
| Cumulative log probabilities of the sequences in the beam so far. | |
| Should have shape [n_audio * beam_size]. | |
| Returns | |
| ------- | |
| Tuple[Tensor, bool]: | |
| - A tensor with the updated tokens for each beam, of shape | |
| [n_audio * beam_size, seq_len]. | |
| - A boolean indicating if the beam search is completed for all | |
| audio samples. | |
| Raises | |
| ------ | |
| ValueError: | |
| If the tokens tensor's shape is not divisible by the beam size. | |
| """ | |
| if tokens.shape[0] % self.beam_size != 0: | |
| raise ValueError(f"{tokens.shape}[0] % {self.beam_size} != 0") | |
| n_audio = tokens.shape[0] // self.beam_size | |
| if self.finished_sequences is None: # for the first update | |
| self.finished_sequences = [{} for _ in range(n_audio)] | |
| logprobs = F.log_softmax(logits.float(), dim=-1) | |
| next_tokens, source_indices, finished_sequences = [], [], [] | |
| for i in range(n_audio): | |
| scores, sources, finished = {}, {}, {} | |
| # STEP 1: calculate the cumulative log probabilities for possible | |
| # candidates | |
| for j in range(self.beam_size): | |
| idx = i * self.beam_size + j | |
| prefix = tokens[idx].tolist() | |
| for logprob, token in zip( | |
| *logprobs[idx].topk(self.beam_size + 1) | |
| ): # noqa: E501 | |
| new_logprob = (sum_logprobs[idx] + logprob).item() | |
| logging.debug("AC score (new_logprob): %f", new_logprob) | |
| sequence = tuple(prefix + [token.item()]) | |
| # Adjust the score by adding the LM score: | |
| lm_score, wordc = self.lm_score_and_word_count(sequence) | |
| if lm_score is not None: # if it is a word boundary | |
| lm_adjusted_score = ( | |
| new_logprob | |
| + self.lm_alpha * lm_score | |
| + wordc * self.lm_beta | |
| ) | |
| scores[sequence] = lm_adjusted_score | |
| else: | |
| scores[sequence] = new_logprob | |
| sources[sequence] = idx | |
| # STEP 2: rank the candidates and keep the top beam_size sequences | |
| # for each audio | |
| saved = 0 | |
| for sequence in sorted(scores, key=scores.get, reverse=True): | |
| if sequence[-1] == self.eot: | |
| finished[sequence] = scores[sequence] | |
| else: | |
| sum_logprobs[len(next_tokens)] = scores[sequence] | |
| next_tokens.append(sequence) | |
| source_indices.append(sources[sequence]) | |
| saved += 1 | |
| if saved == self.beam_size: | |
| break | |
| finished_sequences.append(finished) | |
| tokens = torch.tensor( # pylint: disable=no-member | |
| next_tokens, device=tokens.device | |
| ) # pylint: disable=no-member | |
| self.inference.rearrange_kv_cache(source_indices) | |
| # add newly finished sequences to self.finished_sequences | |
| assert len(self.finished_sequences) == len(finished_sequences) | |
| for previously_finished, newly_finished in zip( | |
| self.finished_sequences, finished_sequences | |
| ): | |
| for seq in sorted( | |
| newly_finished, key=newly_finished.get, reverse=True | |
| ): # noqa: E501 | |
| if len(previously_finished) >= self.max_candidates: | |
| break # the candidate list is full | |
| previously_finished[seq] = newly_finished[seq] | |
| # mark as completed if all audio has enough number of samples | |
| completed = all( | |
| len(sequences) >= self.max_candidates | |
| for sequences in self.finished_sequences | |
| ) | |
| return tokens, completed | |
| class LLMSingleton: | |
| """ | |
| Handle LLM class loading in GPU memory. | |
| A singleton class to manage the loading and caching of language models and | |
| tokenizers to ensure that each model and tokenizer is instantiated only | |
| once throughout the application. | |
| Attributes | |
| ---------- | |
| _models : dict | |
| A dictionary to store model instances indexed by model names. | |
| _tokenizers : dict | |
| A dictionary to store tokenizer instances indexed by tokenizer names. | |
| _models_lock : Lock | |
| A threading lock to ensure thread-safe access to the `_models` dictionary. | |
| _tokenizers_lock : Lock | |
| A threading lock to ensure thread-safe access to the `_tokenizers` dictionary. | |
| Methods | |
| ------- | |
| get_model(model_name) | |
| Retrieves a model instance for the given model name or loads it if not | |
| already present. | |
| get_tokenizer(tokenizer_name) | |
| Retrieves a tokenizer instance for the given tokenizer name or loads it | |
| if not already present. | |
| """ | |
| _models = {} | |
| _tokenizers = {} | |
| _models_lock = Lock() | |
| _tokenizers_lock = Lock() | |
| def get_model(cls, model_name): | |
| """ | |
| Retrieve or load a model by name ensuring singleton instantiation. | |
| Parameters | |
| ---------- | |
| model_name : str | |
| The identifier name of the model to be loaded or retrieved. | |
| Returns | |
| ------- | |
| model : PreTrainedModel | |
| An instance of `AutoModelForCausalLM` corresponding to the specified | |
| `model_name`. | |
| Notes | |
| ----- | |
| If the model is not already loaded, it will fetch the model from | |
| HuggingFace's repository using the `AutoModelForCausalLM.from_pretrained` | |
| method, cache it, and return the instance. If already loaded, it simply | |
| returns the cached instance. | |
| """ | |
| with cls._models_lock: | |
| if model_name not in cls._models: | |
| logging.debug("Loading model: %s", model_name) | |
| model = AutoModelForCausalLM.from_pretrained(model_name) | |
| cls._models[model_name] = model | |
| return cls._models[model_name] | |
| def get_tokenizer(cls, tokenizer_name): | |
| """ | |
| Retrieve or load a tokenizer by name ensuring singleton instantiation. | |
| Parameters | |
| ---------- | |
| tokenizer_name : str | |
| The identifier name of the tokenizer to be loaded or retrieved. | |
| Returns | |
| ------- | |
| tokenizer : PreTrainedTokenizer | |
| An instance of `AutoTokenizer` corresponding to the specified | |
| `tokenizer_name`. | |
| Notes | |
| ----- | |
| If the tokenizer is not already loaded, it will fetch the tokenizer | |
| from HuggingFace's repository using the `AutoTokenizer.from_pretrained` | |
| method, cache it, and return the instance. If already loaded, it simply | |
| returns the cached instance. | |
| """ | |
| with cls._tokenizers_lock: | |
| if tokenizer_name not in cls._tokenizers: | |
| logging.debug("Loading tokenizer: %s", tokenizer_name) | |
| tokenizer = AutoTokenizer.from_pretrained(tokenizer_name) | |
| cls._tokenizers[tokenizer_name] = tokenizer | |
| return cls._tokenizers[tokenizer_name] | |
| class BeamSearchDecoderWithLLM(BeamSearchDecoderWithLM): | |
| """Beam Search class with support for Llama (Hugging Face LLM).""" | |
| def __init__( | |
| self, | |
| beam_size: int, | |
| tokenizer: Tokenizer, | |
| inference: Inference, | |
| patience: Optional[float] = None, | |
| llm_path: Optional[str] = None, | |
| lm_alpha: Optional[float] = None, | |
| lm_beta: Optional[float] = None, | |
| lm_eos: Optional[str] = None, | |
| lm_normalize: Optional[bool] = True, | |
| ): # pylint: disable=too-many-arguments | |
| """ | |
| Initialize the beam search decoder with large language model support. | |
| Parameters | |
| ---------- | |
| beam_size : int | |
| The number of beams to use in the search process. | |
| tokenizer : Tokenizer | |
| The tokenizer instance used for tokenizing input text and | |
| detokenizing output tokens. | |
| inference : Inference | |
| The inference model used to predict the next token based on the | |
| current state. | |
| patience : Optional[float], default=None | |
| The patience parameter controls how long the search should wait for | |
| a better candidate before terminating the search early. | |
| llm_path : Optional[str], default=None | |
| The HF name or path to the pre-trained LLM. | |
| lm_alpha : Optional[float], default=None | |
| The weight (alpha) of the language model score. | |
| lm_beta : Optional[float], default=None | |
| The weight (beta) applied to the word count within the language | |
| model scoring. | |
| lm_eos : Optional[str], default=None | |
| Characters considered as end-of-sentence markers. | |
| lm_normalize : Optional[bool], default=True | |
| Indicates whether to normalize the text before scoring with the | |
| language model. | |
| """ | |
| super().__init__( | |
| beam_size, | |
| tokenizer, | |
| inference, | |
| patience, | |
| None, | |
| lm_alpha, | |
| lm_beta, | |
| lm_eos, | |
| lm_normalize, | |
| ) | |
| # Check if CUDA is available | |
| self.device = "cuda" if torch.cuda.is_available() else "cpu" | |
| # Load the models, only once | |
| if llm_path: | |
| self.llm_model = LLMSingleton.get_model(llm_path).to(self.device) | |
| self.llm_tokenizer = LLMSingleton.get_tokenizer(llm_path) | |
| else: | |
| self.llm_model = self.llm_tokenizer = None | |
| def lm_score_and_word_count(self, sequence) -> Tuple[float, int]: | |
| """Get large language model score and word count for a sequence. | |
| Parameters | |
| ---------- | |
| sequence : tuple of int | |
| A sequence of token IDs. | |
| Returns | |
| ------- | |
| float | |
| The language model score for the decoded text of the sequence. | |
| int | |
| The number of words in the decoded text of the sequence. | |
| """ | |
| # Similar implementation for LLM | |
| # Convert sequence of tokens to text | |
| sequence = tuple(t for t in sequence if t not in self.special_tokens) | |
| if len(sequence) < LMOptions().lm_token_threshold: | |
| return None, 0.0 | |
| text = self.tokenizer.decode(sequence) | |
| # Early return for empty text | |
| if not text: | |
| return None, 0.0 | |
| logging.debug('LLM text: "%s"', text) | |
| # Normalize the text | |
| if self.lm_normalize: | |
| normalized_text = self.lm_normalizer(text) | |
| else: | |
| normalized_text = text | |
| logging.debug('LLM text normalized: "%s"', normalized_text) | |
| word_count = len(normalized_text.split()) | |
| logging.debug("Word count: %d", word_count) | |
| # Tokenize the input | |
| tokens = self.llm_tokenizer(normalized_text, return_tensors="pt").to( | |
| self.device | |
| ) | |
| # Get input IDs and attention mask | |
| input_ids = tokens["input_ids"] | |
| attention_mask = tokens["attention_mask"] | |
| # outputs = self.llm_model(**tokens) | |
| # Calculate output from the model | |
| outputs = self.llm_model( | |
| input_ids, attention_mask=attention_mask, labels=input_ids | |
| ) | |
| # Get the log probabilities of the last token | |
| log_probs = outputs.logits[:, -1, :].softmax(dim=-1) | |
| # Use the highest log probability as the score | |
| max_log_prob = log_probs.max().item() | |
| # Convert from natural log to log10 (like KenLM) | |
| score = max_log_prob # / math.log(10) * -100 | |
| logging.debug("LLM score: %f", score) | |
| return score, word_count | |
| class BeamSearchDecoderWithLMAndLLM(BeamSearchDecoderWithLM): | |
| """Beam Search class with support for KenLM and Hugging Face LLM together. | |
| It uses the word count weight (the beta) as the large language weight. | |
| """ | |
| def __init__( | |
| self, | |
| beam_size: int, | |
| tokenizer: Tokenizer, | |
| inference: Inference, | |
| patience: Optional[float] = None, | |
| lm_path: Optional[str] = None, | |
| llm_path: Optional[str] = None, | |
| lm_alpha: Optional[float] = None, | |
| lm_beta: Optional[float] = None, | |
| lm_eos: Optional[str] = None, | |
| lm_normalize: Optional[bool] = True, | |
| ): # pylint: disable=too-many-arguments | |
| """ | |
| Initialize the beam search decoder with n-gram and large LMs. | |
| Parameters | |
| ---------- | |
| beam_size : int | |
| The number of beams to use in the search process. | |
| tokenizer : Tokenizer | |
| The tokenizer instance used for tokenizing input text and | |
| detokenizing output tokens. | |
| inference : Inference | |
| The inference model used to predict the next token based on the | |
| current state. | |
| patience : Optional[float], default=None | |
| The patience parameter controls how long the search should wait for | |
| a better candidate before terminating the search early. | |
| lm_path : Optional[str], default=None | |
| The file path to the pre-trained KenLM language model. | |
| llm_path : Optional[str], default=None | |
| The HF name or path to the pre-trained LLM. | |
| lm_alpha : Optional[float], default=None | |
| The weight (alpha) of the language model score. | |
| lm_beta : Optional[float], default=None | |
| The weight (beta) applied to the word count within the language | |
| model scoring. | |
| lm_eos : Optional[str], default=None | |
| Characters considered as end-of-sentence markers. | |
| lm_normalize : Optional[bool], default=True | |
| Indicates whether to normalize the text before scoring with the | |
| language model. | |
| """ | |
| super().__init__( | |
| beam_size, | |
| tokenizer, | |
| inference, | |
| patience, | |
| None, | |
| lm_alpha, | |
| lm_beta, | |
| lm_eos, | |
| lm_normalize, | |
| ) | |
| # Check if CUDA is available | |
| self.device = "cuda" if torch.cuda.is_available() else "cpu" | |
| # Load the models, only once | |
| self.lm_model = ( | |
| kenlm.Model(lm_path) if lm_path is not None else None | |
| ) # pylint: disable=c-extension-no-member | |
| if llm_path: | |
| self.llm_model = LLMSingleton.get_model(llm_path).to(self.device) | |
| self.llm_tokenizer = LLMSingleton.get_tokenizer(llm_path) | |
| else: | |
| self.llm_model = self.llm_tokenizer = None | |
| def lm_score_and_word_count(self, sequence) -> Tuple[float, int]: | |
| """Get n-gram and large language model scores. | |
| Parameters | |
| ---------- | |
| sequence : tuple of int | |
| A sequence of token IDs. | |
| Returns | |
| ------- | |
| float | |
| The n-gram language model score for the decoded text of the sequence. | |
| float | |
| The large language model score for the decoded text of the sequence. | |
| """ | |
| # Convert sequence of tokens to text | |
| sequence = tuple(t for t in sequence if t not in self.special_tokens) | |
| if len(sequence) < LMOptions().lm_token_threshold: | |
| return None, 0.0 | |
| text = self.tokenizer.decode(sequence) | |
| # Early return for empty text | |
| if not text: | |
| return None, 0.0 | |
| logging.debug('LM&LLM text: "%s"', text) | |
| # Normalize the text | |
| if self.lm_normalize: | |
| normalized_text = self.lm_normalizer(text) | |
| else: | |
| normalized_text = text | |
| logging.debug('LM&LLM text normalized: "%s"', normalized_text) | |
| # Check for end of sentence and end of word: | |
| eos = text[-1] in self.lm_eos | |
| # word_count = len(normalized_text.split()) | |
| # logging.debug("Word count: %d", word_count) | |
| # In KenLM, the most probable sequences have a higher score: | |
| score_lm = self.lm_model.score(normalized_text, bos=True, eos=eos) | |
| logging.debug("LM score: %f", score_lm) | |
| # Tokenize the input | |
| tokens = self.llm_tokenizer(normalized_text, return_tensors="pt").to( | |
| self.device | |
| ) | |
| # Get input IDs and attention mask | |
| input_ids = tokens["input_ids"] | |
| attention_mask = tokens["attention_mask"] | |
| # Calculate output from the model | |
| outputs = self.llm_model( | |
| input_ids, attention_mask=attention_mask, labels=input_ids | |
| ) | |
| # Get the log probabilities of the last token | |
| log_probs = outputs.logits[:, -1, :].softmax(dim=-1) | |
| # Use the highest log probability as the score | |
| max_log_prob = log_probs.max().item() | |
| # Convert from natural log to log10 (like KenLM) | |
| score_llm = max_log_prob # / math.log(10) * -100 | |
| logging.debug("LLM score: %f", score_llm) | |
| return score_lm, score_llm | |
| # Extending the DecodingTask class to support an BeamSearchWithLM | |
| # =============================================================== | |
| # Store a reference to the original __init__ | |
| original_decoding_task_init = DecodingTask.__init__ | |
| def new_decoding_task_init(self, model: Whisper, options: DecodingOptions): | |
| """Create the the DecodingTask class instance. | |
| This will replace the original constructor. | |
| Example | |
| ------- | |
| >>> DecodingTask.__init__ = new_decoding_task_init | |
| """ | |
| # Call the original constructor using the stored reference: | |
| original_decoding_task_init(self, model, options) | |
| # New logic: | |
| lm_options = LMOptions() | |
| if options.beam_size is not None: | |
| if lm_options.llm_path is not None and lm_options.lm_path is not None: | |
| logging.debug("Decoder: BeamSearchDecoderWithLMAndLLM") | |
| self.decoder = BeamSearchDecoderWithLMAndLLM( | |
| options.beam_size, | |
| self.tokenizer, | |
| self.inference, | |
| options.patience, | |
| lm_options.lm_path, | |
| lm_options.llm_path, | |
| lm_options.lm_alpha, | |
| lm_options.lm_beta, | |
| lm_options.lm_eos, | |
| lm_options.lm_normalize, | |
| ) | |
| elif lm_options.llm_path is not None: | |
| logging.debug("Decoder: BeamSearchDecoderWithLLM") | |
| self.decoder = BeamSearchDecoderWithLLM( | |
| options.beam_size, | |
| self.tokenizer, | |
| self.inference, | |
| options.patience, | |
| lm_options.llm_path, | |
| lm_options.lm_alpha, | |
| lm_options.lm_beta, | |
| lm_options.lm_eos, | |
| lm_options.lm_normalize, | |
| ) | |
| else: | |
| logging.debug("Decoder: BeamSearchDecoderWithLM") | |
| self.decoder = BeamSearchDecoderWithLM( | |
| options.beam_size, | |
| self.tokenizer, | |
| self.inference, | |
| options.patience, | |
| lm_options.lm_path, | |
| lm_options.lm_alpha, | |
| lm_options.lm_beta, | |
| lm_options.lm_eos, | |
| lm_options.lm_normalize, | |
| ) | |
| # Monkey patching the DecodingTask constructor: | |
| DecodingTask.__init__ = new_decoding_task_init | |