Unverified Commit bc55b631 authored by Sayak Paul's avatar Sayak Paul Committed by GitHub
Browse files

[tests] remove tests for deprecated pipelines. (#11879)

* remove tests for deprecated pipelines.

* remove folders

* test_pipelines_common
parent 15d50f16
# coding=utf-8
# Copyright 2025 HuggingFace Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import gc
import unittest
import torch
from diffusers import DDIMScheduler, TextToVideoZeroPipeline
from diffusers.utils.testing_utils import (
backend_empty_cache,
load_pt,
nightly,
require_torch_accelerator,
torch_device,
)
from ..test_pipelines_common import assert_mean_pixel_difference
@nightly
@require_torch_accelerator
class TextToVideoZeroPipelineSlowTests(unittest.TestCase):
def setUp(self):
# clean up the VRAM before each test
super().setUp()
gc.collect()
backend_empty_cache(torch_device)
def tearDown(self):
# clean up the VRAM after each test
super().tearDown()
gc.collect()
backend_empty_cache(torch_device)
def test_full_model(self):
model_id = "stable-diffusion-v1-5/stable-diffusion-v1-5"
pipe = TextToVideoZeroPipeline.from_pretrained(model_id, torch_dtype=torch.float16).to(torch_device)
pipe.scheduler = DDIMScheduler.from_config(pipe.scheduler.config)
generator = torch.Generator(device="cpu").manual_seed(0)
prompt = "A bear is playing a guitar on Times Square"
result = pipe(prompt=prompt, generator=generator).images
expected_result = load_pt(
"https://huggingface.co/datasets/hf-internal-testing/diffusers-images/resolve/main/text-to-video/A bear is playing a guitar on Times Square.pt",
weights_only=False,
)
assert_mean_pixel_difference(result, expected_result)
# coding=utf-8
# Copyright 2025 HuggingFace Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import gc
import inspect
import tempfile
import unittest
import numpy as np
import torch
from transformers import CLIPTextConfig, CLIPTextModel, CLIPTextModelWithProjection, CLIPTokenizer
from diffusers import AutoencoderKL, DDIMScheduler, TextToVideoZeroSDXLPipeline, UNet2DConditionModel
from diffusers.utils.testing_utils import (
backend_empty_cache,
enable_full_determinism,
nightly,
require_accelerate_version_greater,
require_torch_accelerator,
torch_device,
)
from ..pipeline_params import TEXT_TO_IMAGE_BATCH_PARAMS, TEXT_TO_IMAGE_IMAGE_PARAMS, TEXT_TO_IMAGE_PARAMS
from ..test_pipelines_common import PipelineFromPipeTesterMixin, PipelineTesterMixin
enable_full_determinism()
def to_np(tensor):
if isinstance(tensor, torch.Tensor):
tensor = tensor.detach().cpu().numpy()
return tensor
class TextToVideoZeroSDXLPipelineFastTests(PipelineTesterMixin, PipelineFromPipeTesterMixin, unittest.TestCase):
pipeline_class = TextToVideoZeroSDXLPipeline
params = TEXT_TO_IMAGE_PARAMS
batch_params = TEXT_TO_IMAGE_BATCH_PARAMS
image_params = TEXT_TO_IMAGE_IMAGE_PARAMS
image_latents_params = TEXT_TO_IMAGE_IMAGE_PARAMS
generator_device = "cpu"
def get_dummy_components(self, seed=0):
torch.manual_seed(seed)
unet = UNet2DConditionModel(
block_out_channels=(2, 4),
layers_per_block=2,
sample_size=2,
norm_num_groups=2,
in_channels=4,
out_channels=4,
down_block_types=("DownBlock2D", "CrossAttnDownBlock2D"),
up_block_types=("CrossAttnUpBlock2D", "UpBlock2D"),
# SD2-specific config below
attention_head_dim=(2, 4),
use_linear_projection=True,
addition_embed_type="text_time",
addition_time_embed_dim=8,
transformer_layers_per_block=(1, 2),
projection_class_embeddings_input_dim=80, # 6 * 8 + 32
cross_attention_dim=64,
)
scheduler = DDIMScheduler(
num_train_timesteps=1000,
beta_start=0.0001,
beta_end=0.02,
beta_schedule="linear",
trained_betas=None,
clip_sample=True,
set_alpha_to_one=True,
steps_offset=0,
prediction_type="epsilon",
thresholding=False,
dynamic_thresholding_ratio=0.995,
clip_sample_range=1.0,
sample_max_value=1.0,
timestep_spacing="leading",
rescale_betas_zero_snr=False,
)
torch.manual_seed(seed)
vae = AutoencoderKL(
block_out_channels=[32, 64],
in_channels=3,
out_channels=3,
down_block_types=["DownEncoderBlock2D", "DownEncoderBlock2D"],
up_block_types=["UpDecoderBlock2D", "UpDecoderBlock2D"],
latent_channels=4,
sample_size=128,
)
torch.manual_seed(seed)
text_encoder_config = CLIPTextConfig(
bos_token_id=0,
eos_token_id=2,
hidden_size=32,
intermediate_size=37,
layer_norm_eps=1e-05,
num_attention_heads=4,
num_hidden_layers=5,
pad_token_id=1,
vocab_size=1000,
# SD2-specific config below
hidden_act="gelu",
projection_dim=32,
)
text_encoder = CLIPTextModel(text_encoder_config)
tokenizer = CLIPTokenizer.from_pretrained("hf-internal-testing/tiny-random-clip")
text_encoder_2 = CLIPTextModelWithProjection(text_encoder_config)
tokenizer_2 = CLIPTokenizer.from_pretrained("hf-internal-testing/tiny-random-clip")
components = {
"unet": unet,
"scheduler": scheduler,
"vae": vae,
"text_encoder": text_encoder,
"tokenizer": tokenizer,
"text_encoder_2": text_encoder_2,
"tokenizer_2": tokenizer_2,
"image_encoder": None,
"feature_extractor": None,
}
return components
def get_dummy_inputs(self, device, seed=0):
if str(device).startswith("mps"):
generator = torch.manual_seed(seed)
else:
generator = torch.Generator(device=device).manual_seed(seed)
inputs = {
"prompt": "A panda dancing in Antarctica",
"generator": generator,
"num_inference_steps": 5,
"t0": 1,
"t1": 3,
"height": 64,
"width": 64,
"video_length": 3,
"output_type": "np",
}
return inputs
def get_generator(self, device, seed=0):
if str(device).startswith("mps"):
generator = torch.manual_seed(seed)
else:
generator = torch.Generator(device=device).manual_seed(seed)
return generator
def test_text_to_video_zero_sdxl(self):
components = self.get_dummy_components()
pipe = self.pipeline_class(**components)
pipe = pipe.to(torch_device)
inputs = self.get_dummy_inputs(self.generator_device)
result = pipe(**inputs).images
first_frame_slice = result[0, -3:, -3:, -1]
last_frame_slice = result[-1, -3:, -3:, 0]
expected_slice1 = np.array(
[0.6008109, 0.73051643, 0.51778656, 0.55817354, 0.45222935, 0.45998418, 0.57017255, 0.54874814, 0.47078788]
)
expected_slice2 = np.array(
[0.6011751, 0.47420046, 0.41660714, 0.6472957, 0.41261768, 0.5438129, 0.7401535, 0.6756011, 0.53652245]
)
assert np.abs(first_frame_slice.flatten() - expected_slice1).max() < 1e-2
assert np.abs(last_frame_slice.flatten() - expected_slice2).max() < 1e-2
@unittest.skip(
reason="Cannot call `set_default_attn_processor` as this pipeline uses a specific attention processor."
)
def test_attention_slicing_forward_pass(self):
pass
def test_cfg(self):
sig = inspect.signature(self.pipeline_class.__call__)
if "guidance_scale" not in sig.parameters:
return
components = self.get_dummy_components()
pipe = self.pipeline_class(**components)
pipe = pipe.to(torch_device)
pipe.set_progress_bar_config(disable=None)
inputs = self.get_dummy_inputs(self.generator_device)
inputs["guidance_scale"] = 1.0
out_no_cfg = pipe(**inputs)[0]
inputs["guidance_scale"] = 7.5
out_cfg = pipe(**inputs)[0]
assert out_cfg.shape == out_no_cfg.shape
def test_dict_tuple_outputs_equivalent(self, expected_max_difference=1e-4):
components = self.get_dummy_components()
pipe = self.pipeline_class(**components)
pipe.to(torch_device)
pipe.set_progress_bar_config(disable=None)
output = pipe(**self.get_dummy_inputs(self.generator_device))[0]
output_tuple = pipe(**self.get_dummy_inputs(self.generator_device), return_dict=False)[0]
max_diff = np.abs(to_np(output) - to_np(output_tuple)).max()
self.assertLess(max_diff, expected_max_difference)
@unittest.skipIf(torch_device not in ["cuda", "xpu"], reason="float16 requires CUDA or XPU")
@require_torch_accelerator
def test_float16_inference(self, expected_max_diff=5e-2):
components = self.get_dummy_components()
for name, module in components.items():
if hasattr(module, "half"):
components[name] = module.to(torch_device).half()
pipe = self.pipeline_class(**components)
pipe.to(torch_device)
pipe.set_progress_bar_config(disable=None)
components = self.get_dummy_components()
pipe_fp16 = self.pipeline_class(**components)
pipe_fp16.to(torch_device, torch.float16)
pipe_fp16.set_progress_bar_config(disable=None)
inputs = self.get_dummy_inputs(self.generator_device)
# # Reset generator in case it is used inside dummy inputs
if "generator" in inputs:
inputs["generator"] = self.get_generator(self.generator_device)
output = pipe(**inputs)[0]
fp16_inputs = self.get_dummy_inputs(self.generator_device)
# Reset generator in case it is used inside dummy inputs
if "generator" in fp16_inputs:
fp16_inputs["generator"] = self.get_generator(self.generator_device)
output_fp16 = pipe_fp16(**fp16_inputs)[0]
max_diff = np.abs(to_np(output) - to_np(output_fp16)).max()
self.assertLess(max_diff, expected_max_diff, "The outputs of the fp16 and fp32 pipelines are too different.")
@unittest.skip(reason="Batching needs to be properly figured out first for this pipeline.")
def test_inference_batch_consistent(self):
pass
@unittest.skip(
reason="Cannot call `set_default_attn_processor` as this pipeline uses a specific attention processor."
)
def test_inference_batch_single_identical(self):
pass
@require_torch_accelerator
@require_accelerate_version_greater("0.17.0")
def test_model_cpu_offload_forward_pass(self, expected_max_diff=2e-4):
components = self.get_dummy_components()
pipe = self.pipeline_class(**components)
pipe = pipe.to(torch_device)
pipe.set_progress_bar_config(disable=None)
inputs = self.get_dummy_inputs(self.generator_device)
output_without_offload = pipe(**inputs)[0]
pipe.enable_model_cpu_offload(device=torch_device)
inputs = self.get_dummy_inputs(self.generator_device)
output_with_offload = pipe(**inputs)[0]
max_diff = np.abs(to_np(output_with_offload) - to_np(output_without_offload)).max()
self.assertLess(max_diff, expected_max_diff, "CPU offloading should not affect the inference results")
@unittest.skip(reason="`num_images_per_prompt` argument is not supported for this pipeline.")
def test_pipeline_call_signature(self):
pass
@unittest.skipIf(torch_device not in ["cuda", "xpu"], reason="float16 requires CUDA or XPU")
@require_torch_accelerator
def test_save_load_float16(self, expected_max_diff=1e-2):
components = self.get_dummy_components()
for name, module in components.items():
if hasattr(module, "half"):
components[name] = module.to(torch_device).half()
pipe = self.pipeline_class(**components)
pipe.to(torch_device)
pipe.set_progress_bar_config(disable=None)
inputs = self.get_dummy_inputs(self.generator_device)
output = pipe(**inputs)[0]
with tempfile.TemporaryDirectory() as tmpdir:
pipe.save_pretrained(tmpdir)
pipe_loaded = self.pipeline_class.from_pretrained(tmpdir, torch_dtype=torch.float16)
pipe_loaded.to(torch_device)
pipe_loaded.set_progress_bar_config(disable=None)
for name, component in pipe_loaded.components.items():
if hasattr(component, "dtype"):
self.assertTrue(
component.dtype == torch.float16,
f"`{name}.dtype` switched from `float16` to {component.dtype} after loading.",
)
inputs = self.get_dummy_inputs(self.generator_device)
output_loaded = pipe_loaded(**inputs)[0]
max_diff = np.abs(to_np(output) - to_np(output_loaded)).max()
self.assertLess(
max_diff, expected_max_diff, "The output of the fp16 pipeline changed after saving and loading."
)
@unittest.skip(
reason="Cannot call `set_default_attn_processor` as this pipeline uses a specific attention processor."
)
def test_save_load_local(self):
pass
@unittest.skip(
reason="Cannot call `set_default_attn_processor` as this pipeline uses a specific attention processor."
)
def test_save_load_optional_components(self):
pass
@unittest.skip(
reason="Cannot call `set_default_attn_processor` as this pipeline uses a specific attention processor."
)
def test_sequential_cpu_offload_forward_pass(self):
pass
@require_torch_accelerator
def test_to_device(self):
components = self.get_dummy_components()
pipe = self.pipeline_class(**components)
pipe.set_progress_bar_config(disable=None)
pipe.to("cpu")
model_devices = [component.device.type for component in components.values() if hasattr(component, "device")]
self.assertTrue(all(device == "cpu" for device in model_devices))
output_cpu = pipe(**self.get_dummy_inputs("cpu"))[0] # generator set to cpu
self.assertTrue(np.isnan(output_cpu).sum() == 0)
pipe.to(torch_device)
model_devices = [component.device.type for component in components.values() if hasattr(component, "device")]
self.assertTrue(all(device == torch_device for device in model_devices))
output_device = pipe(**self.get_dummy_inputs("cpu"))[0] # generator set to cpu
self.assertTrue(np.isnan(to_np(output_device)).sum() == 0)
@unittest.skip(
reason="Cannot call `set_default_attn_processor` as this pipeline uses a specific attention processor."
)
def test_xformers_attention_forwardGenerator_pass(self):
pass
@nightly
@require_torch_accelerator
class TextToVideoZeroSDXLPipelineSlowTests(unittest.TestCase):
def setUp(self):
# clean up the VRAM before each test
super().setUp()
gc.collect()
backend_empty_cache(torch_device)
def tearDown(self):
# clean up the VRAM after each test
super().tearDown()
gc.collect()
backend_empty_cache(torch_device)
def test_full_model(self):
model_id = "stabilityai/stable-diffusion-xl-base-1.0"
pipe = TextToVideoZeroSDXLPipeline.from_pretrained(
model_id, torch_dtype=torch.float16, variant="fp16", use_safetensors=True
)
pipe.enable_model_cpu_offload()
pipe.enable_vae_slicing()
pipe.scheduler = DDIMScheduler.from_config(pipe.scheduler.config)
generator = torch.Generator(device="cpu").manual_seed(0)
prompt = "A panda dancing in Antarctica"
result = pipe(prompt=prompt, generator=generator).images
first_frame_slice = result[0, -3:, -3:, -1]
last_frame_slice = result[-1, -3:, -3:, 0]
expected_slice1 = np.array([0.57, 0.57, 0.57, 0.57, 0.57, 0.56, 0.55, 0.56, 0.56])
expected_slice2 = np.array([0.54, 0.53, 0.53, 0.53, 0.53, 0.52, 0.53, 0.53, 0.53])
assert np.abs(first_frame_slice.flatten() - expected_slice1).max() < 1e-2
assert np.abs(last_frame_slice.flatten() - expected_slice2).max() < 1e-2
# coding=utf-8
# Copyright 2025 HuggingFace Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import random
import unittest
import numpy as np
import torch
from transformers import CLIPTextConfig, CLIPTextModel, CLIPTokenizer
from diffusers import (
AutoencoderKL,
DDIMScheduler,
UNet3DConditionModel,
VideoToVideoSDPipeline,
)
from diffusers.utils import is_xformers_available
from diffusers.utils.testing_utils import (
enable_full_determinism,
floats_tensor,
is_flaky,
nightly,
numpy_cosine_similarity_distance,
skip_mps,
torch_device,
)
from ..pipeline_params import (
TEXT_GUIDED_IMAGE_VARIATION_BATCH_PARAMS,
TEXT_GUIDED_IMAGE_VARIATION_PARAMS,
)
from ..test_pipelines_common import PipelineTesterMixin
enable_full_determinism()
@skip_mps
class VideoToVideoSDPipelineFastTests(PipelineTesterMixin, unittest.TestCase):
pipeline_class = VideoToVideoSDPipeline
params = TEXT_GUIDED_IMAGE_VARIATION_PARAMS.union({"video"}) - {"image", "width", "height"}
batch_params = TEXT_GUIDED_IMAGE_VARIATION_BATCH_PARAMS.union({"video"}) - {"image"}
required_optional_params = PipelineTesterMixin.required_optional_params - {"latents"}
test_attention_slicing = False
# No `output_type`.
required_optional_params = frozenset(
[
"num_inference_steps",
"generator",
"latents",
"return_dict",
"callback",
"callback_steps",
]
)
def get_dummy_components(self):
torch.manual_seed(0)
unet = UNet3DConditionModel(
block_out_channels=(4, 8),
layers_per_block=1,
sample_size=32,
in_channels=4,
out_channels=4,
down_block_types=("CrossAttnDownBlock3D", "DownBlock3D"),
up_block_types=("UpBlock3D", "CrossAttnUpBlock3D"),
cross_attention_dim=32,
attention_head_dim=4,
norm_num_groups=2,
)
scheduler = DDIMScheduler(
beta_start=0.00085,
beta_end=0.012,
beta_schedule="scaled_linear",
clip_sample=True,
set_alpha_to_one=False,
)
torch.manual_seed(0)
vae = AutoencoderKL(
block_out_channels=[
8,
],
in_channels=3,
out_channels=3,
down_block_types=[
"DownEncoderBlock2D",
],
up_block_types=["UpDecoderBlock2D"],
latent_channels=4,
sample_size=32,
norm_num_groups=2,
)
torch.manual_seed(0)
text_encoder_config = CLIPTextConfig(
bos_token_id=0,
eos_token_id=2,
hidden_size=32,
intermediate_size=37,
layer_norm_eps=1e-05,
num_attention_heads=4,
num_hidden_layers=5,
pad_token_id=1,
vocab_size=1000,
hidden_act="gelu",
projection_dim=512,
)
text_encoder = CLIPTextModel(text_encoder_config)
tokenizer = CLIPTokenizer.from_pretrained("hf-internal-testing/tiny-random-clip")
components = {
"unet": unet,
"scheduler": scheduler,
"vae": vae,
"text_encoder": text_encoder,
"tokenizer": tokenizer,
}
return components
def get_dummy_inputs(self, device, seed=0):
# 3 frames
video = floats_tensor((1, 3, 3, 32, 32), rng=random.Random(seed)).to(device)
if str(device).startswith("mps"):
generator = torch.manual_seed(seed)
else:
generator = torch.Generator(device=device).manual_seed(seed)
inputs = {
"prompt": "A painting of a squirrel eating a burger",
"video": video,
"generator": generator,
"num_inference_steps": 2,
"guidance_scale": 6.0,
"output_type": "pt",
}
return inputs
def test_text_to_video_default_case(self):
device = "cpu" # ensure determinism for the device-dependent torch.Generator
components = self.get_dummy_components()
sd_pipe = VideoToVideoSDPipeline(**components)
sd_pipe = sd_pipe.to(device)
sd_pipe.set_progress_bar_config(disable=None)
inputs = self.get_dummy_inputs(device)
inputs["output_type"] = "np"
frames = sd_pipe(**inputs).frames
image_slice = frames[0][0][-3:, -3:, -1]
assert frames[0][0].shape == (32, 32, 3)
expected_slice = np.array([0.6391, 0.5350, 0.5202, 0.5521, 0.5453, 0.5393, 0.6652, 0.5270, 0.5185])
assert np.abs(image_slice.flatten() - expected_slice).max() < 1e-2
@is_flaky()
def test_save_load_optional_components(self):
super().test_save_load_optional_components(expected_max_difference=0.001)
@is_flaky()
def test_dict_tuple_outputs_equivalent(self):
super().test_dict_tuple_outputs_equivalent()
@is_flaky()
def test_save_load_local(self):
super().test_save_load_local()
@unittest.skipIf(
torch_device != "cuda" or not is_xformers_available(),
reason="XFormers attention is only available with CUDA and `xformers` installed",
)
def test_xformers_attention_forwardGenerator_pass(self):
self._test_xformers_attention_forwardGenerator_pass(test_mean_pixel_difference=False, expected_max_diff=5e-3)
# (todo): sayakpaul
@unittest.skip(reason="Batching needs to be properly figured out first for this pipeline.")
def test_inference_batch_consistent(self):
pass
# (todo): sayakpaul
@unittest.skip(reason="Batching needs to be properly figured out first for this pipeline.")
def test_inference_batch_single_identical(self):
pass
@unittest.skip(reason="`num_images_per_prompt` argument is not supported for this pipeline.")
def test_num_images_per_prompt(self):
pass
def test_encode_prompt_works_in_isolation(self):
extra_required_param_value_dict = {
"device": torch.device(torch_device).type,
"num_images_per_prompt": 1,
"do_classifier_free_guidance": self.get_dummy_inputs(device=torch_device).get("guidance_scale", 1.0) > 1.0,
}
return super().test_encode_prompt_works_in_isolation(extra_required_param_value_dict)
@nightly
@skip_mps
class VideoToVideoSDPipelineSlowTests(unittest.TestCase):
def test_two_step_model(self):
pipe = VideoToVideoSDPipeline.from_pretrained("cerspense/zeroscope_v2_576w", torch_dtype=torch.float16)
pipe.enable_model_cpu_offload()
# 10 frames
generator = torch.Generator(device="cpu").manual_seed(0)
video = torch.randn((1, 10, 3, 320, 576), generator=generator)
prompt = "Spiderman is surfing"
generator = torch.Generator(device="cpu").manual_seed(0)
video_frames = pipe(prompt, video=video, generator=generator, num_inference_steps=3, output_type="np").frames
expected_array = np.array(
[0.17114258, 0.13720703, 0.08886719, 0.14819336, 0.1730957, 0.24584961, 0.22021484, 0.35180664, 0.2607422]
)
output_array = video_frames[0, 0, :3, :3, 0].flatten()
assert numpy_cosine_similarity_distance(expected_array, output_array) < 1e-3
# coding=utf-8
# Copyright 2025 HuggingFace Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import gc
import unittest
import numpy as np
import torch
from transformers import CLIPTextConfig, CLIPTextModelWithProjection, CLIPTokenizer
from diffusers import PriorTransformer, UnCLIPPipeline, UnCLIPScheduler, UNet2DConditionModel, UNet2DModel
from diffusers.pipelines.unclip.text_proj import UnCLIPTextProjModel
from diffusers.utils.testing_utils import (
backend_empty_cache,
backend_max_memory_allocated,
backend_reset_max_memory_allocated,
backend_reset_peak_memory_stats,
enable_full_determinism,
load_numpy,
nightly,
require_torch_accelerator,
skip_mps,
torch_device,
)
from ..pipeline_params import TEXT_TO_IMAGE_BATCH_PARAMS, TEXT_TO_IMAGE_PARAMS
from ..test_pipelines_common import PipelineTesterMixin, assert_mean_pixel_difference
enable_full_determinism()
class UnCLIPPipelineFastTests(PipelineTesterMixin, unittest.TestCase):
pipeline_class = UnCLIPPipeline
params = TEXT_TO_IMAGE_PARAMS - {
"negative_prompt",
"height",
"width",
"negative_prompt_embeds",
"guidance_scale",
"prompt_embeds",
"cross_attention_kwargs",
}
batch_params = TEXT_TO_IMAGE_BATCH_PARAMS
required_optional_params = [
"generator",
"return_dict",
"prior_num_inference_steps",
"decoder_num_inference_steps",
"super_res_num_inference_steps",
]
test_xformers_attention = False
@property
def text_embedder_hidden_size(self):
return 32
@property
def time_input_dim(self):
return 32
@property
def block_out_channels_0(self):
return self.time_input_dim
@property
def time_embed_dim(self):
return self.time_input_dim * 4
@property
def cross_attention_dim(self):
return 100
@property
def dummy_tokenizer(self):
tokenizer = CLIPTokenizer.from_pretrained("hf-internal-testing/tiny-random-clip")
return tokenizer
@property
def dummy_text_encoder(self):
torch.manual_seed(0)
config = CLIPTextConfig(
bos_token_id=0,
eos_token_id=2,
hidden_size=self.text_embedder_hidden_size,
projection_dim=self.text_embedder_hidden_size,
intermediate_size=37,
layer_norm_eps=1e-05,
num_attention_heads=4,
num_hidden_layers=5,
pad_token_id=1,
vocab_size=1000,
)
return CLIPTextModelWithProjection(config)
@property
def dummy_prior(self):
torch.manual_seed(0)
model_kwargs = {
"num_attention_heads": 2,
"attention_head_dim": 12,
"embedding_dim": self.text_embedder_hidden_size,
"num_layers": 1,
}
model = PriorTransformer(**model_kwargs)
return model
@property
def dummy_text_proj(self):
torch.manual_seed(0)
model_kwargs = {
"clip_embeddings_dim": self.text_embedder_hidden_size,
"time_embed_dim": self.time_embed_dim,
"cross_attention_dim": self.cross_attention_dim,
}
model = UnCLIPTextProjModel(**model_kwargs)
return model
@property
def dummy_decoder(self):
torch.manual_seed(0)
model_kwargs = {
"sample_size": 32,
# RGB in channels
"in_channels": 3,
# Out channels is double in channels because predicts mean and variance
"out_channels": 6,
"down_block_types": ("ResnetDownsampleBlock2D", "SimpleCrossAttnDownBlock2D"),
"up_block_types": ("SimpleCrossAttnUpBlock2D", "ResnetUpsampleBlock2D"),
"mid_block_type": "UNetMidBlock2DSimpleCrossAttn",
"block_out_channels": (self.block_out_channels_0, self.block_out_channels_0 * 2),
"layers_per_block": 1,
"cross_attention_dim": self.cross_attention_dim,
"attention_head_dim": 4,
"resnet_time_scale_shift": "scale_shift",
"class_embed_type": "identity",
}
model = UNet2DConditionModel(**model_kwargs)
return model
@property
def dummy_super_res_kwargs(self):
return {
"sample_size": 64,
"layers_per_block": 1,
"down_block_types": ("ResnetDownsampleBlock2D", "ResnetDownsampleBlock2D"),
"up_block_types": ("ResnetUpsampleBlock2D", "ResnetUpsampleBlock2D"),
"block_out_channels": (self.block_out_channels_0, self.block_out_channels_0 * 2),
"in_channels": 6,
"out_channels": 3,
}
@property
def dummy_super_res_first(self):
torch.manual_seed(0)
model = UNet2DModel(**self.dummy_super_res_kwargs)
return model
@property
def dummy_super_res_last(self):
# seeded differently to get different unet than `self.dummy_super_res_first`
torch.manual_seed(1)
model = UNet2DModel(**self.dummy_super_res_kwargs)
return model
def get_dummy_components(self):
prior = self.dummy_prior
decoder = self.dummy_decoder
text_proj = self.dummy_text_proj
text_encoder = self.dummy_text_encoder
tokenizer = self.dummy_tokenizer
super_res_first = self.dummy_super_res_first
super_res_last = self.dummy_super_res_last
prior_scheduler = UnCLIPScheduler(
variance_type="fixed_small_log",
prediction_type="sample",
num_train_timesteps=1000,
clip_sample_range=5.0,
)
decoder_scheduler = UnCLIPScheduler(
variance_type="learned_range",
prediction_type="epsilon",
num_train_timesteps=1000,
)
super_res_scheduler = UnCLIPScheduler(
variance_type="fixed_small_log",
prediction_type="epsilon",
num_train_timesteps=1000,
)
components = {
"prior": prior,
"decoder": decoder,
"text_proj": text_proj,
"text_encoder": text_encoder,
"tokenizer": tokenizer,
"super_res_first": super_res_first,
"super_res_last": super_res_last,
"prior_scheduler": prior_scheduler,
"decoder_scheduler": decoder_scheduler,
"super_res_scheduler": super_res_scheduler,
}
return components
def get_dummy_inputs(self, device, seed=0):
if str(device).startswith("mps"):
generator = torch.manual_seed(seed)
else:
generator = torch.Generator(device=device).manual_seed(seed)
inputs = {
"prompt": "horse",
"generator": generator,
"prior_num_inference_steps": 2,
"decoder_num_inference_steps": 2,
"super_res_num_inference_steps": 2,
"output_type": "np",
}
return inputs
def test_unclip(self):
device = "cpu"
components = self.get_dummy_components()
pipe = self.pipeline_class(**components)
pipe = pipe.to(device)
pipe.set_progress_bar_config(disable=None)
output = pipe(**self.get_dummy_inputs(device))
image = output.images
image_from_tuple = pipe(
**self.get_dummy_inputs(device),
return_dict=False,
)[0]
image_slice = image[0, -3:, -3:, -1]
image_from_tuple_slice = image_from_tuple[0, -3:, -3:, -1]
assert image.shape == (1, 64, 64, 3)
expected_slice = np.array(
[
0.9997,
0.9988,
0.0028,
0.9997,
0.9984,
0.9965,
0.0029,
0.9986,
0.0025,
]
)
assert np.abs(image_slice.flatten() - expected_slice).max() < 1e-2
assert np.abs(image_from_tuple_slice.flatten() - expected_slice).max() < 1e-2
def test_unclip_passed_text_embed(self):
device = torch.device("cpu")
class DummyScheduler:
init_noise_sigma = 1
components = self.get_dummy_components()
pipe = self.pipeline_class(**components)
pipe = pipe.to(device)
prior = components["prior"]
decoder = components["decoder"]
super_res_first = components["super_res_first"]
tokenizer = components["tokenizer"]
text_encoder = components["text_encoder"]
generator = torch.Generator(device=device).manual_seed(0)
dtype = prior.dtype
batch_size = 1
shape = (batch_size, prior.config.embedding_dim)
prior_latents = pipe.prepare_latents(
shape, dtype=dtype, device=device, generator=generator, latents=None, scheduler=DummyScheduler()
)
shape = (batch_size, decoder.config.in_channels, decoder.config.sample_size, decoder.config.sample_size)
generator = torch.Generator(device=device).manual_seed(0)
decoder_latents = pipe.prepare_latents(
shape, dtype=dtype, device=device, generator=generator, latents=None, scheduler=DummyScheduler()
)
shape = (
batch_size,
super_res_first.config.in_channels // 2,
super_res_first.config.sample_size,
super_res_first.config.sample_size,
)
super_res_latents = pipe.prepare_latents(
shape, dtype=dtype, device=device, generator=generator, latents=None, scheduler=DummyScheduler()
)
pipe.set_progress_bar_config(disable=None)
prompt = "this is a prompt example"
generator = torch.Generator(device=device).manual_seed(0)
output = pipe(
[prompt],
generator=generator,
prior_num_inference_steps=2,
decoder_num_inference_steps=2,
super_res_num_inference_steps=2,
prior_latents=prior_latents,
decoder_latents=decoder_latents,
super_res_latents=super_res_latents,
output_type="np",
)
image = output.images
text_inputs = tokenizer(
prompt,
padding="max_length",
max_length=tokenizer.model_max_length,
return_tensors="pt",
)
text_model_output = text_encoder(text_inputs.input_ids)
text_attention_mask = text_inputs.attention_mask
generator = torch.Generator(device=device).manual_seed(0)
image_from_text = pipe(
generator=generator,
prior_num_inference_steps=2,
decoder_num_inference_steps=2,
super_res_num_inference_steps=2,
prior_latents=prior_latents,
decoder_latents=decoder_latents,
super_res_latents=super_res_latents,
text_model_output=text_model_output,
text_attention_mask=text_attention_mask,
output_type="np",
)[0]
# make sure passing text embeddings manually is identical
assert np.abs(image - image_from_text).max() < 1e-4
# Overriding PipelineTesterMixin::test_attention_slicing_forward_pass
# because UnCLIP GPU undeterminism requires a looser check.
@skip_mps
def test_attention_slicing_forward_pass(self):
test_max_difference = torch_device == "cpu"
self._test_attention_slicing_forward_pass(test_max_difference=test_max_difference, expected_max_diff=0.01)
# Overriding PipelineTesterMixin::test_inference_batch_single_identical
# because UnCLIP undeterminism requires a looser check.
@skip_mps
def test_inference_batch_single_identical(self):
additional_params_copy_to_batched_inputs = [
"prior_num_inference_steps",
"decoder_num_inference_steps",
"super_res_num_inference_steps",
]
self._test_inference_batch_single_identical(
additional_params_copy_to_batched_inputs=additional_params_copy_to_batched_inputs, expected_max_diff=9.8e-3
)
def test_inference_batch_consistent(self):
additional_params_copy_to_batched_inputs = [
"prior_num_inference_steps",
"decoder_num_inference_steps",
"super_res_num_inference_steps",
]
if torch_device == "mps":
# TODO: MPS errors with larger batch sizes
batch_sizes = [2, 3]
self._test_inference_batch_consistent(
batch_sizes=batch_sizes,
additional_params_copy_to_batched_inputs=additional_params_copy_to_batched_inputs,
)
else:
self._test_inference_batch_consistent(
additional_params_copy_to_batched_inputs=additional_params_copy_to_batched_inputs
)
@skip_mps
def test_dict_tuple_outputs_equivalent(self):
return super().test_dict_tuple_outputs_equivalent()
@skip_mps
def test_save_load_local(self):
return super().test_save_load_local(expected_max_difference=5e-3)
@skip_mps
def test_save_load_optional_components(self):
return super().test_save_load_optional_components()
@unittest.skip("UnCLIP produces very large differences in fp16 vs fp32. Test is not useful.")
def test_float16_inference(self):
super().test_float16_inference(expected_max_diff=1.0)
@nightly
class UnCLIPPipelineCPUIntegrationTests(unittest.TestCase):
def setUp(self):
# clean up the VRAM before each test
super().setUp()
gc.collect()
backend_empty_cache(torch_device)
def tearDown(self):
# clean up the VRAM after each test
super().tearDown()
gc.collect()
backend_empty_cache(torch_device)
def test_unclip_karlo_cpu_fp32(self):
expected_image = load_numpy(
"https://huggingface.co/datasets/hf-internal-testing/diffusers-images/resolve/main"
"/unclip/karlo_v1_alpha_horse_cpu.npy"
)
pipeline = UnCLIPPipeline.from_pretrained("kakaobrain/karlo-v1-alpha")
pipeline.set_progress_bar_config(disable=None)
generator = torch.manual_seed(0)
output = pipeline(
"horse",
num_images_per_prompt=1,
generator=generator,
output_type="np",
)
image = output.images[0]
assert image.shape == (256, 256, 3)
assert np.abs(expected_image - image).max() < 1e-1
@nightly
@require_torch_accelerator
class UnCLIPPipelineIntegrationTests(unittest.TestCase):
def setUp(self):
# clean up the VRAM before each test
super().setUp()
gc.collect()
backend_empty_cache(torch_device)
def tearDown(self):
# clean up the VRAM after each test
super().tearDown()
gc.collect()
backend_empty_cache(torch_device)
def test_unclip_karlo(self):
expected_image = load_numpy(
"https://huggingface.co/datasets/hf-internal-testing/diffusers-images/resolve/main"
"/unclip/karlo_v1_alpha_horse_fp16.npy"
)
pipeline = UnCLIPPipeline.from_pretrained("kakaobrain/karlo-v1-alpha", torch_dtype=torch.float16)
pipeline = pipeline.to(torch_device)
pipeline.set_progress_bar_config(disable=None)
generator = torch.Generator(device="cpu").manual_seed(0)
output = pipeline(
"horse",
generator=generator,
output_type="np",
)
image = output.images[0]
assert image.shape == (256, 256, 3)
assert_mean_pixel_difference(image, expected_image)
def test_unclip_pipeline_with_sequential_cpu_offloading(self):
backend_empty_cache(torch_device)
backend_reset_max_memory_allocated(torch_device)
backend_reset_peak_memory_stats(torch_device)
pipe = UnCLIPPipeline.from_pretrained("kakaobrain/karlo-v1-alpha", torch_dtype=torch.float16)
pipe.set_progress_bar_config(disable=None)
pipe.enable_attention_slicing()
pipe.enable_sequential_cpu_offload()
_ = pipe(
"horse",
num_images_per_prompt=1,
prior_num_inference_steps=2,
decoder_num_inference_steps=2,
super_res_num_inference_steps=2,
output_type="np",
)
mem_bytes = backend_max_memory_allocated(torch_device)
# make sure that less than 7 GB is allocated
assert mem_bytes < 7 * 10**9
# coding=utf-8
# Copyright 2025 HuggingFace Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import gc
import random
import unittest
import numpy as np
import torch
from transformers import (
CLIPImageProcessor,
CLIPTextConfig,
CLIPTextModelWithProjection,
CLIPTokenizer,
CLIPVisionConfig,
CLIPVisionModelWithProjection,
)
from diffusers import (
DiffusionPipeline,
UnCLIPImageVariationPipeline,
UnCLIPScheduler,
UNet2DConditionModel,
UNet2DModel,
)
from diffusers.pipelines.unclip.text_proj import UnCLIPTextProjModel
from diffusers.utils.testing_utils import (
backend_empty_cache,
enable_full_determinism,
floats_tensor,
load_image,
load_numpy,
nightly,
require_torch_accelerator,
skip_mps,
torch_device,
)
from ..pipeline_params import IMAGE_VARIATION_BATCH_PARAMS, IMAGE_VARIATION_PARAMS
from ..test_pipelines_common import PipelineTesterMixin, assert_mean_pixel_difference
enable_full_determinism()
class UnCLIPImageVariationPipelineFastTests(PipelineTesterMixin, unittest.TestCase):
pipeline_class = UnCLIPImageVariationPipeline
params = IMAGE_VARIATION_PARAMS - {"height", "width", "guidance_scale"}
batch_params = IMAGE_VARIATION_BATCH_PARAMS
required_optional_params = [
"generator",
"return_dict",
"decoder_num_inference_steps",
"super_res_num_inference_steps",
]
test_xformers_attention = False
supports_dduf = False
@property
def text_embedder_hidden_size(self):
return 32
@property
def time_input_dim(self):
return 32
@property
def block_out_channels_0(self):
return self.time_input_dim
@property
def time_embed_dim(self):
return self.time_input_dim * 4
@property
def cross_attention_dim(self):
return 100
@property
def dummy_tokenizer(self):
tokenizer = CLIPTokenizer.from_pretrained("hf-internal-testing/tiny-random-clip")
return tokenizer
@property
def dummy_text_encoder(self):
torch.manual_seed(0)
config = CLIPTextConfig(
bos_token_id=0,
eos_token_id=2,
hidden_size=self.text_embedder_hidden_size,
projection_dim=self.text_embedder_hidden_size,
intermediate_size=37,
layer_norm_eps=1e-05,
num_attention_heads=4,
num_hidden_layers=5,
pad_token_id=1,
vocab_size=1000,
)
return CLIPTextModelWithProjection(config)
@property
def dummy_image_encoder(self):
torch.manual_seed(0)
config = CLIPVisionConfig(
hidden_size=self.text_embedder_hidden_size,
projection_dim=self.text_embedder_hidden_size,
num_hidden_layers=5,
num_attention_heads=4,
image_size=32,
intermediate_size=37,
patch_size=1,
)
return CLIPVisionModelWithProjection(config)
@property
def dummy_text_proj(self):
torch.manual_seed(0)
model_kwargs = {
"clip_embeddings_dim": self.text_embedder_hidden_size,
"time_embed_dim": self.time_embed_dim,
"cross_attention_dim": self.cross_attention_dim,
}
model = UnCLIPTextProjModel(**model_kwargs)
return model
@property
def dummy_decoder(self):
torch.manual_seed(0)
model_kwargs = {
"sample_size": 32,
# RGB in channels
"in_channels": 3,
# Out channels is double in channels because predicts mean and variance
"out_channels": 6,
"down_block_types": ("ResnetDownsampleBlock2D", "SimpleCrossAttnDownBlock2D"),
"up_block_types": ("SimpleCrossAttnUpBlock2D", "ResnetUpsampleBlock2D"),
"mid_block_type": "UNetMidBlock2DSimpleCrossAttn",
"block_out_channels": (self.block_out_channels_0, self.block_out_channels_0 * 2),
"layers_per_block": 1,
"cross_attention_dim": self.cross_attention_dim,
"attention_head_dim": 4,
"resnet_time_scale_shift": "scale_shift",
"class_embed_type": "identity",
}
model = UNet2DConditionModel(**model_kwargs)
return model
@property
def dummy_super_res_kwargs(self):
return {
"sample_size": 64,
"layers_per_block": 1,
"down_block_types": ("ResnetDownsampleBlock2D", "ResnetDownsampleBlock2D"),
"up_block_types": ("ResnetUpsampleBlock2D", "ResnetUpsampleBlock2D"),
"block_out_channels": (self.block_out_channels_0, self.block_out_channels_0 * 2),
"in_channels": 6,
"out_channels": 3,
}
@property
def dummy_super_res_first(self):
torch.manual_seed(0)
model = UNet2DModel(**self.dummy_super_res_kwargs)
return model
@property
def dummy_super_res_last(self):
# seeded differently to get different unet than `self.dummy_super_res_first`
torch.manual_seed(1)
model = UNet2DModel(**self.dummy_super_res_kwargs)
return model
def get_dummy_components(self):
decoder = self.dummy_decoder
text_proj = self.dummy_text_proj
text_encoder = self.dummy_text_encoder
tokenizer = self.dummy_tokenizer
super_res_first = self.dummy_super_res_first
super_res_last = self.dummy_super_res_last
decoder_scheduler = UnCLIPScheduler(
variance_type="learned_range",
prediction_type="epsilon",
num_train_timesteps=1000,
)
super_res_scheduler = UnCLIPScheduler(
variance_type="fixed_small_log",
prediction_type="epsilon",
num_train_timesteps=1000,
)
feature_extractor = CLIPImageProcessor(crop_size=32, size=32)
image_encoder = self.dummy_image_encoder
return {
"decoder": decoder,
"text_encoder": text_encoder,
"tokenizer": tokenizer,
"text_proj": text_proj,
"feature_extractor": feature_extractor,
"image_encoder": image_encoder,
"super_res_first": super_res_first,
"super_res_last": super_res_last,
"decoder_scheduler": decoder_scheduler,
"super_res_scheduler": super_res_scheduler,
}
def get_dummy_inputs(self, device, seed=0, pil_image=True):
input_image = floats_tensor((1, 3, 32, 32), rng=random.Random(seed)).to(device)
if str(device).startswith("mps"):
generator = torch.manual_seed(seed)
else:
generator = torch.Generator(device=device).manual_seed(seed)
if pil_image:
input_image = input_image * 0.5 + 0.5
input_image = input_image.clamp(0, 1)
input_image = input_image.cpu().permute(0, 2, 3, 1).float().numpy()
input_image = DiffusionPipeline.numpy_to_pil(input_image)[0]
return {
"image": input_image,
"generator": generator,
"decoder_num_inference_steps": 2,
"super_res_num_inference_steps": 2,
"output_type": "np",
}
def test_unclip_image_variation_input_tensor(self):
device = "cpu"
components = self.get_dummy_components()
pipe = self.pipeline_class(**components)
pipe = pipe.to(device)
pipe.set_progress_bar_config(disable=None)
pipeline_inputs = self.get_dummy_inputs(device, pil_image=False)
output = pipe(**pipeline_inputs)
image = output.images
tuple_pipeline_inputs = self.get_dummy_inputs(device, pil_image=False)
image_from_tuple = pipe(
**tuple_pipeline_inputs,
return_dict=False,
)[0]
image_slice = image[0, -3:, -3:, -1]
image_from_tuple_slice = image_from_tuple[0, -3:, -3:, -1]
assert image.shape == (1, 64, 64, 3)
expected_slice = np.array(
[
0.9997,
0.0002,
0.9997,
0.9997,
0.9969,
0.0023,
0.9997,
0.9969,
0.9970,
]
)
assert np.abs(image_slice.flatten() - expected_slice).max() < 1e-2
assert np.abs(image_from_tuple_slice.flatten() - expected_slice).max() < 1e-2
def test_unclip_image_variation_input_image(self):
device = "cpu"
components = self.get_dummy_components()
pipe = self.pipeline_class(**components)
pipe = pipe.to(device)
pipe.set_progress_bar_config(disable=None)
pipeline_inputs = self.get_dummy_inputs(device, pil_image=True)
output = pipe(**pipeline_inputs)
image = output.images
tuple_pipeline_inputs = self.get_dummy_inputs(device, pil_image=True)
image_from_tuple = pipe(
**tuple_pipeline_inputs,
return_dict=False,
)[0]
image_slice = image[0, -3:, -3:, -1]
image_from_tuple_slice = image_from_tuple[0, -3:, -3:, -1]
assert image.shape == (1, 64, 64, 3)
expected_slice = np.array([0.9997, 0.0003, 0.9997, 0.9997, 0.9970, 0.0024, 0.9997, 0.9971, 0.9971])
assert np.abs(image_slice.flatten() - expected_slice).max() < 1e-2
assert np.abs(image_from_tuple_slice.flatten() - expected_slice).max() < 1e-2
def test_unclip_image_variation_input_list_images(self):
device = "cpu"
components = self.get_dummy_components()
pipe = self.pipeline_class(**components)
pipe = pipe.to(device)
pipe.set_progress_bar_config(disable=None)
pipeline_inputs = self.get_dummy_inputs(device, pil_image=True)
pipeline_inputs["image"] = [
pipeline_inputs["image"],
pipeline_inputs["image"],
]
output = pipe(**pipeline_inputs)
image = output.images
tuple_pipeline_inputs = self.get_dummy_inputs(device, pil_image=True)
tuple_pipeline_inputs["image"] = [
tuple_pipeline_inputs["image"],
tuple_pipeline_inputs["image"],
]
image_from_tuple = pipe(
**tuple_pipeline_inputs,
return_dict=False,
)[0]
image_slice = image[0, -3:, -3:, -1]
image_from_tuple_slice = image_from_tuple[0, -3:, -3:, -1]
assert image.shape == (2, 64, 64, 3)
expected_slice = np.array(
[
0.9997,
0.9989,
0.0008,
0.0021,
0.9960,
0.0018,
0.0014,
0.0002,
0.9933,
]
)
assert np.abs(image_slice.flatten() - expected_slice).max() < 1e-2
assert np.abs(image_from_tuple_slice.flatten() - expected_slice).max() < 1e-2
def test_unclip_passed_image_embed(self):
device = torch.device("cpu")
class DummyScheduler:
init_noise_sigma = 1
components = self.get_dummy_components()
pipe = self.pipeline_class(**components)
pipe = pipe.to(device)
pipe.set_progress_bar_config(disable=None)
generator = torch.Generator(device=device).manual_seed(0)
dtype = pipe.decoder.dtype
batch_size = 1
shape = (
batch_size,
pipe.decoder.config.in_channels,
pipe.decoder.config.sample_size,
pipe.decoder.config.sample_size,
)
decoder_latents = pipe.prepare_latents(
shape, dtype=dtype, device=device, generator=generator, latents=None, scheduler=DummyScheduler()
)
shape = (
batch_size,
pipe.super_res_first.config.in_channels // 2,
pipe.super_res_first.config.sample_size,
pipe.super_res_first.config.sample_size,
)
generator = torch.Generator(device=device).manual_seed(0)
super_res_latents = pipe.prepare_latents(
shape, dtype=dtype, device=device, generator=generator, latents=None, scheduler=DummyScheduler()
)
pipeline_inputs = self.get_dummy_inputs(device, pil_image=False)
img_out_1 = pipe(
**pipeline_inputs, decoder_latents=decoder_latents, super_res_latents=super_res_latents
).images
pipeline_inputs = self.get_dummy_inputs(device, pil_image=False)
# Don't pass image, instead pass embedding
image = pipeline_inputs.pop("image")
image_embeddings = pipe.image_encoder(image).image_embeds
img_out_2 = pipe(
**pipeline_inputs,
decoder_latents=decoder_latents,
super_res_latents=super_res_latents,
image_embeddings=image_embeddings,
).images
# make sure passing text embeddings manually is identical
assert np.abs(img_out_1 - img_out_2).max() < 1e-4
# Overriding PipelineTesterMixin::test_attention_slicing_forward_pass
# because UnCLIP GPU undeterminism requires a looser check.
@skip_mps
def test_attention_slicing_forward_pass(self):
test_max_difference = torch_device == "cpu"
# Check is relaxed because there is not a torch 2.0 sliced attention added kv processor
expected_max_diff = 1e-2
self._test_attention_slicing_forward_pass(
test_max_difference=test_max_difference, expected_max_diff=expected_max_diff
)
# Overriding PipelineTesterMixin::test_inference_batch_single_identical
# because UnCLIP undeterminism requires a looser check.
@unittest.skip("UnCLIP produces very large differences. Test is not useful.")
@skip_mps
def test_inference_batch_single_identical(self):
additional_params_copy_to_batched_inputs = [
"decoder_num_inference_steps",
"super_res_num_inference_steps",
]
self._test_inference_batch_single_identical(
additional_params_copy_to_batched_inputs=additional_params_copy_to_batched_inputs, expected_max_diff=5e-3
)
def test_inference_batch_consistent(self):
additional_params_copy_to_batched_inputs = [
"decoder_num_inference_steps",
"super_res_num_inference_steps",
]
if torch_device == "mps":
# TODO: MPS errors with larger batch sizes
batch_sizes = [2, 3]
self._test_inference_batch_consistent(
batch_sizes=batch_sizes,
additional_params_copy_to_batched_inputs=additional_params_copy_to_batched_inputs,
)
else:
self._test_inference_batch_consistent(
additional_params_copy_to_batched_inputs=additional_params_copy_to_batched_inputs
)
@skip_mps
def test_dict_tuple_outputs_equivalent(self):
return super().test_dict_tuple_outputs_equivalent()
@unittest.skip("UnCLIP produces very large difference. Test is not useful.")
@skip_mps
def test_save_load_local(self):
return super().test_save_load_local(expected_max_difference=4e-3)
@skip_mps
def test_save_load_optional_components(self):
return super().test_save_load_optional_components()
@unittest.skip("UnCLIP produces very large difference in fp16 vs fp32. Test is not useful.")
def test_float16_inference(self):
super().test_float16_inference(expected_max_diff=1.0)
@nightly
@require_torch_accelerator
class UnCLIPImageVariationPipelineIntegrationTests(unittest.TestCase):
def setUp(self):
# clean up the VRAM before each test
super().setUp()
gc.collect()
backend_empty_cache(torch_device)
def tearDown(self):
# clean up the VRAM after each test
super().tearDown()
gc.collect()
backend_empty_cache(torch_device)
def test_unclip_image_variation_karlo(self):
input_image = load_image(
"https://huggingface.co/datasets/hf-internal-testing/diffusers-images/resolve/main/unclip/cat.png"
)
expected_image = load_numpy(
"https://huggingface.co/datasets/hf-internal-testing/diffusers-images/resolve/main"
"/unclip/karlo_v1_alpha_cat_variation_fp16.npy"
)
pipeline = UnCLIPImageVariationPipeline.from_pretrained(
"kakaobrain/karlo-v1-alpha-image-variations", torch_dtype=torch.float16
)
pipeline = pipeline.to(torch_device)
pipeline.set_progress_bar_config(disable=None)
generator = torch.Generator(device="cpu").manual_seed(0)
output = pipeline(
input_image,
generator=generator,
output_type="np",
)
image = output.images[0]
assert image.shape == (256, 256, 3)
assert_mean_pixel_difference(image, expected_image, 15)
import gc
import random
import unittest
import numpy as np
import torch
from PIL import Image
from transformers import (
CLIPImageProcessor,
CLIPTextModel,
CLIPTokenizer,
CLIPVisionModelWithProjection,
GPT2Tokenizer,
)
from diffusers import (
AutoencoderKL,
DPMSolverMultistepScheduler,
UniDiffuserModel,
UniDiffuserPipeline,
UniDiffuserTextDecoder,
)
from diffusers.utils.testing_utils import (
backend_empty_cache,
enable_full_determinism,
floats_tensor,
load_image,
nightly,
require_torch_accelerator,
torch_device,
)
from diffusers.utils.torch_utils import randn_tensor
from ..pipeline_params import (
IMAGE_TO_IMAGE_IMAGE_PARAMS,
TEXT_GUIDED_IMAGE_VARIATION_BATCH_PARAMS,
TEXT_GUIDED_IMAGE_VARIATION_PARAMS,
)
from ..test_pipelines_common import PipelineKarrasSchedulerTesterMixin, PipelineLatentTesterMixin, PipelineTesterMixin
enable_full_determinism()
class UniDiffuserPipelineFastTests(
PipelineTesterMixin, PipelineLatentTesterMixin, PipelineKarrasSchedulerTesterMixin, unittest.TestCase
):
pipeline_class = UniDiffuserPipeline
params = TEXT_GUIDED_IMAGE_VARIATION_PARAMS
batch_params = TEXT_GUIDED_IMAGE_VARIATION_BATCH_PARAMS
image_params = IMAGE_TO_IMAGE_IMAGE_PARAMS
# vae_latents, not latents, is the argument that corresponds to VAE latent inputs
image_latents_params = frozenset(["vae_latents"])
supports_dduf = False
def get_dummy_components(self):
unet = UniDiffuserModel.from_pretrained(
"hf-internal-testing/unidiffuser-diffusers-test",
subfolder="unet",
)
scheduler = DPMSolverMultistepScheduler(
beta_start=0.00085,
beta_end=0.012,
beta_schedule="scaled_linear",
solver_order=3,
)
vae = AutoencoderKL.from_pretrained(
"hf-internal-testing/unidiffuser-diffusers-test",
subfolder="vae",
)
text_encoder = CLIPTextModel.from_pretrained(
"hf-internal-testing/unidiffuser-diffusers-test",
subfolder="text_encoder",
)
clip_tokenizer = CLIPTokenizer.from_pretrained(
"hf-internal-testing/unidiffuser-diffusers-test",
subfolder="clip_tokenizer",
)
image_encoder = CLIPVisionModelWithProjection.from_pretrained(
"hf-internal-testing/unidiffuser-diffusers-test",
subfolder="image_encoder",
)
# From the Stable Diffusion Image Variation pipeline tests
clip_image_processor = CLIPImageProcessor(crop_size=32, size=32)
# image_processor = CLIPImageProcessor.from_pretrained("hf-internal-testing/tiny-random-clip")
text_tokenizer = GPT2Tokenizer.from_pretrained(
"hf-internal-testing/unidiffuser-diffusers-test",
subfolder="text_tokenizer",
)
text_decoder = UniDiffuserTextDecoder.from_pretrained(
"hf-internal-testing/unidiffuser-diffusers-test",
subfolder="text_decoder",
)
components = {
"vae": vae,
"text_encoder": text_encoder,
"image_encoder": image_encoder,
"clip_image_processor": clip_image_processor,
"clip_tokenizer": clip_tokenizer,
"text_decoder": text_decoder,
"text_tokenizer": text_tokenizer,
"unet": unet,
"scheduler": scheduler,
}
return components
def get_dummy_inputs(self, device, seed=0):
image = floats_tensor((1, 3, 32, 32), rng=random.Random(seed)).to(device)
image = image.cpu().permute(0, 2, 3, 1)[0]
image = Image.fromarray(np.uint8(image)).convert("RGB")
if str(device).startswith("mps"):
generator = torch.manual_seed(seed)
else:
generator = torch.Generator(device=device).manual_seed(seed)
inputs = {
"prompt": "an elephant under the sea",
"image": image,
"generator": generator,
"num_inference_steps": 2,
"guidance_scale": 6.0,
"output_type": "np",
}
return inputs
def get_fixed_latents(self, device, seed=0):
if isinstance(device, str):
device = torch.device(device)
generator = torch.Generator(device=device).manual_seed(seed)
# Hardcode the shapes for now.
prompt_latents = randn_tensor((1, 77, 32), generator=generator, device=device, dtype=torch.float32)
vae_latents = randn_tensor((1, 4, 16, 16), generator=generator, device=device, dtype=torch.float32)
clip_latents = randn_tensor((1, 1, 32), generator=generator, device=device, dtype=torch.float32)
latents = {
"prompt_latents": prompt_latents,
"vae_latents": vae_latents,
"clip_latents": clip_latents,
}
return latents
def get_dummy_inputs_with_latents(self, device, seed=0):
# image = floats_tensor((1, 3, 32, 32), rng=random.Random(seed)).to(device)
# image = image.cpu().permute(0, 2, 3, 1)[0]
# image = Image.fromarray(np.uint8(image)).convert("RGB")
image = load_image(
"https://huggingface.co/datasets/hf-internal-testing/diffusers-images/resolve/main/unidiffuser/unidiffuser_example_image.jpg",
)
image = image.resize((32, 32))
latents = self.get_fixed_latents(device, seed=seed)
if str(device).startswith("mps"):
generator = torch.manual_seed(seed)
else:
generator = torch.Generator(device=device).manual_seed(seed)
inputs = {
"prompt": "an elephant under the sea",
"image": image,
"generator": generator,
"num_inference_steps": 2,
"guidance_scale": 6.0,
"output_type": "np",
"prompt_latents": latents.get("prompt_latents"),
"vae_latents": latents.get("vae_latents"),
"clip_latents": latents.get("clip_latents"),
}
return inputs
def test_dict_tuple_outputs_equivalent(self):
expected_slice = None
if torch_device == "cpu":
expected_slice = np.array([0.7489, 0.3722, 0.4475, 0.5630, 0.5923, 0.4992, 0.3936, 0.5844, 0.4975])
super().test_dict_tuple_outputs_equivalent(expected_slice=expected_slice)
def test_unidiffuser_default_joint_v0(self):
device = "cpu" # ensure determinism for the device-dependent torch.Generator
components = self.get_dummy_components()
unidiffuser_pipe = UniDiffuserPipeline(**components)
unidiffuser_pipe = unidiffuser_pipe.to(device)
unidiffuser_pipe.set_progress_bar_config(disable=None)
# Set mode to 'joint'
unidiffuser_pipe.set_joint_mode()
assert unidiffuser_pipe.mode == "joint"
# inputs = self.get_dummy_inputs(device)
inputs = self.get_dummy_inputs_with_latents(device)
# Delete prompt and image for joint inference.
del inputs["prompt"]
del inputs["image"]
sample = unidiffuser_pipe(**inputs)
image = sample.images
text = sample.text
assert image.shape == (1, 32, 32, 3)
image_slice = image[0, -3:, -3:, -1]
expected_img_slice = np.array([0.5760, 0.6270, 0.6571, 0.4965, 0.4638, 0.5663, 0.5254, 0.5068, 0.5716])
assert np.abs(image_slice.flatten() - expected_img_slice).max() < 1e-3
expected_text_prefix = " no no no "
assert text[0][:10] == expected_text_prefix
def test_unidiffuser_default_joint_no_cfg_v0(self):
device = "cpu" # ensure determinism for the device-dependent torch.Generator
components = self.get_dummy_components()
unidiffuser_pipe = UniDiffuserPipeline(**components)
unidiffuser_pipe = unidiffuser_pipe.to(device)
unidiffuser_pipe.set_progress_bar_config(disable=None)
# Set mode to 'joint'
unidiffuser_pipe.set_joint_mode()
assert unidiffuser_pipe.mode == "joint"
# inputs = self.get_dummy_inputs(device)
inputs = self.get_dummy_inputs_with_latents(device)
# Delete prompt and image for joint inference.
del inputs["prompt"]
del inputs["image"]
# Set guidance scale to 1.0 to turn off CFG
inputs["guidance_scale"] = 1.0
sample = unidiffuser_pipe(**inputs)
image = sample.images
text = sample.text
assert image.shape == (1, 32, 32, 3)
image_slice = image[0, -3:, -3:, -1]
expected_img_slice = np.array([0.5760, 0.6270, 0.6571, 0.4965, 0.4638, 0.5663, 0.5254, 0.5068, 0.5716])
assert np.abs(image_slice.flatten() - expected_img_slice).max() < 1e-3
expected_text_prefix = " no no no "
assert text[0][:10] == expected_text_prefix
def test_unidiffuser_default_text2img_v0(self):
device = "cpu" # ensure determinism for the device-dependent torch.Generator
components = self.get_dummy_components()
unidiffuser_pipe = UniDiffuserPipeline(**components)
unidiffuser_pipe = unidiffuser_pipe.to(device)
unidiffuser_pipe.set_progress_bar_config(disable=None)
# Set mode to 'text2img'
unidiffuser_pipe.set_text_to_image_mode()
assert unidiffuser_pipe.mode == "text2img"
inputs = self.get_dummy_inputs_with_latents(device)
# Delete image for text-conditioned image generation
del inputs["image"]
image = unidiffuser_pipe(**inputs).images
assert image.shape == (1, 32, 32, 3)
image_slice = image[0, -3:, -3:, -1]
expected_slice = np.array([0.5758, 0.6269, 0.6570, 0.4967, 0.4639, 0.5664, 0.5257, 0.5067, 0.5715])
assert np.abs(image_slice.flatten() - expected_slice).max() < 1e-3
def test_unidiffuser_default_image_0(self):
device = "cpu" # ensure determinism for the device-dependent torch.Generator
components = self.get_dummy_components()
unidiffuser_pipe = UniDiffuserPipeline(**components)
unidiffuser_pipe = unidiffuser_pipe.to(device)
unidiffuser_pipe.set_progress_bar_config(disable=None)
# Set mode to 'img'
unidiffuser_pipe.set_image_mode()
assert unidiffuser_pipe.mode == "img"
inputs = self.get_dummy_inputs(device)
# Delete prompt and image for unconditional ("marginal") text generation.
del inputs["prompt"]
del inputs["image"]
image = unidiffuser_pipe(**inputs).images
assert image.shape == (1, 32, 32, 3)
image_slice = image[0, -3:, -3:, -1]
expected_slice = np.array([0.5760, 0.6270, 0.6571, 0.4966, 0.4638, 0.5663, 0.5254, 0.5068, 0.5715])
assert np.abs(image_slice.flatten() - expected_slice).max() < 1e-3
def test_unidiffuser_default_text_v0(self):
device = "cpu" # ensure determinism for the device-dependent torch.Generator
components = self.get_dummy_components()
unidiffuser_pipe = UniDiffuserPipeline(**components)
unidiffuser_pipe = unidiffuser_pipe.to(device)
unidiffuser_pipe.set_progress_bar_config(disable=None)
# Set mode to 'img'
unidiffuser_pipe.set_text_mode()
assert unidiffuser_pipe.mode == "text"
inputs = self.get_dummy_inputs(device)
# Delete prompt and image for unconditional ("marginal") text generation.
del inputs["prompt"]
del inputs["image"]
text = unidiffuser_pipe(**inputs).text
expected_text_prefix = " no no no "
assert text[0][:10] == expected_text_prefix
def test_unidiffuser_default_img2text_v0(self):
device = "cpu" # ensure determinism for the device-dependent torch.Generator
components = self.get_dummy_components()
unidiffuser_pipe = UniDiffuserPipeline(**components)
unidiffuser_pipe = unidiffuser_pipe.to(device)
unidiffuser_pipe.set_progress_bar_config(disable=None)
# Set mode to 'img2text'
unidiffuser_pipe.set_image_to_text_mode()
assert unidiffuser_pipe.mode == "img2text"
inputs = self.get_dummy_inputs_with_latents(device)
# Delete text for image-conditioned text generation
del inputs["prompt"]
text = unidiffuser_pipe(**inputs).text
expected_text_prefix = " no no no "
assert text[0][:10] == expected_text_prefix
def test_unidiffuser_default_joint_v1(self):
device = "cpu" # ensure determinism for the device-dependent torch.Generator
unidiffuser_pipe = UniDiffuserPipeline.from_pretrained("hf-internal-testing/unidiffuser-test-v1")
unidiffuser_pipe = unidiffuser_pipe.to(device)
unidiffuser_pipe.set_progress_bar_config(disable=None)
# Set mode to 'joint'
unidiffuser_pipe.set_joint_mode()
assert unidiffuser_pipe.mode == "joint"
# inputs = self.get_dummy_inputs(device)
inputs = self.get_dummy_inputs_with_latents(device)
# Delete prompt and image for joint inference.
del inputs["prompt"]
del inputs["image"]
inputs["data_type"] = 1
sample = unidiffuser_pipe(**inputs)
image = sample.images
text = sample.text
assert image.shape == (1, 32, 32, 3)
image_slice = image[0, -3:, -3:, -1]
expected_img_slice = np.array([0.5760, 0.6270, 0.6571, 0.4965, 0.4638, 0.5663, 0.5254, 0.5068, 0.5716])
assert np.abs(image_slice.flatten() - expected_img_slice).max() < 1e-3
expected_text_prefix = " no no no "
assert text[0][:10] == expected_text_prefix
def test_unidiffuser_default_text2img_v1(self):
device = "cpu" # ensure determinism for the device-dependent torch.Generator
unidiffuser_pipe = UniDiffuserPipeline.from_pretrained("hf-internal-testing/unidiffuser-test-v1")
unidiffuser_pipe = unidiffuser_pipe.to(device)
unidiffuser_pipe.set_progress_bar_config(disable=None)
# Set mode to 'text2img'
unidiffuser_pipe.set_text_to_image_mode()
assert unidiffuser_pipe.mode == "text2img"
inputs = self.get_dummy_inputs_with_latents(device)
# Delete image for text-conditioned image generation
del inputs["image"]
image = unidiffuser_pipe(**inputs).images
assert image.shape == (1, 32, 32, 3)
image_slice = image[0, -3:, -3:, -1]
expected_slice = np.array([0.5758, 0.6269, 0.6570, 0.4967, 0.4639, 0.5664, 0.5257, 0.5067, 0.5715])
assert np.abs(image_slice.flatten() - expected_slice).max() < 1e-3
def test_unidiffuser_default_img2text_v1(self):
device = "cpu" # ensure determinism for the device-dependent torch.Generator
unidiffuser_pipe = UniDiffuserPipeline.from_pretrained("hf-internal-testing/unidiffuser-test-v1")
unidiffuser_pipe = unidiffuser_pipe.to(device)
unidiffuser_pipe.set_progress_bar_config(disable=None)
# Set mode to 'img2text'
unidiffuser_pipe.set_image_to_text_mode()
assert unidiffuser_pipe.mode == "img2text"
inputs = self.get_dummy_inputs_with_latents(device)
# Delete text for image-conditioned text generation
del inputs["prompt"]
text = unidiffuser_pipe(**inputs).text
expected_text_prefix = " no no no "
assert text[0][:10] == expected_text_prefix
def test_unidiffuser_text2img_multiple_images(self):
device = "cpu" # ensure determinism for the device-dependent torch.Generator
components = self.get_dummy_components()
unidiffuser_pipe = UniDiffuserPipeline(**components)
unidiffuser_pipe = unidiffuser_pipe.to(device)
unidiffuser_pipe.set_progress_bar_config(disable=None)
# Set mode to 'text2img'
unidiffuser_pipe.set_text_to_image_mode()
assert unidiffuser_pipe.mode == "text2img"
inputs = self.get_dummy_inputs(device)
# Delete image for text-conditioned image generation
del inputs["image"]
inputs["num_images_per_prompt"] = 2
inputs["num_prompts_per_image"] = 3
image = unidiffuser_pipe(**inputs).images
assert image.shape == (2, 32, 32, 3)
def test_unidiffuser_img2text_multiple_prompts(self):
device = "cpu" # ensure determinism for the device-dependent torch.Generator
components = self.get_dummy_components()
unidiffuser_pipe = UniDiffuserPipeline(**components)
unidiffuser_pipe = unidiffuser_pipe.to(device)
unidiffuser_pipe.set_progress_bar_config(disable=None)
# Set mode to 'img2text'
unidiffuser_pipe.set_image_to_text_mode()
assert unidiffuser_pipe.mode == "img2text"
inputs = self.get_dummy_inputs(device)
# Delete text for image-conditioned text generation
del inputs["prompt"]
inputs["num_images_per_prompt"] = 2
inputs["num_prompts_per_image"] = 3
text = unidiffuser_pipe(**inputs).text
assert len(text) == 3
def test_unidiffuser_text2img_multiple_images_with_latents(self):
device = "cpu" # ensure determinism for the device-dependent torch.Generator
components = self.get_dummy_components()
unidiffuser_pipe = UniDiffuserPipeline(**components)
unidiffuser_pipe = unidiffuser_pipe.to(device)
unidiffuser_pipe.set_progress_bar_config(disable=None)
# Set mode to 'text2img'
unidiffuser_pipe.set_text_to_image_mode()
assert unidiffuser_pipe.mode == "text2img"
inputs = self.get_dummy_inputs_with_latents(device)
# Delete image for text-conditioned image generation
del inputs["image"]
inputs["num_images_per_prompt"] = 2
inputs["num_prompts_per_image"] = 3
image = unidiffuser_pipe(**inputs).images
assert image.shape == (2, 32, 32, 3)
def test_unidiffuser_img2text_multiple_prompts_with_latents(self):
device = "cpu" # ensure determinism for the device-dependent torch.Generator
components = self.get_dummy_components()
unidiffuser_pipe = UniDiffuserPipeline(**components)
unidiffuser_pipe = unidiffuser_pipe.to(device)
unidiffuser_pipe.set_progress_bar_config(disable=None)
# Set mode to 'img2text'
unidiffuser_pipe.set_image_to_text_mode()
assert unidiffuser_pipe.mode == "img2text"
inputs = self.get_dummy_inputs_with_latents(device)
# Delete text for image-conditioned text generation
del inputs["prompt"]
inputs["num_images_per_prompt"] = 2
inputs["num_prompts_per_image"] = 3
text = unidiffuser_pipe(**inputs).text
assert len(text) == 3
def test_inference_batch_single_identical(self):
super().test_inference_batch_single_identical(expected_max_diff=2e-4)
@require_torch_accelerator
def test_unidiffuser_default_joint_v1_fp16(self):
unidiffuser_pipe = UniDiffuserPipeline.from_pretrained(
"hf-internal-testing/unidiffuser-test-v1", torch_dtype=torch.float16
)
unidiffuser_pipe = unidiffuser_pipe.to(torch_device)
unidiffuser_pipe.set_progress_bar_config(disable=None)
# Set mode to 'joint'
unidiffuser_pipe.set_joint_mode()
assert unidiffuser_pipe.mode == "joint"
inputs = self.get_dummy_inputs_with_latents(torch_device)
# Delete prompt and image for joint inference.
del inputs["prompt"]
del inputs["image"]
inputs["data_type"] = 1
sample = unidiffuser_pipe(**inputs)
image = sample.images
text = sample.text
assert image.shape == (1, 32, 32, 3)
image_slice = image[0, -3:, -3:, -1]
expected_img_slice = np.array([0.5049, 0.5498, 0.5854, 0.3052, 0.4460, 0.6489, 0.5122, 0.4810, 0.6138])
assert np.abs(image_slice.flatten() - expected_img_slice).max() < 1e-3
expected_text_prefix = '" This This'
assert text[0][: len(expected_text_prefix)] == expected_text_prefix
@require_torch_accelerator
def test_unidiffuser_default_text2img_v1_fp16(self):
unidiffuser_pipe = UniDiffuserPipeline.from_pretrained(
"hf-internal-testing/unidiffuser-test-v1", torch_dtype=torch.float16
)
unidiffuser_pipe = unidiffuser_pipe.to(torch_device)
unidiffuser_pipe.set_progress_bar_config(disable=None)
# Set mode to 'text2img'
unidiffuser_pipe.set_text_to_image_mode()
assert unidiffuser_pipe.mode == "text2img"
inputs = self.get_dummy_inputs_with_latents(torch_device)
# Delete prompt and image for joint inference.
del inputs["image"]
inputs["data_type"] = 1
sample = unidiffuser_pipe(**inputs)
image = sample.images
assert image.shape == (1, 32, 32, 3)
image_slice = image[0, -3:, -3:, -1]
expected_img_slice = np.array([0.5054, 0.5498, 0.5854, 0.3052, 0.4458, 0.6489, 0.5122, 0.4810, 0.6138])
assert np.abs(image_slice.flatten() - expected_img_slice).max() < 1e-3
@require_torch_accelerator
def test_unidiffuser_default_img2text_v1_fp16(self):
unidiffuser_pipe = UniDiffuserPipeline.from_pretrained(
"hf-internal-testing/unidiffuser-test-v1", torch_dtype=torch.float16
)
unidiffuser_pipe = unidiffuser_pipe.to(torch_device)
unidiffuser_pipe.set_progress_bar_config(disable=None)
# Set mode to 'img2text'
unidiffuser_pipe.set_image_to_text_mode()
assert unidiffuser_pipe.mode == "img2text"
inputs = self.get_dummy_inputs_with_latents(torch_device)
# Delete prompt and image for joint inference.
del inputs["prompt"]
inputs["data_type"] = 1
text = unidiffuser_pipe(**inputs).text
expected_text_prefix = '" This This'
assert text[0][: len(expected_text_prefix)] == expected_text_prefix
@unittest.skip(
"Test not supported because it has a bunch of direct configs at init and also, this pipeline isn't used that much now."
)
def test_encode_prompt_works_in_isolation():
pass
@nightly
@require_torch_accelerator
class UniDiffuserPipelineSlowTests(unittest.TestCase):
def setUp(self):
super().setUp()
gc.collect()
backend_empty_cache(torch_device)
def tearDown(self):
super().tearDown()
gc.collect()
backend_empty_cache(torch_device)
def get_inputs(self, device, seed=0, generate_latents=False):
generator = torch.manual_seed(seed)
image = load_image(
"https://huggingface.co/datasets/hf-internal-testing/diffusers-images/resolve/main/unidiffuser/unidiffuser_example_image.jpg"
)
inputs = {
"prompt": "an elephant under the sea",
"image": image,
"generator": generator,
"num_inference_steps": 3,
"guidance_scale": 8.0,
"output_type": "np",
}
if generate_latents:
latents = self.get_fixed_latents(device, seed=seed)
for latent_name, latent_tensor in latents.items():
inputs[latent_name] = latent_tensor
return inputs
def get_fixed_latents(self, device, seed=0):
if isinstance(device, str):
device = torch.device(device)
latent_device = torch.device("cpu")
generator = torch.Generator(device=latent_device).manual_seed(seed)
# Hardcode the shapes for now.
prompt_latents = randn_tensor((1, 77, 768), generator=generator, device=device, dtype=torch.float32)
vae_latents = randn_tensor((1, 4, 64, 64), generator=generator, device=device, dtype=torch.float32)
clip_latents = randn_tensor((1, 1, 512), generator=generator, device=device, dtype=torch.float32)
# Move latents onto desired device.
prompt_latents = prompt_latents.to(device)
vae_latents = vae_latents.to(device)
clip_latents = clip_latents.to(device)
latents = {
"prompt_latents": prompt_latents,
"vae_latents": vae_latents,
"clip_latents": clip_latents,
}
return latents
def test_unidiffuser_default_joint_v1(self):
pipe = UniDiffuserPipeline.from_pretrained("thu-ml/unidiffuser-v1")
pipe.to(torch_device)
pipe.set_progress_bar_config(disable=None)
pipe.enable_attention_slicing()
# inputs = self.get_dummy_inputs(device)
inputs = self.get_inputs(device=torch_device, generate_latents=True)
# Delete prompt and image for joint inference.
del inputs["prompt"]
del inputs["image"]
sample = pipe(**inputs)
image = sample.images
text = sample.text
assert image.shape == (1, 512, 512, 3)
image_slice = image[0, -3:, -3:, -1]
expected_img_slice = np.array([0.2402, 0.2375, 0.2285, 0.2378, 0.2407, 0.2263, 0.2354, 0.2307, 0.2520])
assert np.abs(image_slice.flatten() - expected_img_slice).max() < 1e-1
expected_text_prefix = "a living room"
assert text[0][: len(expected_text_prefix)] == expected_text_prefix
def test_unidiffuser_default_text2img_v1(self):
pipe = UniDiffuserPipeline.from_pretrained("thu-ml/unidiffuser-v1")
pipe.to(torch_device)
pipe.set_progress_bar_config(disable=None)
pipe.enable_attention_slicing()
inputs = self.get_inputs(device=torch_device, generate_latents=True)
del inputs["image"]
sample = pipe(**inputs)
image = sample.images
assert image.shape == (1, 512, 512, 3)
image_slice = image[0, -3:, -3:, -1]
expected_slice = np.array([0.0242, 0.0103, 0.0022, 0.0129, 0.0000, 0.0090, 0.0376, 0.0508, 0.0005])
assert np.abs(image_slice.flatten() - expected_slice).max() < 1e-1
def test_unidiffuser_default_img2text_v1(self):
pipe = UniDiffuserPipeline.from_pretrained("thu-ml/unidiffuser-v1")
pipe.to(torch_device)
pipe.set_progress_bar_config(disable=None)
pipe.enable_attention_slicing()
inputs = self.get_inputs(device=torch_device, generate_latents=True)
del inputs["prompt"]
sample = pipe(**inputs)
text = sample.text
expected_text_prefix = "An astronaut"
assert text[0][: len(expected_text_prefix)] == expected_text_prefix
@nightly
@require_torch_accelerator
class UniDiffuserPipelineNightlyTests(unittest.TestCase):
def setUp(self):
super().setUp()
gc.collect()
backend_empty_cache(torch_device)
def tearDown(self):
super().tearDown()
gc.collect()
backend_empty_cache(torch_device)
def get_inputs(self, device, seed=0, generate_latents=False):
generator = torch.manual_seed(seed)
image = load_image(
"https://huggingface.co/datasets/hf-internal-testing/diffusers-images/resolve/main/unidiffuser/unidiffuser_example_image.jpg"
)
inputs = {
"prompt": "an elephant under the sea",
"image": image,
"generator": generator,
"num_inference_steps": 3,
"guidance_scale": 8.0,
"output_type": "np",
}
if generate_latents:
latents = self.get_fixed_latents(device, seed=seed)
for latent_name, latent_tensor in latents.items():
inputs[latent_name] = latent_tensor
return inputs
def get_fixed_latents(self, device, seed=0):
if isinstance(device, str):
device = torch.device(device)
latent_device = torch.device("cpu")
generator = torch.Generator(device=latent_device).manual_seed(seed)
# Hardcode the shapes for now.
prompt_latents = randn_tensor((1, 77, 768), generator=generator, device=device, dtype=torch.float32)
vae_latents = randn_tensor((1, 4, 64, 64), generator=generator, device=device, dtype=torch.float32)
clip_latents = randn_tensor((1, 1, 512), generator=generator, device=device, dtype=torch.float32)
# Move latents onto desired device.
prompt_latents = prompt_latents.to(device)
vae_latents = vae_latents.to(device)
clip_latents = clip_latents.to(device)
latents = {
"prompt_latents": prompt_latents,
"vae_latents": vae_latents,
"clip_latents": clip_latents,
}
return latents
def test_unidiffuser_default_joint_v1_fp16(self):
pipe = UniDiffuserPipeline.from_pretrained("thu-ml/unidiffuser-v1", torch_dtype=torch.float16)
pipe.to(torch_device)
pipe.set_progress_bar_config(disable=None)
pipe.enable_attention_slicing()
# inputs = self.get_dummy_inputs(device)
inputs = self.get_inputs(device=torch_device, generate_latents=True)
# Delete prompt and image for joint inference.
del inputs["prompt"]
del inputs["image"]
sample = pipe(**inputs)
image = sample.images
text = sample.text
assert image.shape == (1, 512, 512, 3)
image_slice = image[0, -3:, -3:, -1]
expected_img_slice = np.array([0.2402, 0.2375, 0.2285, 0.2378, 0.2407, 0.2263, 0.2354, 0.2307, 0.2520])
assert np.abs(image_slice.flatten() - expected_img_slice).max() < 2e-1
expected_text_prefix = "a living room"
assert text[0][: len(expected_text_prefix)] == expected_text_prefix
def test_unidiffuser_default_text2img_v1_fp16(self):
pipe = UniDiffuserPipeline.from_pretrained("thu-ml/unidiffuser-v1", torch_dtype=torch.float16)
pipe.to(torch_device)
pipe.set_progress_bar_config(disable=None)
pipe.enable_attention_slicing()
inputs = self.get_inputs(device=torch_device, generate_latents=True)
del inputs["image"]
sample = pipe(**inputs)
image = sample.images
assert image.shape == (1, 512, 512, 3)
image_slice = image[0, -3:, -3:, -1]
expected_slice = np.array([0.0242, 0.0103, 0.0022, 0.0129, 0.0000, 0.0090, 0.0376, 0.0508, 0.0005])
assert np.abs(image_slice.flatten() - expected_slice).max() < 1e-1
def test_unidiffuser_default_img2text_v1_fp16(self):
pipe = UniDiffuserPipeline.from_pretrained("thu-ml/unidiffuser-v1", torch_dtype=torch.float16)
pipe.to(torch_device)
pipe.set_progress_bar_config(disable=None)
pipe.enable_attention_slicing()
inputs = self.get_inputs(device=torch_device, generate_latents=True)
del inputs["prompt"]
sample = pipe(**inputs)
text = sample.text
expected_text_prefix = "An astronaut"
assert text[0][: len(expected_text_prefix)] == expected_text_prefix
# coding=utf-8
# Copyright 2025 HuggingFace Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
import numpy as np
import torch
from transformers import CLIPTextConfig, CLIPTextModel, CLIPTokenizer
from diffusers import DDPMWuerstchenScheduler, WuerstchenCombinedPipeline
from diffusers.pipelines.wuerstchen import PaellaVQModel, WuerstchenDiffNeXt, WuerstchenPrior
from diffusers.utils.testing_utils import enable_full_determinism, require_torch_accelerator, torch_device
from ..test_pipelines_common import PipelineTesterMixin
enable_full_determinism()
class WuerstchenCombinedPipelineFastTests(PipelineTesterMixin, unittest.TestCase):
pipeline_class = WuerstchenCombinedPipeline
params = ["prompt"]
batch_params = ["prompt", "negative_prompt"]
required_optional_params = [
"generator",
"height",
"width",
"latents",
"prior_guidance_scale",
"decoder_guidance_scale",
"negative_prompt",
"num_inference_steps",
"return_dict",
"prior_num_inference_steps",
"output_type",
]
test_xformers_attention = True
@property
def text_embedder_hidden_size(self):
return 32
@property
def dummy_prior(self):
torch.manual_seed(0)
model_kwargs = {"c_in": 2, "c": 8, "depth": 2, "c_cond": 32, "c_r": 8, "nhead": 2}
model = WuerstchenPrior(**model_kwargs)
return model.eval()
@property
def dummy_tokenizer(self):
tokenizer = CLIPTokenizer.from_pretrained("hf-internal-testing/tiny-random-clip")
return tokenizer
@property
def dummy_prior_text_encoder(self):
torch.manual_seed(0)
config = CLIPTextConfig(
bos_token_id=0,
eos_token_id=2,
hidden_size=self.text_embedder_hidden_size,
intermediate_size=37,
layer_norm_eps=1e-05,
num_attention_heads=4,
num_hidden_layers=5,
pad_token_id=1,
vocab_size=1000,
)
return CLIPTextModel(config).eval()
@property
def dummy_text_encoder(self):
torch.manual_seed(0)
config = CLIPTextConfig(
bos_token_id=0,
eos_token_id=2,
projection_dim=self.text_embedder_hidden_size,
hidden_size=self.text_embedder_hidden_size,
intermediate_size=37,
layer_norm_eps=1e-05,
num_attention_heads=4,
num_hidden_layers=5,
pad_token_id=1,
vocab_size=1000,
)
return CLIPTextModel(config).eval()
@property
def dummy_vqgan(self):
torch.manual_seed(0)
model_kwargs = {
"bottleneck_blocks": 1,
"num_vq_embeddings": 2,
}
model = PaellaVQModel(**model_kwargs)
return model.eval()
@property
def dummy_decoder(self):
torch.manual_seed(0)
model_kwargs = {
"c_cond": self.text_embedder_hidden_size,
"c_hidden": [320],
"nhead": [-1],
"blocks": [4],
"level_config": ["CT"],
"clip_embd": self.text_embedder_hidden_size,
"inject_effnet": [False],
}
model = WuerstchenDiffNeXt(**model_kwargs)
return model.eval()
def get_dummy_components(self):
prior = self.dummy_prior
prior_text_encoder = self.dummy_prior_text_encoder
scheduler = DDPMWuerstchenScheduler()
tokenizer = self.dummy_tokenizer
text_encoder = self.dummy_text_encoder
decoder = self.dummy_decoder
vqgan = self.dummy_vqgan
components = {
"tokenizer": tokenizer,
"text_encoder": text_encoder,
"decoder": decoder,
"vqgan": vqgan,
"scheduler": scheduler,
"prior_prior": prior,
"prior_text_encoder": prior_text_encoder,
"prior_tokenizer": tokenizer,
"prior_scheduler": scheduler,
}
return components
def get_dummy_inputs(self, device, seed=0):
if str(device).startswith("mps"):
generator = torch.manual_seed(seed)
else:
generator = torch.Generator(device=device).manual_seed(seed)
inputs = {
"prompt": "horse",
"generator": generator,
"prior_guidance_scale": 4.0,
"decoder_guidance_scale": 4.0,
"num_inference_steps": 2,
"prior_num_inference_steps": 2,
"output_type": "np",
"height": 128,
"width": 128,
}
return inputs
def test_wuerstchen(self):
device = "cpu"
components = self.get_dummy_components()
pipe = self.pipeline_class(**components)
pipe = pipe.to(device)
pipe.set_progress_bar_config(disable=None)
output = pipe(**self.get_dummy_inputs(device))
image = output.images
image_from_tuple = pipe(**self.get_dummy_inputs(device), return_dict=False)[0]
image_slice = image[0, -3:, -3:, -1]
image_from_tuple_slice = image_from_tuple[-3:, -3:, -1]
assert image.shape == (1, 128, 128, 3)
expected_slice = np.array([0.7616304, 0.0, 1.0, 0.0, 1.0, 0.0, 0.05925313, 0.0, 0.951898])
assert np.abs(image_slice.flatten() - expected_slice).max() < 1e-2, (
f" expected_slice {expected_slice}, but got {image_slice.flatten()}"
)
assert np.abs(image_from_tuple_slice.flatten() - expected_slice).max() < 1e-2, (
f" expected_slice {expected_slice}, but got {image_from_tuple_slice.flatten()}"
)
@require_torch_accelerator
def test_offloads(self):
pipes = []
components = self.get_dummy_components()
sd_pipe = self.pipeline_class(**components).to(torch_device)
pipes.append(sd_pipe)
components = self.get_dummy_components()
sd_pipe = self.pipeline_class(**components)
sd_pipe.enable_sequential_cpu_offload(device=torch_device)
pipes.append(sd_pipe)
components = self.get_dummy_components()
sd_pipe = self.pipeline_class(**components)
sd_pipe.enable_model_cpu_offload(device=torch_device)
pipes.append(sd_pipe)
image_slices = []
for pipe in pipes:
inputs = self.get_dummy_inputs(torch_device)
image = pipe(**inputs).images
image_slices.append(image[0, -3:, -3:, -1].flatten())
assert np.abs(image_slices[0] - image_slices[1]).max() < 1e-3
assert np.abs(image_slices[0] - image_slices[2]).max() < 1e-3
def test_inference_batch_single_identical(self):
super().test_inference_batch_single_identical(expected_max_diff=1e-2)
@unittest.skip(reason="flakey and float16 requires CUDA")
def test_float16_inference(self):
super().test_float16_inference()
@unittest.skip(reason="Test not supported.")
def test_callback_inputs(self):
pass
@unittest.skip(reason="Test not supported.")
def test_callback_cfg(self):
pass
# coding=utf-8
# Copyright 2025 HuggingFace Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
import numpy as np
import torch
from transformers import CLIPTextConfig, CLIPTextModel, CLIPTokenizer
from diffusers import DDPMWuerstchenScheduler, WuerstchenDecoderPipeline
from diffusers.pipelines.wuerstchen import PaellaVQModel, WuerstchenDiffNeXt
from diffusers.utils.testing_utils import enable_full_determinism, skip_mps, torch_device
from ..test_pipelines_common import PipelineTesterMixin
enable_full_determinism()
class WuerstchenDecoderPipelineFastTests(PipelineTesterMixin, unittest.TestCase):
pipeline_class = WuerstchenDecoderPipeline
params = ["prompt"]
batch_params = ["image_embeddings", "prompt", "negative_prompt"]
required_optional_params = [
"num_images_per_prompt",
"num_inference_steps",
"latents",
"negative_prompt",
"guidance_scale",
"output_type",
"return_dict",
]
test_xformers_attention = False
callback_cfg_params = ["image_embeddings", "text_encoder_hidden_states"]
@property
def text_embedder_hidden_size(self):
return 32
@property
def time_input_dim(self):
return 32
@property
def block_out_channels_0(self):
return self.time_input_dim
@property
def time_embed_dim(self):
return self.time_input_dim * 4
@property
def dummy_tokenizer(self):
tokenizer = CLIPTokenizer.from_pretrained("hf-internal-testing/tiny-random-clip")
return tokenizer
@property
def dummy_text_encoder(self):
torch.manual_seed(0)
config = CLIPTextConfig(
bos_token_id=0,
eos_token_id=2,
projection_dim=self.text_embedder_hidden_size,
hidden_size=self.text_embedder_hidden_size,
intermediate_size=37,
layer_norm_eps=1e-05,
num_attention_heads=4,
num_hidden_layers=5,
pad_token_id=1,
vocab_size=1000,
)
return CLIPTextModel(config).eval()
@property
def dummy_vqgan(self):
torch.manual_seed(0)
model_kwargs = {
"bottleneck_blocks": 1,
"num_vq_embeddings": 2,
}
model = PaellaVQModel(**model_kwargs)
return model.eval()
@property
def dummy_decoder(self):
torch.manual_seed(0)
model_kwargs = {
"c_cond": self.text_embedder_hidden_size,
"c_hidden": [320],
"nhead": [-1],
"blocks": [4],
"level_config": ["CT"],
"clip_embd": self.text_embedder_hidden_size,
"inject_effnet": [False],
}
model = WuerstchenDiffNeXt(**model_kwargs)
return model.eval()
def get_dummy_components(self):
decoder = self.dummy_decoder
text_encoder = self.dummy_text_encoder
tokenizer = self.dummy_tokenizer
vqgan = self.dummy_vqgan
scheduler = DDPMWuerstchenScheduler()
components = {
"decoder": decoder,
"vqgan": vqgan,
"text_encoder": text_encoder,
"tokenizer": tokenizer,
"scheduler": scheduler,
"latent_dim_scale": 4.0,
}
return components
def get_dummy_inputs(self, device, seed=0):
if str(device).startswith("mps"):
generator = torch.manual_seed(seed)
else:
generator = torch.Generator(device=device).manual_seed(seed)
inputs = {
"image_embeddings": torch.ones((1, 4, 4, 4), device=device),
"prompt": "horse",
"generator": generator,
"guidance_scale": 1.0,
"num_inference_steps": 2,
"output_type": "np",
}
return inputs
def test_wuerstchen_decoder(self):
device = "cpu"
components = self.get_dummy_components()
pipe = self.pipeline_class(**components)
pipe = pipe.to(device)
pipe.set_progress_bar_config(disable=None)
output = pipe(**self.get_dummy_inputs(device))
image = output.images
image_from_tuple = pipe(**self.get_dummy_inputs(device), return_dict=False)
image_slice = image[0, -3:, -3:, -1]
image_from_tuple_slice = image_from_tuple[0, -3:, -3:, -1]
assert image.shape == (1, 64, 64, 3)
expected_slice = np.array([0.0000, 0.0000, 0.0089, 1.0000, 1.0000, 0.3927, 1.0000, 1.0000, 1.0000])
assert np.abs(image_slice.flatten() - expected_slice).max() < 1e-2
assert np.abs(image_from_tuple_slice.flatten() - expected_slice).max() < 1e-2
@skip_mps
def test_inference_batch_single_identical(self):
self._test_inference_batch_single_identical(expected_max_diff=1e-5)
@skip_mps
def test_attention_slicing_forward_pass(self):
test_max_difference = torch_device == "cpu"
test_mean_pixel_difference = False
self._test_attention_slicing_forward_pass(
test_max_difference=test_max_difference,
test_mean_pixel_difference=test_mean_pixel_difference,
)
@unittest.skip(reason="bf16 not supported and requires CUDA")
def test_float16_inference(self):
super().test_float16_inference()
@unittest.skip("Test not supported.")
def test_encode_prompt_works_in_isolation(self):
super().test_encode_prompt_works_in_isolation()
# coding=utf-8
# Copyright 2025 HuggingFace Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
import numpy as np
import torch
from transformers import CLIPTextConfig, CLIPTextModel, CLIPTokenizer
from diffusers import DDPMWuerstchenScheduler, WuerstchenPriorPipeline
from diffusers.pipelines.wuerstchen import WuerstchenPrior
from diffusers.utils.import_utils import is_peft_available
from diffusers.utils.testing_utils import enable_full_determinism, require_peft_backend, skip_mps, torch_device
if is_peft_available():
from peft import LoraConfig
from peft.tuners.tuners_utils import BaseTunerLayer
from ..test_pipelines_common import PipelineTesterMixin
enable_full_determinism()
class WuerstchenPriorPipelineFastTests(PipelineTesterMixin, unittest.TestCase):
pipeline_class = WuerstchenPriorPipeline
params = ["prompt"]
batch_params = ["prompt", "negative_prompt"]
required_optional_params = [
"num_images_per_prompt",
"generator",
"num_inference_steps",
"latents",
"negative_prompt",
"guidance_scale",
"output_type",
"return_dict",
]
test_xformers_attention = False
callback_cfg_params = ["text_encoder_hidden_states"]
@property
def text_embedder_hidden_size(self):
return 32
@property
def time_input_dim(self):
return 32
@property
def block_out_channels_0(self):
return self.time_input_dim
@property
def time_embed_dim(self):
return self.time_input_dim * 4
@property
def dummy_tokenizer(self):
tokenizer = CLIPTokenizer.from_pretrained("hf-internal-testing/tiny-random-clip")
return tokenizer
@property
def dummy_text_encoder(self):
torch.manual_seed(0)
config = CLIPTextConfig(
bos_token_id=0,
eos_token_id=2,
hidden_size=self.text_embedder_hidden_size,
intermediate_size=37,
layer_norm_eps=1e-05,
num_attention_heads=4,
num_hidden_layers=5,
pad_token_id=1,
vocab_size=1000,
)
return CLIPTextModel(config).eval()
@property
def dummy_prior(self):
torch.manual_seed(0)
model_kwargs = {
"c_in": 2,
"c": 8,
"depth": 2,
"c_cond": 32,
"c_r": 8,
"nhead": 2,
}
model = WuerstchenPrior(**model_kwargs)
return model.eval()
def get_dummy_components(self):
prior = self.dummy_prior
text_encoder = self.dummy_text_encoder
tokenizer = self.dummy_tokenizer
scheduler = DDPMWuerstchenScheduler()
components = {
"prior": prior,
"text_encoder": text_encoder,
"tokenizer": tokenizer,
"scheduler": scheduler,
}
return components
def get_dummy_inputs(self, device, seed=0):
if str(device).startswith("mps"):
generator = torch.manual_seed(seed)
else:
generator = torch.Generator(device=device).manual_seed(seed)
inputs = {
"prompt": "horse",
"generator": generator,
"guidance_scale": 4.0,
"num_inference_steps": 2,
"output_type": "np",
}
return inputs
def test_wuerstchen_prior(self):
device = "cpu"
components = self.get_dummy_components()
pipe = self.pipeline_class(**components)
pipe = pipe.to(device)
pipe.set_progress_bar_config(disable=None)
output = pipe(**self.get_dummy_inputs(device))
image = output.image_embeddings
image_from_tuple = pipe(**self.get_dummy_inputs(device), return_dict=False)[0]
image_slice = image[0, 0, 0, -10:]
image_from_tuple_slice = image_from_tuple[0, 0, 0, -10:]
assert image.shape == (1, 2, 24, 24)
expected_slice = np.array(
[
-7172.837,
-3438.855,
-1093.312,
388.8835,
-7471.467,
-7998.1206,
-5328.259,
218.00089,
-2731.5745,
-8056.734,
]
)
assert np.abs(image_slice.flatten() - expected_slice).max() < 5e-2
assert np.abs(image_from_tuple_slice.flatten() - expected_slice).max() < 5e-2
@skip_mps
def test_inference_batch_single_identical(self):
self._test_inference_batch_single_identical(
expected_max_diff=3e-1,
)
@skip_mps
def test_attention_slicing_forward_pass(self):
test_max_difference = torch_device == "cpu"
test_mean_pixel_difference = False
self._test_attention_slicing_forward_pass(
test_max_difference=test_max_difference,
test_mean_pixel_difference=test_mean_pixel_difference,
)
@unittest.skip(reason="flaky for now")
def test_float16_inference(self):
super().test_float16_inference()
# override because we need to make sure latent_mean and latent_std to be 0
def test_callback_inputs(self):
components = self.get_dummy_components()
components["latent_mean"] = 0
components["latent_std"] = 0
pipe = self.pipeline_class(**components)
pipe = pipe.to(torch_device)
pipe.set_progress_bar_config(disable=None)
self.assertTrue(
hasattr(pipe, "_callback_tensor_inputs"),
f" {self.pipeline_class} should have `_callback_tensor_inputs` that defines a list of tensor variables its callback function can use as inputs",
)
def callback_inputs_test(pipe, i, t, callback_kwargs):
missing_callback_inputs = set()
for v in pipe._callback_tensor_inputs:
if v not in callback_kwargs:
missing_callback_inputs.add(v)
self.assertTrue(
len(missing_callback_inputs) == 0, f"Missing callback tensor inputs: {missing_callback_inputs}"
)
last_i = pipe.num_timesteps - 1
if i == last_i:
callback_kwargs["latents"] = torch.zeros_like(callback_kwargs["latents"])
return callback_kwargs
inputs = self.get_dummy_inputs(torch_device)
inputs["callback_on_step_end"] = callback_inputs_test
inputs["callback_on_step_end_tensor_inputs"] = pipe._callback_tensor_inputs
inputs["output_type"] = "latent"
output = pipe(**inputs)[0]
assert output.abs().sum() == 0
def check_if_lora_correctly_set(self, model) -> bool:
"""
Checks if the LoRA layers are correctly set with peft
"""
for module in model.modules():
if isinstance(module, BaseTunerLayer):
return True
return False
def get_lora_components(self):
prior = self.dummy_prior
prior_lora_config = LoraConfig(
r=4, lora_alpha=4, target_modules=["to_q", "to_k", "to_v", "to_out.0"], init_lora_weights=False
)
return prior, prior_lora_config
@require_peft_backend
def test_inference_with_prior_lora(self):
_, prior_lora_config = self.get_lora_components()
device = "cpu"
components = self.get_dummy_components()
pipe = self.pipeline_class(**components)
pipe = pipe.to(device)
pipe.set_progress_bar_config(disable=None)
output_no_lora = pipe(**self.get_dummy_inputs(device))
image_embed = output_no_lora.image_embeddings
self.assertTrue(image_embed.shape == (1, 2, 24, 24))
pipe.prior.add_adapter(prior_lora_config)
self.assertTrue(self.check_if_lora_correctly_set(pipe.prior), "Lora not correctly set in prior")
output_lora = pipe(**self.get_dummy_inputs(device))
lora_image_embed = output_lora.image_embeddings
self.assertTrue(image_embed.shape == lora_image_embed.shape)
@unittest.skip("Test not supported as dtype cannot be inferred without the text encoder otherwise.")
def test_encode_prompt_works_in_isolation(self):
pass
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment