#Install dependencies

In [3]:
!pip install ai-edge-litert-nightly

Collecting ai-edge-litert-nightly
  Downloading ai_edge_litert_nightly-1.1.2.dev20250213-cp311-cp311-manylinux_2_17_x86_64.whl.metadata (1.6 kB)
Downloading ai_edge_litert_nightly-1.1.2.dev20250213-cp311-cp311-manylinux_2_17_x86_64.whl (3.5 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.5/3.5 MB[0m [31m29.1 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: ai-edge-litert-nightly
Successfully installed ai-edge-litert-nightly-1.1.2.dev20250213


In [4]:
from ai_edge_litert import interpreter as interpreter_lib
from transformers import AutoTokenizer
import numpy as np
from collections.abc import Sequence
import sys

The cache for model files in Transformers v4.22.0 has been updated. Migrating your old cache. This is a one-time only operation. You can interrupt this and resume the migration later on by calling `transformers.utils.move_cache()`.


0it [00:00, ?it/s]

# Download model files

In [1]:
from huggingface_hub import hf_hub_download
model_path = hf_hub_download(repo_id="litert-community/DeepSeek-R1-Distill-Qwen-1.5B", filename="deepseek_q8_seq128_ekv1280.tflite")

deepseek_q8_seq128_ekv1280.tflite:   0%|          | 0.00/1.81G [00:00<?, ?B/s]

# Create LiteRT interpreter and tokenizer

In [5]:
interpreter = interpreter_lib.InterpreterWithCustomOps(
    custom_op_registerers=["pywrap_genai_ops.GenAIOpsRegisterer"],
    model_path=model_path,
    num_threads=2,
    experimental_default_delegate_latest_features=True)
tokenizer = AutoTokenizer.from_pretrained("deepseek-ai/DeepSeek-R1-Distill-Qwen-1.5B")

tokenizer_config.json:   0%|          | 0.00/3.07k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/7.03M [00:00<?, ?B/s]

# Create pipeline with LiteRT models

In [6]:

class LiteRTLlmPipeline:

  def __init__(self, interpreter, tokenizer):
    """Initializes the pipeline."""
    self._interpreter = interpreter
    self._tokenizer = tokenizer

    self._prefill_runner = None
    self._decode_runner = self._interpreter.get_signature_runner("decode")


  def _init_prefill_runner(self, num_input_tokens: int):
    """Initializes all the variables related to the prefill runner.

    This method initializes the following variables:
      - self._prefill_runner: The prefill runner based on the input size.
      - self._max_seq_len: The maximum sequence length supported by the model.
      - self._max_kv_cache_seq_len: The maximum sequence length supported by the
        KV cache.
      - self._num_layers: The number of layers in the model.

    Args:
      num_input_tokens: The number of input tokens.
    """

    self._prefill_runner = self._get_prefill_runner(num_input_tokens)
    # input_token_shape has shape (batch, max_seq_len)
    input_token_shape = self._prefill_runner.get_input_details()["tokens"][
        "shape"
    ]
    if len(input_token_shape) == 1:
      self._max_seq_len = input_token_shape[0]
    else:
      self._max_seq_len = input_token_shape[1]

    # kv cache input has shape [batch=1, seq_len, num_heads, dim].
    kv_cache_shape = self._prefill_runner.get_input_details()["kv_cache_k_0"][
        "shape"
    ]
    self._max_kv_cache_seq_len = kv_cache_shape[1]

    # The two arguments excluded are `tokens` and `input_pos`. Dividing by 2
    # because each layer has key and value caches.
    self._num_layers = (
        len(self._prefill_runner.get_input_details().keys()) - 2
    ) // 2


  def _init_kv_cache(self) -> dict[str, np.ndarray]:
    if self._prefill_runner is None:
      raise ValueError("Prefill runner is not initialized.")
    kv_cache = {}
    for i in range(self._num_layers):
      kv_cache[f"kv_cache_k_{i}"] = np.zeros(
          self._prefill_runner.get_input_details()[f"kv_cache_k_{i}"]["shape"],
          dtype=np.float32,
      )
      kv_cache[f"kv_cache_v_{i}"] = np.zeros(
          self._prefill_runner.get_input_details()[f"kv_cache_v_{i}"]["shape"],
          dtype=np.float32,
      )
    return kv_cache

  def _get_prefill_runner(self, num_input_tokens: int) :
    """Gets the prefill runner with the best suitable input size.

    Args:
      num_input_tokens: The number of input tokens.

    Returns:
      The prefill runner with the smallest input size.
    """
    best_signature = None
    delta = sys.maxsize
    max_prefill_len = -1
    for key in self._interpreter.get_signature_list().keys():
      if "prefill" not in key:
        continue
      input_pos = self._interpreter.get_signature_runner(key).get_input_details()[
          "input_pos"
      ]
      # input_pos["shape"] has shape (max_seq_len, )
      seq_size = input_pos["shape"][0]
      max_prefill_len = max(max_prefill_len, seq_size)
      if num_input_tokens <= seq_size and seq_size - num_input_tokens < delta:
        delta = seq_size - num_input_tokens
        best_signature = key
    if best_signature is None:
      raise ValueError(
          "The largest prefill length supported is %d, but we have %d number of input tokens"
          %(max_prefill_len, num_input_tokens)
      )
    return self._interpreter.get_signature_runner(best_signature)

  def _greedy_sampler(self, logits: np.ndarray) -> int:
    return int(np.argmax(logits))

  def generate(self, prompt: str, max_decode_steps: int | None = None) -> str:
    messages=[{ 'role': 'user', 'content': prompt}]
    token_ids = self._tokenizer.apply_chat_template(messages, tokenize=True, add_generation_prompt=True)
    # Initialize the prefill runner with the suitable input size.
    self._init_prefill_runner(len(token_ids))

    actual_max_decode_steps = self._max_kv_cache_seq_len - len(token_ids)
    if max_decode_steps is not None:
      actual_max_decode_steps = min(actual_max_decode_steps, max_decode_steps)

    input_token_ids = [0] * self._max_seq_len
    input_token_ids[:len(token_ids)] = token_ids
    model_inputs = self._init_kv_cache()
    model_inputs.update({
        "tokens": np.asarray([input_token_ids], dtype=np.int32),
        "input_pos": np.arange(self._max_seq_len, dtype=np.int32),
    })
    decode_text = []
    decode_step = 0
    print('Running prefill')
    for step in range(actual_max_decode_steps+1):
      signature_runner = self._prefill_runner if step == 0 else self._decode_runner
      model_outputs = signature_runner(**model_inputs)
      # At prefill stage, output logits has shape (batch=1, seq_size, vocab_size)
      # At decode stage, output logits has shape (batch=1, 1, vocab_size).
      selected_logit = len(token_ids)-1 if step == 0 else 0
      logits = model_outputs.pop("logits")[0][selected_logit]

      if step == 0:
        print('Running decode')

      # Decode text output.
      next_token = self._greedy_sampler(logits)
      if next_token == self._tokenizer.eos_token_id:
        break
      decode_text.append(self._tokenizer.decode(next_token, skip_special_tokens=False))
      print(decode_text[-1], end='', flush=True)
      # The rest of the outputs is the updated kv cache.
      model_inputs = model_outputs
      model_inputs.update({
          "tokens": np.array([[next_token]], dtype=np.int32),
          "input_pos": np.array([decode_step + len(token_ids)], dtype=np.int32),})
      decode_step += 1



    print() # print a new line at the end.
    return ''.join(decode_text)


# Generate text from model

In [7]:
pipeline = LiteRTLlmPipeline(interpreter, tokenizer)

In [9]:
prompt = "What is the capital of France?"
output = pipeline.generate(prompt, max_decode_steps = 20)

Running prefill
Running decode
Okay, so I need to figure out the capital of France. I remember that France is a country in
