Spaces:
Running
on
Zero
Running
on
Zero
| import random | |
| import numpy as np | |
| import torch | |
| from transformers import AutoTokenizer, T5ForConditionalGeneration | |
| import spaces | |
| # Available models | |
| MODEL_OPTIONS_INSTRUCT_TUNED = { | |
| "EXT1 Model": "alakxender/flan-t5-base-alpaca-dv-ext" | |
| } | |
| # Cache for loaded models/tokenizers | |
| MODEL_CACHE = {} | |
| def get_model_and_tokenizer(model_dir): | |
| if model_dir not in MODEL_CACHE: | |
| print(f"Loading model: {model_dir}") | |
| tokenizer = AutoTokenizer.from_pretrained(model_dir) | |
| model = T5ForConditionalGeneration.from_pretrained(model_dir) | |
| device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| print(f"Moving model to device: {device}") | |
| model.to(device) | |
| MODEL_CACHE[model_dir] = (tokenizer, model) | |
| return MODEL_CACHE[model_dir] | |
| def generate_response_tuned(instruction, input_text, seed, model_choice,max_tokens, temperature, num_beams): | |
| random.seed(seed) | |
| np.random.seed(seed) | |
| torch.manual_seed(seed) | |
| if torch.cuda.is_available(): | |
| torch.cuda.manual_seed_all(seed) | |
| model_dir = MODEL_OPTIONS_INSTRUCT_TUNED[model_choice] | |
| tokenizer, model = get_model_and_tokenizer(model_dir) | |
| combined_input = f"{instruction.strip()} {input_text.strip()}" if input_text else instruction.strip() | |
| inputs = tokenizer( | |
| combined_input, | |
| return_tensors="pt", | |
| truncation=True | |
| ) | |
| device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| inputs = {k: v.to(device) for k, v in inputs.items()} | |
| gen_kwargs = { | |
| **inputs, | |
| "max_length":max_tokens, | |
| "num_beams":num_beams, | |
| "do_sample":(temperature > 0.0), | |
| "temperature":temperature, | |
| "repetition_penalty":1.2, | |
| "top_p":0.95, | |
| "top_k":50 | |
| } | |
| with torch.no_grad(): | |
| outputs = model.generate(**gen_kwargs) | |
| decoded_output = tokenizer.decode(outputs[0], skip_special_tokens=True) | |
| # Trim to the last period | |
| if '.' in decoded_output: | |
| last_period = decoded_output.rfind('.') | |
| decoded_output = decoded_output[:last_period+1] | |
| decoded_output = ' '.join(decoded_output.split()) | |
| return decoded_output |