pipeline_pndm.py 4.33 KB
Newer Older
Patrick von Platen's avatar
Patrick von Platen committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# Copyright 2022 The HuggingFace Team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and

# limitations under the License.


Pedro Cuenca's avatar
Pedro Cuenca committed
17
import warnings
18
from typing import Optional, Tuple, Union
Pedro Cuenca's avatar
Pedro Cuenca committed
19

Patrick von Platen's avatar
Patrick von Platen committed
20
21
import torch

Partho's avatar
Partho committed
22
from ...models import UNet2DModel
23
from ...pipeline_utils import DiffusionPipeline, ImagePipelineOutput
Partho's avatar
Partho committed
24
from ...schedulers import PNDMScheduler
Patrick von Platen's avatar
Patrick von Platen committed
25
26


Patrick von Platen's avatar
Patrick von Platen committed
27
class PNDMPipeline(DiffusionPipeline):
Kashif Rasul's avatar
Kashif Rasul committed
28
29
30
31
32
33
34
35
36
37
    r"""
    This model inherits from [`DiffusionPipeline`]. Check the superclass documentation for the generic methods the
    library implements for all the pipelines (such as downloading or saving, running on a particular device, etc.)

    Parameters:
        unet (:obj:`UNet2DModel`): U-Net architecture to denoise the encoded image latents.
        scheduler ([`SchedulerMixin`]):
            The `PNDMScheduler` to be used in combination with `unet` to denoise the encoded image.
    """

Partho's avatar
Partho committed
38
39
40
41
    unet: UNet2DModel
    scheduler: PNDMScheduler

    def __init__(self, unet: UNet2DModel, scheduler: PNDMScheduler):
Patrick von Platen's avatar
Patrick von Platen committed
42
        super().__init__()
43
44
        scheduler = scheduler.set_format("pt")
        self.register_modules(unet=unet, scheduler=scheduler)
Patrick von Platen's avatar
Patrick von Platen committed
45

Patrick von Platen's avatar
Patrick von Platen committed
46
    @torch.no_grad()
Partho's avatar
Partho committed
47
48
49
50
51
52
    def __call__(
        self,
        batch_size: int = 1,
        num_inference_steps: int = 50,
        generator: Optional[torch.Generator] = None,
        output_type: Optional[str] = "pil",
53
        return_dict: bool = True,
Partho's avatar
Partho committed
54
        **kwargs,
55
    ) -> Union[ImagePipelineOutput, Tuple]:
Kashif Rasul's avatar
Kashif Rasul committed
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
        r"""
        Args:
            batch_size (:obj:`int`, `optional`, defaults to 1): The number of images to generate.
            num_inference_steps (:
                obj:`int`, `optional`, defaults to 50): The number of denoising steps. More denoising steps usually
                lead to a higher quality image at the expense of slower inference.
            generator (:
                obj:`torch.Generator`, `optional`): A [torch
                generator](https://pytorch.org/docs/stable/generated/torch.Generator.html) to make generation
                deterministic.
            output_type (:
                obj:`str`, `optional`, defaults to :obj:`"pil"`): The output format of the generate image. Choose
                between [PIL](https://pillow.readthedocs.io/en/stable/): `PIL.Image.Image` or `nd.array`.
            return_dict (:
                obj:`bool`, `optional`, defaults to :obj:`True`): Whether or not to return a
                [`~pipeline_utils.ImagePipelineOutput`] instead of a plain tuple.
        """
Patrick von Platen's avatar
Patrick von Platen committed
73
74
        # For more information on the sampling method you can take a look at Algorithm 2 of
        # the official paper: https://arxiv.org/pdf/2202.09778.pdf
Patrick von Platen's avatar
Patrick von Platen committed
75

Pedro Cuenca's avatar
Pedro Cuenca committed
76
77
78
79
80
81
82
83
84
85
86
        if "torch_device" in kwargs:
            device = kwargs.pop("torch_device")
            warnings.warn(
                "`torch_device` is deprecated as an input argument to `__call__` and will be removed in v0.3.0."
                " Consider using `pipe.to(torch_device)` instead."
            )

            # Set device as before (to be removed in 0.3.0)
            if device is None:
                device = "cuda" if torch.cuda.is_available() else "cpu"
            self.to(device)
Patrick von Platen's avatar
Patrick von Platen committed
87
88
89

        # Sample gaussian noise to begin loop
        image = torch.randn(
Patrick von Platen's avatar
Patrick von Platen committed
90
            (batch_size, self.unet.in_channels, self.unet.sample_size, self.unet.sample_size),
Patrick von Platen's avatar
Patrick von Platen committed
91
            generator=generator,
Patrick von Platen's avatar
Patrick von Platen committed
92
        )
Pedro Cuenca's avatar
Pedro Cuenca committed
93
        image = image.to(self.device)
Patrick von Platen's avatar
Patrick von Platen committed
94

95
        self.scheduler.set_timesteps(num_inference_steps)
hysts's avatar
hysts committed
96
        for t in self.progress_bar(self.scheduler.timesteps):
97
            model_output = self.unet(image, t).sample
Patrick von Platen's avatar
Patrick von Platen committed
98

99
            image = self.scheduler.step(model_output, t, image).prev_sample
Patrick von Platen's avatar
Patrick von Platen committed
100

101
102
        image = (image / 2 + 0.5).clamp(0, 1)
        image = image.cpu().permute(0, 2, 3, 1).numpy()
103
104
        if output_type == "pil":
            image = self.numpy_to_pil(image)
105

106
107
108
109
        if not return_dict:
            return (image,)

        return ImagePipelineOutput(images=image)