nsa-117m-byte-sft / modeling_nsa.py
seconds-0's picture
Upload NSAForCausalLM
9a7a74a verified
# Remote code: configuration and modeling for NSA
import math
from typing import Optional
import torch
from torch import nn
from transformers import PreTrainedModel
from transformers.generation.utils import GenerationMixin
from transformers.modeling_outputs import CausalLMOutput
from .configuration_nsa import NSAConfig
_HAS_NSA = False # Do not attempt nested vendor import in HF dynamic loader
class RMSNorm(nn.Module):
def __init__(self, dim: int, eps: float = 1e-6) -> None:
super().__init__()
self.weight = nn.Parameter(torch.ones(dim))
self.eps = eps
def forward(self, x: torch.Tensor) -> torch.Tensor:
rms = x.pow(2).mean(dim=-1, keepdim=True).add(self.eps).rsqrt()
return (x * rms) * self.weight
class MLP(nn.Module):
def __init__(self, dim: int, hidden_mult: int = 4) -> None:
super().__init__()
h = hidden_mult * dim
self.fc1 = nn.Linear(dim, h, bias=False)
self.fc2 = nn.Linear(h, dim, bias=False)
def forward(self, x: torch.Tensor) -> torch.Tensor:
return self.fc2(torch.nn.functional.silu(self.fc1(x)))
def _rope(q: torch.Tensor) -> torch.Tensor:
B, S, D = q.shape[0], q.shape[2], q.shape[-1]
if D % 2 != 0:
return q
device = q.device
half = D // 2
pos = torch.arange(S, device=device).float().unsqueeze(-1)
inv_freq = 1.0 / (10000 ** (torch.arange(0, half, device=device).float() / half))
angles = pos * inv_freq
cos = angles.cos().view(1, 1, S, half)
sin = angles.sin().view(1, 1, S, half)
q1, q2 = q[..., :half], q[..., half:]
return torch.cat([q1 * cos - q2 * sin, q1 * sin + q2 * cos], dim=-1)
def _avg_pool_time(x: torch.Tensor, kernel: int, stride: int) -> torch.Tensor:
if x.shape[2] < kernel:
return x[..., :0, :]
xt = x.permute(0, 3, 1, 2).contiguous()
y = torch.nn.functional.avg_pool2d(xt, kernel_size=(1, kernel), stride=(1, stride))
return y.permute(0, 2, 3, 1).contiguous()
def _window_mask(q: torch.Tensor, S: int, w: int) -> torch.Tensor:
B, h = q.shape[0], q.shape[1]
device = q.device
row = torch.arange(S, device=device).view(S, 1)
col = torch.arange(S, device=device).view(1, S)
allowed = (col <= row) & (col >= (row - (w - 1)))
M = torch.full((S, S), float('-inf'), device=device, dtype=q.dtype)
M.masked_fill_(allowed, 0.0)
return M.view(1, 1, S, S).expand(B, h, S, S)
def _selection_blocks(scores: torch.Tensor, l_sel: int, n_sel: int) -> torch.Tensor:
B, h, S = scores.shape
n_blocks = max(1, (S + l_sel - 1) // l_sel)
# Pad to multiple of l_sel
pad = n_blocks * l_sel - S
if pad > 0:
scores = torch.nn.functional.pad(scores, (0, pad), value=-1e9)
blk_scores = scores.view(B, h, n_blocks, l_sel).max(dim=-1).values
k = min(n_sel, n_blocks)
return torch.topk(blk_scores, k=k, dim=-1).indices
class EmbeddedNSAAttention(nn.Module):
def __init__(self, dim: int, n_heads: int, n_kv_groups: int, d_k: int, d_v: int,
l: int, d: int, l_sel: int, n_sel: int, w: int) -> None:
super().__init__()
self.n_heads = n_heads
self.n_kv_groups = n_kv_groups
self.d_k = d_k
self.d_v = d_v
self.l = l
self.stride = d
self.l_sel = l_sel
self.n_sel = n_sel
self.w = w
self.W_Q = nn.Linear(dim, n_heads * d_k, bias=False)
self.W_K_cmp = nn.Linear(dim, n_kv_groups * d_k, bias=False)
self.W_V_cmp = nn.Linear(dim, n_kv_groups * d_v, bias=False)
self.W_K_sel = nn.Linear(dim, n_kv_groups * d_k, bias=False)
self.W_V_sel = nn.Linear(dim, n_kv_groups * d_v, bias=False)
self.W_K_win = nn.Linear(dim, n_kv_groups * d_k, bias=False)
self.W_V_win = nn.Linear(dim, n_kv_groups * d_v, bias=False)
# Gate MLP operates on per-group pooled Q with width d_k (matches training)
gate_hidden = max(1, d_k // 2)
self.gate_fc1 = nn.Linear(d_k, gate_hidden, bias=True)
self.gate_fc2 = nn.Linear(gate_hidden, 3, bias=True)
nn.init.xavier_uniform_(self.gate_fc2.weight, gain=0.1)
nn.init.zeros_(self.gate_fc2.bias)
self.out = nn.Linear(n_heads * d_v, dim, bias=False)
def forward(self, x: torch.Tensor) -> torch.Tensor:
B, S, D = x.shape
h, dk, dv = self.n_heads, self.d_k, self.d_v
Q = self.W_Q(x).view(B, S, h, dk).transpose(1, 2) # [B,h,S,dk]
g = max(1, self.n_kv_groups)
r = max(1, h // g)
# Project per-group K/V then broadcast to heads
Kc_g = self.W_K_cmp(x).view(B, S, g, dk).permute(0, 2, 1, 3) # [B,g,S,dk]
Vc_g = self.W_V_cmp(x).view(B, S, g, dv).permute(0, 2, 1, 3)
Ks_g = self.W_K_sel(x).view(B, S, g, dk).permute(0, 2, 1, 3)
Vs_g = self.W_V_sel(x).view(B, S, g, dv).permute(0, 2, 1, 3)
Kw_g = self.W_K_win(x).view(B, S, g, dk).permute(0, 2, 1, 3)
Vw_g = self.W_V_win(x).view(B, S, g, dv).permute(0, 2, 1, 3)
# Broadcast groups to heads
def _bcast_to_heads(T):
return T.unsqueeze(1).expand(B, r, g, S, T.shape[-1]).reshape(B, h, S, T.shape[-1])
Kc = _bcast_to_heads(Kc_g)
Vc = _bcast_to_heads(Vc_g)
Ks = _bcast_to_heads(Ks_g)
Vs = _bcast_to_heads(Vs_g)
Kw = _bcast_to_heads(Kw_g)
Vw = _bcast_to_heads(Vw_g)
# RoPE
Qr = _rope(Q.transpose(1, 2)).transpose(1, 2)
Kc_r = _rope(Kc.transpose(1, 2)).transpose(1, 2)
Ks_r = _rope(Ks.transpose(1, 2)).transpose(1, 2)
Kw_r = _rope(Kw.transpose(1, 2)).transpose(1, 2)
# Compressed: average-pool along time
Kc_p = _avg_pool_time(Kc_r, kernel=max(1, self.stride), stride=max(1, self.stride))
Vc_p = _avg_pool_time(Vc, kernel=max(1, self.stride), stride=max(1, self.stride))
O_cmp = torch.nn.functional.scaled_dot_product_attention(Qr, Kc_p, Vc_p, is_causal=True)
# Selection: naive top-n blocks (global), enforce causal via triangular mask
scores = (Qr * Ks_r).mean(dim=-1) # [B,h,S]
blk_idx = _selection_blocks(scores, self.l_sel, self.n_sel) # [B,h,n]
n_blocks = max(1, (S + self.l_sel - 1) // self.l_sel)
keep = torch.zeros((B, h, n_blocks), device=x.device, dtype=torch.bool)
keep.scatter_(2, blk_idx, True)
keep = keep.unsqueeze(-1).expand(B, h, n_blocks, self.l_sel).reshape(B, h, -1)[:, :, :S]
logits = torch.matmul(Qr / math.sqrt(dk), Ks_r.transpose(-2, -1)) # [B,h,S,S]
tri = torch.triu(torch.ones((S, S), device=x.device, dtype=torch.bool), diagonal=1)
logits = logits.masked_fill(tri, float('-inf'))
sel_mask = torch.where(keep.unsqueeze(2).expand(B, h, S, S), torch.zeros((), device=x.device, dtype=Qr.dtype), torch.full((), float('-inf'), device=x.device, dtype=Qr.dtype))
P = torch.nn.functional.softmax(logits + sel_mask, dim=-1)
O_sel = torch.matmul(P, Vs)
# Sliding window
M = _window_mask(Qr, S, max(1, self.w))
logits_w = torch.matmul(Qr / math.sqrt(dk), Kw_r.transpose(-2, -1)) + M
P_w = torch.nn.functional.softmax(logits_w, dim=-1)
O_win = torch.matmul(P_w, Vw)
# Gate & mix: compute per-token, per-group gate from pooled Q
# Pool Q across heads within each kv-group
# Qr: [B,h,S,dk] -> reshape to [B,G,h_per_group,S,dk] then mean over h_per_group
G = max(1, self.n_kv_groups)
h_per_group = max(1, h // G)
Qg = Qr.view(B, G, h_per_group, S, dk).mean(dim=2) # [B,G,S,dk]
Qg = Qg.permute(0, 2, 1, 3) # [B,S,G,dk]
g1 = torch.nn.functional.silu(self.gate_fc1(Qg))
gate = torch.nn.functional.softmax(self.gate_fc2(g1), dim=-1) # [B,S,G,3]
gc = gate[..., 0:1].unsqueeze(-1) # [B,S,G,1,1]
gs = gate[..., 1:2].unsqueeze(-1)
gw = gate[..., 2:3].unsqueeze(-1)
# Broadcast group gates to heads within the group
# Reshape branch outputs to [B,S,G,h_per_group,dv]
Oc = O_cmp.permute(0,2,1,3).view(B, S, G, h_per_group, dv)
Os = O_sel.permute(0,2,1,3).view(B, S, G, h_per_group, dv)
Ow = O_win.permute(0,2,1,3).view(B, S, G, h_per_group, dv)
O = gc * Oc + gs * Os + gw * Ow
O = O.reshape(B, S, h, dv).permute(0, 2, 1, 3)
O = O.transpose(1, 2).reshape(B, S, h * dv)
return self.out(O)
class SimpleAttention(nn.Module):
def __init__(self, dim: int, n_heads: int, d_k: int, d_v: int) -> None:
super().__init__()
self.n_heads = n_heads
self.d_k = d_k
self.d_v = d_v
self.q_proj = nn.Linear(dim, n_heads * d_k, bias=False)
self.k_proj = nn.Linear(dim, n_heads * d_k, bias=False)
self.v_proj = nn.Linear(dim, n_heads * d_v, bias=False)
self.out = nn.Linear(n_heads * d_v, dim, bias=False)
def forward(self, x: torch.Tensor) -> torch.Tensor:
B, S, D = x.shape
h, dk, dv = self.n_heads, self.d_k, self.d_v
q = self.q_proj(x).view(B, S, h, dk).transpose(1, 2) # [B,h,S,dk]
k = self.k_proj(x).view(B, S, h, dk).transpose(1, 2) # [B,h,S,dk]
v = self.v_proj(x).view(B, S, h, dv).transpose(1, 2) # [B,h,S,dv]
attn = torch.nn.functional.scaled_dot_product_attention(q, k, v, is_causal=True)
attn = attn.transpose(1, 2).contiguous().view(B, S, h * dv)
return self.out(attn)
class SimpleBlock(nn.Module):
def __init__(self, dim: int, n_heads: int, d_k: int, d_v: int) -> None:
super().__init__()
self.norm1 = RMSNorm(dim)
self.attn = SimpleAttention(dim, n_heads, d_k, d_v)
self.norm2 = RMSNorm(dim)
self.mlp = MLP(dim)
def forward(self, x: torch.Tensor) -> torch.Tensor:
x = x + self.attn(self.norm1(x))
x = x + self.mlp(self.norm2(x))
return x
class NSABlockRemote(nn.Module):
"""Transformer block with embedded NSA attention, pre/post RMSNorm, and MLP."""
def __init__(self, dim: int, n_heads: int, n_kv_groups: int, d_k: int, d_v: int,
l: int, d: int, l_sel: int, n_sel: int, w: int) -> None:
super().__init__()
self.norm1 = RMSNorm(dim)
self.attn = EmbeddedNSAAttention(dim, n_heads, n_kv_groups, d_k, d_v, l, d, l_sel, n_sel, w)
self.norm2 = RMSNorm(dim)
self.mlp = MLP(dim)
def forward(self, x: torch.Tensor) -> torch.Tensor:
x = x + self.attn(self.norm1(x))
x = x + self.mlp(self.norm2(x))
return x
class NSATinyLM(nn.Module):
def __init__(self, config: NSAConfig):
super().__init__()
self.config = config
self.vocab_size = int(config.vocab_size)
self.hidden_size = int(config.hidden_size)
self.num_hidden_layers = int(config.num_hidden_layers)
self.num_attention_heads = int(config.num_attention_heads)
self.n_kv_groups = int(getattr(config, "n_kv_groups", 1))
self.d_k = int(getattr(config, "d_k", self.hidden_size // self.num_attention_heads))
self.d_v = int(getattr(config, "d_v", self.hidden_size // self.num_attention_heads))
nsa = config.nsa or {}
self.l = int(nsa.get("block", 32))
self.d = int(nsa.get("stride", 16))
self.l_sel = int(nsa.get("sel_block", 64))
self.n_sel = int(nsa.get("sel_top_n", 16))
self.w = int(nsa.get("window", 512))
self.embed = nn.Embedding(self.vocab_size, self.hidden_size)
import os as _os
# Allow forcing simple fallback via env for integration tests
_force_simple = _os.getenv('NSA_REMOTE_FORCE_SIMPLE', '0').lower() in ('1','true','yes')
if not _force_simple:
# Fallback to embedded minimal NSA if vendor import failed
self.blocks = nn.ModuleList([
NSABlockRemote(
self.hidden_size,
self.num_attention_heads,
self.n_kv_groups,
self.d_k,
self.d_v,
self.l,
self.d,
self.l_sel,
self.n_sel,
self.w,
) for _ in range(self.num_hidden_layers)
])
else:
self.blocks = nn.ModuleList([
SimpleBlock(self.hidden_size, self.num_attention_heads, self.d_k, self.d_v)
for _ in range(self.num_hidden_layers)
])
self.norm = nn.LayerNorm(self.hidden_size)
self.lm_head = nn.Linear(self.hidden_size, self.vocab_size, bias=False)
def forward(self, input_ids: torch.Tensor) -> torch.Tensor:
x = self.embed(input_ids)
for blk in self.blocks:
x = blk(x)
x = self.norm(x)
logits = self.lm_head(x)
return logits
class NSAForCausalLM(PreTrainedModel, GenerationMixin):
config_class = NSAConfig
_no_split_modules = ["EmbeddedNSAAttention", "SimpleBlock"]
def __init__(self, config: NSAConfig):
super().__init__(config)
self.model = NSATinyLM(config)
self.post_init()
def get_input_embeddings(self):
return self.model.embed
def set_input_embeddings(self, new_emb):
self.model.embed = new_emb
def forward(
self,
input_ids: Optional[torch.LongTensor] = None,
attention_mask: Optional[torch.Tensor] = None,
labels: Optional[torch.LongTensor] = None,
**kwargs,
):
if input_ids is None:
raise ValueError("input_ids is required")
logits = self.model(input_ids)
loss = None
if labels is not None:
# Shift for causal LM loss
shift_logits = logits[:, :-1, :].contiguous()
shift_labels = labels[:, 1:].contiguous()
loss_fct = torch.nn.CrossEntropyLoss()
loss = loss_fct(shift_logits.view(-1, shift_logits.size(-1)), shift_labels.view(-1))
return CausalLMOutput(loss=loss, logits=logits)
def prepare_inputs_for_generation(self, input_ids, **kwargs):
# No past_key_values cache: rerun full sequence. Works everywhere, slower at decode.
return {"input_ids": input_ids, "attention_mask": kwargs.get("attention_mask", None)}