Ramzes / tests /test_stablediffusion.py
Bordoglor's picture
Upload folder using huggingface_hub
302920f verified
# Copyright 2023-present the HuggingFace Inc. team.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import copy
from dataclasses import asdict, replace
import numpy as np
import pytest
from diffusers import StableDiffusionPipeline
from peft import (
BOFTConfig,
HRAConfig,
LoHaConfig,
LoKrConfig,
LoraConfig,
OFTConfig,
get_peft_model,
get_peft_model_state_dict,
inject_adapter_in_model,
set_peft_model_state_dict,
)
from peft.tuners.tuners_utils import BaseTunerLayer
from .testing_common import PeftCommonTester
from .testing_utils import set_init_weights_false, temp_seed
PEFT_DIFFUSERS_SD_MODELS_TO_TEST = ["hf-internal-testing/tiny-sd-pipe"]
DIFFUSERS_CONFIGS = [
(
LoraConfig,
{
"text_encoder": {
"r": 8,
"lora_alpha": 32,
"target_modules": ["k_proj", "q_proj", "v_proj", "out_proj", "fc1", "fc2"],
"lora_dropout": 0.0,
"bias": "none",
"init_lora_weights": False,
},
"unet": {
"r": 8,
"lora_alpha": 32,
"target_modules": [
"proj_in",
"proj_out",
"to_k",
"to_q",
"to_v",
"to_out.0",
"ff.net.0.proj",
"ff.net.2",
],
"lora_dropout": 0.0,
"bias": "none",
"init_lora_weights": False,
},
},
),
(
LoHaConfig,
{
"text_encoder": {
"r": 8,
"alpha": 32,
"target_modules": ["k_proj", "q_proj", "v_proj", "out_proj", "fc1", "fc2"],
"rank_dropout": 0.0,
"module_dropout": 0.0,
"init_weights": False,
},
"unet": {
"r": 8,
"alpha": 32,
"target_modules": [
"proj_in",
"proj_out",
"to_k",
"to_q",
"to_v",
"to_out.0",
"ff.net.0.proj",
"ff.net.2",
],
"rank_dropout": 0.0,
"module_dropout": 0.0,
"init_weights": False,
},
},
),
(
LoKrConfig,
{
"text_encoder": {
"r": 8,
"alpha": 32,
"target_modules": ["k_proj", "q_proj", "v_proj", "out_proj", "fc1", "fc2"],
"rank_dropout": 0.0,
"module_dropout": 0.0,
"init_weights": False,
},
"unet": {
"r": 8,
"alpha": 32,
"target_modules": [
"proj_in",
"proj_out",
"to_k",
"to_q",
"to_v",
"to_out.0",
"ff.net.0.proj",
"ff.net.2",
],
"rank_dropout": 0.0,
"module_dropout": 0.0,
"init_weights": False,
},
},
),
(
OFTConfig,
{
"text_encoder": {
"r": 1,
"oft_block_size": 0,
"target_modules": ["k_proj", "q_proj", "v_proj", "out_proj", "fc1", "fc2"],
"module_dropout": 0.0,
"init_weights": False,
"use_cayley_neumann": False,
},
"unet": {
"r": 1,
"oft_block_size": 0,
"target_modules": [
"proj_in",
"proj_out",
"to_k",
"to_q",
"to_v",
"to_out.0",
"ff.net.0.proj",
"ff.net.2",
],
"module_dropout": 0.0,
"init_weights": False,
"use_cayley_neumann": False,
},
},
),
(
BOFTConfig,
{
"text_encoder": {
"boft_block_num": 1,
"boft_block_size": 0,
"target_modules": ["k_proj", "q_proj", "v_proj", "out_proj", "fc1", "fc2"],
"boft_dropout": 0.0,
"init_weights": False,
},
"unet": {
"boft_block_num": 1,
"boft_block_size": 0,
"target_modules": [
"proj_in",
"proj_out",
"to_k",
"to_q",
"to_v",
"to_out.0",
"ff.net.0.proj",
"ff.net.2",
],
"boft_dropout": 0.0,
"init_weights": False,
},
},
),
(
HRAConfig,
{
"text_encoder": {
"r": 8,
"target_modules": ["k_proj", "q_proj", "v_proj", "out_proj", "fc1", "fc2"],
"init_weights": False,
},
"unet": {
"r": 8,
"target_modules": [
"proj_in",
"proj_out",
"to_k",
"to_q",
"to_v",
"to_out.0",
"ff.net.0.proj",
"ff.net.2",
],
"init_weights": False,
},
},
),
]
def skip_if_not_lora(config_cls):
if config_cls != LoraConfig:
pytest.skip("Skipping test because it is only applicable to LoraConfig")
class TestStableDiffusionModel(PeftCommonTester):
r"""
Tests that diffusers StableDiffusion model works with PEFT as expected.
"""
transformers_class = StableDiffusionPipeline
sd_model = StableDiffusionPipeline.from_pretrained("hf-internal-testing/tiny-sd-pipe")
def instantiate_sd_peft(self, model_id, config_cls, config_kwargs):
# Instantiate StableDiffusionPipeline
if model_id == "hf-internal-testing/tiny-sd-pipe":
# in CI, this model often times out on the hub, let's cache it
model = copy.deepcopy(self.sd_model)
else:
model = self.transformers_class.from_pretrained(model_id)
config_kwargs = config_kwargs.copy()
text_encoder_kwargs = config_kwargs.pop("text_encoder")
unet_kwargs = config_kwargs.pop("unet")
# the remaining config kwargs should be applied to both configs
for key, val in config_kwargs.items():
text_encoder_kwargs[key] = val
unet_kwargs[key] = val
# Instantiate text_encoder adapter
config_text_encoder = config_cls(**text_encoder_kwargs)
model.text_encoder = get_peft_model(model.text_encoder, config_text_encoder)
# Instantiate unet adapter
config_unet = config_cls(**unet_kwargs)
model.unet = get_peft_model(model.unet, config_unet)
# Move model to device
model = model.to(self.torch_device)
return model
def prepare_inputs_for_testing(self):
return {
"prompt": "a high quality digital photo of a cute corgi",
"num_inference_steps": 3,
}
@pytest.mark.parametrize("model_id", PEFT_DIFFUSERS_SD_MODELS_TO_TEST)
@pytest.mark.parametrize("config_cls,config_kwargs", DIFFUSERS_CONFIGS)
def test_merge_layers(self, model_id, config_cls, config_kwargs):
if (config_cls == LoKrConfig) and (self.torch_device not in ["cuda", "xpu"]):
pytest.skip("Merging test with LoKr fails without GPU")
# Instantiate model & adapters
config_kwargs = set_init_weights_false(config_cls, config_kwargs)
model = self.instantiate_sd_peft(model_id, config_cls, config_kwargs)
# Generate output for peft modified StableDiffusion
dummy_input = self.prepare_inputs_for_testing()
with temp_seed(seed=42):
peft_output = np.array(model(**dummy_input).images[0]).astype(np.float32)
# Merge adapter and model
if config_cls not in [LoHaConfig, OFTConfig, HRAConfig]:
# TODO: Merging the text_encoder is leading to issues on CPU with PyTorch 2.1
model.text_encoder = model.text_encoder.merge_and_unload()
model.unet = model.unet.merge_and_unload()
# Generate output for peft merged StableDiffusion
with temp_seed(seed=42):
merged_output = np.array(model(**dummy_input).images[0]).astype(np.float32)
# Images are in uint8 drange, so use large atol
assert np.allclose(peft_output, merged_output, atol=1.0)
@pytest.mark.parametrize("model_id", PEFT_DIFFUSERS_SD_MODELS_TO_TEST)
@pytest.mark.parametrize("config_cls,config_kwargs", DIFFUSERS_CONFIGS)
def test_merge_layers_safe_merge(self, model_id, config_cls, config_kwargs):
if (config_cls == LoKrConfig) and (self.torch_device not in ["cuda", "xpu"]):
pytest.skip("Merging test with LoKr fails without GPU")
# Instantiate model & adapters
model = self.instantiate_sd_peft(model_id, config_cls, config_kwargs)
# Generate output for peft modified StableDiffusion
dummy_input = self.prepare_inputs_for_testing()
with temp_seed(seed=42):
peft_output = np.array(model(**dummy_input).images[0]).astype(np.float32)
# Merge adapter and model
if config_cls not in [LoHaConfig, OFTConfig, HRAConfig]:
# TODO: Merging the text_encoder is leading to issues on CPU with PyTorch 2.1
model.text_encoder = model.text_encoder.merge_and_unload(safe_merge=True)
model.unet = model.unet.merge_and_unload(safe_merge=True)
# Generate output for peft merged StableDiffusion
with temp_seed(seed=42):
merged_output = np.array(model(**dummy_input).images[0]).astype(np.float32)
# Images are in uint8 drange, so use large atol
assert np.allclose(peft_output, merged_output, atol=1.0)
@pytest.mark.parametrize("model_id", PEFT_DIFFUSERS_SD_MODELS_TO_TEST)
@pytest.mark.parametrize("config_cls,config_kwargs", DIFFUSERS_CONFIGS)
def test_add_weighted_adapter_base_unchanged(self, model_id, config_cls, config_kwargs):
skip_if_not_lora(config_cls)
# Instantiate model & adapters
config_kwargs = set_init_weights_false(config_cls, config_kwargs)
model = self.instantiate_sd_peft(model_id, config_cls, config_kwargs)
# Get current available adapter config
text_encoder_adapter_name = next(iter(model.text_encoder.peft_config.keys()))
unet_adapter_name = next(iter(model.unet.peft_config.keys()))
text_encoder_adapter_config = replace(model.text_encoder.peft_config[text_encoder_adapter_name])
unet_adapter_config = replace(model.unet.peft_config[unet_adapter_name])
# Create weighted adapters
model.text_encoder.add_weighted_adapter([unet_adapter_name], [0.5], "weighted_adapter_test")
model.unet.add_weighted_adapter([unet_adapter_name], [0.5], "weighted_adapter_test")
# Assert that base adapters config did not change
assert asdict(text_encoder_adapter_config) == asdict(model.text_encoder.peft_config[text_encoder_adapter_name])
assert asdict(unet_adapter_config) == asdict(model.unet.peft_config[unet_adapter_name])
@pytest.mark.parametrize("model_id", PEFT_DIFFUSERS_SD_MODELS_TO_TEST)
@pytest.mark.parametrize("config_cls,config_kwargs", DIFFUSERS_CONFIGS)
def test_disable_adapter(self, model_id, config_cls, config_kwargs):
config_kwargs = set_init_weights_false(config_cls, config_kwargs)
self._test_disable_adapter(model_id, config_cls, config_kwargs)
@pytest.mark.parametrize("model_id", PEFT_DIFFUSERS_SD_MODELS_TO_TEST)
@pytest.mark.parametrize("config_cls,config_kwargs", DIFFUSERS_CONFIGS)
def test_load_model_low_cpu_mem_usage(self, model_id, config_cls, config_kwargs):
# Instantiate model & adapters
pipe = self.instantiate_sd_peft(model_id, config_cls, config_kwargs)
te_state_dict = get_peft_model_state_dict(pipe.text_encoder)
unet_state_dict = get_peft_model_state_dict(pipe.unet)
del pipe
pipe = self.instantiate_sd_peft(model_id, config_cls, config_kwargs)
config_kwargs = config_kwargs.copy()
text_encoder_kwargs = config_kwargs.pop("text_encoder")
unet_kwargs = config_kwargs.pop("unet")
# the remaining config kwargs should be applied to both configs
for key, val in config_kwargs.items():
text_encoder_kwargs[key] = val
unet_kwargs[key] = val
config_text_encoder = config_cls(**text_encoder_kwargs)
config_unet = config_cls(**unet_kwargs)
# check text encoder
inject_adapter_in_model(config_text_encoder, pipe.text_encoder, low_cpu_mem_usage=True)
# sanity check that the adapter was applied:
assert any(isinstance(module, BaseTunerLayer) for module in pipe.text_encoder.modules())
assert "meta" in {p.device.type for p in pipe.text_encoder.parameters()}
set_peft_model_state_dict(pipe.text_encoder, te_state_dict, low_cpu_mem_usage=True)
assert "meta" not in {p.device.type for p in pipe.text_encoder.parameters()}
# check unet
inject_adapter_in_model(config_unet, pipe.unet, low_cpu_mem_usage=True)
# sanity check that the adapter was applied:
assert any(isinstance(module, BaseTunerLayer) for module in pipe.unet.modules())
assert "meta" in {p.device.type for p in pipe.unet.parameters()}
set_peft_model_state_dict(pipe.unet, unet_state_dict, low_cpu_mem_usage=True)
assert "meta" not in {p.device.type for p in pipe.unet.parameters()}