scheduling_karras_ve.py 9.75 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# Copyright 2022 NVIDIA and The HuggingFace Team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


16
from dataclasses import dataclass
17
from typing import Optional, Tuple, Union
18
19
20
21
22

import numpy as np
import torch

from ..configuration_utils import ConfigMixin, register_to_config
23
from ..utils import BaseOutput, deprecate
24
25
26
from .scheduling_utils import SchedulerMixin


27
28
29
30
31
32
33
34
35
36
@dataclass
class KarrasVeOutput(BaseOutput):
    """
    Output class for the scheduler's step function output.

    Args:
        prev_sample (`torch.FloatTensor` of shape `(batch_size, num_channels, height, width)` for images):
            Computed sample (x_{t-1}) of previous timestep. `prev_sample` should be used as next model input in the
            denoising loop.
        derivative (`torch.FloatTensor` of shape `(batch_size, num_channels, height, width)` for images):
37
            Derivative of predicted original image sample (x_0).
38
39
40
        pred_original_sample (`torch.FloatTensor` of shape `(batch_size, num_channels, height, width)` for images):
            The predicted denoised sample (x_{0}) based on the model output from the current timestep.
            `pred_original_sample` can be used to preview progress or for guidance.
41
42
43
44
    """

    prev_sample: torch.FloatTensor
    derivative: torch.FloatTensor
45
    pred_original_sample: Optional[torch.FloatTensor] = None
46
47


48
49
class KarrasVeScheduler(SchedulerMixin, ConfigMixin):
    """
Suraj Patil's avatar
Suraj Patil committed
50
51
    Stochastic sampling from Karras et al. [1] tailored to the Variance-Expanding (VE) models [2]. Use Algorithm 2 and
    the VE column of Table 1 from [1] for reference.
52

Suraj Patil's avatar
Suraj Patil committed
53
54
55
    [1] Karras, Tero, et al. "Elucidating the Design Space of Diffusion-Based Generative Models."
    https://arxiv.org/abs/2206.00364 [2] Song, Yang, et al. "Score-based generative modeling through stochastic
    differential equations." https://arxiv.org/abs/2011.13456
56

57
58
59
    [`~ConfigMixin`] takes care of storing all config attributes that are passed in the scheduler's `__init__`
    function, such as `num_train_timesteps`. They can be accessed via `scheduler.config.num_train_timesteps`.
    [`~ConfigMixin`] also provides general loading and saving functionality via the [`~ConfigMixin.save_config`] and
Nathan Lambert's avatar
Nathan Lambert committed
60
    [`~ConfigMixin.from_config`] functions.
61

62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
    For more details on the parameters, see the original paper's Appendix E.: "Elucidating the Design Space of
    Diffusion-Based Generative Models." https://arxiv.org/abs/2206.00364. The grid search values used to find the
    optimal {s_noise, s_churn, s_min, s_max} for a specific model are described in Table 5 of the paper.

    Args:
        sigma_min (`float`): minimum noise magnitude
        sigma_max (`float`): maximum noise magnitude
        s_noise (`float`): the amount of additional noise to counteract loss of detail during sampling.
            A reasonable range is [1.000, 1.011].
        s_churn (`float`): the parameter controlling the overall amount of stochasticity.
            A reasonable range is [0, 100].
        s_min (`float`): the start value of the sigma range where we add noise (enable stochasticity).
            A reasonable range is [0, 10].
        s_max (`float`): the end value of the sigma range where we add noise.
            A reasonable range is [0.2, 80].

78
79
80
81
82
    """

    @register_to_config
    def __init__(
        self,
83
84
85
86
87
88
        sigma_min: float = 0.02,
        sigma_max: float = 100,
        s_noise: float = 1.007,
        s_churn: float = 80,
        s_min: float = 0.05,
        s_max: float = 50,
89
        **kwargs,
90
    ):
91
92
        deprecate(
            "tensor_format",
Patrick von Platen's avatar
Patrick von Platen committed
93
            "0.6.0",
94
95
96
            "If you're running your code in PyTorch, you can safely remove this argument.",
            take_from=kwargs,
        )
97

98
99
100
        # standard deviation of the initial noise distribution
        self.init_noise_sigma = sigma_max

101
        # setable values
102
        self.num_inference_steps: int = None
103
        self.timesteps: np.IntTensor = None
104
        self.schedule: torch.FloatTensor = None  # sigma(t_i)
105

106
107
108
109
110
111
112
113
114
115
116
117
118
119
    def scale_model_input(self, sample: torch.FloatTensor, timestep: Optional[int] = None) -> torch.FloatTensor:
        """
        Ensures interchangeability with schedulers that need to scale the denoising model input depending on the
        current timestep.

        Args:
            sample (`torch.FloatTensor`): input sample
            timestep (`int`, optional): current timestep

        Returns:
            `torch.FloatTensor`: scaled input sample
        """
        return sample

120
    def set_timesteps(self, num_inference_steps: int, device: Union[str, torch.device] = None):
121
122
123
124
125
126
127
128
        """
        Sets the continuous timesteps used for the diffusion chain. Supporting function to be run before inference.

        Args:
            num_inference_steps (`int`):
                the number of diffusion steps used when generating samples with a pre-trained model.

        """
