import torch import torch.nn as nn import torch.nn.functional as F try: import xformers import xformers.ops xformers_available = True except Exception: xformers_available = False # Region Controller (unchanged) class RegionControler: def __init__(self) -> None: self.prompt_image_conditioning = [] region_control = RegionControler() # Helper function for weight initialization def init_weights(m): if isinstance(m, nn.Linear): nn.init.xavier_uniform_(m.weight) # Base Attention Processor class BaseAttnProcessor(nn.Module): def __init__(self): super().__init__() def _process_input(self, attn, hidden_states, encoder_hidden_states, attention_mask, temb): """Handles preprocessing for both AttnProcessor and IPAttnProcessor""" residual = hidden_states if attn.spatial_norm is not None: hidden_states = attn.spatial_norm(hidden_states, temb) input_ndim = hidden_states.ndim if input_ndim == 4: batch_size, channel, height, width = hidden_states.shape hidden_states = hidden_states.view(batch_size, channel, height * width).transpose(1, 2) else: batch_size, sequence_length, _ = hidden_states.shape if encoder_hidden_states is None: encoder_hidden_states = hidden_states elif attn.norm_cross: encoder_hidden_states = attn.norm_encoder_hidden_states(encoder_hidden_states) attention_mask = attn.prepare_attention_mask(attention_mask, sequence_length, batch_size) if attn.group_norm is not None: hidden_states = attn.group_norm(hidden_states.transpose(1, 2)).transpose(1, 2) return hidden_states, encoder_hidden_states, residual, batch_size, input_ndim, height, width def _apply_attention(self, attn, query, key, value, attention_mask): """Handles the actual attention operation using either xformers or standard PyTorch""" if xformers_available: return xformers.ops.memory_efficient_attention(query, key, value, attn_bias=attention_mask) else: attention_probs = attn.get_attention_scores(query, key, attention_mask) return torch.bmm(attention_probs, value) # Optimized AttnProcessor class AttnProcessor(BaseAttnProcessor): def forward(self, attn, hidden_states, encoder_hidden_states=None, attention_mask=None, temb=None): hidden_states, encoder_hidden_states, residual, batch_size, input_ndim, height, width = \ self._process_input(attn, hidden_states, encoder_hidden_states, attention_mask, temb) query = attn.to_q(hidden_states) key = attn.to_k(encoder_hidden_states) value = attn.to_v(encoder_hidden_states) query, key, value = map(attn.head_to_batch_dim, (query, key, value)) hidden_states = self._apply_attention(attn, query, key, value, attention_mask) hidden_states = attn.batch_to_head_dim(hidden_states) hidden_states = attn.to_out[0](hidden_states) hidden_states = attn.to_out[1](hidden_states) if input_ndim == 4: hidden_states = hidden_states.transpose(-1, -2).reshape(batch_size, -1, height, width) if attn.residual_connection: hidden_states = hidden_states + residual return hidden_states / attn.rescale_output_factor # Optimized IPAttnProcessor class IPAttnProcessor(BaseAttnProcessor): def __init__(self, hidden_size, cross_attention_dim=None, scale=1.0, num_tokens=4): super().__init__() self.hidden_size = hidden_size self.cross_attention_dim = cross_attention_dim self.scale = scale self.num_tokens = num_tokens self.to_k_ip = nn.Linear(cross_attention_dim or hidden_size, hidden_size, bias=False) self.to_v_ip = nn.Linear(cross_attention_dim or hidden_size, hidden_size, bias=False) self.apply(init_weights) def forward(self, attn, hidden_states, encoder_hidden_states=None, attention_mask=None, temb=None): hidden_states, encoder_hidden_states, residual, batch_size, input_ndim, height, width = \ self._process_input(attn, hidden_states, encoder_hidden_states, attention_mask, temb) end_pos = encoder_hidden_states.shape[1] - self.num_tokens encoder_hidden_states, ip_hidden_states = encoder_hidden_states[:, :end_pos, :], encoder_hidden_states[:, end_pos:, :] query = attn.to_q(hidden_states) key = attn.to_k(encoder_hidden_states) value = attn.to_v(encoder_hidden_states) query, key, value = map(attn.head_to_batch_dim, (query, key, value)) hidden_states = self._apply_attention(attn, query, key, value, attention_mask) hidden_states = attn.batch_to_head_dim(hidden_states) # Image Prompt Attention ip_key = attn.head_to_batch_dim(self.to_k_ip(ip_hidden_states)) ip_value = attn.head_to_batch_dim(self.to_v_ip(ip_hidden_states)) ip_hidden_states = self._apply_attention(attn, query, ip_key, ip_value, None) ip_hidden_states = attn.batch_to_head_dim(ip_hidden_states) # Region Control if len(region_control.prompt_image_conditioning) == 1: region_mask = region_control.prompt_image_conditioning[0].get("region_mask", None) if region_mask is not None: mask = F.interpolate(region_mask[None, None], scale_factor=(ip_hidden_states.shape[1] / region_mask.shape[0]), mode="nearest").reshape([1, -1, 1]) else: mask = torch.ones_like(ip_hidden_states) ip_hidden_states *= mask hidden_states = hidden_states + self.scale * ip_hidden_states # Linear projection and dropout hidden_states = attn.to_out[0](hidden_states) hidden_states = attn.to_out[1](hidden_states) if input_ndim == 4: hidden_states = hidden_states.transpose(-1, -2).reshape(batch_size, -1, height, width) if attn.residual_connection: hidden_states = hidden_states + residual return hidden_states / attn.rescale_output_factor