Unverified Commit a0c54828 authored by Dhruv Nair's avatar Dhruv Nair Committed by GitHub
Browse files

Deprecate Pipelines (#6169)



* deprecate pipe

* make style

* update

* add deprecation message

* format

* remove tests for deprecated pipelines

* remove deprecation message

* make style

* fix copies

* clean up

* clean

* clean

* clean

* clean up

* clean up

* clean up toctree

* clean up

---------
Co-authored-by: default avatarPatrick von Platen <patrick.v.platen@gmail.com>
parent 8d891e6e
# coding=utf-8
# Copyright 2023 HuggingFace Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import gc
import random
import unittest
import numpy as np
import torch
from transformers import XLMRobertaTokenizer
from diffusers import (
AltDiffusionImg2ImgPipeline,
AutoencoderKL,
PNDMScheduler,
UNet2DConditionModel,
)
from diffusers.image_processor import VaeImageProcessor
from diffusers.pipelines.alt_diffusion.modeling_roberta_series import (
RobertaSeriesConfig,
RobertaSeriesModelWithTransformation,
)
from diffusers.utils import load_image
from diffusers.utils.testing_utils import (
enable_full_determinism,
floats_tensor,
load_numpy,
nightly,
require_torch_gpu,
torch_device,
)
enable_full_determinism()
class AltDiffusionImg2ImgPipelineFastTests(unittest.TestCase):
def tearDown(self):
# clean up the VRAM after each test
super().tearDown()
gc.collect()
torch.cuda.empty_cache()
@property
def dummy_image(self):
batch_size = 1
num_channels = 3
sizes = (32, 32)
image = floats_tensor((batch_size, num_channels) + sizes, rng=random.Random(0)).to(torch_device)
return image
@property
def dummy_cond_unet(self):
torch.manual_seed(0)
model = UNet2DConditionModel(
block_out_channels=(32, 64),
layers_per_block=2,
sample_size=32,
in_channels=4,
out_channels=4,
down_block_types=("DownBlock2D", "CrossAttnDownBlock2D"),
up_block_types=("CrossAttnUpBlock2D", "UpBlock2D"),
cross_attention_dim=32,
)
return model
@property
def dummy_vae(self):
torch.manual_seed(0)
model = AutoencoderKL(
block_out_channels=[32, 64],
in_channels=3,
out_channels=3,
down_block_types=["DownEncoderBlock2D", "DownEncoderBlock2D"],
up_block_types=["UpDecoderBlock2D", "UpDecoderBlock2D"],
latent_channels=4,
)
return model
@property
def dummy_text_encoder(self):
torch.manual_seed(0)
config = RobertaSeriesConfig(
hidden_size=32,
project_dim=32,
intermediate_size=37,
layer_norm_eps=1e-05,
num_attention_heads=4,
num_hidden_layers=5,
pad_token_id=1,
vocab_size=5006,
)
return RobertaSeriesModelWithTransformation(config)
@property
def dummy_extractor(self):
def extract(*args, **kwargs):
class Out:
def __init__(self):
self.pixel_values = torch.ones([0])
def to(self, device):
self.pixel_values.to(device)
return self
return Out()
return extract
def test_stable_diffusion_img2img_default_case(self):
device = "cpu" # ensure determinism for the device-dependent torch.Generator
unet = self.dummy_cond_unet
scheduler = PNDMScheduler(skip_prk_steps=True)
vae = self.dummy_vae
bert = self.dummy_text_encoder
tokenizer = XLMRobertaTokenizer.from_pretrained("hf-internal-testing/tiny-xlm-roberta")
tokenizer.model_max_length = 77
init_image = self.dummy_image.to(device)
init_image = init_image / 2 + 0.5
# make sure here that pndm scheduler skips prk
alt_pipe = AltDiffusionImg2ImgPipeline(
unet=unet,
scheduler=scheduler,
vae=vae,
text_encoder=bert,
tokenizer=tokenizer,
safety_checker=None,
feature_extractor=self.dummy_extractor,
image_encoder=None,
)
alt_pipe.image_processor = VaeImageProcessor(vae_scale_factor=alt_pipe.vae_scale_factor, do_normalize=True)
alt_pipe = alt_pipe.to(device)
alt_pipe.set_progress_bar_config(disable=None)
prompt = "A painting of a squirrel eating a burger"
generator = torch.Generator(device=device).manual_seed(0)
output = alt_pipe(
[prompt],
generator=generator,
guidance_scale=6.0,
num_inference_steps=2,
output_type="np",
image=init_image,
)
image = output.images
generator = torch.Generator(device=device).manual_seed(0)
image_from_tuple = alt_pipe(
[prompt],
generator=generator,
guidance_scale=6.0,
num_inference_steps=2,
output_type="np",
image=init_image,
return_dict=False,
)[0]
image_slice = image[0, -3:, -3:, -1]
image_from_tuple_slice = image_from_tuple[0, -3:, -3:, -1]
assert image.shape == (1, 32, 32, 3)
expected_slice = np.array([0.4427, 0.3731, 0.4249, 0.4941, 0.4546, 0.4148, 0.4193, 0.4666, 0.4499])
assert np.abs(image_slice.flatten() - expected_slice).max() < 5e-3
assert np.abs(image_from_tuple_slice.flatten() - expected_slice).max() < 5e-3
@unittest.skipIf(torch_device != "cuda", "This test requires a GPU")
def test_stable_diffusion_img2img_fp16(self):
"""Test that stable diffusion img2img works with fp16"""
unet = self.dummy_cond_unet
scheduler = PNDMScheduler(skip_prk_steps=True)
vae = self.dummy_vae
bert = self.dummy_text_encoder
tokenizer = XLMRobertaTokenizer.from_pretrained("hf-internal-testing/tiny-xlm-roberta")
tokenizer.model_max_length = 77
init_image = self.dummy_image.to(torch_device)
# put models in fp16
unet = unet.half()
vae = vae.half()
bert = bert.half()
# make sure here that pndm scheduler skips prk
alt_pipe = AltDiffusionImg2ImgPipeline(
unet=unet,
scheduler=scheduler,
vae=vae,
text_encoder=bert,
tokenizer=tokenizer,
safety_checker=None,
feature_extractor=self.dummy_extractor,
image_encoder=None,
)
alt_pipe.image_processor = VaeImageProcessor(vae_scale_factor=alt_pipe.vae_scale_factor, do_normalize=False)
alt_pipe = alt_pipe.to(torch_device)
alt_pipe.set_progress_bar_config(disable=None)
prompt = "A painting of a squirrel eating a burger"
generator = torch.manual_seed(0)
image = alt_pipe(
[prompt],
generator=generator,
num_inference_steps=2,
output_type="np",
image=init_image,
).images
assert image.shape == (1, 32, 32, 3)
@unittest.skipIf(torch_device != "cuda", "This test requires a GPU")
def test_stable_diffusion_img2img_pipeline_multiple_of_8(self):
init_image = load_image(
"https://huggingface.co/datasets/hf-internal-testing/diffusers-images/resolve/main"
"/img2img/sketch-mountains-input.jpg"
)
# resize to resolution that is divisible by 8 but not 16 or 32
init_image = init_image.resize((760, 504))
model_id = "BAAI/AltDiffusion"
pipe = AltDiffusionImg2ImgPipeline.from_pretrained(
model_id,
safety_checker=None,
)
pipe.to(torch_device)
pipe.set_progress_bar_config(disable=None)
pipe.enable_attention_slicing()
prompt = "A fantasy landscape, trending on artstation"
generator = torch.manual_seed(0)
output = pipe(
prompt=prompt,
image=init_image,
strength=0.75,
guidance_scale=7.5,
generator=generator,
output_type="np",
)
image = output.images[0]
image_slice = image[255:258, 383:386, -1]
assert image.shape == (504, 760, 3)
expected_slice = np.array([0.9358, 0.9397, 0.9599, 0.9901, 1.0000, 1.0000, 0.9882, 1.0000, 1.0000])
assert np.abs(image_slice.flatten() - expected_slice).max() < 1e-2
@nightly
@require_torch_gpu
class AltDiffusionImg2ImgPipelineIntegrationTests(unittest.TestCase):
def tearDown(self):
# clean up the VRAM after each test
super().tearDown()
gc.collect()
torch.cuda.empty_cache()
def test_stable_diffusion_img2img_pipeline_default(self):
init_image = load_image(
"https://huggingface.co/datasets/hf-internal-testing/diffusers-images/resolve/main"
"/img2img/sketch-mountains-input.jpg"
)
init_image = init_image.resize((768, 512))
expected_image = load_numpy(
"https://huggingface.co/datasets/hf-internal-testing/diffusers-images/resolve/main/img2img/fantasy_landscape_alt.npy"
)
model_id = "BAAI/AltDiffusion"
pipe = AltDiffusionImg2ImgPipeline.from_pretrained(
model_id,
safety_checker=None,
)
pipe.to(torch_device)
pipe.set_progress_bar_config(disable=None)
pipe.enable_attention_slicing()
prompt = "A fantasy landscape, trending on artstation"
generator = torch.manual_seed(0)
output = pipe(
prompt=prompt,
image=init_image,
strength=0.75,
guidance_scale=7.5,
generator=generator,
output_type="np",
)
image = output.images[0]
assert image.shape == (512, 768, 3)
# img2img is flaky across GPUs even in fp32, so using MAE here
assert np.abs(expected_image - image).max() < 1e-2
# coding=utf-8
# Copyright 2023 HuggingFace Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import gc
import unittest
import numpy as np
import torch
from diffusers import (
AudioDiffusionPipeline,
AutoencoderKL,
DDIMScheduler,
DDPMScheduler,
DiffusionPipeline,
Mel,
UNet2DConditionModel,
UNet2DModel,
)
from diffusers.utils.testing_utils import enable_full_determinism, nightly, require_torch_gpu, torch_device
enable_full_determinism()
class PipelineFastTests(unittest.TestCase):
def tearDown(self):
# clean up the VRAM after each test
super().tearDown()
gc.collect()
torch.cuda.empty_cache()
@property
def dummy_unet(self):
torch.manual_seed(0)
model = UNet2DModel(
sample_size=(32, 64),
in_channels=1,
out_channels=1,
layers_per_block=2,
block_out_channels=(128, 128),
down_block_types=("AttnDownBlock2D", "DownBlock2D"),
up_block_types=("UpBlock2D", "AttnUpBlock2D"),
)
return model
@property
def dummy_unet_condition(self):
torch.manual_seed(0)
model = UNet2DConditionModel(
sample_size=(64, 32),
in_channels=1,
out_channels=1,
layers_per_block=2,
block_out_channels=(128, 128),
down_block_types=("CrossAttnDownBlock2D", "DownBlock2D"),
up_block_types=("UpBlock2D", "CrossAttnUpBlock2D"),
cross_attention_dim=10,
)
return model
@property
def dummy_vqvae_and_unet(self):
torch.manual_seed(0)
vqvae = AutoencoderKL(
sample_size=(128, 64),
in_channels=1,
out_channels=1,
latent_channels=1,
layers_per_block=2,
block_out_channels=(128, 128),
down_block_types=("DownEncoderBlock2D", "DownEncoderBlock2D"),
up_block_types=("UpDecoderBlock2D", "UpDecoderBlock2D"),
)
unet = UNet2DModel(
sample_size=(64, 32),
in_channels=1,
out_channels=1,
layers_per_block=2,
block_out_channels=(128, 128),
down_block_types=("AttnDownBlock2D", "DownBlock2D"),
up_block_types=("UpBlock2D", "AttnUpBlock2D"),
)
return vqvae, unet
@nightly
def test_audio_diffusion(self):
device = "cpu" # ensure determinism for the device-dependent torch.Generator
mel = Mel(
x_res=self.dummy_unet.config.sample_size[1],
y_res=self.dummy_unet.config.sample_size[0],
)
scheduler = DDPMScheduler()
pipe = AudioDiffusionPipeline(vqvae=None, unet=self.dummy_unet, mel=mel, scheduler=scheduler)
pipe = pipe.to(device)
pipe.set_progress_bar_config(disable=None)
generator = torch.Generator(device=device).manual_seed(42)
output = pipe(generator=generator, steps=4)
audio = output.audios[0]
image = output.images[0]
generator = torch.Generator(device=device).manual_seed(42)
output = pipe(generator=generator, steps=4, return_dict=False)
image_from_tuple = output[0][0]
assert audio.shape == (1, (self.dummy_unet.config.sample_size[1] - 1) * mel.hop_length)
assert (
image.height == self.dummy_unet.config.sample_size[0]
and image.width == self.dummy_unet.config.sample_size[1]
)
image_slice = np.frombuffer(image.tobytes(), dtype="uint8")[:10]
image_from_tuple_slice = np.frombuffer(image_from_tuple.tobytes(), dtype="uint8")[:10]
expected_slice = np.array([69, 255, 255, 255, 0, 0, 77, 181, 12, 127])
assert np.abs(image_slice.flatten() - expected_slice).max() == 0
assert np.abs(image_from_tuple_slice.flatten() - expected_slice).max() == 0
mel = Mel(
x_res=self.dummy_vqvae_and_unet[0].config.sample_size[1],
y_res=self.dummy_vqvae_and_unet[0].config.sample_size[0],
)
scheduler = DDIMScheduler()
dummy_vqvae_and_unet = self.dummy_vqvae_and_unet
pipe = AudioDiffusionPipeline(
vqvae=self.dummy_vqvae_and_unet[0], unet=dummy_vqvae_and_unet[1], mel=mel, scheduler=scheduler
)
pipe = pipe.to(device)
pipe.set_progress_bar_config(disable=None)
np.random.seed(0)
raw_audio = np.random.uniform(-1, 1, ((dummy_vqvae_and_unet[0].config.sample_size[1] - 1) * mel.hop_length,))
generator = torch.Generator(device=device).manual_seed(42)
output = pipe(raw_audio=raw_audio, generator=generator, start_step=5, steps=10)
image = output.images[0]
assert (
image.height == self.dummy_vqvae_and_unet[0].config.sample_size[0]
and image.width == self.dummy_vqvae_and_unet[0].config.sample_size[1]
)
image_slice = np.frombuffer(image.tobytes(), dtype="uint8")[:10]
expected_slice = np.array([120, 117, 110, 109, 138, 167, 138, 148, 132, 121])
assert np.abs(image_slice.flatten() - expected_slice).max() == 0
dummy_unet_condition = self.dummy_unet_condition
pipe = AudioDiffusionPipeline(
vqvae=self.dummy_vqvae_and_unet[0], unet=dummy_unet_condition, mel=mel, scheduler=scheduler
)
pipe = pipe.to(device)
pipe.set_progress_bar_config(disable=None)
np.random.seed(0)
encoding = torch.rand((1, 1, 10))
output = pipe(generator=generator, encoding=encoding)
image = output.images[0]
image_slice = np.frombuffer(image.tobytes(), dtype="uint8")[:10]
expected_slice = np.array([107, 103, 120, 127, 142, 122, 113, 122, 97, 111])
assert np.abs(image_slice.flatten() - expected_slice).max() == 0
@nightly
@require_torch_gpu
class PipelineIntegrationTests(unittest.TestCase):
def tearDown(self):
# clean up the VRAM after each test
super().tearDown()
gc.collect()
torch.cuda.empty_cache()
def test_audio_diffusion(self):
device = torch_device
pipe = DiffusionPipeline.from_pretrained("teticio/audio-diffusion-ddim-256")
pipe = pipe.to(device)
pipe.set_progress_bar_config(disable=None)
generator = torch.Generator(device=device).manual_seed(42)
output = pipe(generator=generator)
audio = output.audios[0]
image = output.images[0]
assert audio.shape == (1, (pipe.unet.config.sample_size[1] - 1) * pipe.mel.hop_length)
assert image.height == pipe.unet.config.sample_size[0] and image.width == pipe.unet.config.sample_size[1]
image_slice = np.frombuffer(image.tobytes(), dtype="uint8")[:10]
expected_slice = np.array([151, 167, 154, 144, 122, 134, 121, 105, 70, 26])
assert np.abs(image_slice.flatten() - expected_slice).max() == 0
# coding=utf-8
# Copyright 2023 HuggingFace Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
import numpy as np
import torch
from transformers import CLIPTextConfig, CLIPTextModel
from diffusers import DDIMScheduler, LDMPipeline, UNet2DModel, VQModel
from diffusers.utils.testing_utils import enable_full_determinism, nightly, require_torch, torch_device
enable_full_determinism()
class LDMPipelineFastTests(unittest.TestCase):
@property
def dummy_uncond_unet(self):
torch.manual_seed(0)
model = UNet2DModel(
block_out_channels=(32, 64),
layers_per_block=2,
sample_size=32,
in_channels=3,
out_channels=3,
down_block_types=("DownBlock2D", "AttnDownBlock2D"),
up_block_types=("AttnUpBlock2D", "UpBlock2D"),
)
return model
@property
def dummy_vq_model(self):
torch.manual_seed(0)
model = VQModel(
block_out_channels=[32, 64],
in_channels=3,
out_channels=3,
down_block_types=["DownEncoderBlock2D", "DownEncoderBlock2D"],
up_block_types=["UpDecoderBlock2D", "UpDecoderBlock2D"],
latent_channels=3,
)
return model
@property
def dummy_text_encoder(self):
torch.manual_seed(0)
config = CLIPTextConfig(
bos_token_id=0,
eos_token_id=2,
hidden_size=32,
intermediate_size=37,
layer_norm_eps=1e-05,
num_attention_heads=4,
num_hidden_layers=5,
pad_token_id=1,
vocab_size=1000,
)
return CLIPTextModel(config)
def test_inference_uncond(self):
unet = self.dummy_uncond_unet
scheduler = DDIMScheduler()
vae = self.dummy_vq_model
ldm = LDMPipeline(unet=unet, vqvae=vae, scheduler=scheduler)
ldm.to(torch_device)
ldm.set_progress_bar_config(disable=None)
generator = torch.manual_seed(0)
image = ldm(generator=generator, num_inference_steps=2, output_type="numpy").images
generator = torch.manual_seed(0)
image_from_tuple = ldm(generator=generator, num_inference_steps=2, output_type="numpy", return_dict=False)[0]
image_slice = image[0, -3:, -3:, -1]
image_from_tuple_slice = image_from_tuple[0, -3:, -3:, -1]
assert image.shape == (1, 64, 64, 3)
expected_slice = np.array([0.8512, 0.818, 0.6411, 0.6808, 0.4465, 0.5618, 0.46, 0.6231, 0.5172])
tolerance = 1e-2 if torch_device != "mps" else 3e-2
assert np.abs(image_slice.flatten() - expected_slice).max() < tolerance
assert np.abs(image_from_tuple_slice.flatten() - expected_slice).max() < tolerance
@nightly
@require_torch
class LDMPipelineIntegrationTests(unittest.TestCase):
def test_inference_uncond(self):
ldm = LDMPipeline.from_pretrained("CompVis/ldm-celebahq-256")
ldm.to(torch_device)
ldm.set_progress_bar_config(disable=None)
generator = torch.manual_seed(0)
image = ldm(generator=generator, num_inference_steps=5, output_type="numpy").images
image_slice = image[0, -3:, -3:, -1]
assert image.shape == (1, 256, 256, 3)
expected_slice = np.array([0.4399, 0.44975, 0.46825, 0.474, 0.4359, 0.4581, 0.45095, 0.4341, 0.4447])
tolerance = 1e-2 if torch_device != "mps" else 3e-2
assert np.abs(image_slice.flatten() - expected_slice).max() < tolerance
# coding=utf-8
# Copyright 2023 HuggingFace Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import gc
import unittest
import numpy as np
import torch
from diffusers import RePaintPipeline, RePaintScheduler, UNet2DModel
from diffusers.utils.testing_utils import (
enable_full_determinism,
load_image,
load_numpy,
nightly,
require_torch_gpu,
skip_mps,
torch_device,
)
from ..pipeline_params import IMAGE_INPAINTING_BATCH_PARAMS, IMAGE_INPAINTING_PARAMS
from ..test_pipelines_common import PipelineTesterMixin
enable_full_determinism()
class RepaintPipelineFastTests(PipelineTesterMixin, unittest.TestCase):
pipeline_class = RePaintPipeline
params = IMAGE_INPAINTING_PARAMS - {"width", "height", "guidance_scale"}
required_optional_params = PipelineTesterMixin.required_optional_params - {
"latents",
"num_images_per_prompt",
"callback",
"callback_steps",
}
batch_params = IMAGE_INPAINTING_BATCH_PARAMS
def get_dummy_components(self):
torch.manual_seed(0)
torch.manual_seed(0)
unet = UNet2DModel(
block_out_channels=(32, 64),
layers_per_block=2,
sample_size=32,
in_channels=3,
out_channels=3,
down_block_types=("DownBlock2D", "AttnDownBlock2D"),
up_block_types=("AttnUpBlock2D", "UpBlock2D"),
)
scheduler = RePaintScheduler()
components = {"unet": unet, "scheduler": scheduler}
return components
def get_dummy_inputs(self, device, seed=0):
if str(device).startswith("mps"):
generator = torch.manual_seed(seed)
else:
generator = torch.Generator(device=device).manual_seed(seed)
image = np.random.RandomState(seed).standard_normal((1, 3, 32, 32))
image = torch.from_numpy(image).to(device=device, dtype=torch.float32)
mask = (image > 0).to(device=device, dtype=torch.float32)
inputs = {
"image": image,
"mask_image": mask,
"generator": generator,
"num_inference_steps": 5,
"eta": 0.0,
"jump_length": 2,
"jump_n_sample": 2,
"output_type": "numpy",
}
return inputs
def test_repaint(self):
device = "cpu" # ensure determinism for the device-dependent torch.Generator
components = self.get_dummy_components()
sd_pipe = RePaintPipeline(**components)
sd_pipe = sd_pipe.to(device)
sd_pipe.set_progress_bar_config(disable=None)
inputs = self.get_dummy_inputs(device)
image = sd_pipe(**inputs).images
image_slice = image[0, -3:, -3:, -1]
assert image.shape == (1, 32, 32, 3)
expected_slice = np.array([1.0000, 0.5426, 0.5497, 0.2200, 1.0000, 1.0000, 0.5623, 1.0000, 0.6274])
assert np.abs(image_slice.flatten() - expected_slice).max() < 1e-3
@skip_mps
def test_save_load_local(self):
return super().test_save_load_local()
# RePaint can hardly be made deterministic since the scheduler is currently always
# nondeterministic
@unittest.skip("non-deterministic pipeline")
def test_inference_batch_single_identical(self):
return super().test_inference_batch_single_identical()
@skip_mps
def test_dict_tuple_outputs_equivalent(self):
return super().test_dict_tuple_outputs_equivalent()
@skip_mps
def test_save_load_optional_components(self):
return super().test_save_load_optional_components()
@skip_mps
def test_attention_slicing_forward_pass(self):
return super().test_attention_slicing_forward_pass()
@nightly
@require_torch_gpu
class RepaintPipelineNightlyTests(unittest.TestCase):
def tearDown(self):
super().tearDown()
gc.collect()
torch.cuda.empty_cache()
def test_celebahq(self):
original_image = load_image(
"https://huggingface.co/datasets/hf-internal-testing/diffusers-images/resolve/main/"
"repaint/celeba_hq_256.png"
)
mask_image = load_image(
"https://huggingface.co/datasets/hf-internal-testing/diffusers-images/resolve/main/repaint/mask_256.png"
)
expected_image = load_numpy(
"https://huggingface.co/datasets/hf-internal-testing/diffusers-images/resolve/main/"
"repaint/celeba_hq_256_result.npy"
)
model_id = "google/ddpm-ema-celebahq-256"
unet = UNet2DModel.from_pretrained(model_id)
scheduler = RePaintScheduler.from_pretrained(model_id)
repaint = RePaintPipeline(unet=unet, scheduler=scheduler).to(torch_device)
repaint.set_progress_bar_config(disable=None)
repaint.enable_attention_slicing()
generator = torch.manual_seed(0)
output = repaint(
original_image,
mask_image,
num_inference_steps=250,
eta=0.0,
jump_length=10,
jump_n_sample=10,
generator=generator,
output_type="np",
)
image = output.images[0]
assert image.shape == (256, 256, 3)
assert np.abs(expected_image - image).mean() < 1e-2
# coding=utf-8
# Copyright 2023 HuggingFace Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
import numpy as np
import torch
from diffusers import ScoreSdeVePipeline, ScoreSdeVeScheduler, UNet2DModel
from diffusers.utils.testing_utils import enable_full_determinism, nightly, require_torch, torch_device
enable_full_determinism()
class ScoreSdeVeipelineFastTests(unittest.TestCase):
@property
def dummy_uncond_unet(self):
torch.manual_seed(0)
model = UNet2DModel(
block_out_channels=(32, 64),
layers_per_block=2,
sample_size=32,
in_channels=3,
out_channels=3,
down_block_types=("DownBlock2D", "AttnDownBlock2D"),
up_block_types=("AttnUpBlock2D", "UpBlock2D"),
)
return model
def test_inference(self):
unet = self.dummy_uncond_unet
scheduler = ScoreSdeVeScheduler()
sde_ve = ScoreSdeVePipeline(unet=unet, scheduler=scheduler)
sde_ve.to(torch_device)
sde_ve.set_progress_bar_config(disable=None)
generator = torch.manual_seed(0)
image = sde_ve(num_inference_steps=2, output_type="numpy", generator=generator).images
generator = torch.manual_seed(0)
image_from_tuple = sde_ve(num_inference_steps=2, output_type="numpy", generator=generator, return_dict=False)[
0
]
image_slice = image[0, -3:, -3:, -1]
image_from_tuple_slice = image_from_tuple[0, -3:, -3:, -1]
assert image.shape == (1, 32, 32, 3)
expected_slice = np.array([0.0, 1.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0])
assert np.abs(image_slice.flatten() - expected_slice).max() < 1e-2
assert np.abs(image_from_tuple_slice.flatten() - expected_slice).max() < 1e-2
@nightly
@require_torch
class ScoreSdeVePipelineIntegrationTests(unittest.TestCase):
def test_inference(self):
model_id = "google/ncsnpp-church-256"
model = UNet2DModel.from_pretrained(model_id)
scheduler = ScoreSdeVeScheduler.from_pretrained(model_id)
sde_ve = ScoreSdeVePipeline(unet=model, scheduler=scheduler)
sde_ve.to(torch_device)
sde_ve.set_progress_bar_config(disable=None)
generator = torch.manual_seed(0)
image = sde_ve(num_inference_steps=10, output_type="numpy", generator=generator).images
image_slice = image[0, -3:, -3:, -1]
assert image.shape == (1, 256, 256, 3)
expected_slice = np.array([0.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.0, 0.0])
assert np.abs(image_slice.flatten() - expected_slice).max() < 1e-2
# coding=utf-8
# Copyright 2022 HuggingFace Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import gc
import unittest
import numpy as np
import torch
from diffusers import DDPMScheduler, MidiProcessor, SpectrogramDiffusionPipeline
from diffusers.pipelines.spectrogram_diffusion import SpectrogramContEncoder, SpectrogramNotesEncoder, T5FilmDecoder
from diffusers.utils.testing_utils import (
enable_full_determinism,
nightly,
require_note_seq,
require_onnxruntime,
require_torch_gpu,
skip_mps,
torch_device,
)
from ..pipeline_params import TOKENS_TO_AUDIO_GENERATION_BATCH_PARAMS, TOKENS_TO_AUDIO_GENERATION_PARAMS
from ..test_pipelines_common import PipelineTesterMixin
enable_full_determinism()
MIDI_FILE = "./tests/fixtures/elise_format0.mid"
# The note-seq package throws an error on import because the default installed version of Ipython
# is not compatible with python 3.8 which we run in the CI.
# https://github.com/huggingface/diffusers/actions/runs/4830121056/jobs/8605954838#step:7:98
@unittest.skip("The note-seq package currently throws an error on import")
class SpectrogramDiffusionPipelineFastTests(PipelineTesterMixin, unittest.TestCase):
pipeline_class = SpectrogramDiffusionPipeline
required_optional_params = PipelineTesterMixin.required_optional_params - {
"callback",
"latents",
"callback_steps",
"output_type",
"num_images_per_prompt",
}
test_attention_slicing = False
batch_params = TOKENS_TO_AUDIO_GENERATION_PARAMS
params = TOKENS_TO_AUDIO_GENERATION_BATCH_PARAMS
def get_dummy_components(self):
torch.manual_seed(0)
notes_encoder = SpectrogramNotesEncoder(
max_length=2048,
vocab_size=1536,
d_model=768,
dropout_rate=0.1,
num_layers=1,
num_heads=1,
d_kv=4,
d_ff=2048,
feed_forward_proj="gated-gelu",
)
continuous_encoder = SpectrogramContEncoder(
input_dims=128,
targets_context_length=256,
d_model=768,
dropout_rate=0.1,
num_layers=1,
num_heads=1,
d_kv=4,
d_ff=2048,
feed_forward_proj="gated-gelu",
)
decoder = T5FilmDecoder(
input_dims=128,
targets_length=256,
max_decoder_noise_time=20000.0,
d_model=768,
num_layers=1,
num_heads=1,
d_kv=4,
d_ff=2048,
dropout_rate=0.1,
)
scheduler = DDPMScheduler()
components = {
"notes_encoder": notes_encoder.eval(),
"continuous_encoder": continuous_encoder.eval(),
"decoder": decoder.eval(),
"scheduler": scheduler,
"melgan": None,
}
return components
def get_dummy_inputs(self, device, seed=0):
if str(device).startswith("mps"):
generator = torch.manual_seed(seed)
else:
generator = torch.Generator(device=device).manual_seed(seed)
inputs = {
"input_tokens": [
[1134, 90, 1135, 1133, 1080, 112, 1132, 1080, 1133, 1079, 133, 1132, 1079, 1133, 1] + [0] * 2033
],
"generator": generator,
"num_inference_steps": 4,
"output_type": "mel",
}
return inputs
def test_spectrogram_diffusion(self):
device = "cpu" # ensure determinism for the device-dependent torch.Generator
components = self.get_dummy_components()
pipe = SpectrogramDiffusionPipeline(**components)
pipe = pipe.to(device)
pipe.set_progress_bar_config(disable=None)
inputs = self.get_dummy_inputs(device)
output = pipe(**inputs)
mel = output.audios
mel_slice = mel[0, -3:, -3:]
assert mel_slice.shape == (3, 3)
expected_slice = np.array(
[-11.512925, -4.788215, -0.46172905, -2.051715, -10.539147, -10.970963, -9.091634, 4.0, 4.0]
)
assert np.abs(mel_slice.flatten() - expected_slice).max() < 1e-2
@skip_mps
def test_save_load_local(self):
return super().test_save_load_local()
@skip_mps
def test_dict_tuple_outputs_equivalent(self):
return super().test_dict_tuple_outputs_equivalent()
@skip_mps
def test_save_load_optional_components(self):
return super().test_save_load_optional_components()
@skip_mps
def test_attention_slicing_forward_pass(self):
return super().test_attention_slicing_forward_pass()
def test_inference_batch_single_identical(self):
pass
def test_inference_batch_consistent(self):
pass
@skip_mps
def test_progress_bar(self):
return super().test_progress_bar()
@nightly
@require_torch_gpu
@require_onnxruntime
@require_note_seq
class PipelineIntegrationTests(unittest.TestCase):
def tearDown(self):
# clean up the VRAM after each test
super().tearDown()
gc.collect()
torch.cuda.empty_cache()
def test_callback(self):
# TODO - test that pipeline can decode tokens in a callback
# so that music can be played live
device = torch_device
pipe = SpectrogramDiffusionPipeline.from_pretrained("google/music-spectrogram-diffusion")
melgan = pipe.melgan
pipe.melgan = None
pipe = pipe.to(device)
pipe.set_progress_bar_config(disable=None)
def callback(step, mel_output):
# decode mel to audio
audio = melgan(input_features=mel_output.astype(np.float32))[0]
assert len(audio[0]) == 81920 * (step + 1)
# simulate that audio is played
return audio
processor = MidiProcessor()
input_tokens = processor(MIDI_FILE)
input_tokens = input_tokens[:3]
generator = torch.manual_seed(0)
pipe(input_tokens, num_inference_steps=5, generator=generator, callback=callback, output_type="mel")
def test_spectrogram_fast(self):
device = torch_device
pipe = SpectrogramDiffusionPipeline.from_pretrained("google/music-spectrogram-diffusion")
pipe = pipe.to(device)
pipe.set_progress_bar_config(disable=None)
processor = MidiProcessor()
input_tokens = processor(MIDI_FILE)
# just run two denoising loops
input_tokens = input_tokens[:2]
generator = torch.manual_seed(0)
output = pipe(input_tokens, num_inference_steps=2, generator=generator)
audio = output.audios[0]
assert abs(np.abs(audio).sum() - 3612.841) < 1e-1
def test_spectrogram(self):
device = torch_device
pipe = SpectrogramDiffusionPipeline.from_pretrained("google/music-spectrogram-diffusion")
pipe = pipe.to(device)
pipe.set_progress_bar_config(disable=None)
processor = MidiProcessor()
input_tokens = processor(MIDI_FILE)
# just run 4 denoising loops
input_tokens = input_tokens[:4]
generator = torch.manual_seed(0)
output = pipe(input_tokens, num_inference_steps=100, generator=generator)
audio = output.audios[0]
assert abs(np.abs(audio).sum() - 9389.1111) < 5e-2
# coding=utf-8
# Copyright 2023 HuggingFace Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import gc
import tempfile
import unittest
import numpy as np
import torch
from diffusers import VersatileDiffusionDualGuidedPipeline
from diffusers.utils.testing_utils import load_image, nightly, require_torch_gpu, torch_device
torch.backends.cuda.matmul.allow_tf32 = False
@nightly
@require_torch_gpu
class VersatileDiffusionDualGuidedPipelineIntegrationTests(unittest.TestCase):
def tearDown(self):
# clean up the VRAM after each test
super().tearDown()
gc.collect()
torch.cuda.empty_cache()
def test_remove_unused_weights_save_load(self):
pipe = VersatileDiffusionDualGuidedPipeline.from_pretrained("shi-labs/versatile-diffusion")
# remove text_unet
pipe.remove_unused_weights()
pipe.to(torch_device)
pipe.set_progress_bar_config(disable=None)
second_prompt = load_image(
"https://huggingface.co/datasets/hf-internal-testing/diffusers-images/resolve/main/versatile_diffusion/benz.jpg"
)
generator = torch.manual_seed(0)
image = pipe(
prompt="first prompt",
image=second_prompt,
text_to_image_strength=0.75,
generator=generator,
guidance_scale=7.5,
num_inference_steps=2,
output_type="numpy",
).images
with tempfile.TemporaryDirectory() as tmpdirname:
pipe.save_pretrained(tmpdirname)
pipe = VersatileDiffusionDualGuidedPipeline.from_pretrained(tmpdirname)
pipe.to(torch_device)
pipe.set_progress_bar_config(disable=None)
generator = generator.manual_seed(0)
new_image = pipe(
prompt="first prompt",
image=second_prompt,
text_to_image_strength=0.75,
generator=generator,
guidance_scale=7.5,
num_inference_steps=2,
output_type="numpy",
).images
assert np.abs(image - new_image).max() < 1e-5, "Models don't have the same forward pass"
def test_inference_dual_guided(self):
pipe = VersatileDiffusionDualGuidedPipeline.from_pretrained("shi-labs/versatile-diffusion")
pipe.remove_unused_weights()
pipe.to(torch_device)
pipe.set_progress_bar_config(disable=None)
first_prompt = "cyberpunk 2077"
second_prompt = load_image(
"https://huggingface.co/datasets/hf-internal-testing/diffusers-images/resolve/main/versatile_diffusion/benz.jpg"
)
generator = torch.manual_seed(0)
image = pipe(
prompt=first_prompt,
image=second_prompt,
text_to_image_strength=0.75,
generator=generator,
guidance_scale=7.5,
num_inference_steps=50,
output_type="numpy",
).images
image_slice = image[0, 253:256, 253:256, -1]
assert image.shape == (1, 512, 512, 3)
expected_slice = np.array([0.0787, 0.0849, 0.0826, 0.0812, 0.0807, 0.0795, 0.0818, 0.0798, 0.0779])
assert np.abs(image_slice.flatten() - expected_slice).max() < 1e-2
# coding=utf-8
# Copyright 2023 HuggingFace Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
import numpy as np
import torch
from diffusers import VersatileDiffusionImageVariationPipeline
from diffusers.utils.testing_utils import load_image, nightly, require_torch_gpu, torch_device
torch.backends.cuda.matmul.allow_tf32 = False
class VersatileDiffusionImageVariationPipelineFastTests(unittest.TestCase):
pass
@nightly
@require_torch_gpu
class VersatileDiffusionImageVariationPipelineIntegrationTests(unittest.TestCase):
def test_inference_image_variations(self):
pipe = VersatileDiffusionImageVariationPipeline.from_pretrained("shi-labs/versatile-diffusion")
pipe.to(torch_device)
pipe.set_progress_bar_config(disable=None)
image_prompt = load_image(
"https://huggingface.co/datasets/hf-internal-testing/diffusers-images/resolve/main/versatile_diffusion/benz.jpg"
)
generator = torch.manual_seed(0)
image = pipe(
image=image_prompt,
generator=generator,
guidance_scale=7.5,
num_inference_steps=50,
output_type="numpy",
).images
image_slice = image[0, 253:256, 253:256, -1]
assert image.shape == (1, 512, 512, 3)
expected_slice = np.array([0.0441, 0.0469, 0.0507, 0.0575, 0.0632, 0.0650, 0.0865, 0.0909, 0.0945])
assert np.abs(image_slice.flatten() - expected_slice).max() < 1e-2
# coding=utf-8
# Copyright 2023 HuggingFace Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import gc
import tempfile
import unittest
import numpy as np
import torch
from diffusers import VersatileDiffusionPipeline
from diffusers.utils.testing_utils import load_image, nightly, require_torch_gpu, torch_device
torch.backends.cuda.matmul.allow_tf32 = False
class VersatileDiffusionMegaPipelineFastTests(unittest.TestCase):
pass
@nightly
@require_torch_gpu
class VersatileDiffusionMegaPipelineIntegrationTests(unittest.TestCase):
def tearDown(self):
# clean up the VRAM after each test
super().tearDown()
gc.collect()
torch.cuda.empty_cache()
def test_from_save_pretrained(self):
pipe = VersatileDiffusionPipeline.from_pretrained("shi-labs/versatile-diffusion", torch_dtype=torch.float16)
pipe.to(torch_device)
pipe.set_progress_bar_config(disable=None)
prompt_image = load_image(
"https://huggingface.co/datasets/hf-internal-testing/diffusers-images/resolve/main/versatile_diffusion/benz.jpg"
)
generator = torch.manual_seed(0)
image = pipe.dual_guided(
prompt="first prompt",
image=prompt_image,
text_to_image_strength=0.75,
generator=generator,
guidance_scale=7.5,
num_inference_steps=2,
output_type="numpy",
).images
with tempfile.TemporaryDirectory() as tmpdirname:
pipe.save_pretrained(tmpdirname)
pipe = VersatileDiffusionPipeline.from_pretrained(tmpdirname, torch_dtype=torch.float16)
pipe.to(torch_device)
pipe.set_progress_bar_config(disable=None)
generator = generator.manual_seed(0)
new_image = pipe.dual_guided(
prompt="first prompt",
image=prompt_image,
text_to_image_strength=0.75,
generator=generator,
guidance_scale=7.5,
num_inference_steps=2,
output_type="numpy",
).images
assert np.abs(image - new_image).max() < 1e-5, "Models don't have the same forward pass"
def test_inference_dual_guided_then_text_to_image(self):
pipe = VersatileDiffusionPipeline.from_pretrained("shi-labs/versatile-diffusion", torch_dtype=torch.float16)
pipe.to(torch_device)
pipe.set_progress_bar_config(disable=None)
prompt = "cyberpunk 2077"
init_image = load_image(
"https://huggingface.co/datasets/hf-internal-testing/diffusers-images/resolve/main/versatile_diffusion/benz.jpg"
)
generator = torch.manual_seed(0)
image = pipe.dual_guided(
prompt=prompt,
image=init_image,
text_to_image_strength=0.75,
generator=generator,
guidance_scale=7.5,
num_inference_steps=50,
output_type="numpy",
).images
image_slice = image[0, 253:256, 253:256, -1]
assert image.shape == (1, 512, 512, 3)
expected_slice = np.array([0.1448, 0.1619, 0.1741, 0.1086, 0.1147, 0.1128, 0.1199, 0.1165, 0.1001])
assert np.abs(image_slice.flatten() - expected_slice).max() < 1e-1
prompt = "A painting of a squirrel eating a burger "
generator = torch.manual_seed(0)
image = pipe.text_to_image(
prompt=prompt, generator=generator, guidance_scale=7.5, num_inference_steps=50, output_type="numpy"
).images
image_slice = image[0, 253:256, 253:256, -1]
assert image.shape == (1, 512, 512, 3)
expected_slice = np.array([0.3367, 0.3169, 0.2656, 0.3870, 0.4790, 0.3796, 0.4009, 0.4878, 0.4778])
assert np.abs(image_slice.flatten() - expected_slice).max() < 1e-1
image = pipe.image_variation(init_image, generator=generator, output_type="numpy").images
image_slice = image[0, 253:256, 253:256, -1]
assert image.shape == (1, 512, 512, 3)
expected_slice = np.array([0.3076, 0.3123, 0.3284, 0.3782, 0.3770, 0.3894, 0.4297, 0.4331, 0.4456])
assert np.abs(image_slice.flatten() - expected_slice).max() < 1e-1
# coding=utf-8
# Copyright 2023 HuggingFace Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import gc
import tempfile
import unittest
import numpy as np
import torch
from diffusers import VersatileDiffusionTextToImagePipeline
from diffusers.utils.testing_utils import nightly, require_torch_gpu, torch_device
torch.backends.cuda.matmul.allow_tf32 = False
class VersatileDiffusionTextToImagePipelineFastTests(unittest.TestCase):
pass
@nightly
@require_torch_gpu
class VersatileDiffusionTextToImagePipelineIntegrationTests(unittest.TestCase):
def tearDown(self):
# clean up the VRAM after each test
super().tearDown()
gc.collect()
torch.cuda.empty_cache()
def test_remove_unused_weights_save_load(self):
pipe = VersatileDiffusionTextToImagePipeline.from_pretrained("shi-labs/versatile-diffusion")
# remove text_unet
pipe.remove_unused_weights()
pipe.to(torch_device)
pipe.set_progress_bar_config(disable=None)
prompt = "A painting of a squirrel eating a burger "
generator = torch.manual_seed(0)
image = pipe(
prompt=prompt, generator=generator, guidance_scale=7.5, num_inference_steps=2, output_type="numpy"
).images
with tempfile.TemporaryDirectory() as tmpdirname:
pipe.save_pretrained(tmpdirname)
pipe = VersatileDiffusionTextToImagePipeline.from_pretrained(tmpdirname)
pipe.to(torch_device)
pipe.set_progress_bar_config(disable=None)
generator = generator.manual_seed(0)
new_image = pipe(
prompt=prompt, generator=generator, guidance_scale=7.5, num_inference_steps=2, output_type="numpy"
).images
assert np.abs(image - new_image).max() < 1e-5, "Models don't have the same forward pass"
def test_inference_text2img(self):
pipe = VersatileDiffusionTextToImagePipeline.from_pretrained(
"shi-labs/versatile-diffusion", torch_dtype=torch.float16
)
pipe.to(torch_device)
pipe.set_progress_bar_config(disable=None)
prompt = "A painting of a squirrel eating a burger "
generator = torch.manual_seed(0)
image = pipe(
prompt=prompt, generator=generator, guidance_scale=7.5, num_inference_steps=50, output_type="numpy"
).images
image_slice = image[0, 253:256, 253:256, -1]
assert image.shape == (1, 512, 512, 3)
expected_slice = np.array([0.3367, 0.3169, 0.2656, 0.3870, 0.4790, 0.3796, 0.4009, 0.4878, 0.4778])
assert np.abs(image_slice.flatten() - expected_slice).max() < 1e-2
# coding=utf-8
# Copyright 2023 HuggingFace Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import gc
import unittest
import numpy as np
import torch
from transformers import CLIPTextConfig, CLIPTextModel, CLIPTokenizer
from diffusers import Transformer2DModel, VQDiffusionPipeline, VQDiffusionScheduler, VQModel
from diffusers.pipelines.vq_diffusion.pipeline_vq_diffusion import LearnedClassifierFreeSamplingEmbeddings
from diffusers.utils.testing_utils import load_numpy, nightly, require_torch_gpu, torch_device
torch.backends.cuda.matmul.allow_tf32 = False
class VQDiffusionPipelineFastTests(unittest.TestCase):
def tearDown(self):
# clean up the VRAM after each test
super().tearDown()
gc.collect()
torch.cuda.empty_cache()
@property
def num_embed(self):
return 12
@property
def num_embeds_ada_norm(self):
return 12
@property
def text_embedder_hidden_size(self):
return 32
@property
def dummy_vqvae(self):
torch.manual_seed(0)
model = VQModel(
block_out_channels=[32, 64],
in_channels=3,
out_channels=3,
down_block_types=["DownEncoderBlock2D", "DownEncoderBlock2D"],
up_block_types=["UpDecoderBlock2D", "UpDecoderBlock2D"],
latent_channels=3,
num_vq_embeddings=self.num_embed,
vq_embed_dim=3,
)
return model
@property
def dummy_tokenizer(self):
tokenizer = CLIPTokenizer.from_pretrained("hf-internal-testing/tiny-random-clip")
return tokenizer
@property
def dummy_text_encoder(self):
torch.manual_seed(0)
config = CLIPTextConfig(
bos_token_id=0,
eos_token_id=2,
hidden_size=self.text_embedder_hidden_size,
intermediate_size=37,
layer_norm_eps=1e-05,
num_attention_heads=4,
num_hidden_layers=5,
pad_token_id=1,
vocab_size=1000,
)
return CLIPTextModel(config)
@property
def dummy_transformer(self):
torch.manual_seed(0)
height = 12
width = 12
model_kwargs = {
"attention_bias": True,
"cross_attention_dim": 32,
"attention_head_dim": height * width,
"num_attention_heads": 1,
"num_vector_embeds": self.num_embed,
"num_embeds_ada_norm": self.num_embeds_ada_norm,
"norm_num_groups": 32,
"sample_size": width,
"activation_fn": "geglu-approximate",
}
model = Transformer2DModel(**model_kwargs)
return model
def test_vq_diffusion(self):
device = "cpu"
vqvae = self.dummy_vqvae
text_encoder = self.dummy_text_encoder
tokenizer = self.dummy_tokenizer
transformer = self.dummy_transformer
scheduler = VQDiffusionScheduler(self.num_embed)
learned_classifier_free_sampling_embeddings = LearnedClassifierFreeSamplingEmbeddings(learnable=False)
pipe = VQDiffusionPipeline(
vqvae=vqvae,
text_encoder=text_encoder,
tokenizer=tokenizer,
transformer=transformer,
scheduler=scheduler,
learned_classifier_free_sampling_embeddings=learned_classifier_free_sampling_embeddings,
)
pipe = pipe.to(device)
pipe.set_progress_bar_config(disable=None)
prompt = "teddy bear playing in the pool"
generator = torch.Generator(device=device).manual_seed(0)
output = pipe([prompt], generator=generator, num_inference_steps=2, output_type="np")
image = output.images
generator = torch.Generator(device=device).manual_seed(0)
image_from_tuple = pipe(
[prompt], generator=generator, output_type="np", return_dict=False, num_inference_steps=2
)[0]
image_slice = image[0, -3:, -3:, -1]
image_from_tuple_slice = image_from_tuple[0, -3:, -3:, -1]
assert image.shape == (1, 24, 24, 3)
expected_slice = np.array([0.6551, 0.6168, 0.5008, 0.5676, 0.5659, 0.4295, 0.6073, 0.5599, 0.4992])
assert np.abs(image_slice.flatten() - expected_slice).max() < 1e-2
assert np.abs(image_from_tuple_slice.flatten() - expected_slice).max() < 1e-2
def test_vq_diffusion_classifier_free_sampling(self):
device = "cpu"
vqvae = self.dummy_vqvae
text_encoder = self.dummy_text_encoder
tokenizer = self.dummy_tokenizer
transformer = self.dummy_transformer
scheduler = VQDiffusionScheduler(self.num_embed)
learned_classifier_free_sampling_embeddings = LearnedClassifierFreeSamplingEmbeddings(
learnable=True, hidden_size=self.text_embedder_hidden_size, length=tokenizer.model_max_length
)
pipe = VQDiffusionPipeline(
vqvae=vqvae,
text_encoder=text_encoder,
tokenizer=tokenizer,
transformer=transformer,
scheduler=scheduler,
learned_classifier_free_sampling_embeddings=learned_classifier_free_sampling_embeddings,
)
pipe = pipe.to(device)
pipe.set_progress_bar_config(disable=None)
prompt = "teddy bear playing in the pool"
generator = torch.Generator(device=device).manual_seed(0)
output = pipe([prompt], generator=generator, num_inference_steps=2, output_type="np")
image = output.images
generator = torch.Generator(device=device).manual_seed(0)
image_from_tuple = pipe(
[prompt], generator=generator, output_type="np", return_dict=False, num_inference_steps=2
)[0]
image_slice = image[0, -3:, -3:, -1]
image_from_tuple_slice = image_from_tuple[0, -3:, -3:, -1]
assert image.shape == (1, 24, 24, 3)
expected_slice = np.array([0.6693, 0.6075, 0.4959, 0.5701, 0.5583, 0.4333, 0.6171, 0.5684, 0.4988])
assert np.abs(image_slice.flatten() - expected_slice).max() < 2.0
assert np.abs(image_from_tuple_slice.flatten() - expected_slice).max() < 1e-2
@nightly
@require_torch_gpu
class VQDiffusionPipelineIntegrationTests(unittest.TestCase):
def tearDown(self):
# clean up the VRAM after each test
super().tearDown()
gc.collect()
torch.cuda.empty_cache()
def test_vq_diffusion_classifier_free_sampling(self):
expected_image = load_numpy(
"https://huggingface.co/datasets/hf-internal-testing/diffusers-images/resolve/main"
"/vq_diffusion/teddy_bear_pool_classifier_free_sampling.npy"
)
pipeline = VQDiffusionPipeline.from_pretrained("microsoft/vq-diffusion-ithq")
pipeline = pipeline.to(torch_device)
pipeline.set_progress_bar_config(disable=None)
# requires GPU generator for gumbel softmax
# don't use GPU generator in tests though
generator = torch.Generator(device=torch_device).manual_seed(0)
output = pipeline(
"teddy bear playing in the pool",
num_images_per_prompt=1,
generator=generator,
output_type="np",
)
image = output.images[0]
assert image.shape == (256, 256, 3)
assert np.abs(expected_image - image).max() < 2.0
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment