"docs/vscode:/vscode.git/clone" did not exist on "99b5ef5ca0c3a48d8b703d0e1260aafe3334e844"
Unverified Commit e51f19ae authored by Patrick von Platen's avatar Patrick von Platen Committed by GitHub
Browse files

add model (#3230)



* add

* clean

* up

* clean up more

* fix more tests

* Improve docs further

* improve

* more fixes docs

* Improve docs more

* Update src/diffusers/models/unet_2d_condition.py

* fix

* up

* update doc links

* make fix-copies

* add safety checker and watermarker to stage 3 doc page code snippets

* speed optimizations docs

* memory optimization docs

* make style

* add watermarking snippets to doc string examples

* make style

* use pt_to_pil helper functions in doc strings

* skip mps tests

* Improve safety

* make style

* new logic

* fix

* fix bad onnx design

* make new stable diffusion upscale pipeline model arguments optional

* define has_nsfw_concept when non-pil output type

* lowercase linked to notebook name

---------
Co-authored-by: default avatarWilliam Berman <WLBberman@gmail.com>
parent 1ffcc924
from typing import List
import PIL
import torch
from PIL import Image
from ...configuration_utils import ConfigMixin
from ...models.modeling_utils import ModelMixin
from ...utils import PIL_INTERPOLATION
class IFWatermarker(ModelMixin, ConfigMixin):
def __init__(self):
super().__init__()
self.register_buffer("watermark_image", torch.zeros((62, 62, 4)))
self.watermark_image_as_pil = None
def apply_watermark(self, images: List[PIL.Image.Image], sample_size=None):
# copied from https://github.com/deep-floyd/IF/blob/b77482e36ca2031cb94dbca1001fc1e6400bf4ab/deepfloyd_if/modules/base.py#L287
h = images[0].height
w = images[0].width
sample_size = sample_size or h
coef = min(h / sample_size, w / sample_size)
img_h, img_w = (int(h / coef), int(w / coef)) if coef < 1 else (h, w)
S1, S2 = 1024**2, img_w * img_h
K = (S2 / S1) ** 0.5
wm_size, wm_x, wm_y = int(K * 62), img_w - int(14 * K), img_h - int(14 * K)
if self.watermark_image_as_pil is None:
watermark_image = self.watermark_image.to(torch.uint8).cpu().numpy()
watermark_image = Image.fromarray(watermark_image, mode="RGBA")
self.watermark_image_as_pil = watermark_image
wm_img = self.watermark_image_as_pil.resize(
(wm_size, wm_size), PIL_INTERPOLATION["bicubic"], reducing_gap=None
)
for pil_img in images:
pil_img.paste(wm_img, box=(wm_x - wm_size, wm_y - wm_size, wm_x, wm_y), mask=wm_img.split()[-1])
return images
......@@ -30,7 +30,6 @@ import PIL
import torch
from huggingface_hub import hf_hub_download, model_info, snapshot_download
from packaging import version
from PIL import Image
from tqdm.auto import tqdm
import diffusers
......@@ -56,6 +55,7 @@ from ..utils import (
is_torch_version,
is_transformers_available,
logging,
numpy_to_pil,
)
......@@ -623,7 +623,9 @@ class DiffusionPipeline(ConfigMixin):
if not is_accelerate_available() or is_accelerate_version("<", "0.14.0"):
return False
return hasattr(module, "_hf_hook") and not isinstance(module._hf_hook, accelerate.hooks.CpuOffload)
return hasattr(module, "_hf_hook") and not isinstance(
module._hf_hook, (accelerate.hooks.CpuOffload, accelerate.hooks.AlignDevicesHook)
)
def module_is_offloaded(module):
if not is_accelerate_available() or is_accelerate_version("<", "0.17.0.dev0"):
......@@ -653,7 +655,20 @@ class DiffusionPipeline(ConfigMixin):
is_offloaded = pipeline_is_offloaded or pipeline_is_sequentially_offloaded
for module in modules:
module.to(torch_device, torch_dtype)
is_loaded_in_8bit = hasattr(module, "is_loaded_in_8bit") and module.is_loaded_in_8bit
if is_loaded_in_8bit and torch_dtype is not None:
logger.warning(
f"The module '{module.__class__.__name__}' has been loaded in 8bit and conversion to {torch_dtype} is not yet supported. Module is still in 8bit precision."
)
if is_loaded_in_8bit and torch_device is not None:
logger.warning(
f"The module '{module.__class__.__name__}' has been loaded in 8bit and moving it to {torch_dtype} via `.to()` is not yet supported. Module is still on {module.device}."
)
else:
module.to(torch_device, torch_dtype)
if (
module.dtype == torch.float16
and str(torch_device) in ["cpu"]
......@@ -887,6 +902,9 @@ class DiffusionPipeline(ConfigMixin):
config_dict = cls.load_config(cached_folder)
# pop out "_ignore_files" as it is only needed for download
config_dict.pop("_ignore_files", None)
# 2. Define which model components should load variants
# We retrieve the information by matching whether variant
# model checkpoints exist in the subfolders
......@@ -1204,12 +1222,19 @@ class DiffusionPipeline(ConfigMixin):
)
config_dict = cls._dict_from_json_file(config_file)
ignore_filenames = config_dict.pop("_ignore_files", [])
# retrieve all folder_names that contain relevant files
folder_names = [k for k, v in config_dict.items() if isinstance(v, list)]
filenames = {sibling.rfilename for sibling in info.siblings}
model_filenames, variant_filenames = variant_compatible_siblings(filenames, variant=variant)
# remove ignored filenames
model_filenames = set(model_filenames) - set(ignore_filenames)
variant_filenames = set(variant_filenames) - set(ignore_filenames)
# if the whole pipeline is cached we don't have to ping the Hub
if revision in DEPRECATED_REVISION_ARGS and version.parse(
version.parse(__version__).base_version
......@@ -1370,16 +1395,7 @@ class DiffusionPipeline(ConfigMixin):
"""
Convert a numpy image or a batch of images to a PIL image.
"""
if images.ndim == 3:
images = images[None, ...]
images = (images * 255).round().astype("uint8")
if images.shape[-1] == 1:
# special case for grayscale (single channel) images
pil_images = [Image.fromarray(image.squeeze(), mode="L") for image in images]
else:
pil_images = [Image.fromarray(image) for image in images]
return pil_images
return numpy_to_pil(images)
def progress_bar(self, iterable=None, total=None):
if not hasattr(self, "_progress_bar_config"):
......
......@@ -56,7 +56,18 @@ class OnnxStableDiffusionUpscalePipeline(StableDiffusionUpscalePipeline):
scheduler: Any,
max_noise_level: int = 350,
):
super().__init__(vae, text_encoder, tokenizer, unet, low_res_scheduler, scheduler, max_noise_level)
super().__init__(
vae=vae,
text_encoder=text_encoder,
tokenizer=tokenizer,
unet=unet,
low_res_scheduler=low_res_scheduler,
scheduler=scheduler,
safety_checker=None,
feature_extractor=None,
watermarker=None,
max_noise_level=max_noise_level,
)
def __call__(
self,
......
......@@ -13,18 +13,19 @@
# limitations under the License.
import inspect
from typing import Callable, List, Optional, Union
from typing import Any, Callable, List, Optional, Union
import numpy as np
import PIL
import torch
from transformers import CLIPTextModel, CLIPTokenizer
from transformers import CLIPImageProcessor, CLIPTextModel, CLIPTokenizer
from ...loaders import TextualInversionLoaderMixin
from ...models import AutoencoderKL, UNet2DConditionModel
from ...schedulers import DDPMScheduler, KarrasDiffusionSchedulers
from ...utils import deprecate, is_accelerate_available, is_accelerate_version, logging, randn_tensor
from ..pipeline_utils import DiffusionPipeline, ImagePipelineOutput
from ..pipeline_utils import DiffusionPipeline
from . import StableDiffusionPipelineOutput
logger = logging.get_logger(__name__) # pylint: disable=invalid-name
......@@ -76,6 +77,7 @@ class StableDiffusionUpscalePipeline(DiffusionPipeline, TextualInversionLoaderMi
A scheduler to be used in combination with `unet` to denoise the encoded image latents. Can be one of
[`DDIMScheduler`], [`LMSDiscreteScheduler`], or [`PNDMScheduler`].
"""
_optional_components = ["watermarker", "safety_checker", "feature_extractor"]
def __init__(
self,
......@@ -85,12 +87,16 @@ class StableDiffusionUpscalePipeline(DiffusionPipeline, TextualInversionLoaderMi
unet: UNet2DConditionModel,
low_res_scheduler: DDPMScheduler,
scheduler: KarrasDiffusionSchedulers,
safety_checker: Optional[Any] = None,
feature_extractor: Optional[CLIPImageProcessor] = None,
watermarker: Optional[Any] = None,
max_noise_level: int = 350,
):
super().__init__()
if hasattr(vae, "config"):
# check if vae has a config attribute `scaling_factor` and if it is set to 0.08333, else set it to 0.08333 and deprecate
if hasattr(
vae, "config"
): # check if vae has a config attribute `scaling_factor` and if it is set to 0.08333, else set it to 0.08333 and deprecate
is_vae_scaling_factor_set_to_0_08333 = (
hasattr(vae.config, "scaling_factor") and vae.config.scaling_factor == 0.08333
)
......@@ -113,6 +119,9 @@ class StableDiffusionUpscalePipeline(DiffusionPipeline, TextualInversionLoaderMi
unet=unet,
low_res_scheduler=low_res_scheduler,
scheduler=scheduler,
safety_checker=safety_checker,
watermarker=watermarker,
feature_extractor=feature_extractor,
)
self.register_to_config(max_noise_level=max_noise_level)
......@@ -178,6 +187,23 @@ class StableDiffusionUpscalePipeline(DiffusionPipeline, TextualInversionLoaderMi
return torch.device(module._hf_hook.execution_device)
return self.device
# Copied from diffusers.pipelines.deepfloyd_if.pipeline_if.IFPipeline.run_safety_checker
def run_safety_checker(self, image, device, dtype):
if self.safety_checker is not None:
safety_checker_input = self.feature_extractor(self.numpy_to_pil(image), return_tensors="pt").to(device)
image, nsfw_detected, watermark_detected = self.safety_checker(
images=image,
clip_input=safety_checker_input.pixel_values.to(dtype=dtype),
)
else:
nsfw_detected = None
watermark_detected = None
if hasattr(self, "unet_offload_hook") and self.unet_offload_hook is not None:
self.unet_offload_hook.offload()
return image, nsfw_detected, watermark_detected
# Copied from diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline._encode_prompt
def _encode_prompt(
self,
......@@ -678,10 +704,19 @@ class StableDiffusionUpscalePipeline(DiffusionPipeline, TextualInversionLoaderMi
self.final_offload_hook.offload()
# 11. Convert to PIL
# has_nsfw_concept = False
if output_type == "pil":
image, has_nsfw_concept, _ = self.run_safety_checker(image, device, prompt_embeds.dtype)
image = self.numpy_to_pil(image)
# 11. Apply watermark
if self.watermarker is not None:
image = self.watermarker.apply_watermark(image)
else:
has_nsfw_concept = None
if not return_dict:
return (image,)
return (image, has_nsfw_concept)
return ImagePipelineOutput(images=image)
return StableDiffusionPipelineOutput(images=image, nsfw_content_detected=has_nsfw_concept)
......@@ -15,7 +15,7 @@ from ...models.attention_processor import (
AttnProcessor,
)
from ...models.dual_transformer_2d import DualTransformer2DModel
from ...models.embeddings import GaussianFourierProjection, TimestepEmbedding, Timesteps
from ...models.embeddings import GaussianFourierProjection, TextTimeEmbedding, TimestepEmbedding, Timesteps
from ...models.transformer_2d import Transformer2DModel
from ...models.unet_2d_condition import UNet2DConditionOutput
from ...utils import logging
......@@ -183,11 +183,16 @@ class UNetFlatConditionModel(ModelMixin, ConfigMixin):
class_embed_type (`str`, *optional*, defaults to None):
The type of class embedding to use which is ultimately summed with the time embeddings. Choose from `None`,
`"timestep"`, `"identity"`, `"projection"`, or `"simple_projection"`.
addition_embed_type (`str`, *optional*, defaults to None):
Configures an optional embedding which will be summed with the time embeddings. Choose from `None` or
"text". "text" will use the `TextTimeEmbedding` layer.
num_class_embeds (`int`, *optional*, defaults to None):
Input dimension of the learnable embedding matrix to be projected to `time_embed_dim`, when performing
class conditioning with `class_embed_type` equal to `None`.
time_embedding_type (`str`, *optional*, default to `positional`):
The type of position embedding to use for timesteps. Choose from `positional` or `fourier`.
time_embedding_dim (`int`, *optional*, default to `None`):
An optional override for the dimension of the projected time embedding.
time_embedding_act_fn (`str`, *optional*, default to `None`):
Optional activation function to use on the time embeddings only one time before they as passed to the rest
of the unet. Choose from `silu`, `mish`, `gelu`, and `swish`.
......@@ -246,12 +251,14 @@ class UNetFlatConditionModel(ModelMixin, ConfigMixin):
dual_cross_attention: bool = False,
use_linear_projection: bool = False,
class_embed_type: Optional[str] = None,
addition_embed_type: Optional[str] = None,
num_class_embeds: Optional[int] = None,
upcast_attention: bool = False,
resnet_time_scale_shift: str = "default",
resnet_skip_time_act: bool = False,
resnet_out_scale_factor: int = 1.0,
time_embedding_type: str = "positional",
time_embedding_dim: Optional[int] = None,
time_embedding_act_fn: Optional[str] = None,
timestep_post_act: Optional[str] = None,
time_cond_proj_dim: Optional[int] = None,
......@@ -261,6 +268,7 @@ class UNetFlatConditionModel(ModelMixin, ConfigMixin):
class_embeddings_concat: bool = False,
mid_block_only_cross_attention: Optional[bool] = None,
cross_attention_norm: Optional[str] = None,
addition_embed_type_num_heads=64,
):
super().__init__()
......@@ -311,7 +319,7 @@ class UNetFlatConditionModel(ModelMixin, ConfigMixin):
# time
if time_embedding_type == "fourier":
time_embed_dim = block_out_channels[0] * 2
time_embed_dim = time_embedding_dim or block_out_channels[0] * 2
if time_embed_dim % 2 != 0:
raise ValueError(f"`time_embed_dim` should be divisible by 2, but is {time_embed_dim}.")
self.time_proj = GaussianFourierProjection(
......@@ -319,7 +327,7 @@ class UNetFlatConditionModel(ModelMixin, ConfigMixin):
)
timestep_input_dim = time_embed_dim
elif time_embedding_type == "positional":
time_embed_dim = block_out_channels[0] * 4
time_embed_dim = time_embedding_dim or block_out_channels[0] * 4
self.time_proj = Timesteps(block_out_channels[0], flip_sin_to_cos, freq_shift)
timestep_input_dim = block_out_channels[0]
......@@ -370,6 +378,18 @@ class UNetFlatConditionModel(ModelMixin, ConfigMixin):
else:
self.class_embedding = None
if addition_embed_type == "text":
if encoder_hid_dim is not None:
text_time_embedding_from_dim = encoder_hid_dim
else:
text_time_embedding_from_dim = cross_attention_dim
self.add_embedding = TextTimeEmbedding(
text_time_embedding_from_dim, time_embed_dim, num_heads=addition_embed_type_num_heads
)
elif addition_embed_type is not None:
raise ValueError(f"addition_embed_type: {addition_embed_type} must be None or 'text'.")
if time_embedding_act_fn is None:
self.time_embed_act = None
elif time_embedding_act_fn == "swish":
......@@ -781,6 +801,10 @@ class UNetFlatConditionModel(ModelMixin, ConfigMixin):
else:
emb = emb + class_emb
if self.config.addition_embed_type == "text":
aug_emb = self.add_embedding(encoder_hidden_states)
emb = emb + aug_emb
if self.time_embed_act is not None:
emb = self.time_embed_act(emb)
......
......@@ -44,6 +44,7 @@ from .hub_utils import (
http_user_agent,
)
from .import_utils import (
BACKENDS_MAPPING,
ENV_VARS_TRUE_AND_AUTO_VALUES,
ENV_VARS_TRUE_VALUES,
USE_JAX,
......@@ -53,7 +54,9 @@ from .import_utils import (
OptionalDependencyNotAvailable,
is_accelerate_available,
is_accelerate_version,
is_bs4_available,
is_flax_available,
is_ftfy_available,
is_inflect_available,
is_k_diffusion_available,
is_k_diffusion_version,
......@@ -76,7 +79,7 @@ from .import_utils import (
)
from .logging import get_logger
from .outputs import BaseOutput
from .pil_utils import PIL_INTERPOLATION
from .pil_utils import PIL_INTERPOLATION, numpy_to_pil, pt_to_pil
from .torch_utils import is_compiled_module, randn_tensor
......
......@@ -62,6 +62,96 @@ class CycleDiffusionPipeline(metaclass=DummyObject):
requires_backends(cls, ["torch", "transformers"])
class IFImg2ImgPipeline(metaclass=DummyObject):
_backends = ["torch", "transformers"]
def __init__(self, *args, **kwargs):
requires_backends(self, ["torch", "transformers"])
@classmethod
def from_config(cls, *args, **kwargs):
requires_backends(cls, ["torch", "transformers"])
@classmethod
def from_pretrained(cls, *args, **kwargs):
requires_backends(cls, ["torch", "transformers"])
class IFImg2ImgSuperResolutionPipeline(metaclass=DummyObject):
_backends = ["torch", "transformers"]
def __init__(self, *args, **kwargs):
requires_backends(self, ["torch", "transformers"])
@classmethod
def from_config(cls, *args, **kwargs):
requires_backends(cls, ["torch", "transformers"])
@classmethod
def from_pretrained(cls, *args, **kwargs):
requires_backends(cls, ["torch", "transformers"])
class IFInpaintingPipeline(metaclass=DummyObject):
_backends = ["torch", "transformers"]
def __init__(self, *args, **kwargs):
requires_backends(self, ["torch", "transformers"])
@classmethod
def from_config(cls, *args, **kwargs):
requires_backends(cls, ["torch", "transformers"])
@classmethod
def from_pretrained(cls, *args, **kwargs):
requires_backends(cls, ["torch", "transformers"])
class IFInpaintingSuperResolutionPipeline(metaclass=DummyObject):
_backends = ["torch", "transformers"]
def __init__(self, *args, **kwargs):
requires_backends(self, ["torch", "transformers"])
@classmethod
def from_config(cls, *args, **kwargs):
requires_backends(cls, ["torch", "transformers"])
@classmethod
def from_pretrained(cls, *args, **kwargs):
requires_backends(cls, ["torch", "transformers"])
class IFPipeline(metaclass=DummyObject):
_backends = ["torch", "transformers"]
def __init__(self, *args, **kwargs):
requires_backends(self, ["torch", "transformers"])
@classmethod
def from_config(cls, *args, **kwargs):
requires_backends(cls, ["torch", "transformers"])
@classmethod
def from_pretrained(cls, *args, **kwargs):
requires_backends(cls, ["torch", "transformers"])
class IFSuperResolutionPipeline(metaclass=DummyObject):
_backends = ["torch", "transformers"]
def __init__(self, *args, **kwargs):
requires_backends(self, ["torch", "transformers"])
@classmethod
def from_config(cls, *args, **kwargs):
requires_backends(cls, ["torch", "transformers"])
@classmethod
def from_pretrained(cls, *args, **kwargs):
requires_backends(cls, ["torch", "transformers"])
class LDMTextToImagePipeline(metaclass=DummyObject):
_backends = ["torch", "transformers"]
......
......@@ -271,6 +271,23 @@ except importlib_metadata.PackageNotFoundError:
_compel_available = False
_ftfy_available = importlib.util.find_spec("ftfy") is not None
try:
_ftfy_version = importlib_metadata.version("ftfy")
logger.debug(f"Successfully imported ftfy version {_ftfy_version}")
except importlib_metadata.PackageNotFoundError:
_ftfy_available = False
_bs4_available = importlib.util.find_spec("bs4") is not None
try:
# importlib metadata under different name
_bs4_version = importlib_metadata.version("beautifulsoup4")
logger.debug(f"Successfully imported ftfy version {_bs4_version}")
except importlib_metadata.PackageNotFoundError:
_bs4_available = False
def is_torch_available():
return _torch_available
......@@ -347,6 +364,14 @@ def is_compel_available():
return _compel_available
def is_ftfy_available():
return _ftfy_available
def is_bs4_available():
return _bs4_available
# docstyle-ignore
FLAX_IMPORT_ERROR = """
{0} requires the FLAX library but it was not found in your environment. Checkout the instructions on the
......@@ -437,8 +462,23 @@ COMPEL_IMPORT_ERROR = """
{0} requires the compel library but it was not found in your environment. You can install it with pip: `pip install compel`
"""
# docstyle-ignore
BS4_IMPORT_ERROR = """
{0} requires the Beautiful Soup library but it was not found in your environment. You can install it with pip:
`pip install beautifulsoup4`. Please note that you may need to restart your runtime after installation.
"""
# docstyle-ignore
FTFY_IMPORT_ERROR = """
{0} requires the ftfy library but it was not found in your environment. Checkout the instructions on the
installation section: https://github.com/rspeer/python-ftfy/tree/master#installing and follow the ones
that match your environment. Please note that you may need to restart your runtime after installation.
"""
BACKENDS_MAPPING = OrderedDict(
[
("bs4", (is_bs4_available, BS4_IMPORT_ERROR)),
("flax", (is_flax_available, FLAX_IMPORT_ERROR)),
("inflect", (is_inflect_available, INFLECT_IMPORT_ERROR)),
("onnx", (is_onnx_available, ONNX_IMPORT_ERROR)),
......@@ -454,6 +494,7 @@ BACKENDS_MAPPING = OrderedDict(
("omegaconf", (is_omegaconf_available, OMEGACONF_IMPORT_ERROR)),
("tensorboard", (_tensorboard_available, TENSORBOARD_IMPORT_ERROR)),
("compel", (_compel_available, COMPEL_IMPORT_ERROR)),
("ftfy", (is_ftfy_available, FTFY_IMPORT_ERROR)),
]
)
......
import PIL.Image
import PIL.ImageOps
from packaging import version
from PIL import Image
if version.parse(version.parse(PIL.__version__).base_version) >= version.parse("9.1.0"):
......@@ -19,3 +20,26 @@ else:
"lanczos": PIL.Image.LANCZOS,
"nearest": PIL.Image.NEAREST,
}
def pt_to_pil(images):
images = (images / 2 + 0.5).clamp(0, 1)
images = images.cpu().permute(0, 2, 3, 1).float().numpy()
images = numpy_to_pil(images)
return images
def numpy_to_pil(images):
"""
Convert a numpy image or a batch of images to a PIL image.
"""
if images.ndim == 3:
images = images[None, ...]
images = (images * 255).round().astype("uint8")
if images.shape[-1] == 1:
# special case for grayscale (single channel) images
pil_images = [Image.fromarray(image.squeeze(), mode="L") for image in images]
else:
pil_images = [Image.fromarray(image) for image in images]
return pil_images
import tempfile
import numpy as np
import torch
from transformers import AutoTokenizer, T5EncoderModel
from diffusers import DDPMScheduler, UNet2DConditionModel
from diffusers.models.attention_processor import AttnAddedKVProcessor
from diffusers.pipelines.deepfloyd_if import IFWatermarker
from diffusers.utils.testing_utils import torch_device
from ..test_pipelines_common import to_np
# WARN: the hf-internal-testing/tiny-random-t5 text encoder has some non-determinism in the `save_load` tests.
class IFPipelineTesterMixin:
def _get_dummy_components(self):
torch.manual_seed(0)
text_encoder = T5EncoderModel.from_pretrained("hf-internal-testing/tiny-random-t5")
torch.manual_seed(0)
tokenizer = AutoTokenizer.from_pretrained("hf-internal-testing/tiny-random-t5")
torch.manual_seed(0)
unet = UNet2DConditionModel(
sample_size=32,
layers_per_block=1,
block_out_channels=[32, 64],
down_block_types=[
"ResnetDownsampleBlock2D",
"SimpleCrossAttnDownBlock2D",
],
mid_block_type="UNetMidBlock2DSimpleCrossAttn",
up_block_types=["SimpleCrossAttnUpBlock2D", "ResnetUpsampleBlock2D"],
in_channels=3,
out_channels=6,
cross_attention_dim=32,
encoder_hid_dim=32,
attention_head_dim=8,
addition_embed_type="text",
addition_embed_type_num_heads=2,
cross_attention_norm="group_norm",
resnet_time_scale_shift="scale_shift",
act_fn="gelu",
)
unet.set_attn_processor(AttnAddedKVProcessor()) # For reproducibility tests
torch.manual_seed(0)
scheduler = DDPMScheduler(
num_train_timesteps=1000,
beta_schedule="squaredcos_cap_v2",
beta_start=0.0001,
beta_end=0.02,
thresholding=True,
dynamic_thresholding_ratio=0.95,
sample_max_value=1.0,
prediction_type="epsilon",
variance_type="learned_range",
)
torch.manual_seed(0)
watermarker = IFWatermarker()
return {
"text_encoder": text_encoder,
"tokenizer": tokenizer,
"unet": unet,
"scheduler": scheduler,
"watermarker": watermarker,
"safety_checker": None,
"feature_extractor": None,
}
def _get_superresolution_dummy_components(self):
torch.manual_seed(0)
text_encoder = T5EncoderModel.from_pretrained("hf-internal-testing/tiny-random-t5")
torch.manual_seed(0)
tokenizer = AutoTokenizer.from_pretrained("hf-internal-testing/tiny-random-t5")
torch.manual_seed(0)
unet = UNet2DConditionModel(
sample_size=32,
layers_per_block=[1, 2],
block_out_channels=[32, 64],
down_block_types=[
"ResnetDownsampleBlock2D",
"SimpleCrossAttnDownBlock2D",
],
mid_block_type="UNetMidBlock2DSimpleCrossAttn",
up_block_types=["SimpleCrossAttnUpBlock2D", "ResnetUpsampleBlock2D"],
in_channels=6,
out_channels=6,
cross_attention_dim=32,
encoder_hid_dim=32,
attention_head_dim=8,
addition_embed_type="text",
addition_embed_type_num_heads=2,
cross_attention_norm="group_norm",
resnet_time_scale_shift="scale_shift",
act_fn="gelu",
class_embed_type="timestep",
mid_block_scale_factor=1.414,
time_embedding_act_fn="gelu",
time_embedding_dim=32,
)
unet.set_attn_processor(AttnAddedKVProcessor()) # For reproducibility tests
torch.manual_seed(0)
scheduler = DDPMScheduler(
num_train_timesteps=1000,
beta_schedule="squaredcos_cap_v2",
beta_start=0.0001,
beta_end=0.02,
thresholding=True,
dynamic_thresholding_ratio=0.95,
sample_max_value=1.0,
prediction_type="epsilon",
variance_type="learned_range",
)
torch.manual_seed(0)
image_noising_scheduler = DDPMScheduler(
num_train_timesteps=1000,
beta_schedule="squaredcos_cap_v2",
beta_start=0.0001,
beta_end=0.02,
)
torch.manual_seed(0)
watermarker = IFWatermarker()
return {
"text_encoder": text_encoder,
"tokenizer": tokenizer,
"unet": unet,
"scheduler": scheduler,
"image_noising_scheduler": image_noising_scheduler,
"watermarker": watermarker,
"safety_checker": None,
"feature_extractor": None,
}
# this test is modified from the base class because if pipelines set the text encoder
# as optional with the intention that the user is allowed to encode the prompt once
# and then pass the embeddings directly to the pipeline. The base class test uses
# the unmodified arguments from `self.get_dummy_inputs` which will pass the unencoded
# prompt to the pipeline when the text encoder is set to None, throwing an error.
# So we make the test reflect the intended usage of setting the text encoder to None.
def _test_save_load_optional_components(self):
components = self.get_dummy_components()
pipe = self.pipeline_class(**components)
pipe.to(torch_device)
pipe.set_progress_bar_config(disable=None)
inputs = self.get_dummy_inputs(torch_device)
prompt = inputs["prompt"]
generator = inputs["generator"]
num_inference_steps = inputs["num_inference_steps"]
output_type = inputs["output_type"]
if "image" in inputs:
image = inputs["image"]
else:
image = None
if "mask_image" in inputs:
mask_image = inputs["mask_image"]
else:
mask_image = None
if "original_image" in inputs:
original_image = inputs["original_image"]
else:
original_image = None
prompt_embeds, negative_prompt_embeds = pipe.encode_prompt(prompt)
# inputs with prompt converted to embeddings
inputs = {
"prompt_embeds": prompt_embeds,
"negative_prompt_embeds": negative_prompt_embeds,
"generator": generator,
"num_inference_steps": num_inference_steps,
"output_type": output_type,
}
if image is not None:
inputs["image"] = image
if mask_image is not None:
inputs["mask_image"] = mask_image
if original_image is not None:
inputs["original_image"] = original_image
# set all optional components to None
for optional_component in pipe._optional_components:
setattr(pipe, optional_component, None)
output = pipe(**inputs)[0]
with tempfile.TemporaryDirectory() as tmpdir:
pipe.save_pretrained(tmpdir)
pipe_loaded = self.pipeline_class.from_pretrained(tmpdir)
pipe_loaded.to(torch_device)
pipe_loaded.set_progress_bar_config(disable=None)
pipe_loaded.unet.set_attn_processor(AttnAddedKVProcessor()) # For reproducibility tests
for optional_component in pipe._optional_components:
self.assertTrue(
getattr(pipe_loaded, optional_component) is None,
f"`{optional_component}` did not stay set to None after loading.",
)
inputs = self.get_dummy_inputs(torch_device)
generator = inputs["generator"]
num_inference_steps = inputs["num_inference_steps"]
output_type = inputs["output_type"]
# inputs with prompt converted to embeddings
inputs = {
"prompt_embeds": prompt_embeds,
"negative_prompt_embeds": negative_prompt_embeds,
"generator": generator,
"num_inference_steps": num_inference_steps,
"output_type": output_type,
}
if image is not None:
inputs["image"] = image
if mask_image is not None:
inputs["mask_image"] = mask_image
if original_image is not None:
inputs["original_image"] = original_image
output_loaded = pipe_loaded(**inputs)[0]
max_diff = np.abs(to_np(output) - to_np(output_loaded)).max()
self.assertLess(max_diff, 1e-4)
# Modified from `PipelineTesterMixin` to set the attn processor as it's not serialized.
# This should be handled in the base test and then this method can be removed.
def _test_save_load_local(self):
components = self.get_dummy_components()
pipe = self.pipeline_class(**components)
pipe.to(torch_device)
pipe.set_progress_bar_config(disable=None)
inputs = self.get_dummy_inputs(torch_device)
output = pipe(**inputs)[0]
with tempfile.TemporaryDirectory() as tmpdir:
pipe.save_pretrained(tmpdir)
pipe_loaded = self.pipeline_class.from_pretrained(tmpdir)
pipe_loaded.to(torch_device)
pipe_loaded.set_progress_bar_config(disable=None)
pipe_loaded.unet.set_attn_processor(AttnAddedKVProcessor()) # For reproducibility tests
inputs = self.get_dummy_inputs(torch_device)
output_loaded = pipe_loaded(**inputs)[0]
max_diff = np.abs(to_np(output) - to_np(output_loaded)).max()
self.assertLess(max_diff, 1e-4)
# coding=utf-8
# Copyright 2023 HuggingFace Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import gc
import random
import unittest
import torch
from diffusers import (
IFImg2ImgPipeline,
IFImg2ImgSuperResolutionPipeline,
IFInpaintingPipeline,
IFInpaintingSuperResolutionPipeline,
IFPipeline,
IFSuperResolutionPipeline,
)
from diffusers.models.attention_processor import AttnAddedKVProcessor
from diffusers.utils.testing_utils import floats_tensor, load_numpy, require_torch_gpu, skip_mps, slow, torch_device
from ..pipeline_params import TEXT_TO_IMAGE_BATCH_PARAMS, TEXT_TO_IMAGE_PARAMS
from ..test_pipelines_common import PipelineTesterMixin, assert_mean_pixel_difference
from . import IFPipelineTesterMixin
@skip_mps
class IFPipelineFastTests(PipelineTesterMixin, IFPipelineTesterMixin, unittest.TestCase):
pipeline_class = IFPipeline
params = TEXT_TO_IMAGE_PARAMS - {"width", "height", "latents"}
batch_params = TEXT_TO_IMAGE_BATCH_PARAMS
required_optional_params = PipelineTesterMixin.required_optional_params - {"latents"}
test_xformers_attention = False
def get_dummy_components(self):
return self._get_dummy_components()
def get_dummy_inputs(self, device, seed=0):
if str(device).startswith("mps"):
generator = torch.manual_seed(seed)
else:
generator = torch.Generator(device=device).manual_seed(seed)
inputs = {
"prompt": "A painting of a squirrel eating a burger",
"generator": generator,
"num_inference_steps": 2,
"output_type": "numpy",
}
return inputs
def test_save_load_optional_components(self):
self._test_save_load_optional_components()
@unittest.skipIf(torch_device != "cuda", reason="float16 requires CUDA")
def test_save_load_float16(self):
# Due to non-determinism in save load of the hf-internal-testing/tiny-random-t5 text encoder
self._test_save_load_float16(expected_max_diff=1e-1)
def test_attention_slicing_forward_pass(self):
self._test_attention_slicing_forward_pass(expected_max_diff=1e-2)
def test_save_load_local(self):
self._test_save_load_local()
def test_inference_batch_single_identical(self):
self._test_inference_batch_single_identical(
expected_max_diff=1e-2,
)
@slow
@require_torch_gpu
class IFPipelineSlowTests(unittest.TestCase):
def tearDown(self):
# clean up the VRAM after each test
super().tearDown()
gc.collect()
torch.cuda.empty_cache()
def test_all(self):
# if
pipe_1 = IFPipeline.from_pretrained("DeepFloyd/IF-I-IF-v1.0", variant="fp16", torch_dtype=torch.float16)
pipe_2 = IFSuperResolutionPipeline.from_pretrained(
"DeepFloyd/IF-II-L-v1.0", variant="fp16", torch_dtype=torch.float16, text_encoder=None, tokenizer=None
)
# pre compute text embeddings and remove T5 to save memory
pipe_1.text_encoder.to("cuda")
prompt_embeds, negative_prompt_embeds = pipe_1.encode_prompt("anime turtle", device="cuda")
del pipe_1.tokenizer
del pipe_1.text_encoder
gc.collect()
pipe_1.tokenizer = None
pipe_1.text_encoder = None
pipe_1.enable_model_cpu_offload()
pipe_2.enable_model_cpu_offload()
pipe_1.unet.set_attn_processor(AttnAddedKVProcessor())
pipe_2.unet.set_attn_processor(AttnAddedKVProcessor())
self._test_if(pipe_1, pipe_2, prompt_embeds, negative_prompt_embeds)
pipe_1.remove_all_hooks()
pipe_2.remove_all_hooks()
# img2img
pipe_1 = IFImg2ImgPipeline(**pipe_1.components)
pipe_2 = IFImg2ImgSuperResolutionPipeline(**pipe_2.components)
pipe_1.enable_model_cpu_offload()
pipe_2.enable_model_cpu_offload()
pipe_1.unet.set_attn_processor(AttnAddedKVProcessor())
pipe_2.unet.set_attn_processor(AttnAddedKVProcessor())
self._test_if_img2img(pipe_1, pipe_2, prompt_embeds, negative_prompt_embeds)
pipe_1.remove_all_hooks()
pipe_2.remove_all_hooks()
# inpainting
pipe_1 = IFInpaintingPipeline(**pipe_1.components)
pipe_2 = IFInpaintingSuperResolutionPipeline(**pipe_2.components)
pipe_1.enable_model_cpu_offload()
pipe_2.enable_model_cpu_offload()
pipe_1.unet.set_attn_processor(AttnAddedKVProcessor())
pipe_2.unet.set_attn_processor(AttnAddedKVProcessor())
self._test_if_inpainting(pipe_1, pipe_2, prompt_embeds, negative_prompt_embeds)
def _test_if(self, pipe_1, pipe_2, prompt_embeds, negative_prompt_embeds):
# pipeline 1
_start_torch_memory_measurement()
generator = torch.Generator(device="cpu").manual_seed(0)
output = pipe_1(
prompt_embeds=prompt_embeds,
negative_prompt_embeds=negative_prompt_embeds,
num_inference_steps=2,
generator=generator,
output_type="np",
)
image = output.images[0]
assert image.shape == (64, 64, 3)
mem_bytes = torch.cuda.max_memory_allocated()
assert mem_bytes < 13 * 10**9
expected_image = load_numpy(
"https://huggingface.co/datasets/hf-internal-testing/diffusers-images/resolve/main/if/test_if.npy"
)
assert_mean_pixel_difference(image, expected_image)
# pipeline 2
_start_torch_memory_measurement()
generator = torch.Generator(device="cpu").manual_seed(0)
image = floats_tensor((1, 3, 64, 64), rng=random.Random(0)).to(torch_device)
output = pipe_2(
prompt_embeds=prompt_embeds,
negative_prompt_embeds=negative_prompt_embeds,
image=image,
generator=generator,
num_inference_steps=2,
output_type="np",
)
image = output.images[0]
assert image.shape == (256, 256, 3)
mem_bytes = torch.cuda.max_memory_allocated()
assert mem_bytes < 4 * 10**9
expected_image = load_numpy(
"https://huggingface.co/datasets/hf-internal-testing/diffusers-images/resolve/main/if/test_if_superresolution_stage_II.npy"
)
assert_mean_pixel_difference(image, expected_image)
def _test_if_img2img(self, pipe_1, pipe_2, prompt_embeds, negative_prompt_embeds):
# pipeline 1
_start_torch_memory_measurement()
image = floats_tensor((1, 3, 64, 64), rng=random.Random(0)).to(torch_device)
generator = torch.Generator(device="cpu").manual_seed(0)
output = pipe_1(
prompt_embeds=prompt_embeds,
negative_prompt_embeds=negative_prompt_embeds,
image=image,
num_inference_steps=2,
generator=generator,
output_type="np",
)
image = output.images[0]
assert image.shape == (64, 64, 3)
mem_bytes = torch.cuda.max_memory_allocated()
assert mem_bytes < 10 * 10**9
expected_image = load_numpy(
"https://huggingface.co/datasets/hf-internal-testing/diffusers-images/resolve/main/if/test_if_img2img.npy"
)
assert_mean_pixel_difference(image, expected_image)
# pipeline 2
_start_torch_memory_measurement()
generator = torch.Generator(device="cpu").manual_seed(0)
original_image = floats_tensor((1, 3, 256, 256), rng=random.Random(0)).to(torch_device)
image = floats_tensor((1, 3, 64, 64), rng=random.Random(0)).to(torch_device)
output = pipe_2(
prompt_embeds=prompt_embeds,
negative_prompt_embeds=negative_prompt_embeds,
image=image,
original_image=original_image,
generator=generator,
num_inference_steps=2,
output_type="np",
)
image = output.images[0]
assert image.shape == (256, 256, 3)
mem_bytes = torch.cuda.max_memory_allocated()
assert mem_bytes < 4 * 10**9
expected_image = load_numpy(
"https://huggingface.co/datasets/hf-internal-testing/diffusers-images/resolve/main/if/test_if_img2img_superresolution_stage_II.npy"
)
assert_mean_pixel_difference(image, expected_image)
def _test_if_inpainting(self, pipe_1, pipe_2, prompt_embeds, negative_prompt_embeds):
# pipeline 1
_start_torch_memory_measurement()
image = floats_tensor((1, 3, 64, 64), rng=random.Random(0)).to(torch_device)
mask_image = floats_tensor((1, 3, 64, 64), rng=random.Random(1)).to(torch_device)
generator = torch.Generator(device="cpu").manual_seed(0)
output = pipe_1(
prompt_embeds=prompt_embeds,
negative_prompt_embeds=negative_prompt_embeds,
image=image,
mask_image=mask_image,
num_inference_steps=2,
generator=generator,
output_type="np",
)
image = output.images[0]
assert image.shape == (64, 64, 3)
mem_bytes = torch.cuda.max_memory_allocated()
assert mem_bytes < 10 * 10**9
expected_image = load_numpy(
"https://huggingface.co/datasets/hf-internal-testing/diffusers-images/resolve/main/if/test_if_inpainting.npy"
)
assert_mean_pixel_difference(image, expected_image)
# pipeline 2
_start_torch_memory_measurement()
generator = torch.Generator(device="cpu").manual_seed(0)
image = floats_tensor((1, 3, 64, 64), rng=random.Random(0)).to(torch_device)
original_image = floats_tensor((1, 3, 256, 256), rng=random.Random(0)).to(torch_device)
mask_image = floats_tensor((1, 3, 256, 256), rng=random.Random(1)).to(torch_device)
output = pipe_2(
prompt_embeds=prompt_embeds,
negative_prompt_embeds=negative_prompt_embeds,
image=image,
mask_image=mask_image,
original_image=original_image,
generator=generator,
num_inference_steps=2,
output_type="np",
)
image = output.images[0]
assert image.shape == (256, 256, 3)
mem_bytes = torch.cuda.max_memory_allocated()
assert mem_bytes < 4 * 10**9
expected_image = load_numpy(
"https://huggingface.co/datasets/hf-internal-testing/diffusers-images/resolve/main/if/test_if_inpainting_superresolution_stage_II.npy"
)
assert_mean_pixel_difference(image, expected_image)
def _start_torch_memory_measurement():
torch.cuda.empty_cache()
torch.cuda.reset_max_memory_allocated()
torch.cuda.reset_peak_memory_stats()
# coding=utf-8
# Copyright 2023 HuggingFace Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import random
import unittest
import torch
from diffusers import IFImg2ImgPipeline
from diffusers.utils import floats_tensor
from diffusers.utils.testing_utils import skip_mps, torch_device
from ..pipeline_params import (
TEXT_GUIDED_IMAGE_VARIATION_BATCH_PARAMS,
TEXT_GUIDED_IMAGE_VARIATION_PARAMS,
)
from ..test_pipelines_common import PipelineTesterMixin
from . import IFPipelineTesterMixin
@skip_mps
class IFImg2ImgPipelineFastTests(PipelineTesterMixin, IFPipelineTesterMixin, unittest.TestCase):
pipeline_class = IFImg2ImgPipeline
params = TEXT_GUIDED_IMAGE_VARIATION_PARAMS - {"width", "height"}
batch_params = TEXT_GUIDED_IMAGE_VARIATION_BATCH_PARAMS
required_optional_params = PipelineTesterMixin.required_optional_params - {"latents"}
test_xformers_attention = False
def get_dummy_components(self):
return self._get_dummy_components()
def get_dummy_inputs(self, device, seed=0):
if str(device).startswith("mps"):
generator = torch.manual_seed(seed)
else:
generator = torch.Generator(device=device).manual_seed(seed)
image = floats_tensor((1, 3, 32, 32), rng=random.Random(seed)).to(device)
inputs = {
"prompt": "A painting of a squirrel eating a burger",
"image": image,
"generator": generator,
"num_inference_steps": 2,
"output_type": "numpy",
}
return inputs
def test_save_load_optional_components(self):
self._test_save_load_optional_components()
@unittest.skipIf(torch_device != "cuda", reason="float16 requires CUDA")
def test_save_load_float16(self):
# Due to non-determinism in save load of the hf-internal-testing/tiny-random-t5 text encoder
self._test_save_load_float16(expected_max_diff=1e-1)
@unittest.skipIf(torch_device != "cuda", reason="float16 requires CUDA")
def test_float16_inference(self):
self._test_float16_inference(expected_max_diff=1e-1)
def test_attention_slicing_forward_pass(self):
self._test_attention_slicing_forward_pass(expected_max_diff=1e-2)
def test_save_load_local(self):
self._test_save_load_local()
def test_inference_batch_single_identical(self):
self._test_inference_batch_single_identical(
expected_max_diff=1e-2,
)
# coding=utf-8
# Copyright 2023 HuggingFace Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import random
import unittest
import torch
from diffusers import IFImg2ImgSuperResolutionPipeline
from diffusers.utils import floats_tensor
from diffusers.utils.testing_utils import skip_mps, torch_device
from ..pipeline_params import TEXT_GUIDED_IMAGE_VARIATION_BATCH_PARAMS, TEXT_GUIDED_IMAGE_VARIATION_PARAMS
from ..test_pipelines_common import PipelineTesterMixin
from . import IFPipelineTesterMixin
@skip_mps
class IFImg2ImgSuperResolutionPipelineFastTests(PipelineTesterMixin, IFPipelineTesterMixin, unittest.TestCase):
pipeline_class = IFImg2ImgSuperResolutionPipeline
params = TEXT_GUIDED_IMAGE_VARIATION_PARAMS - {"width", "height"}
batch_params = TEXT_GUIDED_IMAGE_VARIATION_BATCH_PARAMS.union({"original_image"})
required_optional_params = PipelineTesterMixin.required_optional_params - {"latents"}
test_xformers_attention = False
def get_dummy_components(self):
return self._get_superresolution_dummy_components()
def get_dummy_inputs(self, device, seed=0):
if str(device).startswith("mps"):
generator = torch.manual_seed(seed)
else:
generator = torch.Generator(device=device).manual_seed(seed)
original_image = floats_tensor((1, 3, 32, 32), rng=random.Random(seed)).to(device)
image = floats_tensor((1, 3, 16, 16), rng=random.Random(seed)).to(device)
inputs = {
"prompt": "A painting of a squirrel eating a burger",
"image": image,
"original_image": original_image,
"generator": generator,
"num_inference_steps": 2,
"output_type": "numpy",
}
return inputs
def test_save_load_optional_components(self):
self._test_save_load_optional_components()
@unittest.skipIf(torch_device != "cuda", reason="float16 requires CUDA")
def test_save_load_float16(self):
# Due to non-determinism in save load of the hf-internal-testing/tiny-random-t5 text encoder
self._test_save_load_float16(expected_max_diff=1e-1)
def test_attention_slicing_forward_pass(self):
self._test_attention_slicing_forward_pass(expected_max_diff=1e-2)
def test_save_load_local(self):
self._test_save_load_local()
def test_inference_batch_single_identical(self):
self._test_inference_batch_single_identical(
expected_max_diff=1e-2,
)
# coding=utf-8
# Copyright 2023 HuggingFace Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import random
import unittest
import torch
from diffusers import IFInpaintingPipeline
from diffusers.utils import floats_tensor
from diffusers.utils.testing_utils import skip_mps, torch_device
from ..pipeline_params import (
TEXT_GUIDED_IMAGE_INPAINTING_BATCH_PARAMS,
TEXT_GUIDED_IMAGE_INPAINTING_PARAMS,
)
from ..test_pipelines_common import PipelineTesterMixin
from . import IFPipelineTesterMixin
@skip_mps
class IFInpaintingPipelineFastTests(PipelineTesterMixin, IFPipelineTesterMixin, unittest.TestCase):
pipeline_class = IFInpaintingPipeline
params = TEXT_GUIDED_IMAGE_INPAINTING_PARAMS - {"width", "height"}
batch_params = TEXT_GUIDED_IMAGE_INPAINTING_BATCH_PARAMS
required_optional_params = PipelineTesterMixin.required_optional_params - {"latents"}
test_xformers_attention = False
def get_dummy_components(self):
return self._get_dummy_components()
def get_dummy_inputs(self, device, seed=0):
if str(device).startswith("mps"):
generator = torch.manual_seed(seed)
else:
generator = torch.Generator(device=device).manual_seed(seed)
image = floats_tensor((1, 3, 32, 32), rng=random.Random(seed)).to(device)
mask_image = floats_tensor((1, 3, 32, 32), rng=random.Random(seed)).to(device)
inputs = {
"prompt": "A painting of a squirrel eating a burger",
"image": image,
"mask_image": mask_image,
"generator": generator,
"num_inference_steps": 2,
"output_type": "numpy",
}
return inputs
def test_save_load_optional_components(self):
self._test_save_load_optional_components()
@unittest.skipIf(torch_device != "cuda", reason="float16 requires CUDA")
def test_save_load_float16(self):
# Due to non-determinism in save load of the hf-internal-testing/tiny-random-t5 text encoder
self._test_save_load_float16(expected_max_diff=1e-1)
def test_attention_slicing_forward_pass(self):
self._test_attention_slicing_forward_pass(expected_max_diff=1e-2)
def test_save_load_local(self):
self._test_save_load_local()
def test_inference_batch_single_identical(self):
self._test_inference_batch_single_identical(
expected_max_diff=1e-2,
)
# coding=utf-8
# Copyright 2023 HuggingFace Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import random
import unittest
import torch
from diffusers import IFInpaintingSuperResolutionPipeline
from diffusers.utils import floats_tensor
from diffusers.utils.testing_utils import skip_mps, torch_device
from ..pipeline_params import (
TEXT_GUIDED_IMAGE_INPAINTING_BATCH_PARAMS,
TEXT_GUIDED_IMAGE_INPAINTING_PARAMS,
)
from ..test_pipelines_common import PipelineTesterMixin
from . import IFPipelineTesterMixin
@skip_mps
class IFInpaintingSuperResolutionPipelineFastTests(PipelineTesterMixin, IFPipelineTesterMixin, unittest.TestCase):
pipeline_class = IFInpaintingSuperResolutionPipeline
params = TEXT_GUIDED_IMAGE_INPAINTING_PARAMS - {"width", "height"}
batch_params = TEXT_GUIDED_IMAGE_INPAINTING_BATCH_PARAMS.union({"original_image"})
required_optional_params = PipelineTesterMixin.required_optional_params - {"latents"}
test_xformers_attention = False
def get_dummy_components(self):
return self._get_superresolution_dummy_components()
def get_dummy_inputs(self, device, seed=0):
if str(device).startswith("mps"):
generator = torch.manual_seed(seed)
else:
generator = torch.Generator(device=device).manual_seed(seed)
image = floats_tensor((1, 3, 16, 16), rng=random.Random(seed)).to(device)
original_image = floats_tensor((1, 3, 32, 32), rng=random.Random(seed)).to(device)
mask_image = floats_tensor((1, 3, 32, 32), rng=random.Random(seed)).to(device)
inputs = {
"prompt": "A painting of a squirrel eating a burger",
"image": image,
"original_image": original_image,
"mask_image": mask_image,
"generator": generator,
"num_inference_steps": 2,
"output_type": "numpy",
}
return inputs
def test_save_load_optional_components(self):
self._test_save_load_optional_components()
@unittest.skipIf(torch_device != "cuda", reason="float16 requires CUDA")
def test_save_load_float16(self):
# Due to non-determinism in save load of the hf-internal-testing/tiny-random-t5 text encoder
self._test_save_load_float16(expected_max_diff=1e-1)
def test_attention_slicing_forward_pass(self):
self._test_attention_slicing_forward_pass(expected_max_diff=1e-2)
def test_save_load_local(self):
self._test_save_load_local()
def test_inference_batch_single_identical(self):
self._test_inference_batch_single_identical(
expected_max_diff=1e-2,
)
# coding=utf-8
# Copyright 2023 HuggingFace Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import random
import unittest
import torch
from diffusers import IFSuperResolutionPipeline
from diffusers.utils import floats_tensor
from diffusers.utils.testing_utils import skip_mps, torch_device
from ..pipeline_params import TEXT_GUIDED_IMAGE_VARIATION_BATCH_PARAMS, TEXT_GUIDED_IMAGE_VARIATION_PARAMS
from ..test_pipelines_common import PipelineTesterMixin
from . import IFPipelineTesterMixin
@skip_mps
class IFSuperResolutionPipelineFastTests(PipelineTesterMixin, IFPipelineTesterMixin, unittest.TestCase):
pipeline_class = IFSuperResolutionPipeline
params = TEXT_GUIDED_IMAGE_VARIATION_PARAMS - {"width", "height"}
batch_params = TEXT_GUIDED_IMAGE_VARIATION_BATCH_PARAMS
required_optional_params = PipelineTesterMixin.required_optional_params - {"latents"}
test_xformers_attention = False
def get_dummy_components(self):
return self._get_superresolution_dummy_components()
def get_dummy_inputs(self, device, seed=0):
if str(device).startswith("mps"):
generator = torch.manual_seed(seed)
else:
generator = torch.Generator(device=device).manual_seed(seed)
image = floats_tensor((1, 3, 32, 32), rng=random.Random(seed)).to(device)
inputs = {
"prompt": "A painting of a squirrel eating a burger",
"image": image,
"generator": generator,
"num_inference_steps": 2,
"output_type": "numpy",
}
return inputs
def test_save_load_optional_components(self):
self._test_save_load_optional_components()
@unittest.skipIf(torch_device != "cuda", reason="float16 requires CUDA")
def test_save_load_float16(self):
# Due to non-determinism in save load of the hf-internal-testing/tiny-random-t5 text encoder
self._test_save_load_float16(expected_max_diff=1e-1)
def test_attention_slicing_forward_pass(self):
self._test_attention_slicing_forward_pass(expected_max_diff=1e-2)
def test_save_load_local(self):
self._test_save_load_local()
def test_inference_batch_single_identical(self):
self._test_inference_batch_single_identical(
expected_max_diff=1e-2,
)
......@@ -575,6 +575,19 @@ class DownloadTests(unittest.TestCase):
out = pipe(prompt, num_inference_steps=1, output_type="numpy").images
assert out.shape == (1, 128, 128, 3)
def test_download_ignore_files(self):
# Check https://huggingface.co/hf-internal-testing/tiny-stable-diffusion-pipe-ignore-files/blob/72f58636e5508a218c6b3f60550dc96445547817/model_index.json#L4
with tempfile.TemporaryDirectory() as tmpdirname:
# pipeline has Flax weights
tmpdirname = DiffusionPipeline.download("hf-internal-testing/tiny-stable-diffusion-pipe-ignore-files")
all_root_files = [t[-1] for t in os.walk(os.path.join(tmpdirname))]
files = [item for sublist in all_root_files for item in sublist]
# None of the downloaded files should be a pytorch file even if we have some here:
# https://huggingface.co/hf-internal-testing/tiny-stable-diffusion-pipe/blob/main/unet/diffusion_flax_model.msgpack
assert not any(f in ["vae/diffusion_pytorch_model.bin", "text_encoder/config.json"] for f in files)
assert len(files) == 14
class CustomPipelineTests(unittest.TestCase):
def test_load_custom_pipeline(self):
......
......@@ -339,6 +339,9 @@ class PipelineTesterMixin:
@unittest.skipIf(torch_device != "cuda", reason="float16 requires CUDA")
def test_float16_inference(self):
self._test_float16_inference()
def _test_float16_inference(self, expected_max_diff=1e-2):
components = self.get_dummy_components()
pipe = self.pipeline_class(**components)
pipe.to(torch_device)
......@@ -352,10 +355,13 @@ class PipelineTesterMixin:
output_fp16 = pipe_fp16(**self.get_dummy_inputs(torch_device))[0]
max_diff = np.abs(to_np(output) - to_np(output_fp16)).max()
self.assertLess(max_diff, 1e-2, "The outputs of the fp16 and fp32 pipelines are too different.")
self.assertLess(max_diff, expected_max_diff, "The outputs of the fp16 and fp32 pipelines are too different.")
@unittest.skipIf(torch_device != "cuda", reason="float16 requires CUDA")
def test_save_load_float16(self):
self._test_save_load_float16()
def _test_save_load_float16(self, expected_max_diff=1e-2):
components = self.get_dummy_components()
for name, module in components.items():
if hasattr(module, "half"):
......@@ -384,7 +390,9 @@ class PipelineTesterMixin:
output_loaded = pipe_loaded(**inputs)[0]
max_diff = np.abs(to_np(output) - to_np(output_loaded)).max()
self.assertLess(max_diff, 1e-2, "The output of the fp16 pipeline changed after saving and loading.")
self.assertLess(
max_diff, expected_max_diff, "The output of the fp16 pipeline changed after saving and loading."
)
def test_save_load_optional_components(self):
if not hasattr(self.pipeline_class, "_optional_components"):
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment