Unverified Commit 27062c36 authored by Patrick von Platen's avatar Patrick von Platen Committed by GitHub
Browse files

Refactor execution device & cpu offload (#4114)

* create general cpu offload & execution device

* Remove boiler plate

* finish

* kp

* Correct offload more pipelines

* up

* Update src/diffusers/pipelines/pipeline_utils.py

* make style

* up
parent 6427aa99
......@@ -211,27 +211,6 @@ class VideoToVideoSDPipeline(DiffusionPipeline, TextualInversionLoaderMixin, Lor
"""
self.vae.disable_tiling()
def enable_sequential_cpu_offload(self, gpu_id=0):
r"""
Offloads all models to CPU using accelerate, significantly reducing memory usage. When called, unet,
text_encoder, vae have their state dicts saved to CPU and then are moved to a `torch.device('meta') and loaded
to GPU only when their specific submodule has its `forward` method called. Note that offloading happens on a
submodule basis. Memory savings are higher than with `enable_model_cpu_offload`, but performance is lower.
"""
if is_accelerate_available() and is_accelerate_version(">=", "0.14.0"):
from accelerate import cpu_offload
else:
raise ImportError("`enable_sequential_cpu_offload` requires `accelerate v0.14.0` or higher")
device = torch.device(f"cuda:{gpu_id}")
if self.device.type != "cpu":
self.to("cpu", silence_dtype_warnings=True)
torch.cuda.empty_cache() # otherwise we don't see the memory savings (but they probably exist)
for cpu_offloaded_model in [self.unet, self.text_encoder, self.vae]:
cpu_offload(cpu_offloaded_model, device)
def enable_model_cpu_offload(self, gpu_id=0):
r"""
Offloads all models to CPU using accelerate, reducing memory usage with a low impact on performance. Compared
......@@ -257,25 +236,6 @@ class VideoToVideoSDPipeline(DiffusionPipeline, TextualInversionLoaderMixin, Lor
# We'll offload the last model manually.
self.final_offload_hook = hook
@property
# Copied from diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline._execution_device
def _execution_device(self):
r"""
Returns the device on which the pipeline's models will be executed. After calling
`pipeline.enable_sequential_cpu_offload()` the execution device can only be inferred from Accelerate's module
hooks.
"""
if not hasattr(self.unet, "_hf_hook"):
return self.device
for module in self.unet.modules():
if (
hasattr(module, "_hf_hook")
and hasattr(module._hf_hook, "execution_device")
and module._hf_hook.execution_device is not None
):
return torch.device(module._hf_hook.execution_device)
return self.device
# Copied from diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline._encode_prompt
def _encode_prompt(
self,
......
......@@ -24,7 +24,7 @@ from ...models import PriorTransformer, UNet2DConditionModel, UNet2DModel
from ...pipelines import DiffusionPipeline
from ...pipelines.pipeline_utils import ImagePipelineOutput
from ...schedulers import UnCLIPScheduler
from ...utils import is_accelerate_available, logging, randn_tensor
from ...utils import logging, randn_tensor
from .text_proj import UnCLIPTextProjModel
......@@ -63,6 +63,8 @@ class UnCLIPPipeline(DiffusionPipeline):
"""
_exclude_from_cpu_offload = ["prior"]
prior: PriorTransformer
decoder: UNet2DConditionModel
text_proj: UnCLIPTextProjModel
......@@ -205,49 +207,6 @@ class UnCLIPPipeline(DiffusionPipeline):
return prompt_embeds, text_encoder_hidden_states, text_mask
def enable_sequential_cpu_offload(self, gpu_id=0):
r"""
Offloads all models to CPU using accelerate, significantly reducing memory usage. When called, the pipeline's
models have their state dicts saved to CPU and then are moved to a `torch.device('meta') and loaded to GPU only
when their specific submodule has its `forward` method called.
"""
if is_accelerate_available():
from accelerate import cpu_offload
else:
raise ImportError("Please install accelerate via `pip install accelerate`")
device = torch.device(f"cuda:{gpu_id}")
# TODO: self.prior.post_process_latents is not covered by the offload hooks, so it fails if added to the list
models = [
self.decoder,
self.text_proj,
self.text_encoder,
self.super_res_first,
self.super_res_last,
]
for cpu_offloaded_model in models:
if cpu_offloaded_model is not None:
cpu_offload(cpu_offloaded_model, device)
@property
def _execution_device(self):
r"""
Returns the device on which the pipeline's models will be executed. After calling
`pipeline.enable_sequential_cpu_offload()` the execution device can only be inferred from Accelerate's module
hooks.
"""
if self.device != torch.device("meta") or not hasattr(self.decoder, "_hf_hook"):
return self.device
for module in self.decoder.modules():
if (
hasattr(module, "_hf_hook")
and hasattr(module._hf_hook, "execution_device")
and module._hf_hook.execution_device is not None
):
return torch.device(module._hf_hook.execution_device)
return self.device
@torch.no_grad()
def __call__(
self,
......
......@@ -28,7 +28,7 @@ from transformers import (
from ...models import UNet2DConditionModel, UNet2DModel
from ...pipelines import DiffusionPipeline, ImagePipelineOutput
from ...schedulers import UnCLIPScheduler
from ...utils import is_accelerate_available, logging, randn_tensor
from ...utils import logging, randn_tensor
from .text_proj import UnCLIPTextProjModel
......@@ -198,49 +198,6 @@ class UnCLIPImageVariationPipeline(DiffusionPipeline):
return image_embeddings
def enable_sequential_cpu_offload(self, gpu_id=0):
r"""
Offloads all models to CPU using accelerate, significantly reducing memory usage. When called, the pipeline's
models have their state dicts saved to CPU and then are moved to a `torch.device('meta') and loaded to GPU only
when their specific submodule has its `forward` method called.
"""
if is_accelerate_available():
from accelerate import cpu_offload
else:
raise ImportError("Please install accelerate via `pip install accelerate`")
device = torch.device(f"cuda:{gpu_id}")
models = [
self.decoder,
self.text_proj,
self.text_encoder,
self.super_res_first,
self.super_res_last,
]
for cpu_offloaded_model in models:
if cpu_offloaded_model is not None:
cpu_offload(cpu_offloaded_model, device)
@property
# Copied from diffusers.pipelines.unclip.pipeline_unclip.UnCLIPPipeline._execution_device
def _execution_device(self):
r"""
Returns the device on which the pipeline's models will be executed. After calling
`pipeline.enable_sequential_cpu_offload()` the execution device can only be inferred from Accelerate's module
hooks.
"""
if self.device != torch.device("meta") or not hasattr(self.decoder, "_hf_hook"):
return self.device
for module in self.decoder.modules():
if (
hasattr(module, "_hf_hook")
and hasattr(module._hf_hook, "execution_device")
and module._hf_hook.execution_device is not None
):
return torch.device(module._hf_hook.execution_device)
return self.device
@torch.no_grad()
def __call__(
self,
......
......@@ -172,33 +172,6 @@ class UniDiffuserPipeline(DiffusionPipeline):
# TODO: handle safety checking?
self.safety_checker = None
# Modified from diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.enable_sequential_cpu_offload
# Add self.image_encoder, self.text_decoder to cpu_offloaded_models list
def enable_sequential_cpu_offload(self, gpu_id=0):
r"""
Offloads all models to CPU using accelerate, significantly reducing memory usage. When called, unet,
text_encoder, vae and safety checker have their state dicts saved to CPU and then are moved to a
`torch.device('meta')` and loaded to GPU only when their specific submodule has its `forward` method called.
Note that offloading happens on a submodule basis. Memory savings are higher than with
`enable_model_cpu_offload`, but performance is lower.
"""
if is_accelerate_available() and is_accelerate_version(">=", "0.14.0"):
from accelerate import cpu_offload
else:
raise ImportError("`enable_sequential_cpu_offload` requires `accelerate v0.14.0` or higher")
device = torch.device(f"cuda:{gpu_id}")
if self.device.type != "cpu":
self.to("cpu", silence_dtype_warnings=True)
torch.cuda.empty_cache() # otherwise we don't see the memory savings (but they probably exist)
for cpu_offloaded_model in [self.unet, self.text_encoder, self.vae, self.image_encoder, self.text_decoder]:
cpu_offload(cpu_offloaded_model, device)
if self.safety_checker is not None:
cpu_offload(self.safety_checker, execution_device=device, offload_buffers=True)
# Modified from diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.enable_model_cpu_offload
# Add self.image_encoder, self.text_decoder to cpu_offloaded_models list
def enable_model_cpu_offload(self, gpu_id=0):
......@@ -229,25 +202,6 @@ class UniDiffuserPipeline(DiffusionPipeline):
# We'll offload the last model manually.
self.final_offload_hook = hook
@property
# Copied from diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline._execution_device
def _execution_device(self):
r"""
Returns the device on which the pipeline's models will be executed. After calling
`pipeline.enable_sequential_cpu_offload()` the execution device can only be inferred from Accelerate's module
hooks.
"""
if not hasattr(self.unet, "_hf_hook"):
return self.device
for module in self.unet.modules():
if (
hasattr(module, "_hf_hook")
and hasattr(module._hf_hook, "execution_device")
and module._hf_hook.execution_device is not None
):
return torch.device(module._hf_hook.execution_device)
return self.device
# Copied from diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.prepare_extra_step_kwargs
def prepare_extra_step_kwargs(self, generator, eta):
# prepare extra kwargs for the scheduler step, since not all schedulers have the same signature
......
......@@ -30,7 +30,7 @@ from transformers import (
from ...image_processor import VaeImageProcessor
from ...models import AutoencoderKL, DualTransformer2DModel, Transformer2DModel, UNet2DConditionModel
from ...schedulers import KarrasDiffusionSchedulers
from ...utils import is_accelerate_available, logging, randn_tensor
from ...utils import logging, randn_tensor
from ..pipeline_utils import DiffusionPipeline, ImagePipelineOutput
from .modeling_text_unet import UNetFlatConditionModel
......@@ -148,42 +148,6 @@ class VersatileDiffusionDualGuidedPipeline(DiffusionPipeline):
self.image_unet.register_to_config(dual_cross_attention=False)
def enable_sequential_cpu_offload(self, gpu_id=0):
r"""
Offloads all models to CPU using accelerate, significantly reducing memory usage. When called, unet,
text_encoder, vae and safety checker have their state dicts saved to CPU and then are moved to a
`torch.device('meta') and loaded to GPU only when their specific submodule has its `forward` method called.
"""
if is_accelerate_available():
from accelerate import cpu_offload
else:
raise ImportError("Please install accelerate via `pip install accelerate`")
device = torch.device(f"cuda:{gpu_id}")
for cpu_offloaded_model in [self.image_unet, self.text_unet, self.text_encoder, self.vae]:
if cpu_offloaded_model is not None:
cpu_offload(cpu_offloaded_model, device)
@property
# Copied from diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline._execution_device with unet->image_unet
def _execution_device(self):
r"""
Returns the device on which the pipeline's models will be executed. After calling
`pipeline.enable_sequential_cpu_offload()` the execution device can only be inferred from Accelerate's module
hooks.
"""
if not hasattr(self.image_unet, "_hf_hook"):
return self.device
for module in self.image_unet.modules():
if (
hasattr(module, "_hf_hook")
and hasattr(module._hf_hook, "execution_device")
and module._hf_hook.execution_device is not None
):
return torch.device(module._hf_hook.execution_device)
return self.device
def _encode_text_prompt(self, prompt, device, num_images_per_prompt, do_classifier_free_guidance):
r"""
Encodes the prompt into text encoder hidden states.
......
......@@ -25,7 +25,7 @@ from transformers import CLIPImageProcessor, CLIPVisionModelWithProjection
from ...image_processor import VaeImageProcessor
from ...models import AutoencoderKL, UNet2DConditionModel
from ...schedulers import KarrasDiffusionSchedulers
from ...utils import is_accelerate_available, logging, randn_tensor
from ...utils import logging, randn_tensor
from ..pipeline_utils import DiffusionPipeline, ImagePipelineOutput
......@@ -75,42 +75,6 @@ class VersatileDiffusionImageVariationPipeline(DiffusionPipeline):
self.vae_scale_factor = 2 ** (len(self.vae.config.block_out_channels) - 1)
self.image_processor = VaeImageProcessor(vae_scale_factor=self.vae_scale_factor)
def enable_sequential_cpu_offload(self, gpu_id=0):
r"""
Offloads all models to CPU using accelerate, significantly reducing memory usage. When called, unet,
text_encoder, vae and safety checker have their state dicts saved to CPU and then are moved to a
`torch.device('meta') and loaded to GPU only when their specific submodule has its `forward` method called.
"""
if is_accelerate_available():
from accelerate import cpu_offload
else:
raise ImportError("Please install accelerate via `pip install accelerate`")
device = torch.device(f"cuda:{gpu_id}")
for cpu_offloaded_model in [self.image_unet, self.text_unet, self.text_encoder, self.vae]:
if cpu_offloaded_model is not None:
cpu_offload(cpu_offloaded_model, device)
@property
# Copied from diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline._execution_device with unet->image_unet
def _execution_device(self):
r"""
Returns the device on which the pipeline's models will be executed. After calling
`pipeline.enable_sequential_cpu_offload()` the execution device can only be inferred from Accelerate's module
hooks.
"""
if not hasattr(self.image_unet, "_hf_hook"):
return self.device
for module in self.image_unet.modules():
if (
hasattr(module, "_hf_hook")
and hasattr(module._hf_hook, "execution_device")
and module._hf_hook.execution_device is not None
):
return torch.device(module._hf_hook.execution_device)
return self.device
def _encode_prompt(self, prompt, device, num_images_per_prompt, do_classifier_free_guidance, negative_prompt):
r"""
Encodes the prompt into text encoder hidden states.
......
......@@ -23,7 +23,7 @@ from transformers import CLIPImageProcessor, CLIPTextModelWithProjection, CLIPTo
from ...image_processor import VaeImageProcessor
from ...models import AutoencoderKL, Transformer2DModel, UNet2DConditionModel
from ...schedulers import KarrasDiffusionSchedulers
from ...utils import is_accelerate_available, logging, randn_tensor
from ...utils import logging, randn_tensor
from ..pipeline_utils import DiffusionPipeline, ImagePipelineOutput
from .modeling_text_unet import UNetFlatConditionModel
......@@ -99,42 +99,6 @@ class VersatileDiffusionTextToImagePipeline(DiffusionPipeline):
def remove_unused_weights(self):
self.register_modules(text_unet=None)
def enable_sequential_cpu_offload(self, gpu_id=0):
r"""
Offloads all models to CPU using accelerate, significantly reducing memory usage. When called, unet,
text_encoder, vae and safety checker have their state dicts saved to CPU and then are moved to a
`torch.device('meta') and loaded to GPU only when their specific submodule has its `forward` method called.
"""
if is_accelerate_available():
from accelerate import cpu_offload
else:
raise ImportError("Please install accelerate via `pip install accelerate`")
device = torch.device(f"cuda:{gpu_id}")
for cpu_offloaded_model in [self.image_unet, self.text_unet, self.text_encoder, self.vae]:
if cpu_offloaded_model is not None:
cpu_offload(cpu_offloaded_model, device)
@property
# Copied from diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline._execution_device with unet->image_unet
def _execution_device(self):
r"""
Returns the device on which the pipeline's models will be executed. After calling
`pipeline.enable_sequential_cpu_offload()` the execution device can only be inferred from Accelerate's module
hooks.
"""
if not hasattr(self.image_unet, "_hf_hook"):
return self.device
for module in self.image_unet.modules():
if (
hasattr(module, "_hf_hook")
and hasattr(module._hf_hook, "execution_device")
and module._hf_hook.execution_device is not None
):
return torch.device(module._hf_hook.execution_device)
return self.device
def _encode_prompt(self, prompt, device, num_images_per_prompt, do_classifier_free_guidance, negative_prompt):
r"""
Encodes the prompt into text encoder hidden states.
......
......@@ -42,7 +42,6 @@ class DanceDiffusionPipelineFastTests(PipelineTesterMixin, unittest.TestCase):
}
batch_params = UNCONDITIONAL_AUDIO_GENERATION_BATCH_PARAMS
test_attention_slicing = False
test_cpu_offload = False
def get_dummy_components(self):
torch.manual_seed(0)
......
......@@ -38,7 +38,6 @@ class DDIMPipelineFastTests(PipelineTesterMixin, unittest.TestCase):
"callback_steps",
}
batch_params = UNCONDITIONAL_IMAGE_GENERATION_BATCH_PARAMS
test_cpu_offload = False
def get_dummy_components(self):
torch.manual_seed(0)
......
......@@ -43,7 +43,6 @@ class DiTPipelineFastTests(PipelineTesterMixin, unittest.TestCase):
"callback_steps",
}
batch_params = CLASS_CONDITIONED_IMAGE_GENERATION_BATCH_PARAMS
test_cpu_offload = False
def get_dummy_components(self):
torch.manual_seed(0)
......
......@@ -51,7 +51,6 @@ class LDMTextToImagePipelineFastTests(PipelineTesterMixin, unittest.TestCase):
"callback_steps",
}
batch_params = TEXT_TO_IMAGE_BATCH_PARAMS
test_cpu_offload = False
def get_dummy_components(self):
torch.manual_seed(0)
......
......@@ -47,7 +47,6 @@ class RepaintPipelineFastTests(PipelineTesterMixin, unittest.TestCase):
"callback_steps",
}
batch_params = IMAGE_INPAINTING_BATCH_PARAMS
test_cpu_offload = False
def get_dummy_components(self):
torch.manual_seed(0)
......
......@@ -48,7 +48,7 @@ class SpectrogramDiffusionPipelineFastTests(PipelineTesterMixin, unittest.TestCa
"num_images_per_prompt",
}
test_attention_slicing = False
test_cpu_offload = False
batch_params = TOKENS_TO_AUDIO_GENERATION_PARAMS
params = TOKENS_TO_AUDIO_GENERATION_BATCH_PARAMS
......
......@@ -42,7 +42,6 @@ class StableDiffusionSAGPipelineFastTests(PipelineLatentTesterMixin, PipelineTes
batch_params = TEXT_TO_IMAGE_BATCH_PARAMS
image_params = TEXT_TO_IMAGE_IMAGE_PARAMS
image_latents_params = TEXT_TO_IMAGE_IMAGE_PARAMS
test_cpu_offload = False
def get_dummy_components(self):
torch.manual_seed(0)
......
......@@ -63,8 +63,6 @@ class StableDiffusionLatentUpscalePipelineFastTests(
) # TO-DO: update image_params once pipeline is refactored with VaeImageProcessor.preprocess
image_latents_params = frozenset([])
test_cpu_offload = True
@property
def dummy_image(self):
batch_size = 1
......
......@@ -227,7 +227,7 @@ class PipelineTesterMixin:
# set these parameters to False in the child class if the pipeline does not support the corresponding functionality
test_attention_slicing = True
test_cpu_offload = True
test_xformers_attention = True
def get_generator(self, seed):
......@@ -671,9 +671,6 @@ class PipelineTesterMixin:
reason="CPU offload is only available with CUDA and `accelerate v0.14.0` or higher",
)
def test_cpu_offload_forward_pass(self, expected_max_diff=1e-4):
if not self.test_cpu_offload:
return
components = self.get_dummy_components()
pipe = self.pipeline_class(**components)
pipe.to(torch_device)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment