Commit 4497e78d authored by Nathan Lambert's avatar Nathan Lambert
Browse files

merge unet-rl formatting

parents 49718b47 77aadfee
This diff is collapsed.
import torch
import PIL.Image
from diffusers import DiffusionPipeline
generator = torch.Generator()
generator = generator.manual_seed(0)
model_id = "fusing/glide-base"
# load model and scheduler
pipeline = DiffusionPipeline.from_pretrained(model_id)
# run inference (text-conditioned denoising + upscaling)
img = pipeline("a crayon drawing of a corgi", generator)
# process image to PIL
img = img.squeeze(0)
img = ((img + 1) * 127.5).round().clamp(0, 255).to(torch.uint8).cpu().numpy()
image_pil = PIL.Image.fromarray(img)
# save image
image_pil.save("test.png")
# coding=utf-8
# Copyright 2022 The Fairseq Authors and The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
""" LDMBERT model configuration"""
from transformers.configuration_utils import PretrainedConfig
from transformers.utils import logging
logger = logging.get_logger(__name__)
LDMBERT_PRETRAINED_CONFIG_ARCHIVE_MAP = {
"ldm-bert": "https://huggingface.co/ldm-bert/resolve/main/config.json",
}
class LDMBertConfig(PretrainedConfig):
r"""
This is the configuration class to store the configuration of a [`LDMBertModel`]. It is used to instantiate a
LDMBERT model according to the specified arguments, defining the model architecture. Instantiating a configuration
with the defaults will yield a similar configuration to that of the LDMBERT
[facebook/ldmbert-large](https://huggingface.co/facebook/ldmbert-large) architecture.
Configuration objects inherit from [`PretrainedConfig`] and can be used to control the model outputs. Read the
documentation from [`PretrainedConfig`] for more information.
Args:
vocab_size (`int`, *optional*, defaults to 50265):
Vocabulary size of the LDMBERT model. Defines the number of different tokens that can be represented by the
`inputs_ids` passed when calling [`LDMBertModel`] or [`TFLDMBertModel`].
d_model (`int`, *optional*, defaults to 1024):
Dimensionality of the layers and the pooler layer.
encoder_layers (`int`, *optional*, defaults to 12):
Number of encoder layers.
decoder_layers (`int`, *optional*, defaults to 12):
Number of decoder layers.
encoder_attention_heads (`int`, *optional*, defaults to 16):
Number of attention heads for each attention layer in the Transformer encoder.
decoder_attention_heads (`int`, *optional*, defaults to 16):
Number of attention heads for each attention layer in the Transformer decoder.
decoder_ffn_dim (`int`, *optional*, defaults to 4096):
Dimensionality of the "intermediate" (often named feed-forward) layer in decoder.
encoder_ffn_dim (`int`, *optional*, defaults to 4096):
Dimensionality of the "intermediate" (often named feed-forward) layer in decoder.
activation_function (`str` or `function`, *optional*, defaults to `"gelu"`):
The non-linear activation function (function or string) in the encoder and pooler. If string, `"gelu"`,
`"relu"`, `"silu"` and `"gelu_new"` are supported.
dropout (`float`, *optional*, defaults to 0.1):
The dropout probability for all fully connected layers in the embeddings, encoder, and pooler.
attention_dropout (`float`, *optional*, defaults to 0.0):
The dropout ratio for the attention probabilities.
activation_dropout (`float`, *optional*, defaults to 0.0):
The dropout ratio for activations inside the fully connected layer.
classifier_dropout (`float`, *optional*, defaults to 0.0):
The dropout ratio for classifier.
max_position_embeddings (`int`, *optional*, defaults to 1024):
The maximum sequence length that this model might ever be used with. Typically set this to something large
just in case (e.g., 512 or 1024 or 2048).
init_std (`float`, *optional*, defaults to 0.02):
The standard deviation of the truncated_normal_initializer for initializing all weight matrices.
encoder_layerdrop: (`float`, *optional*, defaults to 0.0):
The LayerDrop probability for the encoder. See the [LayerDrop paper](see https://arxiv.org/abs/1909.11556)
for more details.
decoder_layerdrop: (`float`, *optional*, defaults to 0.0):
The LayerDrop probability for the decoder. See the [LayerDrop paper](see https://arxiv.org/abs/1909.11556)
for more details.
scale_embedding (`bool`, *optional*, defaults to `False`):
Scale embeddings by diving by sqrt(d_model).
use_cache (`bool`, *optional*, defaults to `True`):
Whether or not the model should return the last key/values attentions (not used by all models).
num_labels: (`int`, *optional*, defaults to 3):
The number of labels to use in [`LDMBertForSequenceClassification`].
forced_eos_token_id (`int`, *optional*, defaults to 2):
The id of the token to force as the last generated token when `max_length` is reached. Usually set to
`eos_token_id`.
Example:
```python
>>> from transformers import LDMBertModel, LDMBertConfig
>>> # Initializing a LDMBERT facebook/ldmbert-large style configuration
>>> configuration = LDMBertConfig()
>>> # Initializing a model from the facebook/ldmbert-large style configuration
>>> model = LDMBertModel(configuration)
>>> # Accessing the model configuration
>>> configuration = model.config
```"""
model_type = "ldmbert"
keys_to_ignore_at_inference = ["past_key_values"]
attribute_map = {"num_attention_heads": "encoder_attention_heads", "hidden_size": "d_model"}
def __init__(
self,
vocab_size=30522,
max_position_embeddings=77,
encoder_layers=32,
encoder_ffn_dim=5120,
encoder_attention_heads=8,
head_dim=64,
encoder_layerdrop=0.0,
activation_function="gelu",
d_model=1280,
dropout=0.1,
attention_dropout=0.0,
activation_dropout=0.0,
init_std=0.02,
classifier_dropout=0.0,
scale_embedding=False,
use_cache=True,
pad_token_id=0,
**kwargs,
):
self.vocab_size = vocab_size
self.max_position_embeddings = max_position_embeddings
self.d_model = d_model
self.encoder_ffn_dim = encoder_ffn_dim
self.encoder_layers = encoder_layers
self.encoder_attention_heads = encoder_attention_heads
self.head_dim = head_dim
self.dropout = dropout
self.attention_dropout = attention_dropout
self.activation_dropout = activation_dropout
self.activation_function = activation_function
self.init_std = init_std
self.encoder_layerdrop = encoder_layerdrop
self.classifier_dropout = classifier_dropout
self.use_cache = use_cache
self.num_hidden_layers = encoder_layers
self.scale_embedding = scale_embedding # scale factor will be sqrt(d_model) if True
super().__init__(pad_token_id=pad_token_id, **kwargs)
import torch
import tqdm
from diffusers import DiffusionPipeline
from .configuration_ldmbert import LDMBertConfig # NOQA
from .modeling_ldmbert import LDMBertModel # NOQA
# add these relative imports here, so we can load from hub
from .modeling_vae import AutoencoderKL # NOQA
class LatentDiffusion(DiffusionPipeline):
def __init__(self, vqvae, bert, tokenizer, unet, noise_scheduler):
super().__init__()
self.register_modules(vqvae=vqvae, bert=bert, tokenizer=tokenizer, unet=unet, noise_scheduler=noise_scheduler)
@torch.no_grad()
def __call__(
self,
prompt,
batch_size=1,
generator=None,
torch_device=None,
eta=0.0,
guidance_scale=1.0,
num_inference_steps=50,
):
# eta corresponds to η in paper and should be between [0, 1]
if torch_device is None:
torch_device = "cuda" if torch.cuda.is_available() else "cpu"
self.unet.to(torch_device)
self.vqvae.to(torch_device)
self.bert.to(torch_device)
# get unconditional embeddings for classifier free guidence
if guidance_scale != 1.0:
uncond_input = self.tokenizer([""], padding="max_length", max_length=77, return_tensors="pt").to(
torch_device
)
uncond_embeddings = self.bert(uncond_input.input_ids)[0]
# get text embedding
text_input = self.tokenizer(prompt, padding="max_length", max_length=77, return_tensors="pt").to(torch_device)
text_embedding = self.bert(text_input.input_ids)[0]
num_trained_timesteps = self.noise_scheduler.config.timesteps
inference_step_times = range(0, num_trained_timesteps, num_trained_timesteps // num_inference_steps)
image = self.noise_scheduler.sample_noise(
(batch_size, self.unet.in_channels, self.unet.image_size, self.unet.image_size),
device=torch_device,
generator=generator,
)
# See formulas (12) and (16) of DDIM paper https://arxiv.org/pdf/2010.02502.pdf
# Ideally, read DDIM paper in-detail understanding
# Notation (<variable name> -> <name in paper>
# - pred_noise_t -> e_theta(x_t, t)
# - pred_original_image -> f_theta(x_t, t) or x_0
# - std_dev_t -> sigma_t
# - eta -> η
# - pred_image_direction -> "direction pointingc to x_t"
# - pred_prev_image -> "x_t-1"
for t in tqdm.tqdm(reversed(range(num_inference_steps)), total=num_inference_steps):
# guidance_scale of 1 means no guidance
if guidance_scale == 1.0:
image_in = image
context = text_embedding
timesteps = torch.tensor([inference_step_times[t]] * image.shape[0], device=torch_device)
else:
# for classifier free guidance, we need to do two forward passes
# here we concanate embedding and unconditioned embedding in a single batch
# to avoid doing two forward passes
image_in = torch.cat([image] * 2)
context = torch.cat([uncond_embeddings, text_embedding])
timesteps = torch.tensor([inference_step_times[t]] * image.shape[0], device=torch_device)
# 1. predict noise residual
pred_noise_t = self.unet(image_in, timesteps, context=context)
# perform guidance
if guidance_scale != 1.0:
pred_noise_t_uncond, pred_noise_t = pred_noise_t.chunk(2)
pred_noise_t = pred_noise_t_uncond + guidance_scale * (pred_noise_t - pred_noise_t_uncond)
# 2. predict previous mean of image x_t-1
pred_prev_image = self.noise_scheduler.step(pred_noise_t, image, t, num_inference_steps, eta)
# 3. optionally sample variance
variance = 0
if eta > 0:
noise = self.noise_scheduler.sample_noise(image.shape, device=image.device, generator=generator)
variance = self.noise_scheduler.get_variance(t, num_inference_steps).sqrt() * eta * noise
# 4. set current image to prev_image: x_t -> x_t-1
image = pred_prev_image + variance
# scale and decode image with vae
image = 1 / 0.18215 * image
image = self.vqvae.decode(image)
image = torch.clamp((image + 1.0) / 2.0, min=0.0, max=1.0)
return image
......@@ -291,7 +291,7 @@ class BDDM(DiffusionPipeline):
# Sample gaussian noise to begin loop
audio = torch.normal(0, 1, size=audio_size, generator=generator).to(torch_device)
timestep_values = self.noise_scheduler.get_timestep_values()
timestep_values = self.noise_scheduler.config.timestep_values
num_prediction_steps = len(self.noise_scheduler)
for t in tqdm.tqdm(reversed(range(num_prediction_steps)), total=num_prediction_steps):
# 1. predict noise residual
......
......@@ -24,17 +24,11 @@ import torch.utils.checkpoint
from torch import nn
import tqdm
try:
from transformers import CLIPConfig, CLIPModel, CLIPTextConfig, CLIPVisionConfig, GPT2Tokenizer
from transformers.activations import ACT2FN
from transformers.modeling_outputs import BaseModelOutput, BaseModelOutputWithPooling
from transformers.modeling_utils import PreTrainedModel
from transformers.utils import ModelOutput, add_start_docstrings_to_model_forward, replace_return_docstrings
except:
print("Transformers is not installed")
pass
from transformers import CLIPConfig, CLIPModel, CLIPTextConfig, CLIPVisionConfig, GPT2Tokenizer
from transformers.activations import ACT2FN
from transformers.modeling_outputs import BaseModelOutput, BaseModelOutputWithPooling
from transformers.modeling_utils import PreTrainedModel
from transformers.utils import ModelOutput, add_start_docstrings_to_model_forward, replace_return_docstrings
from ..models import GLIDESuperResUNetModel, GLIDETextToImageUNetModel
from ..pipeline_utils import DiffusionPipeline
......
......@@ -472,7 +472,7 @@ class GradTTS(DiffusionPipeline):
t = (1.0 - (t + 0.5) * h) * torch.ones(z.shape[0], dtype=z.dtype, device=z.device)
time = t.unsqueeze(-1).unsqueeze(-1)
residual = self.unet(xt, y_mask, mu_y, t, speaker_id)
residual = self.unet(xt, t, mu_y, y_mask, speaker_id)
xt = self.noise_scheduler.step(xt, residual, mu_y, h, time)
xt = xt * y_mask
......
# Copyright 2022 The HuggingFace Team. All rights reserved.
# Copyright 2022 Stanford University Team and The HuggingFace Team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
......@@ -11,12 +11,40 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# DISCLAIMER: This code is strongly influenced by https://github.com/pesser/pytorch_diffusion
# and https://github.com/hojonathanho/diffusion
import math
import numpy as np
from ..configuration_utils import ConfigMixin
from .scheduling_utils import SchedulerMixin, betas_for_alpha_bar, linear_beta_schedule
from .scheduling_utils import SchedulerMixin
def betas_for_alpha_bar(num_diffusion_timesteps, max_beta=0.999):
"""
Create a beta schedule that discretizes the given alpha_t_bar function,
which defines the cumulative product of (1-beta) over time from t = [0,1].
:param num_diffusion_timesteps: the number of betas to produce.
:param alpha_bar: a lambda that takes an argument t from 0 to 1 and
produces the cumulative product of (1-beta) up to that
part of the diffusion process.
:param max_beta: the maximum beta to use; use values lower than 1 to
prevent singularities.
"""
def alpha_bar(time_step):
return math.cos((time_step + 0.008) / 1.008 * math.pi / 2) ** 2
betas = []
for i in range(num_diffusion_timesteps):
t1 = i / num_diffusion_timesteps
t2 = (i + 1) / num_diffusion_timesteps
betas.append(min(1 - alpha_bar(t2) / alpha_bar(t1), max_beta))
return np.array(betas, dtype=np.float32)
class DDIMScheduler(SchedulerMixin, ConfigMixin):
......@@ -43,13 +71,10 @@ class DDIMScheduler(SchedulerMixin, ConfigMixin):
)
if beta_schedule == "linear":
self.betas = linear_beta_schedule(timesteps, beta_start=beta_start, beta_end=beta_end)
self.betas = np.linspace(beta_start, beta_end, timesteps, dtype=np.float32)
elif beta_schedule == "squaredcos_cap_v2":
# GLIDE cosine schedule
self.betas = betas_for_alpha_bar(
timesteps,
lambda t: math.cos((t + 0.008) / 1.008 * math.pi / 2) ** 2,
)
self.betas = betas_for_alpha_bar(timesteps)
else:
raise NotImplementedError(f"{beta_schedule} does is not implemented for {self.__class__}")
......@@ -59,53 +84,12 @@ class DDIMScheduler(SchedulerMixin, ConfigMixin):
self.set_format(tensor_format=tensor_format)
# alphas_cumprod_prev = torch.nn.functional.pad(alphas_cumprod[:-1], (1, 0), value=1.0)
# TODO(PVP) - check how much of these is actually necessary!
# LDM only uses "fixed_small"; glide seems to use a weird mix of the two, ...
# https://github.com/openai/glide-text2im/blob/69b530740eb6cef69442d6180579ef5ba9ef063e/glide_text2im/gaussian_diffusion.py#L246
# variance = betas * (1.0 - alphas_cumprod_prev) / (1.0 - alphas_cumprod)
# if variance_type == "fixed_small":
# log_variance = torch.log(variance.clamp(min=1e-20))
# elif variance_type == "fixed_large":
# log_variance = torch.log(torch.cat([variance[1:2], betas[1:]], dim=0))
#
#
# self.register_buffer("log_variance", log_variance.to(torch.float32))
# def rescale_betas(self, num_timesteps):
# # GLIDE scaling
# if self.beta_schedule == "linear":
# scale = self.timesteps / num_timesteps
# self.betas = linear_beta_schedule(
# num_timesteps, beta_start=self.beta_start * scale, beta_end=self.beta_end * scale
# )
# self.alphas = 1.0 - self.betas
# self.alphas_cumprod = np.cumprod(self.alphas, axis=0)
def get_timestep_values(self):
return self.config.timestep_values
def get_alpha(self, time_step):
return self.alphas[time_step]
def get_beta(self, time_step):
return self.betas[time_step]
def get_alpha_prod(self, time_step):
if time_step < 0:
return self.one
return self.alphas_cumprod[time_step]
def get_orig_t(self, t, num_inference_steps):
if t < 0:
return -1
return self.config.timesteps // num_inference_steps * t
def get_variance(self, t, num_inference_steps):
orig_t = self.get_orig_t(t, num_inference_steps)
orig_prev_t = self.get_orig_t(t - 1, num_inference_steps)
orig_t = self.config.timesteps // num_inference_steps * t
orig_prev_t = self.config.timesteps // num_inference_steps * (t - 1) if t > 0 else -1
alpha_prod_t = self.get_alpha_prod(orig_t)
alpha_prod_t_prev = self.get_alpha_prod(orig_prev_t)
alpha_prod_t = self.alphas_cumprod[orig_t]
alpha_prod_t_prev = self.alphas_cumprod[orig_prev_t] if orig_prev_t >= 0 else self.one
beta_prod_t = 1 - alpha_prod_t
beta_prod_t_prev = 1 - alpha_prod_t_prev
......@@ -126,12 +110,12 @@ class DDIMScheduler(SchedulerMixin, ConfigMixin):
# - pred_prev_sample -> "x_t-1"
# 1. get actual t and t-1
orig_t = self.get_orig_t(t, num_inference_steps)
orig_prev_t = self.get_orig_t(t - 1, num_inference_steps)
orig_t = self.config.timesteps // num_inference_steps * t
orig_prev_t = self.config.timesteps // num_inference_steps * (t - 1) if t > 0 else -1
# 2. compute alphas, betas
alpha_prod_t = self.get_alpha_prod(orig_t)
alpha_prod_t_prev = self.get_alpha_prod(orig_prev_t)
alpha_prod_t = self.alphas_cumprod[orig_t]
alpha_prod_t_prev = self.alphas_cumprod[orig_prev_t] if orig_prev_t >= 0 else self.one
beta_prod_t = 1 - alpha_prod_t
# 3. compute predicted original sample from predicted noise also called
......
# Copyright 2022 The HuggingFace Team. All rights reserved.
# Copyright 2022 UC Berkely Team and The HuggingFace Team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
......@@ -11,12 +11,39 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# DISCLAIMER: This file is strongly influenced by https://github.com/ermongroup/ddim
import math
import numpy as np
from ..configuration_utils import ConfigMixin
from .scheduling_utils import SchedulerMixin, betas_for_alpha_bar, linear_beta_schedule
from .scheduling_utils import SchedulerMixin
def betas_for_alpha_bar(num_diffusion_timesteps, max_beta=0.999):
"""
Create a beta schedule that discretizes the given alpha_t_bar function,
which defines the cumulative product of (1-beta) over time from t = [0,1].
:param num_diffusion_timesteps: the number of betas to produce.
:param alpha_bar: a lambda that takes an argument t from 0 to 1 and
produces the cumulative product of (1-beta) up to that
part of the diffusion process.
:param max_beta: the maximum beta to use; use values lower than 1 to
prevent singularities.
"""
def alpha_bar(time_step):
return math.cos((time_step + 0.008) / 1.008 * math.pi / 2) ** 2
betas = []
for i in range(num_diffusion_timesteps):
t1 = i / num_diffusion_timesteps
t2 = (i + 1) / num_diffusion_timesteps
betas.append(min(1 - alpha_bar(t2) / alpha_bar(t1), max_beta))
return np.array(betas, dtype=np.float32)
class DDPMScheduler(SchedulerMixin, ConfigMixin):
......@@ -47,13 +74,10 @@ class DDPMScheduler(SchedulerMixin, ConfigMixin):
if trained_betas is not None:
self.betas = np.asarray(trained_betas)
elif beta_schedule == "linear":
self.betas = linear_beta_schedule(timesteps, beta_start=beta_start, beta_end=beta_end)
self.betas = np.linspace(beta_start, beta_end, timesteps, dtype=np.float32)
elif beta_schedule == "squaredcos_cap_v2":
# GLIDE cosine schedule
self.betas = betas_for_alpha_bar(
timesteps,
lambda t: math.cos((t + 0.008) / 1.008 * math.pi / 2) ** 2,
)
self.betas = betas_for_alpha_bar(timesteps)
else:
raise NotImplementedError(f"{beta_schedule} does is not implemented for {self.__class__}")
......@@ -63,44 +87,14 @@ class DDPMScheduler(SchedulerMixin, ConfigMixin):
self.set_format(tensor_format=tensor_format)
# self.register_buffer("betas", betas.to(torch.float32))
# self.register_buffer("alphas", alphas.to(torch.float32))
# self.register_buffer("alphas_cumprod", alphas_cumprod.to(torch.float32))
# alphas_cumprod_prev = torch.nn.functional.pad(alphas_cumprod[:-1], (1, 0), value=1.0)
# TODO(PVP) - check how much of these is actually necessary!
# LDM only uses "fixed_small"; glide seems to use a weird mix of the two, ...
# https://github.com/openai/glide-text2im/blob/69b530740eb6cef69442d6180579ef5ba9ef063e/glide_text2im/gaussian_diffusion.py#L246
# variance = betas * (1.0 - alphas_cumprod_prev) / (1.0 - alphas_cumprod)
# if variance_type == "fixed_small":
# log_variance = torch.log(variance.clamp(min=1e-20))
# elif variance_type == "fixed_large":
# log_variance = torch.log(torch.cat([variance[1:2], betas[1:]], dim=0))
#
#
# self.register_buffer("log_variance", log_variance.to(torch.float32))
def get_timestep_values(self):
return self.config.timestep_values
def get_alpha(self, time_step):
return self.alphas[time_step]
def get_beta(self, time_step):
return self.betas[time_step]
def get_alpha_prod(self, time_step):
if time_step < 0:
return self.one
return self.alphas_cumprod[time_step]
def get_variance(self, t):
alpha_prod_t = self.get_alpha_prod(t)
alpha_prod_t_prev = self.get_alpha_prod(t - 1)
alpha_prod_t = self.alphas_cumprod[t]
alpha_prod_t_prev = self.alphas_cumprod[t - 1] if t > 0 else self.one
# For t > 0, compute predicted variance βt (see formala (6) and (7) from https://arxiv.org/pdf/2006.11239.pdf)
# and sample from it to get previous sample
# x_{t-1} ~ N(pred_prev_sample, variance) == add variane to pred_sample
variance = (1 - alpha_prod_t_prev) / (1 - alpha_prod_t) * self.get_beta(t)
variance = (1 - alpha_prod_t_prev) / (1 - alpha_prod_t) * self.betas[t]
# hacks - were probs added for training stability
if self.config.variance_type == "fixed_small":
......@@ -109,14 +103,14 @@ class DDPMScheduler(SchedulerMixin, ConfigMixin):
elif self.config.variance_type == "fixed_small_log":
variance = self.log(self.clip(variance, min_value=1e-20))
elif self.config.variance_type == "fixed_large":
variance = self.get_beta(t)
variance = self.betas[t]
return variance
def step(self, residual, sample, t, predict_epsilon=True):
# 1. compute alphas, betas
alpha_prod_t = self.get_alpha_prod(t)
alpha_prod_t_prev = self.get_alpha_prod(t - 1)
alpha_prod_t = self.alphas_cumprod[t]
alpha_prod_t_prev = self.alphas_cumprod[t - 1] if t > 0 else self.one
beta_prod_t = 1 - alpha_prod_t
beta_prod_t_prev = 1 - alpha_prod_t_prev
......@@ -133,8 +127,8 @@ class DDPMScheduler(SchedulerMixin, ConfigMixin):
# 4. Compute coefficients for pred_original_sample x_0 and current sample x_t
# See formula (7) from https://arxiv.org/pdf/2006.11239.pdf
pred_original_sample_coeff = (alpha_prod_t_prev ** (0.5) * self.get_beta(t)) / beta_prod_t
current_sample_coeff = self.get_alpha(t) ** (0.5) * beta_prod_t_prev / beta_prod_t
pred_original_sample_coeff = (alpha_prod_t_prev ** (0.5) * self.betas[t]) / beta_prod_t
current_sample_coeff = self.alphas[t] ** (0.5) * beta_prod_t_prev / beta_prod_t
# 5. Compute predicted previous sample µ_t
# See formula (7) from https://arxiv.org/pdf/2006.11239.pdf
......@@ -143,8 +137,8 @@ class DDPMScheduler(SchedulerMixin, ConfigMixin):
return pred_prev_sample
def forward_step(self, original_sample, noise, t):
sqrt_alpha_prod = self.get_alpha_prod(t) ** 0.5
sqrt_one_minus_alpha_prod = (1 - self.get_alpha_prod(t)) ** 0.5
sqrt_alpha_prod = self.alpha_prod_t[t] ** 0.5
sqrt_one_minus_alpha_prod = (1 - self.alpha_prod_t[t]) ** 0.5
noisy_sample = sqrt_alpha_prod * original_sample + sqrt_one_minus_alpha_prod * noise
return noisy_sample
......
# Copyright 2022 The HuggingFace Team. All rights reserved.
# Copyright 2022 Zhejiang University Team and The HuggingFace Team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
......@@ -11,12 +11,39 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# DISCLAIMER: This file is strongly influenced by https://github.com/ermongroup/ddim
import math
import numpy as np
from ..configuration_utils import ConfigMixin
from .scheduling_utils import SchedulerMixin, betas_for_alpha_bar, linear_beta_schedule
from .scheduling_utils import SchedulerMixin
def betas_for_alpha_bar(num_diffusion_timesteps, max_beta=0.999):
"""
Create a beta schedule that discretizes the given alpha_t_bar function,
which defines the cumulative product of (1-beta) over time from t = [0,1].
:param num_diffusion_timesteps: the number of betas to produce.
:param alpha_bar: a lambda that takes an argument t from 0 to 1 and
produces the cumulative product of (1-beta) up to that
part of the diffusion process.
:param max_beta: the maximum beta to use; use values lower than 1 to
prevent singularities.
"""
def alpha_bar(time_step):
return math.cos((time_step + 0.008) / 1.008 * math.pi / 2) ** 2
betas = []
for i in range(num_diffusion_timesteps):
t1 = i / num_diffusion_timesteps
t2 = (i + 1) / num_diffusion_timesteps
betas.append(min(1 - alpha_bar(t2) / alpha_bar(t1), max_beta))
return np.array(betas, dtype=np.float32)
class PNDMScheduler(SchedulerMixin, ConfigMixin):
......@@ -37,13 +64,10 @@ class PNDMScheduler(SchedulerMixin, ConfigMixin):
)
if beta_schedule == "linear":
self.betas = linear_beta_schedule(timesteps, beta_start=beta_start, beta_end=beta_end)
self.betas = np.linspace(beta_start, beta_end, timesteps, dtype=np.float32)
elif beta_schedule == "squaredcos_cap_v2":
# GLIDE cosine schedule
self.betas = betas_for_alpha_bar(
timesteps,
lambda t: math.cos((t + 0.008) / 1.008 * math.pi / 2) ** 2,
)
self.betas = betas_for_alpha_bar(timesteps)
else:
raise NotImplementedError(f"{beta_schedule} does is not implemented for {self.__class__}")
......@@ -67,17 +91,6 @@ class PNDMScheduler(SchedulerMixin, ConfigMixin):
self.time_steps = {}
self.set_prk_mode()
def get_alpha(self, time_step):
return self.alphas[time_step]
def get_beta(self, time_step):
return self.betas[time_step]
def get_alpha_prod(self, time_step):
if time_step < 0:
return self.one
return self.alphas_cumprod[time_step]
def get_prk_time_steps(self, num_inference_steps):
if num_inference_steps in self.prk_time_steps:
return self.prk_time_steps[num_inference_steps]
......@@ -169,8 +182,8 @@ class PNDMScheduler(SchedulerMixin, ConfigMixin):
# sample -> x_t
# residual -> e_θ(x_t, t)
# prev_sample -> x_(t−δ)
alpha_prod_t = self.get_alpha_prod(t_orig + 1)
alpha_prod_t_prev = self.get_alpha_prod(t_orig_prev + 1)
alpha_prod_t = self.alphas_cumprod[t_orig + 1]
alpha_prod_t_prev = self.alphas_cumprod[t_orig_prev + 1]
beta_prod_t = 1 - alpha_prod_t
beta_prod_t_prev = 1 - alpha_prod_t_prev
......
......@@ -18,30 +18,6 @@ import torch
SCHEDULER_CONFIG_NAME = "scheduler_config.json"
def linear_beta_schedule(timesteps, beta_start, beta_end):
return np.linspace(beta_start, beta_end, timesteps, dtype=np.float32)
def betas_for_alpha_bar(num_diffusion_timesteps, alpha_bar, max_beta=0.999):
"""
Create a beta schedule that discretizes the given alpha_t_bar function,
which defines the cumulative product of (1-beta) over time from t = [0,1].
:param num_diffusion_timesteps: the number of betas to produce.
:param alpha_bar: a lambda that takes an argument t from 0 to 1 and
produces the cumulative product of (1-beta) up to that
part of the diffusion process.
:param max_beta: the maximum beta to use; use values lower than 1 to
prevent singularities.
"""
betas = []
for i in range(num_diffusion_timesteps):
t1 = i / num_diffusion_timesteps
t2 = (i + 1) / num_diffusion_timesteps
betas.append(min(1 - alpha_bar(t2) / alpha_bar(t1), max_beta))
return np.array(betas, dtype=np.float32)
class SchedulerMixin:
config_name = SCHEDULER_CONFIG_NAME
......
#!/usr/bin/env python
# coding=utf-8
# flake8: noqa
# There's no way to ignore "F401 '...' imported but unused" warnings in this
# module, but to preserve other warnings. So, don't check this module at all.
import os
# Copyright 2021 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
......@@ -20,8 +11,18 @@ import os
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import importlib
import os
from collections import OrderedDict
import importlib_metadata
from requests.exceptions import HTTPError
from .logging import get_logger
logger = get_logger(__name__)
hf_cache_home = os.path.expanduser(
os.getenv("HF_HOME", os.path.join(os.getenv("XDG_CACHE_HOME", "~/.cache"), "huggingface"))
......@@ -36,6 +37,18 @@ DIFFUSERS_DYNAMIC_MODULE_NAME = "diffusers_modules"
HF_MODULES_CACHE = os.getenv("HF_MODULES_CACHE", os.path.join(hf_cache_home, "modules"))
_transformers_available = importlib.util.find_spec("transformers") is not None
try:
_transformers_version = importlib_metadata.version("transformers")
logger.debug(f"Successfully imported transformers version {_transformers_version}")
except importlib_metadata.PackageNotFoundError:
_transformers_available = False
def is_transformers_available():
return _transformers_available
class RepositoryNotFoundError(HTTPError):
"""
Raised when trying to access a hf.co URL with an invalid repository name, or with a private repo name the user does
......@@ -49,3 +62,39 @@ class EntryNotFoundError(HTTPError):
class RevisionNotFoundError(HTTPError):
"""Raised when trying to access a hf.co URL with a valid repository but an invalid revision."""
TRANSFORMERS_IMPORT_ERROR = """
{0} requires the transformers library but it was not found in your environment. You can install it with pip:
`pip install transformers`
"""
BACKENDS_MAPPING = OrderedDict(
[
("transformers", (is_transformers_available, TRANSFORMERS_IMPORT_ERROR)),
]
)
def requires_backends(obj, backends):
if not isinstance(backends, (list, tuple)):
backends = [backends]
name = obj.__name__ if hasattr(obj, "__name__") else obj.__class__.__name__
checks = (BACKENDS_MAPPING[backend] for backend in backends)
failed = [msg.format(name) for available, msg in checks if not available()]
if failed:
raise ImportError("".join(failed))
class DummyObject(type):
"""
Metaclass for the dummy objects. Any class inheriting from it will return the ImportError generated by
`requires_backend` each time a user tries to access any method of that class.
"""
def __getattr__(cls, key):
if key.startswith("_"):
return super().__getattr__(cls, key)
requires_backends(cls, cls._backends)
# This file is autogenerated by the command `make fix-copies`, do not edit.
# flake8: noqa
from ..utils import DummyObject, requires_backends
class GLIDESuperResUNetModel(metaclass=DummyObject):
_backends = ["transformers"]
def __init__(self, *args, **kwargs):
requires_backends(self, ["transformers"])
class GLIDETextToImageUNetModel(metaclass=DummyObject):
_backends = ["transformers"]
def __init__(self, *args, **kwargs):
requires_backends(self, ["transformers"])
class GLIDEUNetModel(metaclass=DummyObject):
_backends = ["transformers"]
def __init__(self, *args, **kwargs):
requires_backends(self, ["transformers"])
class UNetGradTTSModel(metaclass=DummyObject):
_backends = ["transformers"]
def __init__(self, *args, **kwargs):
requires_backends(self, ["transformers"])
GLIDE = None
class GradTTS(metaclass=DummyObject):
_backends = ["transformers"]
def __init__(self, *args, **kwargs):
requires_backends(self, ["transformers"])
class LatentDiffusion(metaclass=DummyObject):
_backends = ["transformers"]
def __init__(self, *args, **kwargs):
requires_backends(self, ["transformers"])
......@@ -14,11 +14,14 @@
# limitations under the License.
import inspect
import tempfile
import unittest
import numpy as np
import torch
import pytest
from diffusers import (
BDDM,
DDIM,
......@@ -27,9 +30,12 @@ from diffusers import (
PNDM,
DDIMScheduler,
DDPMScheduler,
GLIDESuperResUNetModel,
LatentDiffusion,
PNDMScheduler,
UNetModel,
UNetLDMModel,
UNetGradTTSModel,
)
from diffusers.configuration_utils import ConfigMixin
from diffusers.pipeline_utils import DiffusionPipeline
......@@ -82,7 +88,108 @@ class ConfigTester(unittest.TestCase):
assert config == new_config
class ModelTesterMixin(unittest.TestCase):
class ModelTesterMixin:
def test_from_pretrained_save_pretrained(self):
init_dict, inputs_dict = self.prepare_init_args_and_inputs_for_common()
model = self.model_class(**init_dict)
model.to(torch_device)
model.eval()
with tempfile.TemporaryDirectory() as tmpdirname:
model.save_pretrained(tmpdirname)
new_model = self.model_class.from_pretrained(tmpdirname)
new_model.to(torch_device)
with torch.no_grad():
image = model(**inputs_dict)
new_image = new_model(**inputs_dict)
max_diff = (image - new_image).abs().sum().item()
self.assertLessEqual(max_diff, 1e-5, "Models give different forward passes")
def test_determinism(self):
init_dict, inputs_dict = self.prepare_init_args_and_inputs_for_common()
model = self.model_class(**init_dict)
model.to(torch_device)
model.eval()
with torch.no_grad():
first = model(**inputs_dict)
second = model(**inputs_dict)
out_1 = first.cpu().numpy()
out_2 = second.cpu().numpy()
out_1 = out_1[~np.isnan(out_1)]
out_2 = out_2[~np.isnan(out_2)]
max_diff = np.amax(np.abs(out_1 - out_2))
self.assertLessEqual(max_diff, 1e-5)
def test_output(self):
init_dict, inputs_dict = self.prepare_init_args_and_inputs_for_common()
model = self.model_class(**init_dict)
model.to(torch_device)
model.eval()
with torch.no_grad():
output = model(**inputs_dict)
self.assertIsNotNone(output)
expected_shape = inputs_dict["x"].shape
self.assertEqual(output.shape, expected_shape, "Input and output shapes do not match")
def test_forward_signature(self):
init_dict, _ = self.prepare_init_args_and_inputs_for_common()
model = self.model_class(**init_dict)
signature = inspect.signature(model.forward)
# signature.parameters is an OrderedDict => so arg_names order is deterministic
arg_names = [*signature.parameters.keys()]
expected_arg_names = ["x", "timesteps"]
self.assertListEqual(arg_names[:2], expected_arg_names)
def test_model_from_config(self):
init_dict, inputs_dict = self.prepare_init_args_and_inputs_for_common()
model = self.model_class(**init_dict)
model.to(torch_device)
model.eval()
# test if the model can be loaded from the config
# and has all the expected shape
with tempfile.TemporaryDirectory() as tmpdirname:
model.save_config(tmpdirname)
new_model = self.model_class.from_config(tmpdirname)
new_model.to(torch_device)
new_model.eval()
# check if all paramters shape are the same
for param_name in model.state_dict().keys():
param_1 = model.state_dict()[param_name]
param_2 = new_model.state_dict()[param_name]
self.assertEqual(param_1.shape, param_2.shape)
with torch.no_grad():
output_1 = model(**inputs_dict)
output_2 = new_model(**inputs_dict)
self.assertEqual(output_1.shape, output_2.shape)
def test_training(self):
init_dict, inputs_dict = self.prepare_init_args_and_inputs_for_common()
model = self.model_class(**init_dict)
model.to(torch_device)
model.train()
output = model(**inputs_dict)
noise = torch.randn((inputs_dict["x"].shape[0],) + self.get_output_shape).to(torch_device)
loss = torch.nn.functional.mse_loss(output, noise)
loss.backward()
class UnetModelTests(ModelTesterMixin, unittest.TestCase):
model_class = UNetModel
@property
def dummy_input(self):
batch_size = 4
......@@ -92,32 +199,289 @@ class ModelTesterMixin(unittest.TestCase):
noise = floats_tensor((batch_size, num_channels) + sizes).to(torch_device)
time_step = torch.tensor([10]).to(torch_device)
return (noise, time_step)
return {"x": noise, "timesteps": time_step}
@property
def get_input_shape(self):
return (3, 32, 32)
@property
def get_output_shape(self):
return (3, 32, 32)
def prepare_init_args_and_inputs_for_common(self):
init_dict = {
"ch": 32,
"ch_mult": (1, 2),
"num_res_blocks": 2,
"attn_resolutions": (16,),
"resolution": 32,
}
inputs_dict = self.dummy_input
return init_dict, inputs_dict
def test_from_pretrained_hub(self):
model, loading_info = UNetModel.from_pretrained("fusing/ddpm_dummy", output_loading_info=True)
self.assertIsNotNone(model)
self.assertEqual(len(loading_info["missing_keys"]), 0)
def test_from_pretrained_save_pretrained(self):
model = UNetModel(ch=32, ch_mult=(1, 2), num_res_blocks=2, attn_resolutions=(16,), resolution=32)
model.to(torch_device)
image = model(**self.dummy_input)
with tempfile.TemporaryDirectory() as tmpdirname:
model.save_pretrained(tmpdirname)
new_model = UNetModel.from_pretrained(tmpdirname)
new_model.to(torch_device)
assert image is not None, "Make sure output is not None"
def test_output_pretrained(self):
model = UNetModel.from_pretrained("fusing/ddpm_dummy")
model.eval()
dummy_input = self.dummy_input
torch.manual_seed(0)
if torch.cuda.is_available():
torch.cuda.manual_seed_all(0)
image = model(*dummy_input)
new_image = new_model(*dummy_input)
noise = torch.randn(1, model.config.in_channels, model.config.resolution, model.config.resolution)
time_step = torch.tensor([10])
assert (image - new_image).abs().sum() < 1e-5, "Models don't give the same forward pass"
with torch.no_grad():
output = model(noise, time_step)
output_slice = output[0, -1, -3:, -3:].flatten()
# fmt: off
expected_output_slice = torch.tensor([ 0.2891, -0.1899, 0.2595, -0.6214, 0.0968, -0.2622, 0.4688, 0.1311, 0.0053])
# fmt: on
self.assertTrue(torch.allclose(output_slice, expected_output_slice, atol=1e-3))
class GLIDESuperResUNetTests(ModelTesterMixin, unittest.TestCase):
model_class = GLIDESuperResUNetModel
@property
def dummy_input(self):
batch_size = 4
num_channels = 6
sizes = (32, 32)
low_res_size = (4, 4)
torch_device = "cpu"
noise = torch.randn((batch_size, num_channels // 2) + sizes).to(torch_device)
low_res = torch.randn((batch_size, 3) + low_res_size).to(torch_device)
time_step = torch.tensor([10] * noise.shape[0], device=torch_device)
return {"x": noise, "timesteps": time_step, "low_res": low_res}
@property
def get_input_shape(self):
return (3, 32, 32)
@property
def get_output_shape(self):
return (6, 32, 32)
def prepare_init_args_and_inputs_for_common(self):
init_dict = {
"attention_resolutions": (2,),
"channel_mult": (1, 2),
"in_channels": 6,
"out_channels": 6,
"model_channels": 32,
"num_head_channels": 8,
"num_heads_upsample": 1,
"num_res_blocks": 2,
"resblock_updown": True,
"resolution": 32,
"use_scale_shift_norm": True,
}
inputs_dict = self.dummy_input
return init_dict, inputs_dict
def test_output(self):
init_dict, inputs_dict = self.prepare_init_args_and_inputs_for_common()
model = self.model_class(**init_dict)
model.to(torch_device)
model.eval()
with torch.no_grad():
output = model(**inputs_dict)
output, _ = torch.split(output, 3, dim=1)
self.assertIsNotNone(output)
expected_shape = inputs_dict["x"].shape
self.assertEqual(output.shape, expected_shape, "Input and output shapes do not match")
def test_from_pretrained_hub(self):
model = UNetModel.from_pretrained("fusing/ddpm_dummy")
model, loading_info = GLIDESuperResUNetModel.from_pretrained(
"fusing/glide-super-res-dummy", output_loading_info=True
)
self.assertIsNotNone(model)
self.assertEqual(len(loading_info["missing_keys"]), 0)
model.to(torch_device)
image = model(**self.dummy_input)
assert image is not None, "Make sure output is not None"
def test_output_pretrained(self):
model = GLIDESuperResUNetModel.from_pretrained("fusing/glide-super-res-dummy")
torch.manual_seed(0)
if torch.cuda.is_available():
torch.cuda.manual_seed_all(0)
noise = torch.randn(1, 3, 64, 64)
low_res = torch.randn(1, 3, 4, 4)
time_step = torch.tensor([42] * noise.shape[0])
with torch.no_grad():
output = model(noise, time_step, low_res)
output, _ = torch.split(output, 3, dim=1)
output_slice = output[0, -1, -3:, -3:].flatten()
# fmt: off
expected_output_slice = torch.tensor([-22.8782, -23.2652, -15.3966, -22.8034, -23.3159, -15.5640, -15.3970, -15.4614, - 10.4370])
# fmt: on
self.assertTrue(torch.allclose(output_slice, expected_output_slice, atol=1e-3))
class UNetLDMModelTests(ModelTesterMixin, unittest.TestCase):
model_class = UNetLDMModel
@property
def dummy_input(self):
batch_size = 4
num_channels = 4
sizes = (32, 32)
noise = floats_tensor((batch_size, num_channels) + sizes).to(torch_device)
time_step = torch.tensor([10]).to(torch_device)
return {"x": noise, "timesteps": time_step}
@property
def get_input_shape(self):
return (4, 32, 32)
@property
def get_output_shape(self):
return (4, 32, 32)
def prepare_init_args_and_inputs_for_common(self):
init_dict = {
"image_size": 32,
"in_channels": 4,
"out_channels": 4,
"model_channels": 32,
"num_res_blocks": 2,
"attention_resolutions": (16,),
"channel_mult": (1, 2),
"num_heads": 2,
"conv_resample": True,
}
inputs_dict = self.dummy_input
return init_dict, inputs_dict
def test_from_pretrained_hub(self):
model, loading_info = UNetLDMModel.from_pretrained("fusing/unet-ldm-dummy", output_loading_info=True)
self.assertIsNotNone(model)
self.assertEqual(len(loading_info["missing_keys"]), 0)
model.to(torch_device)
image = model(**self.dummy_input)
assert image is not None, "Make sure output is not None"
def test_output_pretrained(self):
model = UNetLDMModel.from_pretrained("fusing/unet-ldm-dummy")
model.eval()
torch.manual_seed(0)
if torch.cuda.is_available():
torch.cuda.manual_seed_all(0)
noise = torch.randn(1, model.config.in_channels, model.config.image_size, model.config.image_size)
time_step = torch.tensor([10] * noise.shape[0])
with torch.no_grad():
output = model(noise, time_step)
output_slice = output[0, -1, -3:, -3:].flatten()
# fmt: off
expected_output_slice = torch.tensor([-13.3258, -20.1100, -15.9873, -17.6617, -23.0596, -17.9419, -13.3675, -16.1889, -12.3800])
# fmt: on
self.assertTrue(torch.allclose(output_slice, expected_output_slice, atol=1e-3))
class UNetGradTTSModelTests(ModelTesterMixin, unittest.TestCase):
model_class = UNetGradTTSModel
@property
def dummy_input(self):
batch_size = 4
num_features = 32
seq_len = 16
noise = floats_tensor((batch_size, num_features, seq_len)).to(torch_device)
condition = floats_tensor((batch_size, num_features, seq_len)).to(torch_device)
mask = floats_tensor((batch_size, 1, seq_len)).to(torch_device)
time_step = torch.tensor([10] * batch_size).to(torch_device)
return {"x": noise, "timesteps": time_step, "mu": condition, "mask": mask}
image = model(*self.dummy_input)
@property
def get_input_shape(self):
return (4, 32, 16)
@property
def get_output_shape(self):
return (4, 32, 16)
def prepare_init_args_and_inputs_for_common(self):
init_dict = {
"dim": 64,
"groups": 4,
"dim_mults": (1, 2),
"n_feats": 32,
"pe_scale": 1000,
"n_spks": 1,
}
inputs_dict = self.dummy_input
return init_dict, inputs_dict
def test_from_pretrained_hub(self):
model, loading_info = UNetGradTTSModel.from_pretrained("fusing/unet-grad-tts-dummy", output_loading_info=True)
self.assertIsNotNone(model)
self.assertEqual(len(loading_info["missing_keys"]), 0)
model.to(torch_device)
image = model(**self.dummy_input)
assert image is not None, "Make sure output is not None"
def test_output_pretrained(self):
model = UNetGradTTSModel.from_pretrained("fusing/unet-grad-tts-dummy")
model.eval()
torch.manual_seed(0)
if torch.cuda.is_available():
torch.cuda.manual_seed_all(0)
num_features = model.config.n_feats
seq_len = 16
noise = torch.randn((1, num_features, seq_len))
condition = torch.randn((1, num_features, seq_len))
mask = torch.randn((1, 1, seq_len))
time_step = torch.tensor([10])
with torch.no_grad():
output = model(noise, time_step, condition, mask)
output_slice = output[0, -3:, -3:].flatten()
# fmt: off
expected_output_slice = torch.tensor([-0.0690, -0.0531, 0.0633, -0.0660, -0.0541, 0.0650, -0.0656, -0.0555, 0.0617])
# fmt: on
self.assertTrue(torch.allclose(output_slice, expected_output_slice, atol=1e-3))
class PipelineTesterMixin(unittest.TestCase):
def test_from_pretrained_save_pretrained(self):
......@@ -223,7 +587,6 @@ class PipelineTesterMixin(unittest.TestCase):
image = ldm([prompt], generator=generator, num_inference_steps=20)
image_slice = image[0, -1, -3:, -3:].cpu()
print(image_slice.shape)
assert image.shape == (1, 3, 256, 256)
expected_slice = torch.tensor([0.7295, 0.7358, 0.7256, 0.7435, 0.7095, 0.6884, 0.7325, 0.6921, 0.6458])
......
......@@ -20,10 +20,10 @@ import re
# All paths are set with the intent you should run this script from the root of the repo with the command
# python utils/check_dummies.py
PATH_TO_TRANSFORMERS = "src/transformers"
PATH_TO_DIFFUSERS = "src/diffusers"
# Matches is_xxx_available()
_re_backend = re.compile(r"is\_([a-z_]*)_available()")
_re_backend = re.compile(r"if is\_([a-z_]*)_available\(\)")
# Matches from xxx import bla
_re_single_line_import = re.compile(r"\s+from\s+\S*\s+import\s+([^\(\s].*)\n")
_re_test_backend = re.compile(r"^\s+if\s+not\s+is\_[a-z]*\_available\(\)")
......@@ -50,36 +50,30 @@ def {0}(*args, **kwargs):
def find_backend(line):
"""Find one (or multiple) backend in a code line of the init."""
if _re_test_backend.search(line) is None:
backends = _re_backend.findall(line)
if len(backends) == 0:
return None
backends = [b[0] for b in _re_backend.findall(line)]
backends.sort()
return "_and_".join(backends)
return backends[0]
def read_init():
"""Read the init and extracts PyTorch, TensorFlow, SentencePiece and Tokenizers objects."""
with open(os.path.join(PATH_TO_TRANSFORMERS, "__init__.py"), "r", encoding="utf-8", newline="\n") as f:
with open(os.path.join(PATH_TO_DIFFUSERS, "__init__.py"), "r", encoding="utf-8", newline="\n") as f:
lines = f.readlines()
# Get to the point we do the actual imports for type checking
line_index = 0
while not lines[line_index].startswith("if TYPE_CHECKING"):
line_index += 1
backend_specific_objects = {}
# Go through the end of the file
while line_index < len(lines):
# If the line is an if is_backend_available, we grab all objects associated.
backend = find_backend(lines[line_index])
if backend is not None:
while not lines[line_index].startswith(" else:"):
line_index += 1
line_index += 1
objects = []
line_index += 1
# Until we unindent, add backend objects to the list
while len(lines[line_index]) <= 1 or lines[line_index].startswith(" " * 8):
while not lines[line_index].startswith("else:"):
line = lines[line_index]
single_line_import_search = _re_single_line_import.search(line)
if single_line_import_search is not None:
......@@ -129,7 +123,7 @@ def check_dummies(overwrite=False):
short_names = {"torch": "pt"}
# Locate actual dummy modules and read their content.
path = os.path.join(PATH_TO_TRANSFORMERS, "utils")
path = os.path.join(PATH_TO_DIFFUSERS, "utils")
dummy_file_paths = {
backend: os.path.join(path, f"dummy_{short_names.get(backend, backend)}_objects.py")
for backend in dummy_files.keys()
......@@ -147,7 +141,7 @@ def check_dummies(overwrite=False):
if dummy_files[backend] != actual_dummies[backend]:
if overwrite:
print(
f"Updating transformers.utils.dummy_{short_names.get(backend, backend)}_objects.py as the main "
f"Updating diffusers.utils.dummy_{short_names.get(backend, backend)}_objects.py as the main "
"__init__ has new objects."
)
with open(dummy_file_paths[backend], "w", encoding="utf-8", newline="\n") as f:
......@@ -155,7 +149,7 @@ def check_dummies(overwrite=False):
else:
raise ValueError(
"The main __init__ has objects that are not present in "
f"transformers.utils.dummy_{short_names.get(backend, backend)}_objects.py. Run `make fix-copies` "
f"diffusers.utils.dummy_{short_names.get(backend, backend)}_objects.py. Run `make fix-copies` "
"to fix this."
)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment