Unverified Commit 02d83c9f authored by Anton Lozhkov's avatar Anton Lozhkov Committed by GitHub
Browse files

Standardize fast pipeline tests with PipelineTestMixin (#1526)



* [WIP] Standardize fast pipeline tests with PipelineTestMixin

* refactor the sd tests a bit

* add more common tests

* add xformers

* add progressbar test

* cleanup

* upd fp16

* CycleDiffusionPipelineFastTests

* DanceDiffusionPipelineFastTests

* AltDiffusionPipelineFastTests

* StableDiffusion2PipelineFastTests

* StableDiffusion2InpaintPipelineFastTests

* StableDiffusionImageVariationPipelineFastTests

* StableDiffusionImg2ImgPipelineFastTests

* StableDiffusionInpaintPipelineFastTests

* remove unused mixins

* quality

* add missing inits

* try to fix mps tests

* fix mps tests

* add mps warmups

* skip for some pipelines

* style

* Update tests/test_pipelines_common.py
Co-authored-by: default avatarPatrick von Platen <patrick.v.platen@gmail.com>
Co-authored-by: default avatarPatrick von Platen <patrick.v.platen@gmail.com>
parent 9e110299
...@@ -96,10 +96,10 @@ class DDIMPipeline(DiffusionPipeline): ...@@ -96,10 +96,10 @@ class DDIMPipeline(DiffusionPipeline):
if self.device.type == "mps": if self.device.type == "mps":
# randn does not work reproducibly on mps # randn does not work reproducibly on mps
image = torch.randn(image_shape, generator=generator) image = torch.randn(image_shape, generator=generator, dtype=self.unet.dtype)
image = image.to(self.device) image = image.to(self.device)
else: else:
image = torch.randn(image_shape, generator=generator, device=self.device) image = torch.randn(image_shape, generator=generator, device=self.device, dtype=self.unet.dtype)
# set step values # set step values
self.scheduler.set_timesteps(num_inference_steps) self.scheduler.set_timesteps(num_inference_steps)
......
...@@ -14,7 +14,6 @@ ...@@ -14,7 +14,6 @@
# limitations under the License. # limitations under the License.
import gc import gc
import random
import unittest import unittest
import numpy as np import numpy as np
...@@ -25,9 +24,9 @@ from diffusers.pipelines.alt_diffusion.modeling_roberta_series import ( ...@@ -25,9 +24,9 @@ from diffusers.pipelines.alt_diffusion.modeling_roberta_series import (
RobertaSeriesConfig, RobertaSeriesConfig,
RobertaSeriesModelWithTransformation, RobertaSeriesModelWithTransformation,
) )
from diffusers.utils import floats_tensor, slow, torch_device from diffusers.utils import slow, torch_device
from diffusers.utils.testing_utils import require_torch_gpu from diffusers.utils.testing_utils import require_torch_gpu
from transformers import XLMRobertaTokenizer from transformers import CLIPTextConfig, CLIPTextModel, XLMRobertaTokenizer
from ...test_pipelines_common import PipelineTesterMixin from ...test_pipelines_common import PipelineTesterMixin
...@@ -36,25 +35,11 @@ torch.backends.cuda.matmul.allow_tf32 = False ...@@ -36,25 +35,11 @@ torch.backends.cuda.matmul.allow_tf32 = False
class AltDiffusionPipelineFastTests(PipelineTesterMixin, unittest.TestCase): class AltDiffusionPipelineFastTests(PipelineTesterMixin, unittest.TestCase):
def tearDown(self): pipeline_class = AltDiffusionPipeline
# clean up the VRAM after each test
super().tearDown()
gc.collect()
torch.cuda.empty_cache()
@property def get_dummy_components(self):
def dummy_image(self):
batch_size = 1
num_channels = 3
sizes = (32, 32)
image = floats_tensor((batch_size, num_channels) + sizes, rng=random.Random(0)).to(torch_device)
return image
@property
def dummy_cond_unet(self):
torch.manual_seed(0) torch.manual_seed(0)
model = UNet2DConditionModel( unet = UNet2DConditionModel(
block_out_channels=(32, 64), block_out_channels=(32, 64),
layers_per_block=2, layers_per_block=2,
sample_size=32, sample_size=32,
...@@ -64,27 +49,15 @@ class AltDiffusionPipelineFastTests(PipelineTesterMixin, unittest.TestCase): ...@@ -64,27 +49,15 @@ class AltDiffusionPipelineFastTests(PipelineTesterMixin, unittest.TestCase):
up_block_types=("CrossAttnUpBlock2D", "UpBlock2D"), up_block_types=("CrossAttnUpBlock2D", "UpBlock2D"),
cross_attention_dim=32, cross_attention_dim=32,
) )
return model scheduler = DDIMScheduler(
beta_start=0.00085,
@property beta_end=0.012,
def dummy_cond_unet_inpaint(self): beta_schedule="scaled_linear",
torch.manual_seed(0) clip_sample=False,
model = UNet2DConditionModel( set_alpha_to_one=False,
block_out_channels=(32, 64),
layers_per_block=2,
sample_size=32,
in_channels=9,
out_channels=4,
down_block_types=("DownBlock2D", "CrossAttnDownBlock2D"),
up_block_types=("CrossAttnUpBlock2D", "UpBlock2D"),
cross_attention_dim=32,
) )
return model
@property
def dummy_vae(self):
torch.manual_seed(0) torch.manual_seed(0)
model = AutoencoderKL( vae = AutoencoderKL(
block_out_channels=[32, 64], block_out_channels=[32, 64],
in_channels=3, in_channels=3,
out_channels=3, out_channels=3,
...@@ -92,84 +65,90 @@ class AltDiffusionPipelineFastTests(PipelineTesterMixin, unittest.TestCase): ...@@ -92,84 +65,90 @@ class AltDiffusionPipelineFastTests(PipelineTesterMixin, unittest.TestCase):
up_block_types=["UpDecoderBlock2D", "UpDecoderBlock2D"], up_block_types=["UpDecoderBlock2D", "UpDecoderBlock2D"],
latent_channels=4, latent_channels=4,
) )
return model
@property # TODO: address the non-deterministic text encoder (fails for save-load tests)
def dummy_text_encoder(self): # torch.manual_seed(0)
# text_encoder_config = RobertaSeriesConfig(
# hidden_size=32,
# project_dim=32,
# intermediate_size=37,
# layer_norm_eps=1e-05,
# num_attention_heads=4,
# num_hidden_layers=5,
# vocab_size=5002,
# )
# text_encoder = RobertaSeriesModelWithTransformation(text_encoder_config)
torch.manual_seed(0) torch.manual_seed(0)
config = RobertaSeriesConfig( text_encoder_config = CLIPTextConfig(
bos_token_id=0,
eos_token_id=2,
hidden_size=32, hidden_size=32,
project_dim=32, projection_dim=32,
intermediate_size=37, intermediate_size=37,
layer_norm_eps=1e-05, layer_norm_eps=1e-05,
num_attention_heads=4, num_attention_heads=4,
num_hidden_layers=5, num_hidden_layers=5,
pad_token_id=1,
vocab_size=5002, vocab_size=5002,
) )
return RobertaSeriesModelWithTransformation(config) text_encoder = CLIPTextModel(text_encoder_config)
@property tokenizer = XLMRobertaTokenizer.from_pretrained("hf-internal-testing/tiny-xlm-roberta")
def dummy_extractor(self): tokenizer.model_max_length = 77
def extract(*args, **kwargs):
class Out:
def __init__(self):
self.pixel_values = torch.ones([0])
def to(self, device):
self.pixel_values.to(device)
return self
return Out()
return extract components = {
"unet": unet,
"scheduler": scheduler,
"vae": vae,
"text_encoder": text_encoder,
"tokenizer": tokenizer,
"safety_checker": None,
"feature_extractor": None,
}
return components
def get_dummy_inputs(self, device, seed=0):
if str(device).startswith("mps"):
generator = torch.manual_seed(seed)
else:
generator = torch.Generator(device=device).manual_seed(seed)
inputs = {
"prompt": "A painting of a squirrel eating a burger",
"generator": generator,
"num_inference_steps": 2,
"guidance_scale": 6.0,
"output_type": "numpy",
}
return inputs
def test_alt_diffusion_ddim(self): def test_alt_diffusion_ddim(self):
device = "cpu" # ensure determinism for the device-dependent torch.Generator device = "cpu" # ensure determinism for the device-dependent torch.Generator
unet = self.dummy_cond_unet
scheduler = DDIMScheduler(
beta_start=0.00085,
beta_end=0.012,
beta_schedule="scaled_linear",
clip_sample=False,
set_alpha_to_one=False,
)
vae = self.dummy_vae components = self.get_dummy_components()
bert = self.dummy_text_encoder torch.manual_seed(0)
tokenizer = XLMRobertaTokenizer.from_pretrained("hf-internal-testing/tiny-xlm-roberta") text_encoder_config = RobertaSeriesConfig(
tokenizer.model_max_length = 77 hidden_size=32,
project_dim=32,
# make sure here that pndm scheduler skips prk intermediate_size=37,
alt_pipe = AltDiffusionPipeline( layer_norm_eps=1e-05,
unet=unet, num_attention_heads=4,
scheduler=scheduler, num_hidden_layers=5,
vae=vae, vocab_size=5002,
text_encoder=bert,
tokenizer=tokenizer,
safety_checker=None,
feature_extractor=self.dummy_extractor,
) )
# TODO: remove after fixing the non-deterministic text encoder
text_encoder = RobertaSeriesModelWithTransformation(text_encoder_config)
components["text_encoder"] = text_encoder
alt_pipe = AltDiffusionPipeline(**components)
alt_pipe = alt_pipe.to(device) alt_pipe = alt_pipe.to(device)
alt_pipe.set_progress_bar_config(disable=None) alt_pipe.set_progress_bar_config(disable=None)
prompt = "A photo of an astronaut" inputs = self.get_dummy_inputs(device)
inputs["prompt"] = "A photo of an astronaut"
generator = torch.Generator(device=device).manual_seed(0) output = alt_pipe(**inputs)
output = alt_pipe([prompt], generator=generator, guidance_scale=6.0, num_inference_steps=2, output_type="np")
image = output.images image = output.images
generator = torch.Generator(device=device).manual_seed(0)
image_from_tuple = alt_pipe(
[prompt],
generator=generator,
guidance_scale=6.0,
num_inference_steps=2,
output_type="np",
return_dict=False,
)[0]
image_slice = image[0, -3:, -3:, -1] image_slice = image[0, -3:, -3:, -1]
image_from_tuple_slice = image_from_tuple[0, -3:, -3:, -1]
assert image.shape == (1, 64, 64, 3) assert image.shape == (1, 64, 64, 3)
expected_slice = np.array( expected_slice = np.array(
...@@ -177,89 +156,39 @@ class AltDiffusionPipelineFastTests(PipelineTesterMixin, unittest.TestCase): ...@@ -177,89 +156,39 @@ class AltDiffusionPipelineFastTests(PipelineTesterMixin, unittest.TestCase):
) )
assert np.abs(image_slice.flatten() - expected_slice).max() < 1e-2 assert np.abs(image_slice.flatten() - expected_slice).max() < 1e-2
assert np.abs(image_from_tuple_slice.flatten() - expected_slice).max() < 1e-2
def test_alt_diffusion_pndm(self): def test_alt_diffusion_pndm(self):
device = "cpu" # ensure determinism for the device-dependent torch.Generator device = "cpu" # ensure determinism for the device-dependent torch.Generator
unet = self.dummy_cond_unet
scheduler = PNDMScheduler(skip_prk_steps=True)
vae = self.dummy_vae
bert = self.dummy_text_encoder
tokenizer = XLMRobertaTokenizer.from_pretrained("hf-internal-testing/tiny-xlm-roberta")
tokenizer.model_max_length = 77
# make sure here that pndm scheduler skips prk components = self.get_dummy_components()
alt_pipe = AltDiffusionPipeline( components["scheduler"] = PNDMScheduler(skip_prk_steps=True)
unet=unet, torch.manual_seed(0)
scheduler=scheduler, text_encoder_config = RobertaSeriesConfig(
vae=vae, hidden_size=32,
text_encoder=bert, project_dim=32,
tokenizer=tokenizer, intermediate_size=37,
safety_checker=None, layer_norm_eps=1e-05,
feature_extractor=self.dummy_extractor, num_attention_heads=4,
num_hidden_layers=5,
vocab_size=5002,
) )
# TODO: remove after fixing the non-deterministic text encoder
text_encoder = RobertaSeriesModelWithTransformation(text_encoder_config)
components["text_encoder"] = text_encoder
alt_pipe = AltDiffusionPipeline(**components)
alt_pipe = alt_pipe.to(device) alt_pipe = alt_pipe.to(device)
alt_pipe.set_progress_bar_config(disable=None) alt_pipe.set_progress_bar_config(disable=None)
prompt = "A painting of a squirrel eating a burger" inputs = self.get_dummy_inputs(device)
generator = torch.Generator(device=device).manual_seed(0) output = alt_pipe(**inputs)
output = alt_pipe([prompt], generator=generator, guidance_scale=6.0, num_inference_steps=2, output_type="np")
image = output.images image = output.images
generator = torch.Generator(device=device).manual_seed(0)
image_from_tuple = alt_pipe(
[prompt],
generator=generator,
guidance_scale=6.0,
num_inference_steps=2,
output_type="np",
return_dict=False,
)[0]
image_slice = image[0, -3:, -3:, -1] image_slice = image[0, -3:, -3:, -1]
image_from_tuple_slice = image_from_tuple[0, -3:, -3:, -1]
assert image.shape == (1, 64, 64, 3) assert image.shape == (1, 64, 64, 3)
expected_slice = np.array( expected_slice = np.array(
[0.51605093, 0.5707241, 0.47365507, 0.50578886, 0.5633877, 0.4642503, 0.5182081, 0.48763484, 0.49084237] [0.51605093, 0.5707241, 0.47365507, 0.50578886, 0.5633877, 0.4642503, 0.5182081, 0.48763484, 0.49084237]
) )
assert np.abs(image_slice.flatten() - expected_slice).max() < 1e-2 assert np.abs(image_slice.flatten() - expected_slice).max() < 1e-2
assert np.abs(image_from_tuple_slice.flatten() - expected_slice).max() < 1e-2
@unittest.skipIf(torch_device != "cuda", "This test requires a GPU")
def test_alt_diffusion_fp16(self):
"""Test that stable diffusion works with fp16"""
unet = self.dummy_cond_unet
scheduler = PNDMScheduler(skip_prk_steps=True)
vae = self.dummy_vae
bert = self.dummy_text_encoder
tokenizer = XLMRobertaTokenizer.from_pretrained("hf-internal-testing/tiny-xlm-roberta")
tokenizer.model_max_length = 77
# put models in fp16
unet = unet.half()
vae = vae.half()
bert = bert.half()
# make sure here that pndm scheduler skips prk
alt_pipe = AltDiffusionPipeline(
unet=unet,
scheduler=scheduler,
vae=vae,
text_encoder=bert,
tokenizer=tokenizer,
safety_checker=None,
feature_extractor=self.dummy_extractor,
)
alt_pipe = alt_pipe.to(torch_device)
alt_pipe.set_progress_bar_config(disable=None)
prompt = "A painting of a squirrel eating a burger"
generator = torch.Generator(device=torch_device).manual_seed(0)
image = alt_pipe([prompt], generator=generator, num_inference_steps=2, output_type="np").images
assert image.shape == (1, 64, 64, 3)
@slow @slow
......
...@@ -29,13 +29,11 @@ from diffusers.utils import floats_tensor, load_image, load_numpy, slow, torch_d ...@@ -29,13 +29,11 @@ from diffusers.utils import floats_tensor, load_image, load_numpy, slow, torch_d
from diffusers.utils.testing_utils import require_torch_gpu from diffusers.utils.testing_utils import require_torch_gpu
from transformers import XLMRobertaTokenizer from transformers import XLMRobertaTokenizer
from ...test_pipelines_common import PipelineTesterMixin
torch.backends.cuda.matmul.allow_tf32 = False torch.backends.cuda.matmul.allow_tf32 = False
class AltDiffusionImg2ImgPipelineFastTests(PipelineTesterMixin, unittest.TestCase): class AltDiffusionImg2ImgPipelineFastTests(unittest.TestCase):
def tearDown(self): def tearDown(self):
# clean up the VRAM after each test # clean up the VRAM after each test
super().tearDown() super().tearDown()
......
...@@ -23,21 +23,20 @@ from diffusers import DanceDiffusionPipeline, IPNDMScheduler, UNet1DModel ...@@ -23,21 +23,20 @@ from diffusers import DanceDiffusionPipeline, IPNDMScheduler, UNet1DModel
from diffusers.utils import slow, torch_device from diffusers.utils import slow, torch_device
from diffusers.utils.testing_utils import require_torch_gpu from diffusers.utils.testing_utils import require_torch_gpu
from ...test_pipelines_common import PipelineTesterMixin
torch.backends.cuda.matmul.allow_tf32 = False torch.backends.cuda.matmul.allow_tf32 = False
class PipelineFastTests(unittest.TestCase): class DanceDiffusionPipelineFastTests(PipelineTesterMixin, unittest.TestCase):
def tearDown(self): pipeline_class = DanceDiffusionPipeline
# clean up the VRAM after each test test_attention_slicing = False
super().tearDown() test_cpu_offload = False
gc.collect()
torch.cuda.empty_cache()
@property def get_dummy_components(self):
def dummy_unet(self):
torch.manual_seed(0) torch.manual_seed(0)
model = UNet1DModel( unet = UNet1DModel(
block_out_channels=(32, 32, 64), block_out_channels=(32, 32, 64),
extra_in_channels=16, extra_in_channels=16,
sample_size=512, sample_size=512,
...@@ -48,34 +47,44 @@ class PipelineFastTests(unittest.TestCase): ...@@ -48,34 +47,44 @@ class PipelineFastTests(unittest.TestCase):
use_timestep_embedding=False, use_timestep_embedding=False,
time_embedding_type="fourier", time_embedding_type="fourier",
mid_block_type="UNetMidBlock1D", mid_block_type="UNetMidBlock1D",
down_block_types=["DownBlock1DNoSkip"] + ["DownBlock1D"] + ["AttnDownBlock1D"], down_block_types=("DownBlock1DNoSkip", "DownBlock1D", "AttnDownBlock1D"),
up_block_types=["AttnUpBlock1D"] + ["UpBlock1D"] + ["UpBlock1DNoSkip"], up_block_types=("AttnUpBlock1D", "UpBlock1D", "UpBlock1DNoSkip"),
) )
return model scheduler = IPNDMScheduler()
components = {
"unet": unet,
"scheduler": scheduler,
}
return components
def get_dummy_inputs(self, device, seed=0):
if str(device).startswith("mps"):
generator = torch.manual_seed(seed)
else:
generator = torch.Generator(device=device).manual_seed(seed)
inputs = {
"generator": generator,
"num_inference_steps": 4,
}
return inputs
def test_dance_diffusion(self): def test_dance_diffusion(self):
device = "cpu" # ensure determinism for the device-dependent torch.Generator device = "cpu" # ensure determinism for the device-dependent torch.Generator
scheduler = IPNDMScheduler() components = self.get_dummy_components()
pipe = DanceDiffusionPipeline(**components)
pipe = DanceDiffusionPipeline(unet=self.dummy_unet, scheduler=scheduler)
pipe = pipe.to(device) pipe = pipe.to(device)
pipe.set_progress_bar_config(disable=None) pipe.set_progress_bar_config(disable=None)
generator = torch.Generator(device=device).manual_seed(0) inputs = self.get_dummy_inputs(device)
output = pipe(generator=generator, num_inference_steps=4) output = pipe(**inputs)
audio = output.audios audio = output.audios
generator = torch.Generator(device=device).manual_seed(0)
output = pipe(generator=generator, num_inference_steps=4, return_dict=False)
audio_from_tuple = output[0]
audio_slice = audio[0, -3:, -3:] audio_slice = audio[0, -3:, -3:]
audio_from_tuple_slice = audio_from_tuple[0, -3:, -3:]
assert audio.shape == (1, 2, self.dummy_unet.sample_size) assert audio.shape == (1, 2, components["unet"].sample_size)
expected_slice = np.array([-0.7265, 1.0000, -0.8388, 0.1175, 0.9498, -1.0000]) expected_slice = np.array([-0.7265, 1.0000, -0.8388, 0.1175, 0.9498, -1.0000])
assert np.abs(audio_slice.flatten() - expected_slice).max() < 1e-2 assert np.abs(audio_slice.flatten() - expected_slice).max() < 1e-2
assert np.abs(audio_from_tuple_slice.flatten() - expected_slice).max() < 1e-2
@slow @slow
......
...@@ -28,10 +28,11 @@ torch.backends.cuda.matmul.allow_tf32 = False ...@@ -28,10 +28,11 @@ torch.backends.cuda.matmul.allow_tf32 = False
class DDIMPipelineFastTests(PipelineTesterMixin, unittest.TestCase): class DDIMPipelineFastTests(PipelineTesterMixin, unittest.TestCase):
@property pipeline_class = DDIMPipeline
def dummy_uncond_unet(self):
def get_dummy_components(self):
torch.manual_seed(0) torch.manual_seed(0)
model = UNet2DModel( unet = UNet2DModel(
block_out_channels=(32, 64), block_out_channels=(32, 64),
layers_per_block=2, layers_per_block=2,
sample_size=32, sample_size=32,
...@@ -40,32 +41,40 @@ class DDIMPipelineFastTests(PipelineTesterMixin, unittest.TestCase): ...@@ -40,32 +41,40 @@ class DDIMPipelineFastTests(PipelineTesterMixin, unittest.TestCase):
down_block_types=("DownBlock2D", "AttnDownBlock2D"), down_block_types=("DownBlock2D", "AttnDownBlock2D"),
up_block_types=("AttnUpBlock2D", "UpBlock2D"), up_block_types=("AttnUpBlock2D", "UpBlock2D"),
) )
return model scheduler = DDIMScheduler()
components = {"unet": unet, "scheduler": scheduler}
return components
def get_dummy_inputs(self, device, seed=0):
if str(device).startswith("mps"):
generator = torch.manual_seed(seed)
else:
generator = torch.Generator(device=device).manual_seed(seed)
inputs = {
"generator": generator,
"num_inference_steps": 2,
"output_type": "numpy",
}
return inputs
def test_inference(self): def test_inference(self):
device = "cpu" device = "cpu"
unet = self.dummy_uncond_unet
scheduler = DDIMScheduler()
ddpm = DDIMPipeline(unet=unet, scheduler=scheduler) components = self.get_dummy_components()
ddpm.to(device) pipe = self.pipeline_class(**components)
ddpm.set_progress_bar_config(disable=None) pipe.to(device)
pipe.set_progress_bar_config(disable=None)
generator = torch.Generator(device=device).manual_seed(0)
image = ddpm(generator=generator, num_inference_steps=2, output_type="numpy").images
generator = torch.Generator(device=device).manual_seed(0)
image_from_tuple = ddpm(generator=generator, num_inference_steps=2, output_type="numpy", return_dict=False)[0]
inputs = self.get_dummy_inputs(device)
image = pipe(**inputs).images
image_slice = image[0, -3:, -3:, -1] image_slice = image[0, -3:, -3:, -1]
image_from_tuple_slice = image_from_tuple[0, -3:, -3:, -1]
assert image.shape == (1, 32, 32, 3) self.assertEqual(image.shape, (1, 32, 32, 3))
expected_slice = np.array( expected_slice = np.array(
[1.000e00, 5.717e-01, 4.717e-01, 1.000e00, 0.000e00, 1.000e00, 3.000e-04, 0.000e00, 9.000e-04] [1.000e00, 5.717e-01, 4.717e-01, 1.000e00, 0.000e00, 1.000e00, 3.000e-04, 0.000e00, 9.000e-04]
) )
assert np.abs(image_slice.flatten() - expected_slice).max() < 1e-2 max_diff = np.abs(image_slice.flatten() - expected_slice).max()
assert np.abs(image_from_tuple_slice.flatten() - expected_slice).max() < 1e-2 self.assertLessEqual(max_diff, 1e-3)
@slow @slow
......
...@@ -22,13 +22,11 @@ from diffusers import DDPMPipeline, DDPMScheduler, UNet2DModel ...@@ -22,13 +22,11 @@ from diffusers import DDPMPipeline, DDPMScheduler, UNet2DModel
from diffusers.utils import deprecate from diffusers.utils import deprecate
from diffusers.utils.testing_utils import require_torch_gpu, slow, torch_device from diffusers.utils.testing_utils import require_torch_gpu, slow, torch_device
from ...test_pipelines_common import PipelineTesterMixin
torch.backends.cuda.matmul.allow_tf32 = False torch.backends.cuda.matmul.allow_tf32 = False
class DDPMPipelineFastTests(PipelineTesterMixin, unittest.TestCase): class DDPMPipelineFastTests(unittest.TestCase):
@property @property
def dummy_uncond_unet(self): def dummy_uncond_unet(self):
torch.manual_seed(0) torch.manual_seed(0)
......
...@@ -21,13 +21,11 @@ import torch ...@@ -21,13 +21,11 @@ import torch
from diffusers import KarrasVePipeline, KarrasVeScheduler, UNet2DModel from diffusers import KarrasVePipeline, KarrasVeScheduler, UNet2DModel
from diffusers.utils.testing_utils import require_torch, slow, torch_device from diffusers.utils.testing_utils import require_torch, slow, torch_device
from ...test_pipelines_common import PipelineTesterMixin
torch.backends.cuda.matmul.allow_tf32 = False torch.backends.cuda.matmul.allow_tf32 = False
class KarrasVePipelineFastTests(PipelineTesterMixin, unittest.TestCase): class KarrasVePipelineFastTests(unittest.TestCase):
@property @property
def dummy_uncond_unet(self): def dummy_uncond_unet(self):
torch.manual_seed(0) torch.manual_seed(0)
......
...@@ -22,13 +22,11 @@ from diffusers import AutoencoderKL, DDIMScheduler, LDMTextToImagePipeline, UNet ...@@ -22,13 +22,11 @@ from diffusers import AutoencoderKL, DDIMScheduler, LDMTextToImagePipeline, UNet
from diffusers.utils.testing_utils import require_torch, slow, torch_device from diffusers.utils.testing_utils import require_torch, slow, torch_device
from transformers import CLIPTextConfig, CLIPTextModel, CLIPTokenizer from transformers import CLIPTextConfig, CLIPTextModel, CLIPTokenizer
from ...test_pipelines_common import PipelineTesterMixin
torch.backends.cuda.matmul.allow_tf32 = False torch.backends.cuda.matmul.allow_tf32 = False
class LDMTextToImagePipelineFastTests(PipelineTesterMixin, unittest.TestCase): class LDMTextToImagePipelineFastTests(unittest.TestCase):
@property @property
def dummy_cond_unet(self): def dummy_cond_unet(self):
torch.manual_seed(0) torch.manual_seed(0)
......
...@@ -23,13 +23,11 @@ from diffusers import DDIMScheduler, LDMSuperResolutionPipeline, UNet2DModel, VQ ...@@ -23,13 +23,11 @@ from diffusers import DDIMScheduler, LDMSuperResolutionPipeline, UNet2DModel, VQ
from diffusers.utils import PIL_INTERPOLATION, floats_tensor, load_image, slow, torch_device from diffusers.utils import PIL_INTERPOLATION, floats_tensor, load_image, slow, torch_device
from diffusers.utils.testing_utils import require_torch from diffusers.utils.testing_utils import require_torch
from ...test_pipelines_common import PipelineTesterMixin
torch.backends.cuda.matmul.allow_tf32 = False torch.backends.cuda.matmul.allow_tf32 = False
class LDMSuperResolutionPipelineFastTests(PipelineTesterMixin, unittest.TestCase): class LDMSuperResolutionPipelineFastTests(unittest.TestCase):
@property @property
def dummy_image(self): def dummy_image(self):
batch_size = 1 batch_size = 1
......
...@@ -22,13 +22,11 @@ from diffusers import DDIMScheduler, LDMPipeline, UNet2DModel, VQModel ...@@ -22,13 +22,11 @@ from diffusers import DDIMScheduler, LDMPipeline, UNet2DModel, VQModel
from diffusers.utils.testing_utils import require_torch, slow, torch_device from diffusers.utils.testing_utils import require_torch, slow, torch_device
from transformers import CLIPTextConfig, CLIPTextModel from transformers import CLIPTextConfig, CLIPTextModel
from ...test_pipelines_common import PipelineTesterMixin
torch.backends.cuda.matmul.allow_tf32 = False torch.backends.cuda.matmul.allow_tf32 = False
class LDMPipelineFastTests(PipelineTesterMixin, unittest.TestCase): class LDMPipelineFastTests(unittest.TestCase):
@property @property
def dummy_uncond_unet(self): def dummy_uncond_unet(self):
torch.manual_seed(0) torch.manual_seed(0)
......
...@@ -21,13 +21,11 @@ import torch ...@@ -21,13 +21,11 @@ import torch
from diffusers import PNDMPipeline, PNDMScheduler, UNet2DModel from diffusers import PNDMPipeline, PNDMScheduler, UNet2DModel
from diffusers.utils.testing_utils import require_torch, slow, torch_device from diffusers.utils.testing_utils import require_torch, slow, torch_device
from ...test_pipelines_common import PipelineTesterMixin
torch.backends.cuda.matmul.allow_tf32 = False torch.backends.cuda.matmul.allow_tf32 = False
class PNDMPipelineFastTests(PipelineTesterMixin, unittest.TestCase): class PNDMPipelineFastTests(unittest.TestCase):
@property @property
def dummy_uncond_unet(self): def dummy_uncond_unet(self):
torch.manual_seed(0) torch.manual_seed(0)
......
...@@ -21,13 +21,11 @@ import torch ...@@ -21,13 +21,11 @@ import torch
from diffusers import ScoreSdeVePipeline, ScoreSdeVeScheduler, UNet2DModel from diffusers import ScoreSdeVePipeline, ScoreSdeVeScheduler, UNet2DModel
from diffusers.utils.testing_utils import require_torch, slow, torch_device from diffusers.utils.testing_utils import require_torch, slow, torch_device
from ...test_pipelines_common import PipelineTesterMixin
torch.backends.cuda.matmul.allow_tf32 = False torch.backends.cuda.matmul.allow_tf32 = False
class ScoreSdeVeipelineFastTests(PipelineTesterMixin, unittest.TestCase): class ScoreSdeVeipelineFastTests(unittest.TestCase):
@property @property
def dummy_uncond_unet(self): def dummy_uncond_unet(self):
torch.manual_seed(0) torch.manual_seed(0)
......
...@@ -20,7 +20,7 @@ import unittest ...@@ -20,7 +20,7 @@ import unittest
import numpy as np import numpy as np
import torch import torch
from diffusers import AutoencoderKL, CycleDiffusionPipeline, DDIMScheduler, UNet2DConditionModel, UNet2DModel, VQModel from diffusers import AutoencoderKL, CycleDiffusionPipeline, DDIMScheduler, UNet2DConditionModel
from diffusers.utils import floats_tensor, load_image, load_numpy, slow, torch_device from diffusers.utils import floats_tensor, load_image, load_numpy, slow, torch_device
from diffusers.utils.testing_utils import require_torch_gpu from diffusers.utils.testing_utils import require_torch_gpu
from transformers import CLIPTextConfig, CLIPTextModel, CLIPTokenizer from transformers import CLIPTextConfig, CLIPTextModel, CLIPTokenizer
...@@ -32,39 +32,11 @@ torch.backends.cuda.matmul.allow_tf32 = False ...@@ -32,39 +32,11 @@ torch.backends.cuda.matmul.allow_tf32 = False
class CycleDiffusionPipelineFastTests(PipelineTesterMixin, unittest.TestCase): class CycleDiffusionPipelineFastTests(PipelineTesterMixin, unittest.TestCase):
def tearDown(self): pipeline_class = CycleDiffusionPipeline
# clean up the VRAM after each test
super().tearDown()
gc.collect()
torch.cuda.empty_cache()
@property
def dummy_image(self):
batch_size = 1
num_channels = 3
sizes = (32, 32)
image = floats_tensor((batch_size, num_channels) + sizes, rng=random.Random(0)).to(torch_device) def get_dummy_components(self):
return image
@property
def dummy_uncond_unet(self):
torch.manual_seed(0) torch.manual_seed(0)
model = UNet2DModel( unet = UNet2DConditionModel(
block_out_channels=(32, 64),
layers_per_block=2,
sample_size=32,
in_channels=3,
out_channels=3,
down_block_types=("DownBlock2D", "AttnDownBlock2D"),
up_block_types=("AttnUpBlock2D", "UpBlock2D"),
)
return model
@property
def dummy_cond_unet(self):
torch.manual_seed(0)
model = UNet2DConditionModel(
block_out_channels=(32, 64), block_out_channels=(32, 64),
layers_per_block=2, layers_per_block=2,
sample_size=32, sample_size=32,
...@@ -74,40 +46,16 @@ class CycleDiffusionPipelineFastTests(PipelineTesterMixin, unittest.TestCase): ...@@ -74,40 +46,16 @@ class CycleDiffusionPipelineFastTests(PipelineTesterMixin, unittest.TestCase):
up_block_types=("CrossAttnUpBlock2D", "UpBlock2D"), up_block_types=("CrossAttnUpBlock2D", "UpBlock2D"),
cross_attention_dim=32, cross_attention_dim=32,
) )
return model scheduler = DDIMScheduler(
beta_start=0.00085,
@property beta_end=0.012,
def dummy_cond_unet_inpaint(self): beta_schedule="scaled_linear",
torch.manual_seed(0) num_train_timesteps=1000,
model = UNet2DConditionModel( clip_sample=False,
block_out_channels=(32, 64), set_alpha_to_one=False,
layers_per_block=2,
sample_size=32,
in_channels=9,
out_channels=4,
down_block_types=("DownBlock2D", "CrossAttnDownBlock2D"),
up_block_types=("CrossAttnUpBlock2D", "UpBlock2D"),
cross_attention_dim=32,
)
return model
@property
def dummy_vq_model(self):
torch.manual_seed(0)
model = VQModel(
block_out_channels=[32, 64],
in_channels=3,
out_channels=3,
down_block_types=["DownEncoderBlock2D", "DownEncoderBlock2D"],
up_block_types=["UpDecoderBlock2D", "UpDecoderBlock2D"],
latent_channels=3,
) )
return model
@property
def dummy_vae(self):
torch.manual_seed(0) torch.manual_seed(0)
model = AutoencoderKL( vae = AutoencoderKL(
block_out_channels=[32, 64], block_out_channels=[32, 64],
in_channels=3, in_channels=3,
out_channels=3, out_channels=3,
...@@ -115,12 +63,8 @@ class CycleDiffusionPipelineFastTests(PipelineTesterMixin, unittest.TestCase): ...@@ -115,12 +63,8 @@ class CycleDiffusionPipelineFastTests(PipelineTesterMixin, unittest.TestCase):
up_block_types=["UpDecoderBlock2D", "UpDecoderBlock2D"], up_block_types=["UpDecoderBlock2D", "UpDecoderBlock2D"],
latent_channels=4, latent_channels=4,
) )
return model
@property
def dummy_text_encoder(self):
torch.manual_seed(0) torch.manual_seed(0)
config = CLIPTextConfig( text_encoder_config = CLIPTextConfig(
bos_token_id=0, bos_token_id=0,
eos_token_id=2, eos_token_id=2,
hidden_size=32, hidden_size=32,
...@@ -131,68 +75,50 @@ class CycleDiffusionPipelineFastTests(PipelineTesterMixin, unittest.TestCase): ...@@ -131,68 +75,50 @@ class CycleDiffusionPipelineFastTests(PipelineTesterMixin, unittest.TestCase):
pad_token_id=1, pad_token_id=1,
vocab_size=1000, vocab_size=1000,
) )
return CLIPTextModel(config) text_encoder = CLIPTextModel(text_encoder_config)
tokenizer = CLIPTokenizer.from_pretrained("hf-internal-testing/tiny-random-clip")
@property
def dummy_extractor(self):
def extract(*args, **kwargs):
class Out:
def __init__(self):
self.pixel_values = torch.ones([0])
def to(self, device):
self.pixel_values.to(device)
return self
return Out()
return extract components = {
"unet": unet,
"scheduler": scheduler,
"vae": vae,
"text_encoder": text_encoder,
"tokenizer": tokenizer,
"safety_checker": None,
"feature_extractor": None,
}
return components
def get_dummy_inputs(self, device, seed=0):
image = floats_tensor((1, 3, 32, 32), rng=random.Random(seed)).to(device)
if str(device).startswith("mps"):
generator = torch.manual_seed(seed)
else:
generator = torch.Generator(device=device).manual_seed(seed)
inputs = {
"prompt": "An astronaut riding an elephant",
"source_prompt": "An astronaut riding a horse",
"image": image,
"generator": generator,
"num_inference_steps": 2,
"eta": 0.1,
"strength": 0.8,
"guidance_scale": 3,
"source_guidance_scale": 1,
"output_type": "numpy",
}
return inputs
def test_stable_diffusion_cycle(self): def test_stable_diffusion_cycle(self):
device = "cpu" # ensure determinism for the device-dependent torch.Generator device = "cpu" # ensure determinism for the device-dependent torch.Generator
unet = self.dummy_cond_unet
scheduler = DDIMScheduler(
beta_start=0.00085,
beta_end=0.012,
beta_schedule="scaled_linear",
num_train_timesteps=1000,
clip_sample=False,
set_alpha_to_one=False,
)
vae = self.dummy_vae
bert = self.dummy_text_encoder
tokenizer = CLIPTokenizer.from_pretrained("hf-internal-testing/tiny-random-clip")
# make sure here that pndm scheduler skips prk
sd_pipe = CycleDiffusionPipeline(
unet=unet,
scheduler=scheduler,
vae=vae,
text_encoder=bert,
tokenizer=tokenizer,
safety_checker=None,
feature_extractor=self.dummy_extractor,
)
sd_pipe = sd_pipe.to(device)
sd_pipe.set_progress_bar_config(disable=None)
source_prompt = "An astronaut riding a horse" components = self.get_dummy_components()
prompt = "An astronaut riding an elephant" pipe = CycleDiffusionPipeline(**components)
init_image = self.dummy_image.to(device) pipe = pipe.to(device)
pipe.set_progress_bar_config(disable=None)
generator = torch.Generator(device=device).manual_seed(0) inputs = self.get_dummy_inputs(device)
output = sd_pipe( output = pipe(**inputs)
prompt=prompt,
source_prompt=source_prompt,
generator=generator,
num_inference_steps=2,
image=init_image,
eta=0.1,
strength=0.8,
guidance_scale=3,
source_guidance_scale=1,
output_type="np",
)
images = output.images images = output.images
image_slice = images[0, -3:, -3:, -1] image_slice = images[0, -3:, -3:, -1]
...@@ -204,53 +130,16 @@ class CycleDiffusionPipelineFastTests(PipelineTesterMixin, unittest.TestCase): ...@@ -204,53 +130,16 @@ class CycleDiffusionPipelineFastTests(PipelineTesterMixin, unittest.TestCase):
@unittest.skipIf(torch_device != "cuda", "This test requires a GPU") @unittest.skipIf(torch_device != "cuda", "This test requires a GPU")
def test_stable_diffusion_cycle_fp16(self): def test_stable_diffusion_cycle_fp16(self):
unet = self.dummy_cond_unet components = self.get_dummy_components()
scheduler = DDIMScheduler( for name, module in components.items():
beta_start=0.00085, if hasattr(module, "half"):
beta_end=0.012, components[name] = module.half()
beta_schedule="scaled_linear", pipe = CycleDiffusionPipeline(**components)
num_train_timesteps=1000, pipe = pipe.to(torch_device)
clip_sample=False, pipe.set_progress_bar_config(disable=None)
set_alpha_to_one=False,
)
vae = self.dummy_vae
bert = self.dummy_text_encoder
tokenizer = CLIPTokenizer.from_pretrained("hf-internal-testing/tiny-random-clip")
unet = unet.half()
vae = vae.half()
bert = bert.half()
# make sure here that pndm scheduler skips prk
sd_pipe = CycleDiffusionPipeline(
unet=unet,
scheduler=scheduler,
vae=vae,
text_encoder=bert,
tokenizer=tokenizer,
safety_checker=None,
feature_extractor=self.dummy_extractor,
)
sd_pipe = sd_pipe.to(torch_device)
sd_pipe.set_progress_bar_config(disable=None)
source_prompt = "An astronaut riding a horse"
prompt = "An astronaut riding an elephant"
init_image = self.dummy_image.to(torch_device)
generator = torch.Generator(device=torch_device).manual_seed(0) inputs = self.get_dummy_inputs(torch_device)
output = sd_pipe( output = pipe(**inputs)
prompt=prompt,
source_prompt=source_prompt,
generator=generator,
num_inference_steps=2,
image=init_image,
eta=0.1,
strength=0.8,
guidance_scale=3,
source_guidance_scale=1,
output_type="np",
)
images = output.images images = output.images
image_slice = images[0, -3:, -3:, -1] image_slice = images[0, -3:, -3:, -1]
......
...@@ -38,25 +38,11 @@ torch.backends.cuda.matmul.allow_tf32 = False ...@@ -38,25 +38,11 @@ torch.backends.cuda.matmul.allow_tf32 = False
class StableDiffusionImageVariationPipelineFastTests(PipelineTesterMixin, unittest.TestCase): class StableDiffusionImageVariationPipelineFastTests(PipelineTesterMixin, unittest.TestCase):
def tearDown(self): pipeline_class = StableDiffusionImageVariationPipeline
# clean up the VRAM after each test
super().tearDown()
gc.collect()
torch.cuda.empty_cache()
@property
def dummy_image(self):
batch_size = 1
num_channels = 3
sizes = (32, 32)
image = floats_tensor((batch_size, num_channels) + sizes, rng=random.Random(0)).to(torch_device) def get_dummy_components(self):
return image
@property
def dummy_cond_unet(self):
torch.manual_seed(0) torch.manual_seed(0)
model = UNet2DConditionModel( unet = UNet2DConditionModel(
block_out_channels=(32, 64), block_out_channels=(32, 64),
layers_per_block=2, layers_per_block=2,
sample_size=32, sample_size=32,
...@@ -66,12 +52,9 @@ class StableDiffusionImageVariationPipelineFastTests(PipelineTesterMixin, unitte ...@@ -66,12 +52,9 @@ class StableDiffusionImageVariationPipelineFastTests(PipelineTesterMixin, unitte
up_block_types=("CrossAttnUpBlock2D", "UpBlock2D"), up_block_types=("CrossAttnUpBlock2D", "UpBlock2D"),
cross_attention_dim=32, cross_attention_dim=32,
) )
return model scheduler = PNDMScheduler(skip_prk_steps=True)
@property
def dummy_vae(self):
torch.manual_seed(0) torch.manual_seed(0)
model = AutoencoderKL( vae = AutoencoderKL(
block_out_channels=[32, 64], block_out_channels=[32, 64],
in_channels=3, in_channels=3,
out_channels=3, out_channels=3,
...@@ -79,12 +62,8 @@ class StableDiffusionImageVariationPipelineFastTests(PipelineTesterMixin, unitte ...@@ -79,12 +62,8 @@ class StableDiffusionImageVariationPipelineFastTests(PipelineTesterMixin, unitte
up_block_types=["UpDecoderBlock2D", "UpDecoderBlock2D"], up_block_types=["UpDecoderBlock2D", "UpDecoderBlock2D"],
latent_channels=4, latent_channels=4,
) )
return model
@property
def dummy_image_encoder(self):
torch.manual_seed(0) torch.manual_seed(0)
config = CLIPVisionConfig( image_encoder_config = CLIPVisionConfig(
hidden_size=32, hidden_size=32,
projection_dim=32, projection_dim=32,
intermediate_size=37, intermediate_size=37,
...@@ -94,102 +73,58 @@ class StableDiffusionImageVariationPipelineFastTests(PipelineTesterMixin, unitte ...@@ -94,102 +73,58 @@ class StableDiffusionImageVariationPipelineFastTests(PipelineTesterMixin, unitte
image_size=32, image_size=32,
patch_size=4, patch_size=4,
) )
return CLIPVisionModelWithProjection(config) image_encoder = CLIPVisionModelWithProjection(image_encoder_config)
@property components = {
def dummy_extractor(self): "unet": unet,
def extract(*args, **kwargs): "scheduler": scheduler,
class Out: "vae": vae,
def __init__(self): "image_encoder": image_encoder,
self.pixel_values = torch.ones([0]) "safety_checker": None,
"feature_extractor": None,
def to(self, device): }
self.pixel_values.to(device) return components
return self
def get_dummy_inputs(self, device, seed=0):
return Out() image = floats_tensor((1, 3, 32, 32), rng=random.Random(seed)).to(device)
if str(device).startswith("mps"):
return extract generator = torch.manual_seed(seed)
else:
generator = torch.Generator(device=device).manual_seed(seed)
inputs = {
"image": image,
"generator": generator,
"num_inference_steps": 2,
"guidance_scale": 6.0,
"output_type": "numpy",
}
return inputs
def test_stable_diffusion_img_variation_default_case(self): def test_stable_diffusion_img_variation_default_case(self):
device = "cpu" # ensure determinism for the device-dependent torch.Generator device = "cpu" # ensure determinism for the device-dependent torch.Generator
unet = self.dummy_cond_unet components = self.get_dummy_components()
scheduler = PNDMScheduler(skip_prk_steps=True) sd_pipe = StableDiffusionImageVariationPipeline(**components)
vae = self.dummy_vae
image_encoder = self.dummy_image_encoder
init_image = self.dummy_image.to(device)
# make sure here that pndm scheduler skips prk
sd_pipe = StableDiffusionImageVariationPipeline(
unet=unet,
scheduler=scheduler,
vae=vae,
image_encoder=image_encoder,
safety_checker=None,
feature_extractor=self.dummy_extractor,
)
sd_pipe = sd_pipe.to(device) sd_pipe = sd_pipe.to(device)
sd_pipe.set_progress_bar_config(disable=None) sd_pipe.set_progress_bar_config(disable=None)
generator = torch.Generator(device=device).manual_seed(0) inputs = self.get_dummy_inputs(device)
output = sd_pipe( image = sd_pipe(**inputs).images
init_image,
generator=generator,
guidance_scale=6.0,
num_inference_steps=2,
output_type="np",
)
image = output.images
generator = torch.Generator(device=device).manual_seed(0)
image_from_tuple = sd_pipe(
init_image,
generator=generator,
guidance_scale=6.0,
num_inference_steps=2,
output_type="np",
return_dict=False,
)[0]
image_slice = image[0, -3:, -3:, -1] image_slice = image[0, -3:, -3:, -1]
image_from_tuple_slice = image_from_tuple[0, -3:, -3:, -1]
assert image.shape == (1, 64, 64, 3) assert image.shape == (1, 64, 64, 3)
expected_slice = np.array([0.5093, 0.5717, 0.4806, 0.4891, 0.5552, 0.4594, 0.5177, 0.4894, 0.4904]) expected_slice = np.array([0.5093, 0.5717, 0.4806, 0.4891, 0.5552, 0.4594, 0.5177, 0.4894, 0.4904])
assert np.abs(image_slice.flatten() - expected_slice).max() < 1e-3 assert np.abs(image_slice.flatten() - expected_slice).max() < 1e-3
assert np.abs(image_from_tuple_slice.flatten() - expected_slice).max() < 1e-3
def test_stable_diffusion_img_variation_multiple_images(self): def test_stable_diffusion_img_variation_multiple_images(self):
device = "cpu" # ensure determinism for the device-dependent torch.Generator device = "cpu" # ensure determinism for the device-dependent torch.Generator
unet = self.dummy_cond_unet components = self.get_dummy_components()
scheduler = PNDMScheduler(skip_prk_steps=True) sd_pipe = StableDiffusionImageVariationPipeline(**components)
vae = self.dummy_vae
image_encoder = self.dummy_image_encoder
init_image = self.dummy_image.to(device).repeat(2, 1, 1, 1)
# make sure here that pndm scheduler skips prk
sd_pipe = StableDiffusionImageVariationPipeline(
unet=unet,
scheduler=scheduler,
vae=vae,
image_encoder=image_encoder,
safety_checker=None,
feature_extractor=self.dummy_extractor,
)
sd_pipe = sd_pipe.to(device) sd_pipe = sd_pipe.to(device)
sd_pipe.set_progress_bar_config(disable=None) sd_pipe.set_progress_bar_config(disable=None)
generator = torch.Generator(device=device).manual_seed(0) inputs = self.get_dummy_inputs(device)
output = sd_pipe( inputs["image"] = inputs["image"].repeat(2, 1, 1, 1)
init_image, output = sd_pipe(**inputs)
generator=generator,
guidance_scale=6.0,
num_inference_steps=2,
output_type="np",
)
image = output.images image = output.images
...@@ -201,103 +136,40 @@ class StableDiffusionImageVariationPipelineFastTests(PipelineTesterMixin, unitte ...@@ -201,103 +136,40 @@ class StableDiffusionImageVariationPipelineFastTests(PipelineTesterMixin, unitte
def test_stable_diffusion_img_variation_num_images_per_prompt(self): def test_stable_diffusion_img_variation_num_images_per_prompt(self):
device = "cpu" device = "cpu"
unet = self.dummy_cond_unet components = self.get_dummy_components()
scheduler = PNDMScheduler(skip_prk_steps=True) sd_pipe = StableDiffusionImageVariationPipeline(**components)
vae = self.dummy_vae
image_encoder = self.dummy_image_encoder
init_image = self.dummy_image.to(device)
# make sure here that pndm scheduler skips prk
sd_pipe = StableDiffusionImageVariationPipeline(
unet=unet,
scheduler=scheduler,
vae=vae,
image_encoder=image_encoder,
safety_checker=None,
feature_extractor=self.dummy_extractor,
)
sd_pipe = sd_pipe.to(device) sd_pipe = sd_pipe.to(device)
sd_pipe.set_progress_bar_config(disable=None) sd_pipe.set_progress_bar_config(disable=None)
# test num_images_per_prompt=1 (default) # test num_images_per_prompt=1 (default)
images = sd_pipe( inputs = self.get_dummy_inputs(device)
init_image, images = sd_pipe(**inputs).images
num_inference_steps=2,
output_type="np",
).images
assert images.shape == (1, 64, 64, 3) assert images.shape == (1, 64, 64, 3)
# test num_images_per_prompt=1 (default) for batch of images # test num_images_per_prompt=1 (default) for batch of images
batch_size = 2 batch_size = 2
images = sd_pipe( inputs = self.get_dummy_inputs(device)
init_image.repeat(batch_size, 1, 1, 1), inputs["image"] = inputs["image"].repeat(batch_size, 1, 1, 1)
num_inference_steps=2, images = sd_pipe(**inputs).images
output_type="np",
).images
assert images.shape == (batch_size, 64, 64, 3) assert images.shape == (batch_size, 64, 64, 3)
# test num_images_per_prompt for single prompt # test num_images_per_prompt for single prompt
num_images_per_prompt = 2 num_images_per_prompt = 2
images = sd_pipe( inputs = self.get_dummy_inputs(device)
init_image, images = sd_pipe(**inputs, num_images_per_prompt=num_images_per_prompt).images
num_inference_steps=2,
output_type="np",
num_images_per_prompt=num_images_per_prompt,
).images
assert images.shape == (num_images_per_prompt, 64, 64, 3) assert images.shape == (num_images_per_prompt, 64, 64, 3)
# test num_images_per_prompt for batch of prompts # test num_images_per_prompt for batch of prompts
batch_size = 2 batch_size = 2
images = sd_pipe( inputs = self.get_dummy_inputs(device)
init_image.repeat(batch_size, 1, 1, 1), inputs["image"] = inputs["image"].repeat(batch_size, 1, 1, 1)
num_inference_steps=2, images = sd_pipe(**inputs, num_images_per_prompt=num_images_per_prompt).images
output_type="np",
num_images_per_prompt=num_images_per_prompt,
).images
assert images.shape == (batch_size * num_images_per_prompt, 64, 64, 3) assert images.shape == (batch_size * num_images_per_prompt, 64, 64, 3)
@unittest.skipIf(torch_device != "cuda", "This test requires a GPU")
def test_stable_diffusion_img_variation_fp16(self):
"""Test that stable diffusion img2img works with fp16"""
unet = self.dummy_cond_unet
scheduler = PNDMScheduler(skip_prk_steps=True)
vae = self.dummy_vae
image_encoder = self.dummy_image_encoder
init_image = self.dummy_image.to(torch_device).float()
# put models in fp16
unet = unet.half()
vae = vae.half()
image_encoder = image_encoder.half()
# make sure here that pndm scheduler skips prk
sd_pipe = StableDiffusionImageVariationPipeline(
unet=unet,
scheduler=scheduler,
vae=vae,
image_encoder=image_encoder,
safety_checker=None,
feature_extractor=self.dummy_extractor,
)
sd_pipe = sd_pipe.to(torch_device)
sd_pipe.set_progress_bar_config(disable=None)
generator = torch.Generator(device=torch_device).manual_seed(0)
image = sd_pipe(
init_image,
generator=generator,
num_inference_steps=2,
output_type="np",
).images
assert image.shape == (1, 64, 64, 3)
@slow @slow
@require_torch_gpu @require_torch_gpu
......
...@@ -27,8 +27,6 @@ from diffusers import ( ...@@ -27,8 +27,6 @@ from diffusers import (
PNDMScheduler, PNDMScheduler,
StableDiffusionImg2ImgPipeline, StableDiffusionImg2ImgPipeline,
UNet2DConditionModel, UNet2DConditionModel,
UNet2DModel,
VQModel,
) )
from diffusers.utils import floats_tensor, load_image, load_numpy, slow, torch_device from diffusers.utils import floats_tensor, load_image, load_numpy, slow, torch_device
from diffusers.utils.testing_utils import require_torch_gpu from diffusers.utils.testing_utils import require_torch_gpu
...@@ -41,39 +39,11 @@ torch.backends.cuda.matmul.allow_tf32 = False ...@@ -41,39 +39,11 @@ torch.backends.cuda.matmul.allow_tf32 = False
class StableDiffusionImg2ImgPipelineFastTests(PipelineTesterMixin, unittest.TestCase): class StableDiffusionImg2ImgPipelineFastTests(PipelineTesterMixin, unittest.TestCase):
def tearDown(self): pipeline_class = StableDiffusionImg2ImgPipeline
# clean up the VRAM after each test
super().tearDown()
gc.collect()
torch.cuda.empty_cache()
@property
def dummy_image(self):
batch_size = 1
num_channels = 3
sizes = (32, 32)
image = floats_tensor((batch_size, num_channels) + sizes, rng=random.Random(0)).to(torch_device)
return image
@property
def dummy_uncond_unet(self):
torch.manual_seed(0)
model = UNet2DModel(
block_out_channels=(32, 64),
layers_per_block=2,
sample_size=32,
in_channels=3,
out_channels=3,
down_block_types=("DownBlock2D", "AttnDownBlock2D"),
up_block_types=("AttnUpBlock2D", "UpBlock2D"),
)
return model
@property def get_dummy_components(self):
def dummy_cond_unet(self):
torch.manual_seed(0) torch.manual_seed(0)
model = UNet2DConditionModel( unet = UNet2DConditionModel(
block_out_channels=(32, 64), block_out_channels=(32, 64),
layers_per_block=2, layers_per_block=2,
sample_size=32, sample_size=32,
...@@ -83,40 +53,9 @@ class StableDiffusionImg2ImgPipelineFastTests(PipelineTesterMixin, unittest.Test ...@@ -83,40 +53,9 @@ class StableDiffusionImg2ImgPipelineFastTests(PipelineTesterMixin, unittest.Test
up_block_types=("CrossAttnUpBlock2D", "UpBlock2D"), up_block_types=("CrossAttnUpBlock2D", "UpBlock2D"),
cross_attention_dim=32, cross_attention_dim=32,
) )
return model scheduler = PNDMScheduler(skip_prk_steps=True)
@property
def dummy_cond_unet_inpaint(self):
torch.manual_seed(0)
model = UNet2DConditionModel(
block_out_channels=(32, 64),
layers_per_block=2,
sample_size=32,
in_channels=9,
out_channels=4,
down_block_types=("DownBlock2D", "CrossAttnDownBlock2D"),
up_block_types=("CrossAttnUpBlock2D", "UpBlock2D"),
cross_attention_dim=32,
)
return model
@property
def dummy_vq_model(self):
torch.manual_seed(0)
model = VQModel(
block_out_channels=[32, 64],
in_channels=3,
out_channels=3,
down_block_types=["DownEncoderBlock2D", "DownEncoderBlock2D"],
up_block_types=["UpDecoderBlock2D", "UpDecoderBlock2D"],
latent_channels=3,
)
return model
@property
def dummy_vae(self):
torch.manual_seed(0) torch.manual_seed(0)
model = AutoencoderKL( vae = AutoencoderKL(
block_out_channels=[32, 64], block_out_channels=[32, 64],
in_channels=3, in_channels=3,
out_channels=3, out_channels=3,
...@@ -124,12 +63,8 @@ class StableDiffusionImg2ImgPipelineFastTests(PipelineTesterMixin, unittest.Test ...@@ -124,12 +63,8 @@ class StableDiffusionImg2ImgPipelineFastTests(PipelineTesterMixin, unittest.Test
up_block_types=["UpDecoderBlock2D", "UpDecoderBlock2D"], up_block_types=["UpDecoderBlock2D", "UpDecoderBlock2D"],
latent_channels=4, latent_channels=4,
) )
return model
@property
def dummy_text_encoder(self):
torch.manual_seed(0) torch.manual_seed(0)
config = CLIPTextConfig( text_encoder_config = CLIPTextConfig(
bos_token_id=0, bos_token_id=0,
eos_token_id=2, eos_token_id=2,
hidden_size=32, hidden_size=32,
...@@ -140,113 +75,61 @@ class StableDiffusionImg2ImgPipelineFastTests(PipelineTesterMixin, unittest.Test ...@@ -140,113 +75,61 @@ class StableDiffusionImg2ImgPipelineFastTests(PipelineTesterMixin, unittest.Test
pad_token_id=1, pad_token_id=1,
vocab_size=1000, vocab_size=1000,
) )
return CLIPTextModel(config) text_encoder = CLIPTextModel(text_encoder_config)
tokenizer = CLIPTokenizer.from_pretrained("hf-internal-testing/tiny-random-clip")
@property
def dummy_extractor(self):
def extract(*args, **kwargs):
class Out:
def __init__(self):
self.pixel_values = torch.ones([0])
def to(self, device):
self.pixel_values.to(device)
return self
return Out()
return extract components = {
"unet": unet,
"scheduler": scheduler,
"vae": vae,
"text_encoder": text_encoder,
"tokenizer": tokenizer,
"safety_checker": None,
"feature_extractor": None,
}
return components
def get_dummy_inputs(self, device, seed=0):
image = floats_tensor((1, 3, 32, 32), rng=random.Random(seed)).to(device)
if str(device).startswith("mps"):
generator = torch.manual_seed(seed)
else:
generator = torch.Generator(device=device).manual_seed(seed)
inputs = {
"prompt": "A painting of a squirrel eating a burger",
"image": image,
"generator": generator,
"num_inference_steps": 2,
"guidance_scale": 6.0,
"output_type": "numpy",
}
return inputs
def test_stable_diffusion_img2img_default_case(self): def test_stable_diffusion_img2img_default_case(self):
device = "cpu" # ensure determinism for the device-dependent torch.Generator device = "cpu" # ensure determinism for the device-dependent torch.Generator
unet = self.dummy_cond_unet components = self.get_dummy_components()
scheduler = PNDMScheduler(skip_prk_steps=True) sd_pipe = StableDiffusionImg2ImgPipeline(**components)
vae = self.dummy_vae
bert = self.dummy_text_encoder
tokenizer = CLIPTokenizer.from_pretrained("hf-internal-testing/tiny-random-clip")
init_image = self.dummy_image.to(device)
# make sure here that pndm scheduler skips prk
sd_pipe = StableDiffusionImg2ImgPipeline(
unet=unet,
scheduler=scheduler,
vae=vae,
text_encoder=bert,
tokenizer=tokenizer,
safety_checker=None,
feature_extractor=self.dummy_extractor,
)
sd_pipe = sd_pipe.to(device) sd_pipe = sd_pipe.to(device)
sd_pipe.set_progress_bar_config(disable=None) sd_pipe.set_progress_bar_config(disable=None)
prompt = "A painting of a squirrel eating a burger" inputs = self.get_dummy_inputs(device)
generator = torch.Generator(device=device).manual_seed(0) image = sd_pipe(**inputs).images
output = sd_pipe(
[prompt],
generator=generator,
guidance_scale=6.0,
num_inference_steps=2,
output_type="np",
image=init_image,
)
image = output.images
generator = torch.Generator(device=device).manual_seed(0)
image_from_tuple = sd_pipe(
[prompt],
generator=generator,
guidance_scale=6.0,
num_inference_steps=2,
output_type="np",
image=init_image,
return_dict=False,
)[0]
image_slice = image[0, -3:, -3:, -1] image_slice = image[0, -3:, -3:, -1]
image_from_tuple_slice = image_from_tuple[0, -3:, -3:, -1]
assert image.shape == (1, 32, 32, 3) assert image.shape == (1, 32, 32, 3)
expected_slice = np.array([0.4492, 0.3865, 0.4222, 0.5854, 0.5139, 0.4379, 0.4193, 0.48, 0.4218]) expected_slice = np.array([0.4492, 0.3865, 0.4222, 0.5854, 0.5139, 0.4379, 0.4193, 0.48, 0.4218])
assert np.abs(image_slice.flatten() - expected_slice).max() < 1e-3 assert np.abs(image_slice.flatten() - expected_slice).max() < 1e-3
assert np.abs(image_from_tuple_slice.flatten() - expected_slice).max() < 1e-3
def test_stable_diffusion_img2img_negative_prompt(self): def test_stable_diffusion_img2img_negative_prompt(self):
device = "cpu" # ensure determinism for the device-dependent torch.Generator device = "cpu" # ensure determinism for the device-dependent torch.Generator
unet = self.dummy_cond_unet components = self.get_dummy_components()
scheduler = PNDMScheduler(skip_prk_steps=True) sd_pipe = StableDiffusionImg2ImgPipeline(**components)
vae = self.dummy_vae
bert = self.dummy_text_encoder
tokenizer = CLIPTokenizer.from_pretrained("hf-internal-testing/tiny-random-clip")
init_image = self.dummy_image.to(device)
# make sure here that pndm scheduler skips prk
sd_pipe = StableDiffusionImg2ImgPipeline(
unet=unet,
scheduler=scheduler,
vae=vae,
text_encoder=bert,
tokenizer=tokenizer,
safety_checker=None,
feature_extractor=self.dummy_extractor,
)
sd_pipe = sd_pipe.to(device) sd_pipe = sd_pipe.to(device)
sd_pipe.set_progress_bar_config(disable=None) sd_pipe.set_progress_bar_config(disable=None)
prompt = "A painting of a squirrel eating a burger" inputs = self.get_dummy_inputs(device)
negative_prompt = "french fries" negative_prompt = "french fries"
generator = torch.Generator(device=device).manual_seed(0) output = sd_pipe(**inputs, negative_prompt=negative_prompt)
output = sd_pipe(
prompt,
negative_prompt=negative_prompt,
generator=generator,
guidance_scale=6.0,
num_inference_steps=2,
output_type="np",
image=init_image,
)
image = output.images image = output.images
image_slice = image[0, -3:, -3:, -1] image_slice = image[0, -3:, -3:, -1]
...@@ -256,40 +139,15 @@ class StableDiffusionImg2ImgPipelineFastTests(PipelineTesterMixin, unittest.Test ...@@ -256,40 +139,15 @@ class StableDiffusionImg2ImgPipelineFastTests(PipelineTesterMixin, unittest.Test
def test_stable_diffusion_img2img_multiple_init_images(self): def test_stable_diffusion_img2img_multiple_init_images(self):
device = "cpu" # ensure determinism for the device-dependent torch.Generator device = "cpu" # ensure determinism for the device-dependent torch.Generator
unet = self.dummy_cond_unet components = self.get_dummy_components()
scheduler = PNDMScheduler(skip_prk_steps=True) sd_pipe = StableDiffusionImg2ImgPipeline(**components)
vae = self.dummy_vae
bert = self.dummy_text_encoder
tokenizer = CLIPTokenizer.from_pretrained("hf-internal-testing/tiny-random-clip")
init_image = self.dummy_image.to(device).repeat(2, 1, 1, 1)
# make sure here that pndm scheduler skips prk
sd_pipe = StableDiffusionImg2ImgPipeline(
unet=unet,
scheduler=scheduler,
vae=vae,
text_encoder=bert,
tokenizer=tokenizer,
safety_checker=None,
feature_extractor=self.dummy_extractor,
)
sd_pipe = sd_pipe.to(device) sd_pipe = sd_pipe.to(device)
sd_pipe.set_progress_bar_config(disable=None) sd_pipe.set_progress_bar_config(disable=None)
prompt = 2 * ["A painting of a squirrel eating a burger"] inputs = self.get_dummy_inputs(device)
generator = torch.Generator(device=device).manual_seed(0) inputs["prompt"] = [inputs["prompt"]] * 2
output = sd_pipe( inputs["image"] = inputs["image"].repeat(2, 1, 1, 1)
prompt, image = sd_pipe(**inputs).images
generator=generator,
guidance_scale=6.0,
num_inference_steps=2,
output_type="np",
image=init_image,
)
image = output.images
image_slice = image[-1, -3:, -3:, -1] image_slice = image[-1, -3:, -3:, -1]
assert image.shape == (2, 32, 32, 3) assert image.shape == (2, 32, 32, 3)
...@@ -298,171 +156,58 @@ class StableDiffusionImg2ImgPipelineFastTests(PipelineTesterMixin, unittest.Test ...@@ -298,171 +156,58 @@ class StableDiffusionImg2ImgPipelineFastTests(PipelineTesterMixin, unittest.Test
def test_stable_diffusion_img2img_k_lms(self): def test_stable_diffusion_img2img_k_lms(self):
device = "cpu" # ensure determinism for the device-dependent torch.Generator device = "cpu" # ensure determinism for the device-dependent torch.Generator
unet = self.dummy_cond_unet components = self.get_dummy_components()
scheduler = LMSDiscreteScheduler(beta_start=0.00085, beta_end=0.012, beta_schedule="scaled_linear") components["scheduler"] = LMSDiscreteScheduler(
beta_start=0.00085, beta_end=0.012, beta_schedule="scaled_linear"
vae = self.dummy_vae
bert = self.dummy_text_encoder
tokenizer = CLIPTokenizer.from_pretrained("hf-internal-testing/tiny-random-clip")
init_image = self.dummy_image.to(device)
# make sure here that pndm scheduler skips prk
sd_pipe = StableDiffusionImg2ImgPipeline(
unet=unet,
scheduler=scheduler,
vae=vae,
text_encoder=bert,
tokenizer=tokenizer,
safety_checker=None,
feature_extractor=self.dummy_extractor,
) )
sd_pipe = StableDiffusionImg2ImgPipeline(**components)
sd_pipe = sd_pipe.to(device) sd_pipe = sd_pipe.to(device)
sd_pipe.set_progress_bar_config(disable=None) sd_pipe.set_progress_bar_config(disable=None)
prompt = "A painting of a squirrel eating a burger" inputs = self.get_dummy_inputs(device)
generator = torch.Generator(device=device).manual_seed(0) image = sd_pipe(**inputs).images
output = sd_pipe(
[prompt],
generator=generator,
guidance_scale=6.0,
num_inference_steps=2,
output_type="np",
image=init_image,
)
image = output.images
generator = torch.Generator(device=device).manual_seed(0)
output = sd_pipe(
[prompt],
generator=generator,
guidance_scale=6.0,
num_inference_steps=2,
output_type="np",
image=init_image,
return_dict=False,
)
image_from_tuple = output[0]
image_slice = image[0, -3:, -3:, -1] image_slice = image[0, -3:, -3:, -1]
image_from_tuple_slice = image_from_tuple[0, -3:, -3:, -1]
assert image.shape == (1, 32, 32, 3) assert image.shape == (1, 32, 32, 3)
expected_slice = np.array([0.4367, 0.4986, 0.4372, 0.6706, 0.5665, 0.444, 0.5864, 0.6019, 0.5203]) expected_slice = np.array([0.4367, 0.4986, 0.4372, 0.6706, 0.5665, 0.444, 0.5864, 0.6019, 0.5203])
assert np.abs(image_slice.flatten() - expected_slice).max() < 1e-3 assert np.abs(image_slice.flatten() - expected_slice).max() < 1e-3
assert np.abs(image_from_tuple_slice.flatten() - expected_slice).max() < 1e-3
def test_stable_diffusion_img2img_num_images_per_prompt(self): def test_stable_diffusion_img2img_num_images_per_prompt(self):
device = "cpu" device = "cpu" # ensure determinism for the device-dependent torch.Generator
unet = self.dummy_cond_unet components = self.get_dummy_components()
scheduler = PNDMScheduler(skip_prk_steps=True) sd_pipe = StableDiffusionImg2ImgPipeline(**components)
vae = self.dummy_vae
bert = self.dummy_text_encoder
tokenizer = CLIPTokenizer.from_pretrained("hf-internal-testing/tiny-random-clip")
init_image = self.dummy_image.to(device)
# make sure here that pndm scheduler skips prk
sd_pipe = StableDiffusionImg2ImgPipeline(
unet=unet,
scheduler=scheduler,
vae=vae,
text_encoder=bert,
tokenizer=tokenizer,
safety_checker=None,
feature_extractor=self.dummy_extractor,
)
sd_pipe = sd_pipe.to(device) sd_pipe = sd_pipe.to(device)
sd_pipe.set_progress_bar_config(disable=None) sd_pipe.set_progress_bar_config(disable=None)
prompt = "A painting of a squirrel eating a burger"
# test num_images_per_prompt=1 (default) # test num_images_per_prompt=1 (default)
images = sd_pipe( inputs = self.get_dummy_inputs(device)
prompt, images = sd_pipe(**inputs).images
num_inference_steps=2,
output_type="np",
image=init_image,
).images
assert images.shape == (1, 32, 32, 3) assert images.shape == (1, 32, 32, 3)
# test num_images_per_prompt=1 (default) for batch of prompts # test num_images_per_prompt=1 (default) for batch of prompts
batch_size = 2 batch_size = 2
images = sd_pipe( inputs = self.get_dummy_inputs(device)
[prompt] * batch_size, inputs["prompt"] = [inputs["prompt"]] * batch_size
num_inference_steps=2, images = sd_pipe(**inputs).images
output_type="np",
image=init_image,
).images
assert images.shape == (batch_size, 32, 32, 3) assert images.shape == (batch_size, 32, 32, 3)
# test num_images_per_prompt for single prompt # test num_images_per_prompt for single prompt
num_images_per_prompt = 2 num_images_per_prompt = 2
images = sd_pipe( inputs = self.get_dummy_inputs(device)
prompt, images = sd_pipe(**inputs, num_images_per_prompt=num_images_per_prompt).images
num_inference_steps=2,
output_type="np",
image=init_image,
num_images_per_prompt=num_images_per_prompt,
).images
assert images.shape == (num_images_per_prompt, 32, 32, 3) assert images.shape == (num_images_per_prompt, 32, 32, 3)
# test num_images_per_prompt for batch of prompts # test num_images_per_prompt for batch of prompts
batch_size = 2 batch_size = 2
images = sd_pipe( inputs = self.get_dummy_inputs(device)
[prompt] * batch_size, inputs["prompt"] = [inputs["prompt"]] * batch_size
num_inference_steps=2, images = sd_pipe(**inputs, num_images_per_prompt=num_images_per_prompt).images
output_type="np",
image=init_image,
num_images_per_prompt=num_images_per_prompt,
).images
assert images.shape == (batch_size * num_images_per_prompt, 32, 32, 3) assert images.shape == (batch_size * num_images_per_prompt, 32, 32, 3)
@unittest.skipIf(torch_device != "cuda", "This test requires a GPU")
def test_stable_diffusion_img2img_fp16(self):
"""Test that stable diffusion img2img works with fp16"""
unet = self.dummy_cond_unet
scheduler = PNDMScheduler(skip_prk_steps=True)
vae = self.dummy_vae
bert = self.dummy_text_encoder
tokenizer = CLIPTokenizer.from_pretrained("hf-internal-testing/tiny-random-clip")
init_image = self.dummy_image.to(torch_device)
# put models in fp16
unet = unet.half()
vae = vae.half()
bert = bert.half()
# make sure here that pndm scheduler skips prk
sd_pipe = StableDiffusionImg2ImgPipeline(
unet=unet,
scheduler=scheduler,
vae=vae,
text_encoder=bert,
tokenizer=tokenizer,
safety_checker=None,
feature_extractor=self.dummy_extractor,
)
sd_pipe = sd_pipe.to(torch_device)
sd_pipe.set_progress_bar_config(disable=None)
prompt = "A painting of a squirrel eating a burger"
generator = torch.Generator(device=torch_device).manual_seed(0)
image = sd_pipe(
[prompt],
generator=generator,
num_inference_steps=2,
output_type="np",
image=init_image,
).images
assert image.shape == (1, 32, 32, 3)
@slow @slow
@require_torch_gpu @require_torch_gpu
......
...@@ -26,8 +26,6 @@ from diffusers import ( ...@@ -26,8 +26,6 @@ from diffusers import (
PNDMScheduler, PNDMScheduler,
StableDiffusionInpaintPipeline, StableDiffusionInpaintPipeline,
UNet2DConditionModel, UNet2DConditionModel,
UNet2DModel,
VQModel,
) )
from diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion_inpaint import prepare_mask_and_masked_image from diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion_inpaint import prepare_mask_and_masked_image
from diffusers.utils import floats_tensor, load_image, load_numpy, slow, torch_device from diffusers.utils import floats_tensor, load_image, load_numpy, slow, torch_device
...@@ -42,54 +40,11 @@ torch.backends.cuda.matmul.allow_tf32 = False ...@@ -42,54 +40,11 @@ torch.backends.cuda.matmul.allow_tf32 = False
class StableDiffusionInpaintPipelineFastTests(PipelineTesterMixin, unittest.TestCase): class StableDiffusionInpaintPipelineFastTests(PipelineTesterMixin, unittest.TestCase):
def tearDown(self): pipeline_class = StableDiffusionInpaintPipeline
# clean up the VRAM after each test
super().tearDown()
gc.collect()
torch.cuda.empty_cache()
@property
def dummy_image(self):
batch_size = 1
num_channels = 3
sizes = (32, 32)
image = floats_tensor((batch_size, num_channels) + sizes, rng=random.Random(0)).to(torch_device)
return image
@property def get_dummy_components(self):
def dummy_uncond_unet(self):
torch.manual_seed(0) torch.manual_seed(0)
model = UNet2DModel( unet = UNet2DConditionModel(
block_out_channels=(32, 64),
layers_per_block=2,
sample_size=32,
in_channels=3,
out_channels=3,
down_block_types=("DownBlock2D", "AttnDownBlock2D"),
up_block_types=("AttnUpBlock2D", "UpBlock2D"),
)
return model
@property
def dummy_cond_unet(self):
torch.manual_seed(0)
model = UNet2DConditionModel(
block_out_channels=(32, 64),
layers_per_block=2,
sample_size=32,
in_channels=4,
out_channels=4,
down_block_types=("DownBlock2D", "CrossAttnDownBlock2D"),
up_block_types=("CrossAttnUpBlock2D", "UpBlock2D"),
cross_attention_dim=32,
)
return model
@property
def dummy_cond_unet_inpaint(self):
torch.manual_seed(0)
model = UNet2DConditionModel(
block_out_channels=(32, 64), block_out_channels=(32, 64),
layers_per_block=2, layers_per_block=2,
sample_size=32, sample_size=32,
...@@ -99,25 +54,9 @@ class StableDiffusionInpaintPipelineFastTests(PipelineTesterMixin, unittest.Test ...@@ -99,25 +54,9 @@ class StableDiffusionInpaintPipelineFastTests(PipelineTesterMixin, unittest.Test
up_block_types=("CrossAttnUpBlock2D", "UpBlock2D"), up_block_types=("CrossAttnUpBlock2D", "UpBlock2D"),
cross_attention_dim=32, cross_attention_dim=32,
) )
return model scheduler = PNDMScheduler(skip_prk_steps=True)
@property
def dummy_vq_model(self):
torch.manual_seed(0)
model = VQModel(
block_out_channels=[32, 64],
in_channels=3,
out_channels=3,
down_block_types=["DownEncoderBlock2D", "DownEncoderBlock2D"],
up_block_types=["UpDecoderBlock2D", "UpDecoderBlock2D"],
latent_channels=3,
)
return model
@property
def dummy_vae(self):
torch.manual_seed(0) torch.manual_seed(0)
model = AutoencoderKL( vae = AutoencoderKL(
block_out_channels=[32, 64], block_out_channels=[32, 64],
in_channels=3, in_channels=3,
out_channels=3, out_channels=3,
...@@ -125,12 +64,8 @@ class StableDiffusionInpaintPipelineFastTests(PipelineTesterMixin, unittest.Test ...@@ -125,12 +64,8 @@ class StableDiffusionInpaintPipelineFastTests(PipelineTesterMixin, unittest.Test
up_block_types=["UpDecoderBlock2D", "UpDecoderBlock2D"], up_block_types=["UpDecoderBlock2D", "UpDecoderBlock2D"],
latent_channels=4, latent_channels=4,
) )
return model
@property
def dummy_text_encoder(self):
torch.manual_seed(0) torch.manual_seed(0)
config = CLIPTextConfig( text_encoder_config = CLIPTextConfig(
bos_token_id=0, bos_token_id=0,
eos_token_id=2, eos_token_id=2,
hidden_size=32, hidden_size=32,
...@@ -141,226 +76,89 @@ class StableDiffusionInpaintPipelineFastTests(PipelineTesterMixin, unittest.Test ...@@ -141,226 +76,89 @@ class StableDiffusionInpaintPipelineFastTests(PipelineTesterMixin, unittest.Test
pad_token_id=1, pad_token_id=1,
vocab_size=1000, vocab_size=1000,
) )
return CLIPTextModel(config) text_encoder = CLIPTextModel(text_encoder_config)
@property
def dummy_extractor(self):
def extract(*args, **kwargs):
class Out:
def __init__(self):
self.pixel_values = torch.ones([0])
def to(self, device):
self.pixel_values.to(device)
return self
return Out()
return extract
def test_stable_diffusion_inpaint(self):
device = "cpu" # ensure determinism for the device-dependent torch.Generator
unet = self.dummy_cond_unet_inpaint
scheduler = PNDMScheduler(skip_prk_steps=True)
vae = self.dummy_vae
bert = self.dummy_text_encoder
tokenizer = CLIPTokenizer.from_pretrained("hf-internal-testing/tiny-random-clip") tokenizer = CLIPTokenizer.from_pretrained("hf-internal-testing/tiny-random-clip")
image = self.dummy_image.cpu().permute(0, 2, 3, 1)[0] components = {
"unet": unet,
"scheduler": scheduler,
"vae": vae,
"text_encoder": text_encoder,
"tokenizer": tokenizer,
"safety_checker": None,
"feature_extractor": None,
}
return components
def get_dummy_inputs(self, device, seed=0):
# TODO: use tensor inputs instead of PIL, this is here just to leave the old expected_slices untouched
image = floats_tensor((1, 3, 32, 32), rng=random.Random(seed)).to(device)
image = image.cpu().permute(0, 2, 3, 1)[0]
init_image = Image.fromarray(np.uint8(image)).convert("RGB").resize((64, 64)) init_image = Image.fromarray(np.uint8(image)).convert("RGB").resize((64, 64))
mask_image = Image.fromarray(np.uint8(image + 4)).convert("RGB").resize((64, 64)) mask_image = Image.fromarray(np.uint8(image + 4)).convert("RGB").resize((64, 64))
if str(device).startswith("mps"):
generator = torch.manual_seed(seed)
else:
generator = torch.Generator(device=device).manual_seed(seed)
inputs = {
"prompt": "A painting of a squirrel eating a burger",
"image": init_image,
"mask_image": mask_image,
"generator": generator,
"num_inference_steps": 2,
"guidance_scale": 6.0,
"output_type": "numpy",
}
return inputs
# make sure here that pndm scheduler skips prk def test_stable_diffusion_inpaint(self):
sd_pipe = StableDiffusionInpaintPipeline( device = "cpu" # ensure determinism for the device-dependent torch.Generator
unet=unet, components = self.get_dummy_components()
scheduler=scheduler, sd_pipe = StableDiffusionInpaintPipeline(**components)
vae=vae,
text_encoder=bert,
tokenizer=tokenizer,
safety_checker=None,
feature_extractor=None,
)
sd_pipe = sd_pipe.to(device) sd_pipe = sd_pipe.to(device)
sd_pipe.set_progress_bar_config(disable=None) sd_pipe.set_progress_bar_config(disable=None)
prompt = "A painting of a squirrel eating a burger" inputs = self.get_dummy_inputs(device)
generator = torch.Generator(device=device).manual_seed(0) image = sd_pipe(**inputs).images
output = sd_pipe(
[prompt],
generator=generator,
guidance_scale=6.0,
num_inference_steps=2,
output_type="np",
image=init_image,
mask_image=mask_image,
)
image = output.images
generator = torch.Generator(device=device).manual_seed(0)
image_from_tuple = sd_pipe(
[prompt],
generator=generator,
guidance_scale=6.0,
num_inference_steps=2,
output_type="np",
image=init_image,
mask_image=mask_image,
return_dict=False,
)[0]
image_slice = image[0, -3:, -3:, -1] image_slice = image[0, -3:, -3:, -1]
image_from_tuple_slice = image_from_tuple[0, -3:, -3:, -1]
assert image.shape == (1, 64, 64, 3) assert image.shape == (1, 64, 64, 3)
expected_slice = np.array([0.4723, 0.5731, 0.3939, 0.5441, 0.5922, 0.4392, 0.5059, 0.4651, 0.4474]) expected_slice = np.array([0.4723, 0.5731, 0.3939, 0.5441, 0.5922, 0.4392, 0.5059, 0.4651, 0.4474])
assert np.abs(image_slice.flatten() - expected_slice).max() < 1e-2 assert np.abs(image_slice.flatten() - expected_slice).max() < 1e-2
assert np.abs(image_from_tuple_slice.flatten() - expected_slice).max() < 1e-2
def test_stable_diffusion_inpaint_image_tensor(self): def test_stable_diffusion_inpaint_image_tensor(self):
device = "cpu" # ensure determinism for the device-dependent torch.Generator device = "cpu" # ensure determinism for the device-dependent torch.Generator
unet = self.dummy_cond_unet_inpaint components = self.get_dummy_components()
scheduler = PNDMScheduler(skip_prk_steps=True) sd_pipe = StableDiffusionInpaintPipeline(**components)
vae = self.dummy_vae
bert = self.dummy_text_encoder
tokenizer = CLIPTokenizer.from_pretrained("hf-internal-testing/tiny-random-clip")
image = self.dummy_image.repeat(1, 1, 2, 2)
mask_image = image / 2
# make sure here that pndm scheduler skips prk
sd_pipe = StableDiffusionInpaintPipeline(
unet=unet,
scheduler=scheduler,
vae=vae,
text_encoder=bert,
tokenizer=tokenizer,
safety_checker=None,
feature_extractor=None,
)
sd_pipe = sd_pipe.to(device) sd_pipe = sd_pipe.to(device)
sd_pipe.set_progress_bar_config(disable=None) sd_pipe.set_progress_bar_config(disable=None)
prompt = "A painting of a squirrel eating a burger" inputs = self.get_dummy_inputs(device)
generator = torch.Generator(device=device).manual_seed(0) output = sd_pipe(**inputs)
output = sd_pipe( out_pil = output.images
[prompt],
generator=generator,
guidance_scale=6.0,
num_inference_steps=2,
output_type="np",
image=image,
mask_image=mask_image[:, 0],
)
out_1 = output.images
image = image.cpu().permute(0, 2, 3, 1)[0] inputs = self.get_dummy_inputs(device)
mask_image = mask_image.cpu().permute(0, 2, 3, 1)[0] inputs["image"] = torch.tensor(np.array(inputs["image"]) / 127.5 - 1).permute(2, 0, 1).unsqueeze(0)
inputs["mask_image"] = torch.tensor(np.array(inputs["mask_image"]) / 255).permute(2, 0, 1)[:1].unsqueeze(0)
output = sd_pipe(**inputs)
out_tensor = output.images
image = Image.fromarray(np.uint8(image)).convert("RGB") assert out_pil.shape == (1, 64, 64, 3)
mask_image = Image.fromarray(np.uint8(mask_image)).convert("RGB") assert np.abs(out_pil.flatten() - out_tensor.flatten()).max() < 5e-2
generator = torch.Generator(device=device).manual_seed(0)
output = sd_pipe(
[prompt],
generator=generator,
guidance_scale=6.0,
num_inference_steps=2,
output_type="np",
image=image,
mask_image=mask_image,
)
out_2 = output.images
assert out_1.shape == (1, 64, 64, 3)
assert np.abs(out_1.flatten() - out_2.flatten()).max() < 5e-2
def test_stable_diffusion_inpaint_with_num_images_per_prompt(self): def test_stable_diffusion_inpaint_with_num_images_per_prompt(self):
device = "cpu" device = "cpu"
unet = self.dummy_cond_unet_inpaint components = self.get_dummy_components()
scheduler = PNDMScheduler(skip_prk_steps=True) sd_pipe = StableDiffusionInpaintPipeline(**components)
vae = self.dummy_vae
bert = self.dummy_text_encoder
tokenizer = CLIPTokenizer.from_pretrained("hf-internal-testing/tiny-random-clip")
image = self.dummy_image.cpu().permute(0, 2, 3, 1)[0]
init_image = Image.fromarray(np.uint8(image)).convert("RGB").resize((64, 64))
mask_image = Image.fromarray(np.uint8(image + 4)).convert("RGB").resize((64, 64))
# make sure here that pndm scheduler skips prk
sd_pipe = StableDiffusionInpaintPipeline(
unet=unet,
scheduler=scheduler,
vae=vae,
text_encoder=bert,
tokenizer=tokenizer,
safety_checker=None,
feature_extractor=None,
)
sd_pipe = sd_pipe.to(device) sd_pipe = sd_pipe.to(device)
sd_pipe.set_progress_bar_config(disable=None) sd_pipe.set_progress_bar_config(disable=None)
prompt = "A painting of a squirrel eating a burger" inputs = self.get_dummy_inputs(device)
generator = torch.Generator(device=device).manual_seed(0) images = sd_pipe(**inputs, num_images_per_prompt=2).images
images = sd_pipe(
[prompt],
generator=generator,
guidance_scale=6.0,
num_inference_steps=2,
output_type="np",
image=init_image,
mask_image=mask_image,
num_images_per_prompt=2,
).images
# check if the output is a list of 2 images # check if the output is a list of 2 images
assert len(images) == 2 assert len(images) == 2
@unittest.skipIf(torch_device != "cuda", "This test requires a GPU")
def test_stable_diffusion_inpaint_fp16(self):
"""Test that stable diffusion inpaint_legacy works with fp16"""
unet = self.dummy_cond_unet_inpaint
scheduler = PNDMScheduler(skip_prk_steps=True)
vae = self.dummy_vae
bert = self.dummy_text_encoder
tokenizer = CLIPTokenizer.from_pretrained("hf-internal-testing/tiny-random-clip")
image = self.dummy_image.cpu().permute(0, 2, 3, 1)[0]
init_image = Image.fromarray(np.uint8(image)).convert("RGB").resize((64, 64))
mask_image = Image.fromarray(np.uint8(image + 4)).convert("RGB").resize((64, 64))
# put models in fp16
unet = unet.half()
vae = vae.half()
bert = bert.half()
# make sure here that pndm scheduler skips prk
sd_pipe = StableDiffusionInpaintPipeline(
unet=unet,
scheduler=scheduler,
vae=vae,
text_encoder=bert,
tokenizer=tokenizer,
safety_checker=None,
feature_extractor=None,
)
sd_pipe = sd_pipe.to(torch_device)
sd_pipe.set_progress_bar_config(disable=None)
prompt = "A painting of a squirrel eating a burger"
generator = torch.Generator(device=torch_device).manual_seed(0)
image = sd_pipe(
[prompt],
generator=generator,
num_inference_steps=2,
output_type="np",
image=init_image,
mask_image=mask_image,
).images
assert image.shape == (1, 64, 64, 3)
@slow @slow
@require_torch_gpu @require_torch_gpu
......
...@@ -35,13 +35,11 @@ from diffusers.utils.testing_utils import load_numpy, require_torch_gpu ...@@ -35,13 +35,11 @@ from diffusers.utils.testing_utils import load_numpy, require_torch_gpu
from PIL import Image from PIL import Image
from transformers import CLIPTextConfig, CLIPTextModel, CLIPTokenizer from transformers import CLIPTextConfig, CLIPTextModel, CLIPTokenizer
from ...test_pipelines_common import PipelineTesterMixin
torch.backends.cuda.matmul.allow_tf32 = False torch.backends.cuda.matmul.allow_tf32 = False
class StableDiffusionInpaintLegacyPipelineFastTests(PipelineTesterMixin, unittest.TestCase): class StableDiffusionInpaintLegacyPipelineFastTests(unittest.TestCase):
def tearDown(self): def tearDown(self):
# clean up the VRAM after each test # clean up the VRAM after each test
super().tearDown() super().tearDown()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment