Skip to content
GitLab
Menu
Projects
Groups
Snippets
Loading...
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
Menu
Open sidebar
renzhc
diffusers_dcu
Commits
76845183
Commit
76845183
authored
Nov 25, 2022
by
Patrick von Platen
Browse files
Merge branch 'main' of
https://github.com/huggingface/diffusers
into main
parents
520bb082
9ec5084a
Changes
10
Hide whitespace changes
Inline
Side-by-side
Showing
10 changed files
with
1391 additions
and
31 deletions
+1391
-31
docs/source/api/pipelines/stable_diffusion.mdx
docs/source/api/pipelines/stable_diffusion.mdx
+7
-0
src/diffusers/__init__.py
src/diffusers/__init__.py
+1
-0
src/diffusers/pipeline_utils.py
src/diffusers/pipeline_utils.py
+5
-3
src/diffusers/pipelines/__init__.py
src/diffusers/pipelines/__init__.py
+1
-0
src/diffusers/pipelines/stable_diffusion/__init__.py
src/diffusers/pipelines/stable_diffusion/__init__.py
+1
-0
src/diffusers/pipelines/stable_diffusion/pipeline_stable_diffusion_upscale.py
...nes/stable_diffusion/pipeline_stable_diffusion_upscale.py
+551
-0
src/diffusers/utils/dummy_torch_and_transformers_objects.py
src/diffusers/utils/dummy_torch_and_transformers_objects.py
+15
-0
tests/pipelines/stable_diffusion_2/test_stable_diffusion.py
tests/pipelines/stable_diffusion_2/test_stable_diffusion.py
+21
-28
tests/pipelines/stable_diffusion_2/test_stable_diffusion_upscale.py
...lines/stable_diffusion_2/test_stable_diffusion_upscale.py
+315
-0
tests/pipelines/stable_diffusion_2/test_stable_diffusion_v_pred.py
...elines/stable_diffusion_2/test_stable_diffusion_v_pred.py
+474
-0
No files found.
docs/source/api/pipelines/stable_diffusion.mdx
View file @
76845183
...
...
@@ -95,3 +95,10 @@ If you want to use all possible use cases in a single `DiffusionPipeline` you ca
- __call__
- enable_attention_slicing
- disable_attention_slicing
## StableDiffusionUpscalePipeline
[[autodoc]] StableDiffusionUpscalePipeline
- __call__
- enable_attention_slicing
- disable_attention_slicing
src/diffusers/__init__.py
View file @
76845183
...
...
@@ -75,6 +75,7 @@ if is_torch_available() and is_transformers_available():
StableDiffusionInpaintPipelineLegacy
,
StableDiffusionPipeline
,
StableDiffusionPipelineSafe
,
StableDiffusionUpscalePipeline
,
VersatileDiffusionDualGuidedPipeline
,
VersatileDiffusionImageVariationPipeline
,
VersatileDiffusionPipeline
,
...
...
src/diffusers/pipeline_utils.py
View file @
76845183
...
...
@@ -554,7 +554,9 @@ class DiffusionPipeline(ConfigMixin):
init_dict
=
{
k
:
v
for
k
,
v
in
init_dict
.
items
()
if
load_module
(
k
,
v
)}
if
len
(
unused_kwargs
)
>
0
:
logger
.
warning
(
f
"Keyword arguments
{
unused_kwargs
}
not recognized."
)
logger
.
warning
(
f
"Keyword arguments
{
unused_kwargs
}
are not expected by
{
pipeline_class
.
__name__
}
and will be ignored."
)
# import it here to avoid circular import
from
diffusers
import
pipelines
...
...
@@ -680,8 +682,8 @@ class DiffusionPipeline(ConfigMixin):
@
staticmethod
def
_get_signature_keys
(
obj
):
parameters
=
inspect
.
signature
(
obj
.
__init__
).
parameters
required_parameters
=
{
k
:
v
for
k
,
v
in
parameters
.
items
()
if
v
.
default
is
not
True
}
optional_parameters
=
set
({
k
for
k
,
v
in
parameters
.
items
()
if
v
.
default
is
True
})
required_parameters
=
{
k
:
v
for
k
,
v
in
parameters
.
items
()
if
v
.
default
==
inspect
.
_empty
}
optional_parameters
=
set
({
k
for
k
,
v
in
parameters
.
items
()
if
v
.
default
!=
inspect
.
_empty
})
expected_modules
=
set
(
required_parameters
.
keys
())
-
set
([
"self"
])
return
expected_modules
,
optional_parameters
...
...
src/diffusers/pipelines/__init__.py
View file @
76845183
...
...
@@ -24,6 +24,7 @@ if is_torch_available() and is_transformers_available():
StableDiffusionInpaintPipeline
,
StableDiffusionInpaintPipelineLegacy
,
StableDiffusionPipeline
,
StableDiffusionUpscalePipeline
,
)
from
.stable_diffusion_safe
import
StableDiffusionPipelineSafe
from
.versatile_diffusion
import
(
...
...
src/diffusers/pipelines/stable_diffusion/__init__.py
View file @
76845183
...
...
@@ -40,6 +40,7 @@ if is_transformers_available() and is_torch_available():
from
.pipeline_stable_diffusion_img2img
import
StableDiffusionImg2ImgPipeline
from
.pipeline_stable_diffusion_inpaint
import
StableDiffusionInpaintPipeline
from
.pipeline_stable_diffusion_inpaint_legacy
import
StableDiffusionInpaintPipelineLegacy
from
.pipeline_stable_diffusion_upscale
import
StableDiffusionUpscalePipeline
from
.safety_checker
import
StableDiffusionSafetyChecker
if
is_transformers_available
()
and
is_torch_available
()
and
is_transformers_version
(
">="
,
"4.25.0.dev0"
):
...
...
src/diffusers/pipelines/stable_diffusion/pipeline_stable_diffusion_upscale.py
0 → 100644
View file @
76845183
# Copyright 2022 The HuggingFace Team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import
inspect
from
typing
import
Callable
,
List
,
Optional
,
Union
import
numpy
as
np
import
torch
import
PIL
from
diffusers.utils
import
is_accelerate_available
from
transformers
import
CLIPTextModel
,
CLIPTokenizer
from
...models
import
AutoencoderKL
,
UNet2DConditionModel
from
...pipeline_utils
import
DiffusionPipeline
,
ImagePipelineOutput
from
...schedulers
import
DDIMScheduler
,
DDPMScheduler
,
LMSDiscreteScheduler
,
PNDMScheduler
from
...utils
import
logging
logger
=
logging
.
get_logger
(
__name__
)
# pylint: disable=invalid-name
def
preprocess
(
image
):
# resize to multiple of 64
width
,
height
=
image
.
size
width
=
width
-
width
%
64
height
=
height
-
height
%
64
image
=
image
.
resize
((
width
,
height
))
image
=
np
.
array
(
image
.
convert
(
"RGB"
))
image
=
image
[
None
].
transpose
(
0
,
3
,
1
,
2
)
image
=
torch
.
from_numpy
(
image
).
to
(
dtype
=
torch
.
float32
)
/
127.5
-
1.0
return
image
class
StableDiffusionUpscalePipeline
(
DiffusionPipeline
):
r
"""
Pipeline for text-guided image super-resolution using Stable Diffusion 2.
This model inherits from [`DiffusionPipeline`]. Check the superclass documentation for the generic methods the
library implements for all the pipelines (such as downloading or saving, running on a particular device, etc.)
Args:
vae ([`AutoencoderKL`]):
Variational Auto-Encoder (VAE) Model to encode and decode images to and from latent representations.
text_encoder ([`CLIPTextModel`]):
Frozen text-encoder. Stable Diffusion uses the text portion of
[CLIP](https://huggingface.co/docs/transformers/model_doc/clip#transformers.CLIPTextModel), specifically
the [clip-vit-large-patch14](https://huggingface.co/openai/clip-vit-large-patch14) variant.
tokenizer (`CLIPTokenizer`):
Tokenizer of class
[CLIPTokenizer](https://huggingface.co/docs/transformers/v4.21.0/en/model_doc/clip#transformers.CLIPTokenizer).
unet ([`UNet2DConditionModel`]): Conditional U-Net architecture to denoise the encoded image latents.
low_res_scheduler ([`SchedulerMixin`]):
A scheduler used to add initial noise to the low res conditioning image. It must be an instance of
[`DDPMScheduler`].
scheduler ([`SchedulerMixin`]):
A scheduler to be used in combination with `unet` to denoise the encoded image latents. Can be one of
[`DDIMScheduler`], [`LMSDiscreteScheduler`], or [`PNDMScheduler`].
"""
def
__init__
(
self
,
vae
:
AutoencoderKL
,
text_encoder
:
CLIPTextModel
,
tokenizer
:
CLIPTokenizer
,
unet
:
UNet2DConditionModel
,
low_res_scheduler
:
DDPMScheduler
,
scheduler
:
Union
[
DDIMScheduler
,
PNDMScheduler
,
LMSDiscreteScheduler
],
max_noise_level
:
int
=
350
,
):
super
().
__init__
()
self
.
register_modules
(
vae
=
vae
,
text_encoder
=
text_encoder
,
tokenizer
=
tokenizer
,
unet
=
unet
,
low_res_scheduler
=
low_res_scheduler
,
scheduler
=
scheduler
,
)
self
.
register_to_config
(
max_noise_level
=
max_noise_level
)
# Copied from diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.enable_attention_slicing
def
enable_attention_slicing
(
self
,
slice_size
:
Optional
[
Union
[
str
,
int
]]
=
"auto"
):
r
"""
Enable sliced attention computation.
When this option is enabled, the attention module will split the input tensor in slices, to compute attention
in several steps. This is useful to save some memory in exchange for a small speed decrease.
Args:
slice_size (`str` or `int`, *optional*, defaults to `"auto"`):
When `"auto"`, halves the input to the attention heads, so attention will be computed in two steps. If
a number is provided, uses as many slices as `attention_head_dim // slice_size`. In this case,
`attention_head_dim` must be a multiple of `slice_size`.
"""
if
slice_size
==
"auto"
:
if
isinstance
(
self
.
unet
.
config
.
attention_head_dim
,
int
):
# half the attention head size is usually a good trade-off between
# speed and memory
slice_size
=
self
.
unet
.
config
.
attention_head_dim
//
2
else
:
# if `attention_head_dim` is a list, take the smallest head size
slice_size
=
min
(
self
.
unet
.
config
.
attention_head_dim
)
self
.
unet
.
set_attention_slice
(
slice_size
)
# Copied from diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.disable_attention_slicing
def
disable_attention_slicing
(
self
):
r
"""
Disable sliced attention computation. If `enable_attention_slicing` was previously invoked, this method will go
back to computing attention in one step.
"""
# set slice_size = `None` to disable `attention slicing`
self
.
enable_attention_slicing
(
None
)
def
enable_sequential_cpu_offload
(
self
,
gpu_id
=
0
):
r
"""
Offloads all models to CPU using accelerate, significantly reducing memory usage. When called, unet,
text_encoder, vae and safety checker have their state dicts saved to CPU and then are moved to a
`torch.device('meta') and loaded to GPU only when their specific submodule has its `forward` method called.
"""
if
is_accelerate_available
():
from
accelerate
import
cpu_offload
else
:
raise
ImportError
(
"Please install accelerate via `pip install accelerate`"
)
device
=
torch
.
device
(
f
"cuda:
{
gpu_id
}
"
)
for
cpu_offloaded_model
in
[
self
.
unet
,
self
.
text_encoder
]:
if
cpu_offloaded_model
is
not
None
:
cpu_offload
(
cpu_offloaded_model
,
device
)
# Copied from diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.enable_xformers_memory_efficient_attention
def
enable_xformers_memory_efficient_attention
(
self
):
r
"""
Enable memory efficient attention as implemented in xformers.
When this option is enabled, you should observe lower GPU memory usage and a potential speed up at inference
time. Speed up at training time is not guaranteed.
Warning: When Memory Efficient Attention and Sliced attention are both enabled, the Memory Efficient Attention
is used.
"""
self
.
unet
.
set_use_memory_efficient_attention_xformers
(
True
)
# Copied from diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.disable_xformers_memory_efficient_attention
def
disable_xformers_memory_efficient_attention
(
self
):
r
"""
Disable memory efficient attention as implemented in xformers.
"""
self
.
unet
.
set_use_memory_efficient_attention_xformers
(
False
)
@
property
# Copied from diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline._execution_device
def
_execution_device
(
self
):
r
"""
Returns the device on which the pipeline's models will be executed. After calling
`pipeline.enable_sequential_cpu_offload()` the execution device can only be inferred from Accelerate's module
hooks.
"""
if
self
.
device
!=
torch
.
device
(
"meta"
)
or
not
hasattr
(
self
.
unet
,
"_hf_hook"
):
return
self
.
device
for
module
in
self
.
unet
.
modules
():
if
(
hasattr
(
module
,
"_hf_hook"
)
and
hasattr
(
module
.
_hf_hook
,
"execution_device"
)
and
module
.
_hf_hook
.
execution_device
is
not
None
):
return
torch
.
device
(
module
.
_hf_hook
.
execution_device
)
return
self
.
device
# Copied from diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline._encode_prompt
def
_encode_prompt
(
self
,
prompt
,
device
,
num_images_per_prompt
,
do_classifier_free_guidance
,
negative_prompt
):
r
"""
Encodes the prompt into text encoder hidden states.
Args:
prompt (`str` or `list(int)`):
prompt to be encoded
device: (`torch.device`):
torch device
num_images_per_prompt (`int`):
number of images that should be generated per prompt
do_classifier_free_guidance (`bool`):
whether to use classifier free guidance or not
negative_prompt (`str` or `List[str]`):
The prompt or prompts not to guide the image generation. Ignored when not using guidance (i.e., ignored
if `guidance_scale` is less than `1`).
"""
batch_size
=
len
(
prompt
)
if
isinstance
(
prompt
,
list
)
else
1
text_inputs
=
self
.
tokenizer
(
prompt
,
padding
=
"max_length"
,
max_length
=
self
.
tokenizer
.
model_max_length
,
truncation
=
True
,
return_tensors
=
"pt"
,
)
text_input_ids
=
text_inputs
.
input_ids
untruncated_ids
=
self
.
tokenizer
(
prompt
,
padding
=
"max_length"
,
return_tensors
=
"pt"
).
input_ids
if
not
torch
.
equal
(
text_input_ids
,
untruncated_ids
):
removed_text
=
self
.
tokenizer
.
batch_decode
(
untruncated_ids
[:,
self
.
tokenizer
.
model_max_length
-
1
:
-
1
])
logger
.
warning
(
"The following part of your input was truncated because CLIP can only handle sequences up to"
f
"
{
self
.
tokenizer
.
model_max_length
}
tokens:
{
removed_text
}
"
)
if
hasattr
(
self
.
text_encoder
.
config
,
"use_attention_mask"
)
and
self
.
text_encoder
.
config
.
use_attention_mask
:
attention_mask
=
text_inputs
.
attention_mask
.
to
(
device
)
else
:
attention_mask
=
None
text_embeddings
=
self
.
text_encoder
(
text_input_ids
.
to
(
device
),
attention_mask
=
attention_mask
,
)
text_embeddings
=
text_embeddings
[
0
]
# duplicate text embeddings for each generation per prompt, using mps friendly method
bs_embed
,
seq_len
,
_
=
text_embeddings
.
shape
text_embeddings
=
text_embeddings
.
repeat
(
1
,
num_images_per_prompt
,
1
)
text_embeddings
=
text_embeddings
.
view
(
bs_embed
*
num_images_per_prompt
,
seq_len
,
-
1
)
# get unconditional embeddings for classifier free guidance
if
do_classifier_free_guidance
:
uncond_tokens
:
List
[
str
]
if
negative_prompt
is
None
:
uncond_tokens
=
[
""
]
*
batch_size
elif
type
(
prompt
)
is
not
type
(
negative_prompt
):
raise
TypeError
(
f
"`negative_prompt` should be the same type to `prompt`, but got
{
type
(
negative_prompt
)
}
!="
f
"
{
type
(
prompt
)
}
."
)
elif
isinstance
(
negative_prompt
,
str
):
uncond_tokens
=
[
negative_prompt
]
elif
batch_size
!=
len
(
negative_prompt
):
raise
ValueError
(
f
"`negative_prompt`:
{
negative_prompt
}
has batch size
{
len
(
negative_prompt
)
}
, but `prompt`:"
f
"
{
prompt
}
has batch size
{
batch_size
}
. Please make sure that passed `negative_prompt` matches"
" the batch size of `prompt`."
)
else
:
uncond_tokens
=
negative_prompt
max_length
=
text_input_ids
.
shape
[
-
1
]
uncond_input
=
self
.
tokenizer
(
uncond_tokens
,
padding
=
"max_length"
,
max_length
=
max_length
,
truncation
=
True
,
return_tensors
=
"pt"
,
)
if
hasattr
(
self
.
text_encoder
.
config
,
"use_attention_mask"
)
and
self
.
text_encoder
.
config
.
use_attention_mask
:
attention_mask
=
uncond_input
.
attention_mask
.
to
(
device
)
else
:
attention_mask
=
None
uncond_embeddings
=
self
.
text_encoder
(
uncond_input
.
input_ids
.
to
(
device
),
attention_mask
=
attention_mask
,
)
uncond_embeddings
=
uncond_embeddings
[
0
]
# duplicate unconditional embeddings for each generation per prompt, using mps friendly method
seq_len
=
uncond_embeddings
.
shape
[
1
]
uncond_embeddings
=
uncond_embeddings
.
repeat
(
1
,
num_images_per_prompt
,
1
)
uncond_embeddings
=
uncond_embeddings
.
view
(
batch_size
*
num_images_per_prompt
,
seq_len
,
-
1
)
# For classifier free guidance, we need to do two forward passes.
# Here we concatenate the unconditional and text embeddings into a single batch
# to avoid doing two forward passes
text_embeddings
=
torch
.
cat
([
uncond_embeddings
,
text_embeddings
])
return
text_embeddings
# Copied from diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.prepare_extra_step_kwargs
def
prepare_extra_step_kwargs
(
self
,
generator
,
eta
):
# prepare extra kwargs for the scheduler step, since not all schedulers have the same signature
# eta (η) is only used with the DDIMScheduler, it will be ignored for other schedulers.
# eta corresponds to η in DDIM paper: https://arxiv.org/abs/2010.02502
# and should be between [0, 1]
accepts_eta
=
"eta"
in
set
(
inspect
.
signature
(
self
.
scheduler
.
step
).
parameters
.
keys
())
extra_step_kwargs
=
{}
if
accepts_eta
:
extra_step_kwargs
[
"eta"
]
=
eta
# check if the scheduler accepts generator
accepts_generator
=
"generator"
in
set
(
inspect
.
signature
(
self
.
scheduler
.
step
).
parameters
.
keys
())
if
accepts_generator
:
extra_step_kwargs
[
"generator"
]
=
generator
return
extra_step_kwargs
# Copied from diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.decode_latents with 0.18215->0.08333
def
decode_latents
(
self
,
latents
):
latents
=
1
/
0.08333
*
latents
image
=
self
.
vae
.
decode
(
latents
).
sample
image
=
(
image
/
2
+
0.5
).
clamp
(
0
,
1
)
# we always cast to float32 as this does not cause significant overhead and is compatible with bfloa16
image
=
image
.
cpu
().
permute
(
0
,
2
,
3
,
1
).
float
().
numpy
()
return
image
def
check_inputs
(
self
,
prompt
,
image
,
noise_level
,
callback_steps
):
if
not
isinstance
(
prompt
,
str
)
and
not
isinstance
(
prompt
,
list
):
raise
ValueError
(
f
"`prompt` has to be of type `str` or `list` but is
{
type
(
prompt
)
}
"
)
if
(
not
isinstance
(
image
,
torch
.
Tensor
)
and
not
isinstance
(
image
,
PIL
.
Image
.
Image
)
and
not
isinstance
(
image
,
list
)
):
raise
ValueError
(
f
"`image` has to be of type `torch.Tensor`, `PIL.Image.Image` or `list` but is
{
type
(
image
)
}
"
)
# verify batch size of prompt and image are same if image is a list or tensor
if
isinstance
(
image
,
list
)
or
isinstance
(
image
,
torch
.
Tensor
):
if
isinstance
(
prompt
,
str
):
batch_size
=
1
else
:
batch_size
=
len
(
prompt
)
if
isinstance
(
image
,
list
):
image_batch_size
=
len
(
image
)
else
:
image_batch_size
=
image
.
shape
[
0
]
if
batch_size
!=
image_batch_size
:
raise
ValueError
(
f
"`prompt` has batch size
{
batch_size
}
and `image` has batch size
{
image_batch_size
}
."
" Please make sure that passed `prompt` matches the batch size of `image`."
)
# check noise level
if
noise_level
>
self
.
config
.
max_noise_level
:
raise
ValueError
(
f
"`noise_level` has to be <=
{
self
.
config
.
max_noise_level
}
but is
{
noise_level
}
"
)
if
(
callback_steps
is
None
)
or
(
callback_steps
is
not
None
and
(
not
isinstance
(
callback_steps
,
int
)
or
callback_steps
<=
0
)
):
raise
ValueError
(
f
"`callback_steps` has to be a positive integer but is
{
callback_steps
}
of type"
f
"
{
type
(
callback_steps
)
}
."
)
def
prepare_latents
(
self
,
batch_size
,
num_channels_latents
,
height
,
width
,
dtype
,
device
,
generator
,
latents
=
None
):
shape
=
(
batch_size
,
num_channels_latents
,
height
,
width
)
if
latents
is
None
:
if
device
.
type
==
"mps"
:
# randn does not work reproducibly on mps
latents
=
torch
.
randn
(
shape
,
generator
=
generator
,
device
=
"cpu"
,
dtype
=
dtype
).
to
(
device
)
else
:
latents
=
torch
.
randn
(
shape
,
generator
=
generator
,
device
=
device
,
dtype
=
dtype
)
else
:
if
latents
.
shape
!=
shape
:
raise
ValueError
(
f
"Unexpected latents shape, got
{
latents
.
shape
}
, expected
{
shape
}
"
)
latents
=
latents
.
to
(
device
)
# scale the initial noise by the standard deviation required by the scheduler
latents
=
latents
*
self
.
scheduler
.
init_noise_sigma
return
latents
@
torch
.
no_grad
()
def
__call__
(
self
,
prompt
:
Union
[
str
,
List
[
str
]],
image
:
Union
[
torch
.
FloatTensor
,
PIL
.
Image
.
Image
,
List
[
PIL
.
Image
.
Image
]],
num_inference_steps
:
int
=
75
,
guidance_scale
:
float
=
9.0
,
noise_level
:
int
=
20
,
negative_prompt
:
Optional
[
Union
[
str
,
List
[
str
]]]
=
None
,
num_images_per_prompt
:
Optional
[
int
]
=
1
,
eta
:
float
=
0.0
,
generator
:
Optional
[
torch
.
Generator
]
=
None
,
latents
:
Optional
[
torch
.
FloatTensor
]
=
None
,
output_type
:
Optional
[
str
]
=
"pil"
,
return_dict
:
bool
=
True
,
callback
:
Optional
[
Callable
[[
int
,
int
,
torch
.
FloatTensor
],
None
]]
=
None
,
callback_steps
:
Optional
[
int
]
=
1
,
):
r
"""
Function invoked when calling the pipeline for generation.
Args:
prompt (`str` or `List[str]`):
The prompt or prompts to guide the image generation.
image (`PIL.Image.Image` or List[`PIL.Image.Image`] or `torch.FloatTensor`):
`Image`, or tensor representing an image batch which will be upscaled. *
num_inference_steps (`int`, *optional*, defaults to 50):
The number of denoising steps. More denoising steps usually lead to a higher quality image at the
expense of slower inference.
guidance_scale (`float`, *optional*, defaults to 7.5):
Guidance scale as defined in [Classifier-Free Diffusion Guidance](https://arxiv.org/abs/2207.12598).
`guidance_scale` is defined as `w` of equation 2. of [Imagen
Paper](https://arxiv.org/pdf/2205.11487.pdf). Guidance scale is enabled by setting `guidance_scale >
1`. Higher guidance scale encourages to generate images that are closely linked to the text `prompt`,
usually at the expense of lower image quality.
negative_prompt (`str` or `List[str]`, *optional*):
The prompt or prompts not to guide the image generation. Ignored when not using guidance (i.e., ignored
if `guidance_scale` is less than `1`).
num_images_per_prompt (`int`, *optional*, defaults to 1):
The number of images to generate per prompt.
eta (`float`, *optional*, defaults to 0.0):
Corresponds to parameter eta (η) in the DDIM paper: https://arxiv.org/abs/2010.02502. Only applies to
[`schedulers.DDIMScheduler`], will be ignored for others.
generator (`torch.Generator`, *optional*):
A [torch generator](https://pytorch.org/docs/stable/generated/torch.Generator.html) to make generation
deterministic.
latents (`torch.FloatTensor`, *optional*):
Pre-generated noisy latents, sampled from a Gaussian distribution, to be used as inputs for image
generation. Can be used to tweak the same generation with different prompts. If not provided, a latents
tensor will ge generated by sampling using the supplied random `generator`.
output_type (`str`, *optional*, defaults to `"pil"`):
The output format of the generate image. Choose between
[PIL](https://pillow.readthedocs.io/en/stable/): `PIL.Image.Image` or `np.array`.
return_dict (`bool`, *optional*, defaults to `True`):
Whether or not to return a [`~pipelines.stable_diffusion.StableDiffusionPipelineOutput`] instead of a
plain tuple.
callback (`Callable`, *optional*):
A function that will be called every `callback_steps` steps during inference. The function will be
called with the following arguments: `callback(step: int, timestep: int, latents: torch.FloatTensor)`.
callback_steps (`int`, *optional*, defaults to 1):
The frequency at which the `callback` function will be called. If not specified, the callback will be
called at every step.
Returns:
[`~pipelines.stable_diffusion.StableDiffusionPipelineOutput`] or `tuple`:
[`~pipelines.stable_diffusion.StableDiffusionPipelineOutput`] if `return_dict` is True, otherwise a `tuple.
When returning a tuple, the first element is a list with the generated images, and the second element is a
list of `bool`s denoting whether the corresponding generated image likely represents "not-safe-for-work"
(nsfw) content, according to the `safety_checker`.
"""
# 1. Check inputs
self
.
check_inputs
(
prompt
,
image
,
noise_level
,
callback_steps
)
# 2. Define call parameters
batch_size
=
1
if
isinstance
(
prompt
,
str
)
else
len
(
prompt
)
device
=
self
.
_execution_device
# here `guidance_scale` is defined analog to the guidance weight `w` of equation (2)
# of the Imagen paper: https://arxiv.org/pdf/2205.11487.pdf . `guidance_scale = 1`
# corresponds to doing no classifier free guidance.
do_classifier_free_guidance
=
guidance_scale
>
1.0
# 3. Encode input prompt
text_embeddings
=
self
.
_encode_prompt
(
prompt
,
device
,
num_images_per_prompt
,
do_classifier_free_guidance
,
negative_prompt
)
# 4. Preprocess image
image
=
[
image
]
if
isinstance
(
image
,
PIL
.
Image
.
Image
)
else
image
if
isinstance
(
image
,
list
):
image
=
[
preprocess
(
img
)
for
img
in
image
]
image
=
torch
.
cat
(
image
,
dim
=
0
)
image
=
image
.
to
(
dtype
=
text_embeddings
.
dtype
,
device
=
device
)
# 5. set timesteps
self
.
scheduler
.
set_timesteps
(
num_inference_steps
,
device
=
device
)
timesteps_tensor
=
self
.
scheduler
.
timesteps
# 5. Add noise to image
noise_level
=
torch
.
tensor
([
noise_level
],
dtype
=
torch
.
long
,
device
=
device
)
if
device
.
type
==
"mps"
:
# randn does not work reproducibly on mps
noise
=
torch
.
randn
(
image
.
shape
,
generator
=
generator
,
device
=
"cpu"
,
dtype
=
text_embeddings
.
dtype
).
to
(
device
)
else
:
noise
=
torch
.
randn
(
image
.
shape
,
generator
=
generator
,
device
=
device
,
dtype
=
text_embeddings
.
dtype
)
image
=
self
.
low_res_scheduler
.
add_noise
(
image
,
noise
,
noise_level
)
image
=
torch
.
cat
([
image
]
*
2
)
if
do_classifier_free_guidance
else
image
noise_level
=
torch
.
cat
([
noise_level
]
*
2
)
if
do_classifier_free_guidance
else
noise_level
# 6. Prepare latent variables
height
,
width
=
image
.
shape
[
2
:]
num_channels_latents
=
self
.
vae
.
config
.
latent_channels
latents
=
self
.
prepare_latents
(
batch_size
*
num_images_per_prompt
,
num_channels_latents
,
height
,
width
,
text_embeddings
.
dtype
,
device
,
generator
,
latents
,
)
# 7. Check that sizes of image and latents match
num_channels_image
=
image
.
shape
[
1
]
if
num_channels_latents
+
num_channels_image
!=
self
.
unet
.
config
.
in_channels
:
raise
ValueError
(
f
"Incorrect configuration settings! The config of `pipeline.unet`:
{
self
.
unet
.
config
}
expects"
f
"
{
self
.
unet
.
config
.
in_channels
}
but received `num_channels_latents`:
{
num_channels_latents
}
+"
f
" `num_channels_image`:
{
num_channels_image
}
"
f
" =
{
num_channels_latents
+
num_channels_image
}
. Please verify the config of"
" `pipeline.unet` or your `image` input."
)
# 8. Prepare extra step kwargs. TODO: Logic should ideally just be moved out of the pipeline
extra_step_kwargs
=
self
.
prepare_extra_step_kwargs
(
generator
,
eta
)
# 9. Denoising loop
for
i
,
t
in
enumerate
(
self
.
progress_bar
(
timesteps_tensor
)):
# expand the latents if we are doing classifier free guidance
latent_model_input
=
torch
.
cat
([
latents
]
*
2
)
if
do_classifier_free_guidance
else
latents
# concat latents, mask, masked_image_latents in the channel dimension
latent_model_input
=
self
.
scheduler
.
scale_model_input
(
latent_model_input
,
t
)
latent_model_input
=
torch
.
cat
([
latent_model_input
,
image
],
dim
=
1
)
# predict the noise residual
noise_pred
=
self
.
unet
(
latent_model_input
,
t
,
encoder_hidden_states
=
text_embeddings
,
class_labels
=
noise_level
).
sample
# perform guidance
if
do_classifier_free_guidance
:
noise_pred_uncond
,
noise_pred_text
=
noise_pred
.
chunk
(
2
)
noise_pred
=
noise_pred_uncond
+
guidance_scale
*
(
noise_pred_text
-
noise_pred_uncond
)
# compute the previous noisy sample x_t -> x_t-1
latents
=
self
.
scheduler
.
step
(
noise_pred
,
t
,
latents
,
**
extra_step_kwargs
).
prev_sample
# call the callback, if provided
if
callback
is
not
None
and
i
%
callback_steps
==
0
:
callback
(
i
,
t
,
latents
)
# 10. Post-processing
# make sure the VAE is in float32 mode, as it overflows in float16
self
.
vae
.
to
(
dtype
=
torch
.
float32
)
image
=
self
.
decode_latents
(
latents
.
float
())
# 11. Convert to PIL
if
output_type
==
"pil"
:
image
=
self
.
numpy_to_pil
(
image
)
if
not
return_dict
:
return
(
image
,)
return
ImagePipelineOutput
(
images
=
image
)
src/diffusers/utils/dummy_torch_and_transformers_objects.py
View file @
76845183
...
...
@@ -154,6 +154,21 @@ class StableDiffusionPipelineSafe(metaclass=DummyObject):
requires_backends
(
cls
,
[
"torch"
,
"transformers"
])
class
StableDiffusionUpscalePipeline
(
metaclass
=
DummyObject
):
_backends
=
[
"torch"
,
"transformers"
]
def
__init__
(
self
,
*
args
,
**
kwargs
):
requires_backends
(
self
,
[
"torch"
,
"transformers"
])
@
classmethod
def
from_config
(
cls
,
*
args
,
**
kwargs
):
requires_backends
(
cls
,
[
"torch"
,
"transformers"
])
@
classmethod
def
from_pretrained
(
cls
,
*
args
,
**
kwargs
):
requires_backends
(
cls
,
[
"torch"
,
"transformers"
])
class
VersatileDiffusionDualGuidedPipeline
(
metaclass
=
DummyObject
):
_backends
=
[
"torch"
,
"transformers"
]
...
...
tests/pipelines/stable_diffusion_2/test_stable_diffusion.py
View file @
76845183
...
...
@@ -34,7 +34,7 @@ from diffusers import (
)
from
diffusers.utils
import
load_numpy
,
slow
,
torch_device
from
diffusers.utils.testing_utils
import
CaptureLogger
,
require_torch_gpu
from
transformers
import
CLIPFeatureExtractor
,
CLIPTextConfig
,
CLIPTextModel
,
CLIPTokenizer
from
transformers
import
CLIPTextConfig
,
CLIPTextModel
,
CLIPTokenizer
from
...test_pipelines_common
import
PipelineTesterMixin
...
...
@@ -100,21 +100,6 @@ class StableDiffusion2PipelineFastTests(PipelineTesterMixin, unittest.TestCase):
)
return
CLIPTextModel
(
config
)
@
property
def
dummy_extractor
(
self
):
def
extract
(
*
args
,
**
kwargs
):
class
Out
:
def
__init__
(
self
):
self
.
pixel_values
=
torch
.
ones
([
0
])
def
to
(
self
,
device
):
self
.
pixel_values
.
to
(
device
)
return
self
return
Out
()
return
extract
def
test_save_pretrained_from_pretrained
(
self
):
device
=
"cpu"
# ensure determinism for the device-dependent torch.Generator
unet
=
self
.
dummy_cond_unet
...
...
@@ -129,7 +114,6 @@ class StableDiffusion2PipelineFastTests(PipelineTesterMixin, unittest.TestCase):
vae
=
self
.
dummy_vae
bert
=
self
.
dummy_text_encoder
tokenizer
=
CLIPTokenizer
.
from_pretrained
(
"hf-internal-testing/tiny-random-clip"
)
feature_extractor
=
CLIPFeatureExtractor
.
from_pretrained
(
"hf-internal-testing/tiny-random-clip"
)
# make sure here that pndm scheduler skips prk
sd_pipe
=
StableDiffusionPipeline
(
...
...
@@ -139,7 +123,8 @@ class StableDiffusion2PipelineFastTests(PipelineTesterMixin, unittest.TestCase):
text_encoder
=
bert
,
tokenizer
=
tokenizer
,
safety_checker
=
None
,
feature_extractor
=
feature_extractor
,
feature_extractor
=
None
,
requires_safety_checker
=
False
,
)
sd_pipe
=
sd_pipe
.
to
(
device
)
sd_pipe
.
set_progress_bar_config
(
disable
=
None
)
...
...
@@ -185,7 +170,8 @@ class StableDiffusion2PipelineFastTests(PipelineTesterMixin, unittest.TestCase):
text_encoder
=
bert
,
tokenizer
=
tokenizer
,
safety_checker
=
None
,
feature_extractor
=
self
.
dummy_extractor
,
feature_extractor
=
None
,
requires_safety_checker
=
False
,
)
sd_pipe
=
sd_pipe
.
to
(
device
)
sd_pipe
.
set_progress_bar_config
(
disable
=
None
)
...
...
@@ -231,7 +217,8 @@ class StableDiffusion2PipelineFastTests(PipelineTesterMixin, unittest.TestCase):
text_encoder
=
bert
,
tokenizer
=
tokenizer
,
safety_checker
=
None
,
feature_extractor
=
self
.
dummy_extractor
,
feature_extractor
=
None
,
requires_safety_checker
=
False
,
)
sd_pipe
=
sd_pipe
.
to
(
device
)
sd_pipe
.
set_progress_bar_config
(
disable
=
None
)
...
...
@@ -276,7 +263,8 @@ class StableDiffusion2PipelineFastTests(PipelineTesterMixin, unittest.TestCase):
text_encoder
=
bert
,
tokenizer
=
tokenizer
,
safety_checker
=
None
,
feature_extractor
=
self
.
dummy_extractor
,
feature_extractor
=
None
,
requires_safety_checker
=
False
,
)
sd_pipe
=
sd_pipe
.
to
(
device
)
sd_pipe
.
set_progress_bar_config
(
disable
=
None
)
...
...
@@ -321,7 +309,8 @@ class StableDiffusion2PipelineFastTests(PipelineTesterMixin, unittest.TestCase):
text_encoder
=
bert
,
tokenizer
=
tokenizer
,
safety_checker
=
None
,
feature_extractor
=
self
.
dummy_extractor
,
feature_extractor
=
None
,
requires_safety_checker
=
False
,
)
sd_pipe
=
sd_pipe
.
to
(
device
)
sd_pipe
.
set_progress_bar_config
(
disable
=
None
)
...
...
@@ -366,7 +355,8 @@ class StableDiffusion2PipelineFastTests(PipelineTesterMixin, unittest.TestCase):
text_encoder
=
bert
,
tokenizer
=
tokenizer
,
safety_checker
=
None
,
feature_extractor
=
self
.
dummy_extractor
,
feature_extractor
=
None
,
requires_safety_checker
=
False
,
)
sd_pipe
=
sd_pipe
.
to
(
device
)
sd_pipe
.
set_progress_bar_config
(
disable
=
None
)
...
...
@@ -411,7 +401,8 @@ class StableDiffusion2PipelineFastTests(PipelineTesterMixin, unittest.TestCase):
text_encoder
=
bert
,
tokenizer
=
tokenizer
,
safety_checker
=
None
,
feature_extractor
=
self
.
dummy_extractor
,
feature_extractor
=
None
,
requires_safety_checker
=
False
,
)
sd_pipe
=
sd_pipe
.
to
(
device
)
sd_pipe
.
set_progress_bar_config
(
disable
=
None
)
...
...
@@ -449,7 +440,8 @@ class StableDiffusion2PipelineFastTests(PipelineTesterMixin, unittest.TestCase):
text_encoder
=
bert
,
tokenizer
=
tokenizer
,
safety_checker
=
None
,
feature_extractor
=
self
.
dummy_extractor
,
feature_extractor
=
None
,
requires_safety_checker
=
False
,
)
sd_pipe
=
sd_pipe
.
to
(
torch_device
)
sd_pipe
.
set_progress_bar_config
(
disable
=
None
)
...
...
@@ -475,7 +467,8 @@ class StableDiffusion2PipelineFastTests(PipelineTesterMixin, unittest.TestCase):
text_encoder
=
bert
,
tokenizer
=
tokenizer
,
safety_checker
=
None
,
feature_extractor
=
self
.
dummy_extractor
,
feature_extractor
=
None
,
requires_safety_checker
=
False
,
)
sd_pipe
=
sd_pipe
.
to
(
torch_device
)
sd_pipe
.
set_progress_bar_config
(
disable
=
None
)
...
...
@@ -572,7 +565,7 @@ class StableDiffusion2PipelineIntegrationTests(unittest.TestCase):
expected_slice
=
np
.
array
([
0.0548
,
0.0626
,
0.0612
,
0.0611
,
0.0706
,
0.0586
,
0.0843
,
0.0333
,
0.1197
])
assert
np
.
abs
(
image_slice
.
flatten
()
-
expected_slice
).
max
()
<
1e-2
def
test_stable_diffusion_
memory_chunk
ing
(
self
):
def
test_stable_diffusion_
attention_slic
ing
(
self
):
torch
.
cuda
.
reset_peak_memory_stats
()
model_id
=
"stabilityai/stable-diffusion-2-base"
pipe
=
StableDiffusionPipeline
.
from_pretrained
(
model_id
,
revision
=
"fp16"
,
torch_dtype
=
torch
.
float16
)
...
...
@@ -651,7 +644,7 @@ class StableDiffusion2PipelineIntegrationTests(unittest.TestCase):
prompt
=
"astronaut riding a horse"
generator
=
torch
.
Generator
(
device
=
torch_device
).
manual_seed
(
0
)
output
=
pipe
(
prompt
=
prompt
,
strength
=
0.75
,
guidance_scale
=
7.5
,
generator
=
generator
,
output_type
=
"np"
)
output
=
pipe
(
prompt
=
prompt
,
guidance_scale
=
7.5
,
generator
=
generator
,
output_type
=
"np"
)
image
=
output
.
images
[
0
]
assert
image
.
shape
==
(
512
,
512
,
3
)
...
...
tests/pipelines/stable_diffusion_2/test_stable_diffusion_upscale.py
0 → 100644
View file @
76845183
# coding=utf-8
# Copyright 2022 HuggingFace Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import
gc
import
random
import
unittest
import
numpy
as
np
import
torch
from
diffusers
import
AutoencoderKL
,
DDIMScheduler
,
DDPMScheduler
,
StableDiffusionUpscalePipeline
,
UNet2DConditionModel
from
diffusers.utils
import
floats_tensor
,
load_image
,
load_numpy
,
slow
,
torch_device
from
diffusers.utils.testing_utils
import
require_torch_gpu
from
PIL
import
Image
from
transformers
import
CLIPTextConfig
,
CLIPTextModel
,
CLIPTokenizer
from
...test_pipelines_common
import
PipelineTesterMixin
torch
.
backends
.
cuda
.
matmul
.
allow_tf32
=
False
class
StableDiffusionUpscalePipelineFastTests
(
PipelineTesterMixin
,
unittest
.
TestCase
):
def
tearDown
(
self
):
# clean up the VRAM after each test
super
().
tearDown
()
gc
.
collect
()
torch
.
cuda
.
empty_cache
()
@
property
def
dummy_image
(
self
):
batch_size
=
1
num_channels
=
3
sizes
=
(
32
,
32
)
image
=
floats_tensor
((
batch_size
,
num_channels
)
+
sizes
,
rng
=
random
.
Random
(
0
)).
to
(
torch_device
)
return
image
@
property
def
dummy_cond_unet_upscale
(
self
):
torch
.
manual_seed
(
0
)
model
=
UNet2DConditionModel
(
block_out_channels
=
(
32
,
32
,
64
),
layers_per_block
=
2
,
sample_size
=
32
,
in_channels
=
7
,
out_channels
=
4
,
down_block_types
=
(
"DownBlock2D"
,
"CrossAttnDownBlock2D"
,
"CrossAttnDownBlock2D"
),
up_block_types
=
(
"CrossAttnUpBlock2D"
,
"CrossAttnUpBlock2D"
,
"UpBlock2D"
),
cross_attention_dim
=
32
,
# SD2-specific config below
attention_head_dim
=
8
,
use_linear_projection
=
True
,
only_cross_attention
=
(
True
,
True
,
False
),
num_class_embeds
=
100
,
)
return
model
@
property
def
dummy_vae
(
self
):
torch
.
manual_seed
(
0
)
model
=
AutoencoderKL
(
block_out_channels
=
[
32
,
32
,
64
],
in_channels
=
3
,
out_channels
=
3
,
down_block_types
=
[
"DownEncoderBlock2D"
,
"DownEncoderBlock2D"
,
"DownEncoderBlock2D"
],
up_block_types
=
[
"UpDecoderBlock2D"
,
"UpDecoderBlock2D"
,
"UpDecoderBlock2D"
],
latent_channels
=
4
,
)
return
model
@
property
def
dummy_text_encoder
(
self
):
torch
.
manual_seed
(
0
)
config
=
CLIPTextConfig
(
bos_token_id
=
0
,
eos_token_id
=
2
,
hidden_size
=
32
,
intermediate_size
=
37
,
layer_norm_eps
=
1e-05
,
num_attention_heads
=
4
,
num_hidden_layers
=
5
,
pad_token_id
=
1
,
vocab_size
=
1000
,
# SD2-specific config below
hidden_act
=
"gelu"
,
projection_dim
=
512
,
)
return
CLIPTextModel
(
config
)
def
test_stable_diffusion_upscale
(
self
):
device
=
"cpu"
# ensure determinism for the device-dependent torch.Generator
unet
=
self
.
dummy_cond_unet_upscale
low_res_scheduler
=
DDPMScheduler
()
scheduler
=
DDIMScheduler
(
prediction_type
=
"v_prediction"
)
vae
=
self
.
dummy_vae
text_encoder
=
self
.
dummy_text_encoder
tokenizer
=
CLIPTokenizer
.
from_pretrained
(
"hf-internal-testing/tiny-random-clip"
)
image
=
self
.
dummy_image
.
cpu
().
permute
(
0
,
2
,
3
,
1
)[
0
]
low_res_image
=
Image
.
fromarray
(
np
.
uint8
(
image
)).
convert
(
"RGB"
).
resize
((
64
,
64
))
# make sure here that pndm scheduler skips prk
sd_pipe
=
StableDiffusionUpscalePipeline
(
unet
=
unet
,
low_res_scheduler
=
low_res_scheduler
,
scheduler
=
scheduler
,
vae
=
vae
,
text_encoder
=
text_encoder
,
tokenizer
=
tokenizer
,
max_noise_level
=
350
,
)
sd_pipe
=
sd_pipe
.
to
(
device
)
sd_pipe
.
set_progress_bar_config
(
disable
=
None
)
prompt
=
"A painting of a squirrel eating a burger"
generator
=
torch
.
Generator
(
device
=
device
).
manual_seed
(
0
)
output
=
sd_pipe
(
[
prompt
],
image
=
low_res_image
,
generator
=
generator
,
guidance_scale
=
6.0
,
noise_level
=
20
,
num_inference_steps
=
2
,
output_type
=
"np"
,
)
image
=
output
.
images
generator
=
torch
.
Generator
(
device
=
device
).
manual_seed
(
0
)
image_from_tuple
=
sd_pipe
(
[
prompt
],
image
=
low_res_image
,
generator
=
generator
,
guidance_scale
=
6.0
,
noise_level
=
20
,
num_inference_steps
=
2
,
output_type
=
"np"
,
return_dict
=
False
,
)[
0
]
image_slice
=
image
[
0
,
-
3
:,
-
3
:,
-
1
]
image_from_tuple_slice
=
image_from_tuple
[
0
,
-
3
:,
-
3
:,
-
1
]
expected_height_width
=
low_res_image
.
size
[
0
]
*
4
assert
image
.
shape
==
(
1
,
expected_height_width
,
expected_height_width
,
3
)
expected_slice
=
np
.
array
([
0.2562
,
0.3606
,
0.4204
,
0.4469
,
0.4822
,
0.4647
,
0.5315
,
0.5748
,
0.5606
])
assert
np
.
abs
(
image_slice
.
flatten
()
-
expected_slice
).
max
()
<
1e-2
assert
np
.
abs
(
image_from_tuple_slice
.
flatten
()
-
expected_slice
).
max
()
<
1e-2
@
unittest
.
skipIf
(
torch_device
!=
"cuda"
,
"This test requires a GPU"
)
def
test_stable_diffusion_upscale_fp16
(
self
):
"""Test that stable diffusion upscale works with fp16"""
unet
=
self
.
dummy_cond_unet_upscale
low_res_scheduler
=
DDPMScheduler
()
scheduler
=
DDIMScheduler
(
prediction_type
=
"v_prediction"
)
vae
=
self
.
dummy_vae
text_encoder
=
self
.
dummy_text_encoder
tokenizer
=
CLIPTokenizer
.
from_pretrained
(
"hf-internal-testing/tiny-random-clip"
)
image
=
self
.
dummy_image
.
cpu
().
permute
(
0
,
2
,
3
,
1
)[
0
]
low_res_image
=
Image
.
fromarray
(
np
.
uint8
(
image
)).
convert
(
"RGB"
).
resize
((
64
,
64
))
# put models in fp16, except vae as it overflows in fp16
unet
=
unet
.
half
()
text_encoder
=
text_encoder
.
half
()
# make sure here that pndm scheduler skips prk
sd_pipe
=
StableDiffusionUpscalePipeline
(
unet
=
unet
,
low_res_scheduler
=
low_res_scheduler
,
scheduler
=
scheduler
,
vae
=
vae
,
text_encoder
=
text_encoder
,
tokenizer
=
tokenizer
,
max_noise_level
=
350
,
)
sd_pipe
=
sd_pipe
.
to
(
torch_device
)
sd_pipe
.
set_progress_bar_config
(
disable
=
None
)
prompt
=
"A painting of a squirrel eating a burger"
generator
=
torch
.
Generator
(
device
=
torch_device
).
manual_seed
(
0
)
image
=
sd_pipe
(
[
prompt
],
image
=
low_res_image
,
generator
=
generator
,
num_inference_steps
=
2
,
output_type
=
"np"
,
).
images
expected_height_width
=
low_res_image
.
size
[
0
]
*
4
assert
image
.
shape
==
(
1
,
expected_height_width
,
expected_height_width
,
3
)
@
slow
@
require_torch_gpu
class
StableDiffusionUpscalePipelineIntegrationTests
(
unittest
.
TestCase
):
def
tearDown
(
self
):
# clean up the VRAM after each test
super
().
tearDown
()
gc
.
collect
()
torch
.
cuda
.
empty_cache
()
def
test_stable_diffusion_upscale_pipeline
(
self
):
image
=
load_image
(
"https://huggingface.co/datasets/hf-internal-testing/diffusers-images/resolve/main"
"/sd2-upscale/low_res_cat.png"
)
expected_image
=
load_numpy
(
"https://huggingface.co/datasets/hf-internal-testing/diffusers-images/resolve/main/sd2-upscale"
"/upsampled_cat.npy"
)
model_id
=
"stabilityai/stable-diffusion-x4-upscaler"
pipe
=
StableDiffusionUpscalePipeline
.
from_pretrained
(
model_id
)
pipe
.
to
(
torch_device
)
pipe
.
set_progress_bar_config
(
disable
=
None
)
pipe
.
enable_attention_slicing
()
prompt
=
"a cat sitting on a park bench"
generator
=
torch
.
Generator
(
device
=
torch_device
).
manual_seed
(
0
)
output
=
pipe
(
prompt
=
prompt
,
image
=
image
,
generator
=
generator
,
output_type
=
"np"
,
)
image
=
output
.
images
[
0
]
assert
image
.
shape
==
(
512
,
512
,
3
)
assert
np
.
abs
(
expected_image
-
image
).
max
()
<
1e-3
def
test_stable_diffusion_upscale_pipeline_fp16
(
self
):
image
=
load_image
(
"https://huggingface.co/datasets/hf-internal-testing/diffusers-images/resolve/main"
"/sd2-upscale/low_res_cat.png"
)
expected_image
=
load_numpy
(
"https://huggingface.co/datasets/hf-internal-testing/diffusers-images/resolve/main/sd2-upscale"
"/upsampled_cat_fp16.npy"
)
model_id
=
"stabilityai/stable-diffusion-x4-upscaler"
pipe
=
StableDiffusionUpscalePipeline
.
from_pretrained
(
model_id
,
revision
=
"fp16"
,
torch_dtype
=
torch
.
float16
,
)
pipe
.
to
(
torch_device
)
pipe
.
set_progress_bar_config
(
disable
=
None
)
pipe
.
enable_attention_slicing
()
prompt
=
"a cat sitting on a park bench"
generator
=
torch
.
Generator
(
device
=
torch_device
).
manual_seed
(
0
)
output
=
pipe
(
prompt
=
prompt
,
image
=
image
,
generator
=
generator
,
output_type
=
"np"
,
)
image
=
output
.
images
[
0
]
assert
image
.
shape
==
(
512
,
512
,
3
)
assert
np
.
abs
(
expected_image
-
image
).
max
()
<
5e-1
def
test_stable_diffusion_pipeline_with_sequential_cpu_offloading
(
self
):
torch
.
cuda
.
empty_cache
()
torch
.
cuda
.
reset_max_memory_allocated
()
torch
.
cuda
.
reset_peak_memory_stats
()
image
=
load_image
(
"https://huggingface.co/datasets/hf-internal-testing/diffusers-images/resolve/main"
"/sd2-upscale/low_res_cat.png"
)
model_id
=
"stabilityai/stable-diffusion-x4-upscaler"
pipe
=
StableDiffusionUpscalePipeline
.
from_pretrained
(
model_id
,
revision
=
"fp16"
,
torch_dtype
=
torch
.
float16
,
)
pipe
.
to
(
torch_device
)
pipe
.
set_progress_bar_config
(
disable
=
None
)
pipe
.
enable_attention_slicing
(
1
)
pipe
.
enable_sequential_cpu_offload
()
prompt
=
"a cat sitting on a park bench"
generator
=
torch
.
Generator
(
device
=
torch_device
).
manual_seed
(
0
)
_
=
pipe
(
prompt
=
prompt
,
image
=
image
,
generator
=
generator
,
num_inference_steps
=
5
,
output_type
=
"np"
,
)
mem_bytes
=
torch
.
cuda
.
max_memory_allocated
()
# make sure that less than 2.65 GB is allocated
assert
mem_bytes
<
2.65
*
10
**
9
tests/pipelines/stable_diffusion_2/test_stable_diffusion_v_pred.py
0 → 100644
View file @
76845183
# coding=utf-8
# Copyright 2022 HuggingFace Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import
gc
import
time
import
unittest
import
numpy
as
np
import
torch
from
diffusers
import
(
AutoencoderKL
,
DDIMScheduler
,
DPMSolverMultistepScheduler
,
EulerDiscreteScheduler
,
StableDiffusionPipeline
,
UNet2DConditionModel
,
)
from
diffusers.utils
import
load_numpy
,
slow
,
torch_device
from
diffusers.utils.testing_utils
import
require_torch_gpu
from
transformers
import
CLIPTextConfig
,
CLIPTextModel
,
CLIPTokenizer
from
...test_pipelines_common
import
PipelineTesterMixin
torch
.
backends
.
cuda
.
matmul
.
allow_tf32
=
False
class
StableDiffusion2VPredictionPipelineFastTests
(
PipelineTesterMixin
,
unittest
.
TestCase
):
def
tearDown
(
self
):
# clean up the VRAM after each test
super
().
tearDown
()
gc
.
collect
()
torch
.
cuda
.
empty_cache
()
@
property
def
dummy_cond_unet
(
self
):
torch
.
manual_seed
(
0
)
model
=
UNet2DConditionModel
(
block_out_channels
=
(
32
,
64
),
layers_per_block
=
2
,
sample_size
=
32
,
in_channels
=
4
,
out_channels
=
4
,
down_block_types
=
(
"DownBlock2D"
,
"CrossAttnDownBlock2D"
),
up_block_types
=
(
"CrossAttnUpBlock2D"
,
"UpBlock2D"
),
cross_attention_dim
=
32
,
# SD2-specific config below
attention_head_dim
=
(
2
,
4
,
8
,
8
),
use_linear_projection
=
True
,
)
return
model
@
property
def
dummy_vae
(
self
):
torch
.
manual_seed
(
0
)
model
=
AutoencoderKL
(
block_out_channels
=
[
32
,
64
],
in_channels
=
3
,
out_channels
=
3
,
down_block_types
=
[
"DownEncoderBlock2D"
,
"DownEncoderBlock2D"
],
up_block_types
=
[
"UpDecoderBlock2D"
,
"UpDecoderBlock2D"
],
latent_channels
=
4
,
sample_size
=
128
,
)
return
model
@
property
def
dummy_text_encoder
(
self
):
torch
.
manual_seed
(
0
)
config
=
CLIPTextConfig
(
bos_token_id
=
0
,
eos_token_id
=
2
,
hidden_size
=
32
,
intermediate_size
=
37
,
layer_norm_eps
=
1e-05
,
num_attention_heads
=
4
,
num_hidden_layers
=
5
,
pad_token_id
=
1
,
vocab_size
=
1000
,
# SD2-specific config below
hidden_act
=
"gelu"
,
projection_dim
=
64
,
)
return
CLIPTextModel
(
config
)
def
test_stable_diffusion_v_pred_ddim
(
self
):
device
=
"cpu"
# ensure determinism for the device-dependent torch.Generator
unet
=
self
.
dummy_cond_unet
scheduler
=
DDIMScheduler
(
beta_start
=
0.00085
,
beta_end
=
0.012
,
beta_schedule
=
"scaled_linear"
,
clip_sample
=
False
,
set_alpha_to_one
=
False
,
prediction_type
=
"v_prediction"
,
)
vae
=
self
.
dummy_vae
bert
=
self
.
dummy_text_encoder
tokenizer
=
CLIPTokenizer
.
from_pretrained
(
"hf-internal-testing/tiny-random-clip"
)
# make sure here that pndm scheduler skips prk
sd_pipe
=
StableDiffusionPipeline
(
unet
=
unet
,
scheduler
=
scheduler
,
vae
=
vae
,
text_encoder
=
bert
,
tokenizer
=
tokenizer
,
safety_checker
=
None
,
feature_extractor
=
None
,
requires_safety_checker
=
False
,
)
sd_pipe
=
sd_pipe
.
to
(
device
)
sd_pipe
.
set_progress_bar_config
(
disable
=
None
)
prompt
=
"A painting of a squirrel eating a burger"
generator
=
torch
.
Generator
(
device
=
device
).
manual_seed
(
0
)
output
=
sd_pipe
([
prompt
],
generator
=
generator
,
guidance_scale
=
6.0
,
num_inference_steps
=
2
,
output_type
=
"np"
)
image
=
output
.
images
generator
=
torch
.
Generator
(
device
=
device
).
manual_seed
(
0
)
image_from_tuple
=
sd_pipe
(
[
prompt
],
generator
=
generator
,
guidance_scale
=
6.0
,
num_inference_steps
=
2
,
output_type
=
"np"
,
return_dict
=
False
,
)[
0
]
image_slice
=
image
[
0
,
-
3
:,
-
3
:,
-
1
]
image_from_tuple_slice
=
image_from_tuple
[
0
,
-
3
:,
-
3
:,
-
1
]
assert
image
.
shape
==
(
1
,
64
,
64
,
3
)
expected_slice
=
np
.
array
([
0.6424
,
0.6109
,
0.494
,
0.5088
,
0.4984
,
0.4525
,
0.5059
,
0.5068
,
0.4474
])
assert
np
.
abs
(
image_slice
.
flatten
()
-
expected_slice
).
max
()
<
1e-2
assert
np
.
abs
(
image_from_tuple_slice
.
flatten
()
-
expected_slice
).
max
()
<
1e-2
def
test_stable_diffusion_v_pred_k_euler
(
self
):
device
=
"cpu"
# ensure determinism for the device-dependent torch.Generator
unet
=
self
.
dummy_cond_unet
scheduler
=
EulerDiscreteScheduler
(
beta_start
=
0.00085
,
beta_end
=
0.012
,
beta_schedule
=
"scaled_linear"
,
prediction_type
=
"v_prediction"
)
vae
=
self
.
dummy_vae
bert
=
self
.
dummy_text_encoder
tokenizer
=
CLIPTokenizer
.
from_pretrained
(
"hf-internal-testing/tiny-random-clip"
)
# make sure here that pndm scheduler skips prk
sd_pipe
=
StableDiffusionPipeline
(
unet
=
unet
,
scheduler
=
scheduler
,
vae
=
vae
,
text_encoder
=
bert
,
tokenizer
=
tokenizer
,
safety_checker
=
None
,
feature_extractor
=
None
,
requires_safety_checker
=
False
,
)
sd_pipe
=
sd_pipe
.
to
(
device
)
sd_pipe
.
set_progress_bar_config
(
disable
=
None
)
prompt
=
"A painting of a squirrel eating a burger"
generator
=
torch
.
Generator
(
device
=
device
).
manual_seed
(
0
)
output
=
sd_pipe
([
prompt
],
generator
=
generator
,
guidance_scale
=
6.0
,
num_inference_steps
=
2
,
output_type
=
"np"
)
image
=
output
.
images
generator
=
torch
.
Generator
(
device
=
device
).
manual_seed
(
0
)
image_from_tuple
=
sd_pipe
(
[
prompt
],
generator
=
generator
,
guidance_scale
=
6.0
,
num_inference_steps
=
2
,
output_type
=
"np"
,
return_dict
=
False
,
)[
0
]
image_slice
=
image
[
0
,
-
3
:,
-
3
:,
-
1
]
image_from_tuple_slice
=
image_from_tuple
[
0
,
-
3
:,
-
3
:,
-
1
]
assert
image
.
shape
==
(
1
,
64
,
64
,
3
)
expected_slice
=
np
.
array
([
0.4616
,
0.5184
,
0.4887
,
0.5111
,
0.4839
,
0.48
,
0.5119
,
0.5263
,
0.4776
])
assert
np
.
abs
(
image_slice
.
flatten
()
-
expected_slice
).
max
()
<
1e-2
assert
np
.
abs
(
image_from_tuple_slice
.
flatten
()
-
expected_slice
).
max
()
<
1e-2
@
unittest
.
skipIf
(
torch_device
!=
"cuda"
,
"This test requires a GPU"
)
def
test_stable_diffusion_v_pred_fp16
(
self
):
"""Test that stable diffusion v-prediction works with fp16"""
unet
=
self
.
dummy_cond_unet
scheduler
=
DDIMScheduler
(
beta_start
=
0.00085
,
beta_end
=
0.012
,
beta_schedule
=
"scaled_linear"
,
clip_sample
=
False
,
set_alpha_to_one
=
False
,
prediction_type
=
"v_prediction"
,
)
vae
=
self
.
dummy_vae
bert
=
self
.
dummy_text_encoder
tokenizer
=
CLIPTokenizer
.
from_pretrained
(
"hf-internal-testing/tiny-random-clip"
)
# put models in fp16
unet
=
unet
.
half
()
vae
=
vae
.
half
()
bert
=
bert
.
half
()
# make sure here that pndm scheduler skips prk
sd_pipe
=
StableDiffusionPipeline
(
unet
=
unet
,
scheduler
=
scheduler
,
vae
=
vae
,
text_encoder
=
bert
,
tokenizer
=
tokenizer
,
safety_checker
=
None
,
feature_extractor
=
None
,
requires_safety_checker
=
False
,
)
sd_pipe
=
sd_pipe
.
to
(
torch_device
)
sd_pipe
.
set_progress_bar_config
(
disable
=
None
)
prompt
=
"A painting of a squirrel eating a burger"
generator
=
torch
.
Generator
(
device
=
torch_device
).
manual_seed
(
0
)
image
=
sd_pipe
([
prompt
],
generator
=
generator
,
num_inference_steps
=
2
,
output_type
=
"np"
).
images
assert
image
.
shape
==
(
1
,
64
,
64
,
3
)
@
slow
@
require_torch_gpu
class
StableDiffusion2VPredictionPipelineIntegrationTests
(
unittest
.
TestCase
):
def
tearDown
(
self
):
# clean up the VRAM after each test
super
().
tearDown
()
gc
.
collect
()
torch
.
cuda
.
empty_cache
()
def
test_stable_diffusion_v_pred_default
(
self
):
sd_pipe
=
StableDiffusionPipeline
.
from_pretrained
(
"stabilityai/stable-diffusion-2"
)
sd_pipe
=
sd_pipe
.
to
(
torch_device
)
sd_pipe
.
enable_attention_slicing
()
sd_pipe
.
set_progress_bar_config
(
disable
=
None
)
prompt
=
"A painting of a squirrel eating a burger"
generator
=
torch
.
Generator
(
device
=
torch_device
).
manual_seed
(
0
)
output
=
sd_pipe
([
prompt
],
generator
=
generator
,
guidance_scale
=
7.5
,
num_inference_steps
=
20
,
output_type
=
"np"
)
image
=
output
.
images
image_slice
=
image
[
0
,
253
:
256
,
253
:
256
,
-
1
]
assert
image
.
shape
==
(
1
,
768
,
768
,
3
)
expected_slice
=
np
.
array
([
0.0567
,
0.057
,
0.0416
,
0.0463
,
0.0433
,
0.06
,
0.0517
,
0.0526
,
0.0866
])
assert
np
.
abs
(
image_slice
.
flatten
()
-
expected_slice
).
max
()
<
1e-2
def
test_stable_diffusion_v_pred_euler
(
self
):
scheduler
=
EulerDiscreteScheduler
.
from_pretrained
(
"stabilityai/stable-diffusion-2"
,
subfolder
=
"scheduler"
)
sd_pipe
=
StableDiffusionPipeline
.
from_pretrained
(
"stabilityai/stable-diffusion-2"
,
scheduler
=
scheduler
)
sd_pipe
=
sd_pipe
.
to
(
torch_device
)
sd_pipe
.
enable_attention_slicing
()
sd_pipe
.
set_progress_bar_config
(
disable
=
None
)
prompt
=
"A painting of a squirrel eating a burger"
generator
=
torch
.
Generator
(
device
=
torch_device
).
manual_seed
(
0
)
output
=
sd_pipe
([
prompt
],
generator
=
generator
,
num_inference_steps
=
5
,
output_type
=
"numpy"
)
image
=
output
.
images
image_slice
=
image
[
0
,
253
:
256
,
253
:
256
,
-
1
]
assert
image
.
shape
==
(
1
,
768
,
768
,
3
)
expected_slice
=
np
.
array
([
0.0351
,
0.0376
,
0.0505
,
0.0424
,
0.0551
,
0.0656
,
0.0471
,
0.0276
,
0.0596
])
assert
np
.
abs
(
image_slice
.
flatten
()
-
expected_slice
).
max
()
<
1e-2
def
test_stable_diffusion_v_pred_dpm
(
self
):
"""
TODO: update this test after making DPM compatible with V-prediction!
"""
scheduler
=
DPMSolverMultistepScheduler
.
from_pretrained
(
"stabilityai/stable-diffusion-2"
,
subfolder
=
"scheduler"
)
sd_pipe
=
StableDiffusionPipeline
.
from_pretrained
(
"stabilityai/stable-diffusion-2"
,
scheduler
=
scheduler
)
sd_pipe
=
sd_pipe
.
to
(
torch_device
)
sd_pipe
.
enable_attention_slicing
()
sd_pipe
.
set_progress_bar_config
(
disable
=
None
)
prompt
=
"a photograph of an astronaut riding a horse"
generator
=
torch
.
Generator
(
device
=
torch_device
).
manual_seed
(
0
)
image
=
sd_pipe
(
[
prompt
],
generator
=
generator
,
guidance_scale
=
7.5
,
num_inference_steps
=
5
,
output_type
=
"numpy"
).
images
image_slice
=
image
[
0
,
253
:
256
,
253
:
256
,
-
1
]
assert
image
.
shape
==
(
1
,
768
,
768
,
3
)
expected_slice
=
np
.
array
([
0.0
,
0.0
,
0.0
,
0.0
,
0.0
,
0.0
,
0.0
,
0.0
,
0.0
])
assert
np
.
abs
(
image_slice
.
flatten
()
-
expected_slice
).
max
()
<
1e-2
def
test_stable_diffusion_attention_slicing_v_pred
(
self
):
torch
.
cuda
.
reset_peak_memory_stats
()
model_id
=
"stabilityai/stable-diffusion-2"
pipe
=
StableDiffusionPipeline
.
from_pretrained
(
model_id
,
revision
=
"fp16"
,
torch_dtype
=
torch
.
float16
)
pipe
.
to
(
torch_device
)
pipe
.
set_progress_bar_config
(
disable
=
None
)
prompt
=
"a photograph of an astronaut riding a horse"
# make attention efficient
pipe
.
enable_attention_slicing
()
generator
=
torch
.
Generator
(
device
=
torch_device
).
manual_seed
(
0
)
with
torch
.
autocast
(
torch_device
):
output_chunked
=
pipe
(
[
prompt
],
generator
=
generator
,
guidance_scale
=
7.5
,
num_inference_steps
=
10
,
output_type
=
"numpy"
)
image_chunked
=
output_chunked
.
images
mem_bytes
=
torch
.
cuda
.
max_memory_allocated
()
torch
.
cuda
.
reset_peak_memory_stats
()
# make sure that less than 5.5 GB is allocated
assert
mem_bytes
<
5.5
*
10
**
9
# disable slicing
pipe
.
disable_attention_slicing
()
generator
=
torch
.
Generator
(
device
=
torch_device
).
manual_seed
(
0
)
with
torch
.
autocast
(
torch_device
):
output
=
pipe
(
[
prompt
],
generator
=
generator
,
guidance_scale
=
7.5
,
num_inference_steps
=
10
,
output_type
=
"numpy"
)
image
=
output
.
images
# make sure that more than 5.5 GB is allocated
mem_bytes
=
torch
.
cuda
.
max_memory_allocated
()
assert
mem_bytes
>
5.5
*
10
**
9
assert
np
.
abs
(
image_chunked
.
flatten
()
-
image
.
flatten
()).
max
()
<
1e-3
def
test_stable_diffusion_text2img_pipeline_v_pred_default
(
self
):
expected_image
=
load_numpy
(
"https://huggingface.co/datasets/hf-internal-testing/diffusers-images/resolve/main/"
"sd2-text2img/astronaut_riding_a_horse_v_pred.npy"
)
pipe
=
StableDiffusionPipeline
.
from_pretrained
(
"stabilityai/stable-diffusion-2"
)
pipe
.
to
(
torch_device
)
pipe
.
enable_attention_slicing
()
pipe
.
set_progress_bar_config
(
disable
=
None
)
prompt
=
"astronaut riding a horse"
generator
=
torch
.
Generator
(
device
=
torch_device
).
manual_seed
(
0
)
output
=
pipe
(
prompt
=
prompt
,
guidance_scale
=
7.5
,
generator
=
generator
,
output_type
=
"np"
)
image
=
output
.
images
[
0
]
assert
image
.
shape
==
(
768
,
768
,
3
)
assert
np
.
abs
(
expected_image
-
image
).
max
()
<
5e-3
def
test_stable_diffusion_text2img_pipeline_v_pred_fp16
(
self
):
expected_image
=
load_numpy
(
"https://huggingface.co/datasets/hf-internal-testing/diffusers-images/resolve/main/"
"sd2-text2img/astronaut_riding_a_horse_v_pred_fp16.npy"
)
pipe
=
StableDiffusionPipeline
.
from_pretrained
(
"stabilityai/stable-diffusion-2"
,
revision
=
"fp16"
,
torch_dtype
=
torch
.
float16
)
pipe
.
to
(
torch_device
)
pipe
.
set_progress_bar_config
(
disable
=
None
)
prompt
=
"astronaut riding a horse"
generator
=
torch
.
Generator
(
device
=
torch_device
).
manual_seed
(
0
)
output
=
pipe
(
prompt
=
prompt
,
guidance_scale
=
7.5
,
generator
=
generator
,
output_type
=
"np"
)
image
=
output
.
images
[
0
]
assert
image
.
shape
==
(
768
,
768
,
3
)
assert
np
.
abs
(
expected_image
-
image
).
max
()
<
5e-3
def
test_stable_diffusion_text2img_intermediate_state_v_pred
(
self
):
number_of_steps
=
0
def
test_callback_fn
(
step
:
int
,
timestep
:
int
,
latents
:
torch
.
FloatTensor
)
->
None
:
test_callback_fn
.
has_been_called
=
True
nonlocal
number_of_steps
number_of_steps
+=
1
if
step
==
0
:
latents
=
latents
.
detach
().
cpu
().
numpy
()
assert
latents
.
shape
==
(
1
,
4
,
96
,
96
)
latents_slice
=
latents
[
0
,
-
3
:,
-
3
:,
-
1
]
expected_slice
=
np
.
array
(
[
-
0.2543
,
-
1.2755
,
0.4261
,
-
0.9555
,
-
1.173
,
-
0.5892
,
2.4159
,
0.1554
,
-
1.2098
]
)
assert
np
.
abs
(
latents_slice
.
flatten
()
-
expected_slice
).
max
()
<
5e-3
elif
step
==
19
:
latents
=
latents
.
detach
().
cpu
().
numpy
()
assert
latents
.
shape
==
(
1
,
4
,
96
,
96
)
latents_slice
=
latents
[
0
,
-
3
:,
-
3
:,
-
1
]
expected_slice
=
np
.
array
(
[
-
0.9572
,
-
0.967
,
-
0.6152
,
0.0894
,
-
0.699
,
-
0.2344
,
1.5465
,
-
0.0357
,
-
0.1141
]
)
assert
np
.
abs
(
latents_slice
.
flatten
()
-
expected_slice
).
max
()
<
1e-2
test_callback_fn
.
has_been_called
=
False
pipe
=
StableDiffusionPipeline
.
from_pretrained
(
"stabilityai/stable-diffusion-2"
,
revision
=
"fp16"
,
torch_dtype
=
torch
.
float16
)
pipe
=
pipe
.
to
(
torch_device
)
pipe
.
set_progress_bar_config
(
disable
=
None
)
pipe
.
enable_attention_slicing
()
prompt
=
"Andromeda galaxy in a bottle"
generator
=
torch
.
Generator
(
device
=
torch_device
).
manual_seed
(
0
)
with
torch
.
autocast
(
torch_device
):
pipe
(
prompt
=
prompt
,
num_inference_steps
=
20
,
guidance_scale
=
7.5
,
generator
=
generator
,
callback
=
test_callback_fn
,
callback_steps
=
1
,
)
assert
test_callback_fn
.
has_been_called
assert
number_of_steps
==
20
def
test_stable_diffusion_low_cpu_mem_usage_v_pred
(
self
):
pipeline_id
=
"stabilityai/stable-diffusion-2"
start_time
=
time
.
time
()
pipeline_low_cpu_mem_usage
=
StableDiffusionPipeline
.
from_pretrained
(
pipeline_id
,
revision
=
"fp16"
,
torch_dtype
=
torch
.
float16
)
pipeline_low_cpu_mem_usage
.
to
(
torch_device
)
low_cpu_mem_usage_time
=
time
.
time
()
-
start_time
start_time
=
time
.
time
()
_
=
StableDiffusionPipeline
.
from_pretrained
(
pipeline_id
,
revision
=
"fp16"
,
torch_dtype
=
torch
.
float16
,
low_cpu_mem_usage
=
False
)
normal_load_time
=
time
.
time
()
-
start_time
assert
2
*
low_cpu_mem_usage_time
<
normal_load_time
def
test_stable_diffusion_pipeline_with_sequential_cpu_offloading_v_pred
(
self
):
torch
.
cuda
.
empty_cache
()
torch
.
cuda
.
reset_max_memory_allocated
()
torch
.
cuda
.
reset_peak_memory_stats
()
pipeline_id
=
"stabilityai/stable-diffusion-2"
prompt
=
"Andromeda galaxy in a bottle"
pipeline
=
StableDiffusionPipeline
.
from_pretrained
(
pipeline_id
,
revision
=
"fp16"
,
torch_dtype
=
torch
.
float16
)
pipeline
=
pipeline
.
to
(
torch_device
)
pipeline
.
enable_attention_slicing
(
1
)
pipeline
.
enable_sequential_cpu_offload
()
generator
=
torch
.
Generator
(
device
=
torch_device
).
manual_seed
(
0
)
_
=
pipeline
(
prompt
,
generator
=
generator
,
num_inference_steps
=
5
)
mem_bytes
=
torch
.
cuda
.
max_memory_allocated
()
# make sure that less than 2.8 GB is allocated
assert
mem_bytes
<
2.8
*
10
**
9
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment