# backend.py — FINAL VERSION import sqlite3 import threading import time import torch import torch.nn as nn from torch.utils.data import DataLoader, Dataset from huggingface_hub import whoami, HfApi, create_repo from datasets import load_dataset from transformers import AutoTokenizer import psutil import os import shutil from werkzeug.security import generate_password_hash, check_password_hash DB_PATH = "llm_kitchen.db" training_queue = [] active_runs = set() active_users = set() scheduler_lock = threading.Lock() RUN_TIMEOUT = 48 * 3600 MAX_RAM_PER_RUN_GB = 1.5 # ------------------------------ DATABASE ------------------------------ def init_db(): conn = sqlite3.connect(DB_PATH, check_same_thread=False) cursor = conn.cursor() cursor.executescript(""" CREATE TABLE IF NOT EXISTS users ( id INTEGER PRIMARY KEY AUTOINCREMENT, username TEXT UNIQUE NOT NULL, password_hash TEXT NOT NULL, created_at DATETIME DEFAULT CURRENT_TIMESTAMP ); CREATE TABLE IF NOT EXISTS training_runs ( id INTEGER PRIMARY KEY AUTOINCREMENT, user_id INTEGER NOT NULL, arch_type TEXT NOT NULL, num_layers INTEGER NOT NULL, learning_rate REAL NOT NULL, epochs INTEGER NOT NULL, batch_size INTEGER NOT NULL, status TEXT DEFAULT 'queued', logs TEXT DEFAULT '', started_at DATETIME, completed_at DATETIME, FOREIGN KEY (user_id) REFERENCES users(id) ); """) conn.close() init_db() def db_query(query, params=()): conn = sqlite3.connect(DB_PATH, check_same_thread=False) cursor = conn.cursor() cursor.execute(query, params) res = cursor.fetchall() conn.commit() last_id = cursor.lastrowid conn.close() return res, last_id def get_user_by_username(username): rows, _ = db_query("SELECT id, password_hash FROM users WHERE username = ?", (username,)) return rows[0] if rows else None # ------------------------------ AUTHENTICATION ------------------------------ def signup_user(username, password): if not username or not password: return None, "Username and password cannot be empty." if get_user_by_username(username): return None, "Username already exists. Please choose another." password_hash = generate_password_hash(password) _, user_id = db_query("INSERT INTO users (username, password_hash) VALUES (?, ?)", (username, password_hash)) return user_id, f"Welcome, {username}! Your account is ready. Please log in." def login_user(username, password): user = get_user_by_username(username) if user and check_password_hash(user[1], password): return user[0], f"Welcome back, {username}!" return None, "Invalid username or password." # ------------------------------ TRAINING QUEUE & SCHEDULER ------------------------------ def ram_available(): return (psutil.virtual_memory().available / (1024**3)) >= MAX_RAM_PER_RUN_GB def queue_training_run(user_id, config): _, run_id = db_query("INSERT INTO training_runs (user_id, arch_type, num_layers, learning_rate, epochs, batch_size) VALUES (?, ?, ?, ?, ?, ?)", (user_id, config['arch_type'], config['num_layers'], config['learning_rate'], config['epochs'], config['batch_size'])) training_queue.append({"run_id": run_id, "user_id": user_id, **config}) start_training_if_free() return run_id def start_training_if_free(): with scheduler_lock: for job in list(training_queue): if not ram_available(): log_update("MemoryWarning: Not enough RAM for new runs. Waiting.", -1) break if job["user_id"] in active_users: continue log_update(f"Scheduler: Starting run #{job['run_id']} for user #{job['user_id']}", -1) active_runs.add(job["run_id"]) active_users.add(job["user_id"]) training_queue.remove(job) update_run_status(job["run_id"], "running") log_update("🍳 Starting kitchen process...", job["run_id"]) thread = threading.Thread(target=run_training_job, args=(job,)) thread.start() threading.Timer(RUN_TIMEOUT, kill_run_timeout, args=[job]).start() def kill_run_timeout(job): run_id, user_id = job["run_id"], job["user_id"] with scheduler_lock: if run_id in active_runs: log_update(f"Run {run_id}: 💥 48-HOUR TIMEOUT. Terminating.", run_id) update_run_status(run_id, "timeout") active_runs.discard(run_id) active_users.discard(user_id) start_training_if_free() def get_user_runs(user_id): rows, _ = db_query("SELECT id, arch_type, num_layers, status, started_at FROM training_runs WHERE user_id = ? ORDER BY id DESC", (user_id,)) return rows def get_run_logs(user_id, run_id): """Securely fetches logs by checking ownership (user_id).""" rows, _ = db_query("SELECT logs, status FROM training_runs WHERE id = ? AND user_id = ?", (run_id, user_id)) return rows[0] if rows else ("", "unknown") def update_run_status(run_id, status): if status == 'running': db_query("UPDATE training_runs SET status = ?, started_at = CURRENT_TIMESTAMP WHERE id = ?", (status, run_id)) elif status in ['completed', 'failed', 'timeout']: db_query("UPDATE training_runs SET status = ?, completed_at = CURRENT_TIMESTAMP WHERE id = ?", (status, run_id)) else: db_query("UPDATE training_runs SET status = ? WHERE id = ?", (status, run_id)) def log_update(message, run_id): timestamp = time.strftime("%H:%M:%S") full_msg = f"[{timestamp}] {message}" print(full_msg) if run_id > 0: db_query("UPDATE training_runs SET logs = logs || ? || ? WHERE id = ?", ('\n', full_msg, run_id)) # ------------------------------ MODELS & TRAINING ------------------------------ class CNNLanguageModel(nn.Module): def __init__(self, vocab_size, embed_dim=128, num_layers=4): super().__init__() self.embedding = nn.Embedding(vocab_size, embed_dim) layers, in_ch = [], embed_dim for _ in range(num_layers): layers.extend([nn.Conv1d(in_ch, in_ch * 2, kernel_size=3, padding=1), nn.ReLU()]) in_ch *= 2 self.convs, self.fc = nn.Sequential(*layers), nn.Linear(in_ch, vocab_size) def forward(self, x, labels=None): x = self.embedding(x).transpose(1, 2) x = self.convs(x).transpose(1, 2) logits = self.fc(x) loss = nn.CrossEntropyLoss()(logits.view(-1, logits.size(-1)), labels.view(-1)) if labels is not None else None return {"loss": loss, "logits": logits} class RNNLanguageModel(nn.Module): def __init__(self, vocab_size, embed_dim=128, hidden_dim=256, num_layers=2): super().__init__() self.embedding = nn.Embedding(vocab_size, embed_dim) self.rnn = nn.LSTM(embed_dim, hidden_dim, num_layers, batch_first=True) self.fc = nn.Linear(hidden_dim, vocab_size) def forward(self, x, labels=None): x = self.embedding(x) output, _ = self.rnn(x) logits = self.fc(output) loss = nn.CrossEntropyLoss()(logits.view(-1, logits.size(-1)), labels.view(-1)) if labels is not None else None return {"loss": loss, "logits": logits} class TransformerLanguageModel(nn.Module): def __init__(self, vocab_size, embed_dim=128, num_heads=4, num_layers=3): super().__init__() self.embedding = nn.Embedding(vocab_size, embed_dim) encoder_layer = nn.TransformerEncoderLayer(d_model=embed_dim, nhead=num_heads, batch_first=True) self.transformer = nn.TransformerEncoder(encoder_layer, num_layers=num_layers) self.fc = nn.Linear(embed_dim, vocab_size) def forward(self, x, labels=None): x = self.embedding(x) x = self.transformer(x) logits = self.fc(x) loss = nn.CrossEntropyLoss()(logits.view(-1, logits.size(-1)), labels.view(-1)) if labels is not None else None return {"loss": loss, "logits": logits} def get_model(arch_type, vocab_size, num_layers): models = {"cnn": CNNLanguageModel, "rnn": RNNLanguageModel, "transformer": TransformerLanguageModel} return models[arch_type](vocab_size, num_layers=num_layers) class TextDataset(Dataset): def __init__(self, tokenized_data): self.data = tokenized_data["input_ids"] def __len__(self): return len(self.data) def __getitem__(self, idx): return {"input_ids": torch.tensor(self.data[idx]), "labels": torch.tensor(self.data[idx])} def run_training_job(job): run_id, user_id = job["run_id"], job["user_id"] try: device = "cuda" if torch.cuda.is_available() else "cpu" log_update(f"🚀 Device = {device}", run_id) tokenizer = AutoTokenizer.from_pretrained("gpt2") tokenizer.pad_token = tokenizer.eos_token tokenizer_save_path = f"./runs/{run_id}/tokenizer" os.makedirs(tokenizer_save_path, exist_ok=True) tokenizer.save_pretrained(tokenizer_save_path) model = get_model(job["arch_type"], len(tokenizer), job["num_layers"]).to(device) log_update(f"🧱 Model: {job['arch_type']} x{job['num_layers']} layers", run_id) dataset = load_dataset("voidful/reasoning_gemini_300k", split="train[:5000]") tokenized_dataset = dataset.map(lambda ex: tokenizer([q + " " + a for q, a in zip(ex["message"], ex["answer"])], truncation=True, padding="max_length", max_length=128), batched=True, remove_columns=dataset.column_names) train_loader = DataLoader(TextDataset(tokenized_dataset), batch_size=job["batch_size"], shuffle=True) optimizer = torch.optim.AdamW(model.parameters(), lr=job["learning_rate"]) model.train() log_update(f"▶️ Starting training for {job['epochs']} epochs...", run_id) for epoch in range(job["epochs"]): for step, batch in enumerate(train_loader): input_ids = batch["input_ids"].to(device) labels = batch["labels"].to(device) optimizer.zero_grad() outputs = model(input_ids, labels=labels) loss = outputs["loss"] loss.backward() optimizer.step() if step % 50 == 0: log_update(f"Epoch {epoch+1} | Step {step} | Loss: {loss.item():.4f}", run_id) log_update(f"✅ Epoch {epoch+1} completed.", run_id) model_path = f"./runs/{run_id}" os.makedirs(model_path, exist_ok=True) torch.save(model.state_dict(), f"{model_path}/pytorch_model.bin") except Exception as e: log_update(f"💥 FAILED - {str(e)}", run_id) update_run_status(run_id, "failed") else: log_update("🎉 Cooking complete!", run_id) update_run_status(run_id, "completed") finally: with scheduler_lock: active_runs.discard(run_id) active_users.discard(user_id) start_training_if_free() def run_inference(run_id, prompt): model_path = f"./runs/{run_id}/pytorch_model.bin" tokenizer_path = f"./runs/{run_id}/tokenizer" if not (os.path.exists(model_path) and os.path.exists(tokenizer_path)): return "ModelError: Files not found." tokenizer = AutoTokenizer.from_pretrained(tokenizer_path) rows, _ = db_query("SELECT arch_type, num_layers FROM training_runs WHERE id = ?", (run_id,)) if not rows: return "ModelError: Run not found." arch_type, num_layers = rows[0] model = get_model(arch_type, len(tokenizer), num_layers) model.load_state_dict(torch.load(model_path, map_location="cpu")) model.eval() inputs = tokenizer(prompt, return_tensors="pt") input_ids = inputs.input_ids with torch.no_grad(): outputs = model(input_ids) logits = outputs["logits"] generated_ids = torch.argmax(logits, dim=-1) return f"🧑‍🍳 Model says:\n{tokenizer.decode(generated_ids[0], skip_special_tokens=True)}" def publish_run_to_hub(run_id, hf_token, repo_name, user_description=""): try: user_info = whoami(token=hf_token) hf_username = user_info['name'] except Exception as e: raise ValueError(f"Invalid Hugging Face Token. Error: {e}") final_repo_name = f"{hf_username}/{repo_name}" local_dir = f"./runs/{run_id}/hub_upload" shutil.rmtree(local_dir, ignore_errors=True) os.makedirs(local_dir, exist_ok=True) shutil.copy(f"./runs/{run_id}/pytorch_model.bin", f"{local_dir}/pytorch_model.bin") shutil.copytree(f"./runs/{run_id}/tokenizer", f"{local_dir}/tokenizer", dirs_exist_ok=True) readme_content = user_description.strip() or f"# Model from LLM Kitchen - Run #{run_id}" with open(f"{local_dir}/README.md", "w") as f: f.write(readme_content) api = HfApi() repo_url = api.create_repo(repo_id=final_repo_name, token=hf_token, exist_ok=True).repo_id api.upload_folder(folder_path=local_dir, repo_id=repo_url, token=hf_token) return f"https://huggingface.co/{repo_url}"