129
        self.num_inference_steps = num_inference_steps
130
131
        timesteps = np.arange(0, self.num_inference_steps)[::-1].copy()
        self.timesteps = torch.from_numpy(timesteps).to(device)
132
        schedule = [
133
            (
134
                self.config.sigma_max**2
135
136
                * (self.config.sigma_min**2 / self.config.sigma_max**2) ** (i / (num_inference_steps - 1))
            )
137
138
            for i in self.timesteps
        ]
139
        self.schedule = torch.tensor(schedule, dtype=torch.float32, device=device)
140

141
    def add_noise_to_input(
142
143
        self, sample: torch.FloatTensor, sigma: float, generator: Optional[torch.Generator] = None
    ) -> Tuple[torch.FloatTensor, float]:
144
        """
Suraj Patil's avatar
Suraj Patil committed
145
146
        Explicit Langevin-like "churn" step of adding noise to the sample according to a factor gamma_i ≥ 0 to reach a
        higher noise level sigma_hat = sigma_i + gamma_i*sigma_i.
147
148

        TODO Args:
149
        """
150
151
        if self.config.s_min <= sigma <= self.config.s_max:
            gamma = min(self.config.s_churn / self.num_inference_steps, 2**0.5 - 1)
152
153
154
155
        else:
            gamma = 0

        # sample eps ~ N(0, S_noise^2 * I)
156
        eps = self.config.s_noise * torch.randn(sample.shape, generator=generator).to(sample.device)
157
158
159
160
161
162
163
        sigma_hat = sigma + gamma * sigma
        sample_hat = sample + ((sigma_hat**2 - sigma**2) ** 0.5 * eps)

        return sample_hat, sigma_hat

    def step(
        self,
164
        model_output: torch.FloatTensor,
165
166
        sigma_hat: float,
        sigma_prev: float,
167
        sample_hat: torch.FloatTensor,
168
169
        return_dict: bool = True,
    ) -> Union[KarrasVeOutput, Tuple]:
170
171
172
173
174
        """
        Predict the sample at the previous timestep by reversing the SDE. Core function to propagate the diffusion
        process from the learned model outputs (most often the predicted noise).

        Args:
175
            model_output (`torch.FloatTensor`): direct output from learned diffusion model.
176
177
            sigma_hat (`float`): TODO
            sigma_prev (`float`): TODO
178
            sample_hat (`torch.FloatTensor`): TODO
179
            return_dict (`bool`): option for returning tuple rather than KarrasVeOutput class
180
181

            KarrasVeOutput: updated sample in the diffusion chain and derivative (TODO double check).
182
183
184
185
        Returns:
            [`~schedulers.scheduling_karras_ve.KarrasVeOutput`] or `tuple`:
            [`~schedulers.scheduling_karras_ve.KarrasVeOutput`] if `return_dict` is True, otherwise a `tuple`. When
            returning a tuple, the first element is the sample tensor.
186
187

        """
188

189
190
191
192
        pred_original_sample = sample_hat + sigma_hat * model_output
        derivative = (sample_hat - pred_original_sample) / sigma_hat
        sample_prev = sample_hat + (sigma_prev - sigma_hat) * derivative

193
194
195
        if not return_dict:
            return (sample_prev, derivative)

196
197
198
        return KarrasVeOutput(
            prev_sample=sample_prev, derivative=derivative, pred_original_sample=pred_original_sample
        )
199
200
201

    def step_correct(
        self,
202
        model_output: torch.FloatTensor,
203
204
        sigma_hat: float,
        sigma_prev: float,
205
206
207
        sample_hat: torch.FloatTensor,
        sample_prev: torch.FloatTensor,
        derivative: torch.FloatTensor,
208
209
        return_dict: bool = True,
    ) -> Union[KarrasVeOutput, Tuple]:
210
211
212
213
        """
        Correct the predicted sample based on the output model_output of the network. TODO complete description

        Args:
214
            model_output (`torch.FloatTensor`): direct output from learned diffusion model.
215
216
            sigma_hat (`float`): TODO
            sigma_prev (`float`): TODO
217
218
219
            sample_hat (`torch.FloatTensor`): TODO
            sample_prev (`torch.FloatTensor`): TODO
            derivative (`torch.FloatTensor`): TODO
220
            return_dict (`bool`): option for returning tuple rather than KarrasVeOutput class
221
222
223

        Returns:
            prev_sample (TODO): updated sample in the diffusion chain. derivative (TODO): TODO
224

225
        """
226
227
228
        pred_original_sample = sample_prev + sigma_prev * model_output
        derivative_corr = (sample_prev - pred_original_sample) / sigma_prev
        sample_prev = sample_hat + (sigma_prev - sigma_hat) * (0.5 * derivative + 0.5 * derivative_corr)
229
230
231
232

        if not return_dict:
            return (sample_prev, derivative)

233
234
235
        return KarrasVeOutput(
            prev_sample=sample_prev, derivative=derivative, pred_original_sample=pred_original_sample
        )
236
237
238

    def add_noise(self, original_samples, noise, timesteps):
        raise NotImplementedError()