# Copyright 2022 The HuggingFace Team. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import warnings from typing import Tuple, Union import torch from ...pipeline_utils import DiffusionPipeline, ImagePipelineOutput class DDIMPipeline(DiffusionPipeline): def __init__(self, unet, scheduler): super().__init__() scheduler = scheduler.set_format("pt") self.register_modules(unet=unet, scheduler=scheduler) @torch.no_grad() def __call__( self, batch_size=1, generator=None, eta=0.0, num_inference_steps=50, output_type="pil", return_dict: bool = True, **kwargs, ) -> Union[ImagePipelineOutput, Tuple]: if "torch_device" in kwargs: device = kwargs.pop("torch_device") warnings.warn( "`torch_device` is deprecated as an input argument to `__call__` and will be removed in v0.3.0." " Consider using `pipe.to(torch_device)` instead." ) # Set device as before (to be removed in 0.3.0) if device is None: device = "cuda" if torch.cuda.is_available() else "cpu" self.to(device) # eta corresponds to η in paper and should be between [0, 1] # Sample gaussian noise to begin loop image = torch.randn( (batch_size, self.unet.in_channels, self.unet.sample_size, self.unet.sample_size), generator=generator, ) image = image.to(self.device) # set step values self.scheduler.set_timesteps(num_inference_steps) for t in self.progress_bar(self.scheduler.timesteps): # 1. predict noise model_output model_output = self.unet(image, t).sample # 2. predict previous mean of image x_t-1 and add variance depending on eta # do x_t -> x_t-1 image = self.scheduler.step(model_output, t, image, eta).prev_sample image = (image / 2 + 0.5).clamp(0, 1) image = image.cpu().permute(0, 2, 3, 1).numpy() if output_type == "pil": image = self.numpy_to_pil(image) if not return_dict: return (image,) return ImagePipelineOutput(images=image)