Commit 346d2571 authored by luopl's avatar luopl
Browse files

init

parents
Pipeline #1802 failed with stages
in 0 seconds
import torch
from torch import distributed as dist
from torch import nn
from torch.nn import functional as F
from ..base_module import BaseModule
from ...utils import DistriConfig
class DistriConv2dPP(BaseModule):
def __init__(self, module: nn.Conv2d, distri_config: DistriConfig, is_first_layer: bool = False):
super(DistriConv2dPP, self).__init__(module, distri_config)
self.is_first_layer = is_first_layer
def naive_forward(self, x: torch.Tensor) -> torch.Tensor:
# x: [B, C, H, W]
output = self.module(x)
return output
def sliced_forward(self, x: torch.Tensor) -> torch.Tensor:
config = self.distri_config
b, c, h, w = x.shape
assert h % config.n_device_per_batch == 0
stride = self.module.stride[0]
padding = self.module.padding[0]
output_h = x.shape[2] // stride // config.n_device_per_batch
idx = config.split_idx()
h_begin = output_h * idx * stride - padding
h_end = output_h * (idx + 1) * stride + padding
final_padding = [padding, padding, 0, 0]
if h_begin < 0:
h_begin = 0
final_padding[2] = padding
if h_end > h:
h_end = h
final_padding[3] = padding
sliced_input = x[:, :, h_begin:h_end, :]
padded_input = F.pad(sliced_input, final_padding, mode="constant")
return F.conv2d(padded_input, self.module.weight, self.module.bias, stride=stride, padding="valid")
def forward(self, x: torch.Tensor, *args, **kwargs) -> torch.Tensor:
distri_config = self.distri_config
if self.comm_manager is not None and self.comm_manager.handles is not None and self.idx is not None:
if self.comm_manager.handles[self.idx] is not None:
self.comm_manager.handles[self.idx].wait()
self.comm_manager.handles[self.idx] = None
if distri_config.n_device_per_batch == 1:
output = self.naive_forward(x)
else:
if self.is_first_layer:
full_x = x
output = self.sliced_forward(full_x)
else:
boundary_size = self.module.padding[0]
if self.buffer_list is None:
if self.comm_manager.buffer_list is None:
self.idx = self.comm_manager.register_tensor(
shape=[2, x.shape[0], x.shape[1], boundary_size, x.shape[3]],
torch_dtype=x.dtype,
layer_type="conv2d",
)
else:
self.buffer_list = self.comm_manager.get_buffer_list(self.idx)
if self.buffer_list is None:
output = self.naive_forward(x)
else:
def create_padded_x():
if distri_config.split_idx() == 0:
concat_x = torch.cat([x, self.buffer_list[distri_config.split_idx() + 1][0]], dim=2)
padded_x = F.pad(concat_x, [0, 0, boundary_size, 0], mode="constant")
elif distri_config.split_idx() == distri_config.n_device_per_batch - 1:
concat_x = torch.cat([self.buffer_list[distri_config.split_idx() - 1][1], x], dim=2)
padded_x = F.pad(concat_x, [0, 0, 0, boundary_size], mode="constant")
else:
padded_x = torch.cat(
[
self.buffer_list[distri_config.split_idx() - 1][1],
x,
self.buffer_list[distri_config.split_idx() + 1][0],
],
dim=2,
)
return padded_x
boundary = torch.stack([x[:, :, :boundary_size, :], x[:, :, -boundary_size:, :]], dim=0)
if distri_config.mode == "full_sync" or self.counter <= distri_config.warmup_steps:
dist.all_gather(self.buffer_list, boundary, group=distri_config.batch_group, async_op=False)
padded_x = create_padded_x()
output = F.conv2d(
padded_x,
self.module.weight,
self.module.bias,
stride=self.module.stride[0],
padding=(0, self.module.padding[1]),
)
else:
padded_x = create_padded_x()
output = F.conv2d(
padded_x,
self.module.weight,
self.module.bias,
stride=self.module.stride[0],
padding=(0, self.module.padding[1]),
)
if distri_config.mode != "no_sync":
self.comm_manager.enqueue(self.idx, boundary)
self.counter += 1
return output
import torch
from torch import distributed as dist
from torch import nn
from ..base_module import BaseModule
from ...utils import DistriConfig
class DistriGroupNorm(BaseModule):
def __init__(self, module: nn.GroupNorm, distri_config: DistriConfig):
assert isinstance(module, nn.GroupNorm)
super(DistriGroupNorm, self).__init__(module, distri_config)
def forward(self, x: torch.Tensor) -> torch.Tensor:
module = self.module
assert isinstance(module, nn.GroupNorm)
distri_config = self.distri_config
if self.comm_manager is not None and self.comm_manager.handles is not None and self.idx is not None:
if self.comm_manager.handles[self.idx] is not None:
self.comm_manager.handles[self.idx].wait()
self.comm_manager.handles[self.idx] = None
assert x.ndim == 4
n, c, h, w = x.shape
num_groups = module.num_groups
group_size = c // num_groups
if distri_config.mode in ["stale_gn", "corrected_async_gn"]:
if self.buffer_list is None:
if self.comm_manager.buffer_list is None:
n, c, h, w = x.shape
self.idx = self.comm_manager.register_tensor(
shape=[2, n, num_groups, 1, 1, 1], torch_dtype=x.dtype, layer_type="gn"
)
else:
self.buffer_list = self.comm_manager.get_buffer_list(self.idx)
x = x.view([n, num_groups, group_size, h, w])
x_mean = x.mean(dim=[2, 3, 4], keepdim=True) # [1, num_groups, 1, 1, 1]
x2_mean = (x**2).mean(dim=[2, 3, 4], keepdim=True) # [1, num_groups, 1, 1, 1]
slice_mean = torch.stack([x_mean, x2_mean], dim=0)
if self.buffer_list is None:
full_mean = slice_mean
elif self.counter <= distri_config.warmup_steps:
dist.all_gather(self.buffer_list, slice_mean, group=distri_config.batch_group, async_op=False)
full_mean = sum(self.buffer_list) / distri_config.n_device_per_batch
else:
if distri_config.mode == "corrected_async_gn":
correction = slice_mean - self.buffer_list[distri_config.split_idx()]
full_mean = sum(self.buffer_list) / distri_config.n_device_per_batch + correction
else:
new_buffer_list = [buffer for buffer in self.buffer_list]
new_buffer_list[distri_config.split_idx()] = slice_mean
full_mean = sum(new_buffer_list) / distri_config.n_device_per_batch
self.comm_manager.enqueue(self.idx, slice_mean)
full_x_mean, full_x2_mean = full_mean[0], full_mean[1]
var = full_x2_mean - full_x_mean**2
if distri_config.mode == "corrected_async_gn":
slice_x_mean, slice_x2_mean = slice_mean[0], slice_mean[1]
slice_var = slice_x2_mean - slice_x_mean**2
var = torch.where(var < 0, slice_var, var) # Correct negative variance
num_elements = group_size * h * w
var = var * (num_elements / (num_elements - 1))
std = (var + module.eps).sqrt()
output = (x - full_x_mean) / std
output = output.view([n, c, h, w])
if module.affine:
output = output * module.weight.view([1, -1, 1, 1])
output = output + module.bias.view([1, -1, 1, 1])
else:
if self.counter <= distri_config.warmup_steps or distri_config.mode in ["sync_gn", "full_sync"]:
x = x.view([n, num_groups, group_size, h, w])
x_mean = x.mean(dim=[2, 3, 4], keepdim=True) # [1, num_groups, 1, 1, 1]
x2_mean = (x**2).mean(dim=[2, 3, 4], keepdim=True) # [1, num_groups, 1, 1, 1]
mean = torch.stack([x_mean, x2_mean], dim=0)
dist.all_reduce(mean, op=dist.ReduceOp.SUM, group=distri_config.batch_group)
mean = mean / distri_config.n_device_per_batch
x_mean = mean[0]
x2_mean = mean[1]
var = x2_mean - x_mean**2
num_elements = group_size * h * w
var = var * (num_elements / (num_elements - 1))
std = (var + module.eps).sqrt()
output = (x - x_mean) / std
output = output.view([n, c, h, w])
if module.affine:
output = output * module.weight.view([1, -1, 1, 1])
output = output + module.bias.view([1, -1, 1, 1])
elif distri_config.mode in ["separate_gn", "no_sync"]:
output = module(x)
else:
raise NotImplementedError
self.counter += 1
return output
import torch.cuda
from diffusers.models.attention_processor import Attention
from torch import distributed as dist
from torch import nn
from torch.nn import functional as F
from ..base_module import BaseModule
from ...utils import DistriConfig
class DistriAttentionTP(BaseModule):
def __init__(self, module: Attention, distri_config: DistriConfig):
super(DistriAttentionTP, self).__init__(module, distri_config)
heads = module.heads
sliced_heads = heads // distri_config.n_device_per_batch
remainder_heads = heads % distri_config.n_device_per_batch
if distri_config.split_idx() < remainder_heads:
sliced_heads += 1
self.sliced_heads = sliced_heads
if sliced_heads > 0:
if distri_config.split_idx() < remainder_heads:
start_head = distri_config.split_idx() * sliced_heads
else:
start_head = (
remainder_heads * (sliced_heads + 1) + (distri_config.split_idx() - remainder_heads) * sliced_heads
)
end_head = start_head + sliced_heads
dim = module.to_q.out_features // heads
sharded_to_q = nn.Linear(
module.to_q.in_features,
sliced_heads * dim,
bias=module.to_q.bias is not None,
device=module.to_q.weight.device,
dtype=module.to_q.weight.dtype,
)
sharded_to_q.weight.data.copy_(module.to_q.weight.data[start_head * dim : end_head * dim])
if module.to_q.bias is not None:
sharded_to_q.bias.data.copy_(module.to_q.bias.data[start_head * dim : end_head * dim])
sharded_to_k = nn.Linear(
module.to_k.in_features,
sliced_heads * dim,
bias=module.to_k.bias is not None,
device=module.to_k.weight.device,
dtype=module.to_k.weight.dtype,
)
sharded_to_k.weight.data.copy_(module.to_k.weight.data[start_head * dim : end_head * dim])
if module.to_k.bias is not None:
sharded_to_k.bias.data.copy_(module.to_k.bias.data[start_head * dim : end_head * dim])
sharded_to_v = nn.Linear(
module.to_v.in_features,
sliced_heads * dim,
bias=module.to_v.bias is not None,
device=module.to_v.weight.device,
dtype=module.to_v.weight.dtype,
)
sharded_to_v.weight.data.copy_(module.to_v.weight.data[start_head * dim : end_head * dim])
if module.to_v.bias is not None:
sharded_to_v.bias.data.copy_(module.to_v.bias.data[start_head * dim : end_head * dim])
sharded_to_out = nn.Linear(
sliced_heads * dim,
module.to_out[0].out_features,
bias=module.to_out[0].bias is not None,
device=module.to_out[0].weight.device,
dtype=module.to_out[0].weight.dtype,
)
sharded_to_out.weight.data.copy_(module.to_out[0].weight.data[:, start_head * dim : end_head * dim])
if module.to_out[0].bias is not None:
sharded_to_out.bias.data.copy_(module.to_out[0].bias.data)
del module.to_q
del module.to_k
del module.to_v
old_to_out = module.to_out[0]
module.to_q = sharded_to_q
module.to_k = sharded_to_k
module.to_v = sharded_to_v
module.to_out[0] = sharded_to_out
module.heads = sliced_heads
del old_to_out
torch.cuda.empty_cache()
def forward(
self,
hidden_states: torch.FloatTensor,
encoder_hidden_states: torch.FloatTensor or None = None,
attention_mask: torch.FloatTensor or None = None,
**cross_attention_kwargs,
) -> torch.Tensor:
distri_config = self.distri_config
module = self.module
residual = hidden_states
if self.sliced_heads > 0:
input_ndim = hidden_states.ndim
assert input_ndim == 3
batch_size, sequence_length, _ = (
hidden_states.shape if encoder_hidden_states is None else encoder_hidden_states.shape
)
if attention_mask is not None:
attention_mask = module.prepare_attention_mask(attention_mask, sequence_length, batch_size)
# scaled_dot_product_attention expects attention_mask shape to be
# (batch, heads, source_length, target_length)
attention_mask = attention_mask.view(batch_size, module.heads, -1, attention_mask.shape[-1])
if module.group_norm is not None:
hidden_states = module.group_norm(hidden_states.transpose(1, 2)).transpose(1, 2)
query = module.to_q(hidden_states)
if encoder_hidden_states is None:
encoder_hidden_states = hidden_states
elif module.norm_cross:
encoder_hidden_states = module.norm_encoder_hidden_states(encoder_hidden_states)
key = module.to_k(encoder_hidden_states)
value = module.to_v(encoder_hidden_states)
inner_dim = key.shape[-1]
head_dim = inner_dim // module.heads
query = query.view(batch_size, -1, module.heads, head_dim).transpose(1, 2)
key = key.view(batch_size, -1, module.heads, head_dim).transpose(1, 2)
value = value.view(batch_size, -1, module.heads, head_dim).transpose(1, 2)
# the output of sdp = (batch, num_heads, seq_len, head_dim)
# TODO: add support for attn.scale when we move to Torch 2.1
hidden_states = F.scaled_dot_product_attention(
query, key, value, attn_mask=attention_mask, dropout_p=0.0, is_causal=False
)
hidden_states = hidden_states.transpose(1, 2).reshape(batch_size, -1, module.heads * head_dim)
hidden_states = hidden_states.to(query.dtype)
# linear proj
hidden_states = F.linear(hidden_states, module.to_out[0].weight, bias=None)
# dropout
hidden_states = module.to_out[1](hidden_states)
else:
hidden_states = torch.zeros(
[hidden_states.shape[0], hidden_states.shape[1], module.to_out[0].out_features],
device=hidden_states.device,
dtype=hidden_states.dtype,
)
dist.all_reduce(hidden_states, op=dist.ReduceOp.SUM, group=distri_config.batch_group, async_op=False)
if module.to_out[0].bias is not None:
hidden_states = hidden_states + module.to_out[0].bias.view(1, 1, -1)
if module.residual_connection:
hidden_states = hidden_states + residual
hidden_states = hidden_states / module.rescale_output_factor
self.counter += 1
return hidden_states
import torch
from torch import distributed as dist
from torch import nn
from torch.nn import functional as F
from ..base_module import BaseModule
from ...utils import DistriConfig
class DistriConv2dTP(BaseModule):
def __init__(self, module: nn.Conv2d, distri_config: DistriConfig):
super(DistriConv2dTP, self).__init__(module, distri_config)
assert module.in_channels % distri_config.n_device_per_batch == 0
sharded_module = nn.Conv2d(
module.in_channels // distri_config.n_device_per_batch,
module.out_channels,
module.kernel_size,
module.stride,
module.padding,
module.dilation,
module.groups,
module.bias is not None,
module.padding_mode,
device=module.weight.device,
dtype=module.weight.dtype,
)
start_idx = distri_config.split_idx() * (module.in_channels // distri_config.n_device_per_batch)
end_idx = (distri_config.split_idx() + 1) * (module.in_channels // distri_config.n_device_per_batch)
sharded_module.weight.data.copy_(module.weight.data[:, start_idx:end_idx])
if module.bias is not None:
sharded_module.bias.data.copy_(module.bias.data)
self.module = sharded_module
del module
def forward(self, x: torch.Tensor) -> torch.Tensor:
distri_config = self.distri_config
b, c, h, w = x.shape
start_idx = distri_config.split_idx() * (c // distri_config.n_device_per_batch)
end_idx = (distri_config.split_idx() + 1) * (c // distri_config.n_device_per_batch)
output = F.conv2d(
x[:, start_idx:end_idx],
self.module.weight,
bias=None,
stride=self.module.stride,
padding=self.module.padding,
dilation=self.module.dilation,
groups=self.module.groups,
)
dist.all_reduce(output, op=dist.ReduceOp.SUM, group=distri_config.batch_group, async_op=False)
if self.module.bias is not None:
output = output + self.module.bias.view(1, -1, 1, 1)
self.counter += 1
return output
import torch.cuda
from diffusers.models.attention import FeedForward, GEGLU
from torch import distributed as dist
from torch import nn
from torch.nn import functional as F
from ..base_module import BaseModule
from ...utils import DistriConfig
class DistriFeedForwardTP(BaseModule):
def __init__(self, module: FeedForward, distri_config: DistriConfig):
super(DistriFeedForwardTP, self).__init__(module, distri_config)
assert isinstance(module.net[0], GEGLU)
assert module.net[0].proj.out_features % (distri_config.n_device_per_batch * 2) == 0
assert module.net[2].in_features % distri_config.n_device_per_batch == 0
mid_features = module.net[2].in_features // distri_config.n_device_per_batch
sharded_fc1 = nn.Linear(
module.net[0].proj.in_features,
mid_features * 2,
bias=module.net[0].proj.bias is not None,
device=module.net[0].proj.weight.device,
dtype=module.net[0].proj.weight.dtype,
)
start_idx = distri_config.split_idx() * mid_features
end_idx = (distri_config.split_idx() + 1) * mid_features
sharded_fc1.weight.data[:mid_features].copy_(module.net[0].proj.weight.data[start_idx:end_idx])
if module.net[0].proj.bias is not None:
sharded_fc1.bias.data[:mid_features].copy_(module.net[0].proj.bias.data[start_idx:end_idx])
start_idx = (distri_config.n_device_per_batch + distri_config.split_idx()) * mid_features
end_idx = (distri_config.n_device_per_batch + distri_config.split_idx() + 1) * mid_features
sharded_fc1.weight.data[mid_features:].copy_(module.net[0].proj.weight.data[start_idx:end_idx])
if module.net[0].proj.bias is not None:
sharded_fc1.bias.data[mid_features:].copy_(module.net[0].proj.bias.data[start_idx:end_idx])
sharded_fc2 = nn.Linear(
mid_features,
module.net[2].out_features,
bias=module.net[2].bias is not None,
device=module.net[2].weight.device,
dtype=module.net[2].weight.dtype,
)
sharded_fc2.weight.data.copy_(
module.net[2].weight.data[
:, distri_config.split_idx() * mid_features : (distri_config.split_idx() + 1) * mid_features
]
)
if module.net[2].bias is not None:
sharded_fc2.bias.data.copy_(module.net[2].bias.data)
old_fc1 = module.net[0].proj
old_fc2 = module.net[2]
module.net[0].proj = sharded_fc1
module.net[2] = sharded_fc2
del old_fc1
del old_fc2
torch.cuda.empty_cache()
def forward(self, hidden_states: torch.Tensor, scale: float = 1.0) -> torch.Tensor:
distri_config = self.distri_config
module = self.module
assert scale == 1.0
for i, submodule in enumerate(module.net):
if i == 0:
hidden_states, gate = submodule.proj(hidden_states).chunk(2, dim=-1)
hidden_states = hidden_states * submodule.gelu(gate)
elif i == 2:
hidden_states = F.linear(hidden_states, submodule.weight, None)
else:
hidden_states = submodule(hidden_states)
dist.all_reduce(hidden_states, op=dist.ReduceOp.SUM, group=distri_config.batch_group, async_op=False)
if module.net[2].bias is not None:
hidden_states = hidden_states + module.net[2].bias.view(1, 1, -1)
self.counter += 1
return hidden_states
import torch.cuda
from diffusers.models.resnet import Downsample2D, ResnetBlock2D, Upsample2D, USE_PEFT_BACKEND
from torch import distributed as dist
from torch import nn
from torch.nn import functional as F
from ..base_module import BaseModule
from ...utils import DistriConfig
class DistriResnetBlock2DTP(BaseModule):
def __init__(self, module: ResnetBlock2D, distri_config: DistriConfig):
super(DistriResnetBlock2DTP, self).__init__(module, distri_config)
assert module.conv1.out_channels % distri_config.n_device_per_batch == 0
mid_channels = module.conv1.out_channels // distri_config.n_device_per_batch
sharded_conv1 = nn.Conv2d(
module.conv1.in_channels,
mid_channels,
module.conv1.kernel_size,
module.conv1.stride,
module.conv1.padding,
module.conv1.dilation,
module.conv1.groups,
module.conv1.bias is not None,
module.conv1.padding_mode,
device=module.conv1.weight.device,
dtype=module.conv1.weight.dtype,
)
sharded_conv1.weight.data.copy_(
module.conv1.weight.data[
distri_config.split_idx() * mid_channels : (distri_config.split_idx() + 1) * mid_channels
]
)
if module.conv1.bias is not None:
sharded_conv1.bias.data.copy_(
module.conv1.bias.data[
distri_config.split_idx() * mid_channels : (distri_config.split_idx() + 1) * mid_channels
]
)
sharded_conv2 = nn.Conv2d(
mid_channels,
module.conv2.out_channels,
module.conv2.kernel_size,
module.conv2.stride,
module.conv2.padding,
module.conv2.dilation,
module.conv2.groups,
module.conv2.bias is not None,
module.conv2.padding_mode,
device=module.conv2.weight.device,
dtype=module.conv2.weight.dtype,
)
sharded_conv2.weight.data.copy_(
module.conv2.weight.data[
:, distri_config.split_idx() * mid_channels : (distri_config.split_idx() + 1) * mid_channels
]
)
if module.conv2.bias is not None:
sharded_conv2.bias.data.copy_(module.conv2.bias.data)
assert module.time_emb_proj is not None
assert module.time_embedding_norm == "default"
sharded_time_emb_proj = nn.Linear(
module.time_emb_proj.in_features,
mid_channels,
bias=module.time_emb_proj.bias is not None,
device=module.time_emb_proj.weight.device,
dtype=module.time_emb_proj.weight.dtype,
)
sharded_time_emb_proj.weight.data.copy_(
module.time_emb_proj.weight.data[
distri_config.split_idx() * mid_channels : (distri_config.split_idx() + 1) * mid_channels
]
)
if module.time_emb_proj.bias is not None:
sharded_time_emb_proj.bias.data.copy_(
module.time_emb_proj.bias.data[
distri_config.split_idx() * mid_channels : (distri_config.split_idx() + 1) * mid_channels
]
)
sharded_norm2 = nn.GroupNorm(
module.norm2.num_groups // distri_config.n_device_per_batch,
mid_channels,
module.norm2.eps,
module.norm2.affine,
device=module.norm2.weight.device,
dtype=module.norm2.weight.dtype,
)
if module.norm2.affine:
sharded_norm2.weight.data.copy_(
module.norm2.weight.data[
distri_config.split_idx() * mid_channels : (distri_config.split_idx() + 1) * mid_channels
]
)
sharded_norm2.bias.data.copy_(
module.norm2.bias.data[
distri_config.split_idx() * mid_channels : (distri_config.split_idx() + 1) * mid_channels
]
)
del module.conv1
del module.conv2
del module.time_emb_proj
del module.norm2
module.conv1 = sharded_conv1
module.conv2 = sharded_conv2
module.time_emb_proj = sharded_time_emb_proj
module.norm2 = sharded_norm2
torch.cuda.empty_cache()
def forward(
self,
input_tensor: torch.FloatTensor,
temb: torch.FloatTensor,
scale: float = 1.0,
) -> torch.FloatTensor:
assert scale == 1.0
distri_config = self.distri_config
module = self.module
hidden_states = input_tensor
hidden_states = module.norm1(hidden_states)
hidden_states = module.nonlinearity(hidden_states)
if module.upsample is not None:
# upsample_nearest_nhwc fails with large batch sizes. see https://github.com/huggingface/diffusers/issues/984
if hidden_states.shape[0] >= 64:
input_tensor = input_tensor.contiguous()
hidden_states = hidden_states.contiguous()
input_tensor = (
module.upsample(input_tensor, scale=scale)
if isinstance(module.upsample, Upsample2D)
else module.upsample(input_tensor)
)
hidden_states = (
module.upsample(hidden_states, scale=scale)
if isinstance(module.upsample, Upsample2D)
else module.upsample(hidden_states)
)
elif module.downsample is not None:
input_tensor = (
module.downsample(input_tensor, scale=scale)
if isinstance(module.downsample, Downsample2D)
else module.downsample(input_tensor)
)
hidden_states = (
module.downsample(hidden_states, scale=scale)
if isinstance(module.downsample, Downsample2D)
else module.downsample(hidden_states)
)
hidden_states = module.conv1(hidden_states)
if module.time_emb_proj is not None:
if not module.skip_time_act:
temb = module.nonlinearity(temb)
temb = module.time_emb_proj(temb)[:, :, None, None]
if temb is not None and module.time_embedding_norm == "default":
hidden_states = hidden_states + temb
hidden_states = module.norm2(hidden_states)
if temb is not None and module.time_embedding_norm == "scale_shift":
scale, shift = torch.chunk(temb, 2, dim=1)
hidden_states = hidden_states * (1 + scale) + shift
hidden_states = module.nonlinearity(hidden_states)
hidden_states = module.dropout(hidden_states)
hidden_states = F.conv2d(
hidden_states,
module.conv2.weight,
bias=None,
stride=module.conv2.stride,
padding=module.conv2.padding,
dilation=module.conv2.dilation,
groups=module.conv2.groups,
)
dist.all_reduce(hidden_states, op=dist.ReduceOp.SUM, group=distri_config.batch_group, async_op=False)
if module.conv2.bias is not None:
hidden_states = hidden_states + module.conv2.bias.view(1, -1, 1, 1)
if module.conv_shortcut is not None:
input_tensor = (
module.conv_shortcut(input_tensor, scale) if not USE_PEFT_BACKEND else self.conv_shortcut(input_tensor)
)
output_tensor = (input_tensor + hidden_states) / module.output_scale_factor
self.counter += 1
return output_tensor
import torch
from packaging import version
from torch import distributed as dist
from typing import List, Tuple
def check_env():
if version.parse(torch.version.cuda) < version.parse("11.3"):
# https://docs.nvidia.com/deeplearning/nccl/user-guide/docs/usage/cudagraph.html
raise RuntimeError("NCCL CUDA Graph support requires CUDA 11.3 or above")
if version.parse(version.parse(torch.__version__).base_version) < version.parse("2.2.0"):
# https://pytorch.org/blog/accelerating-pytorch-with-cuda-graphs/
raise RuntimeError(
"CUDAGraph with NCCL support requires PyTorch 2.2.0 or above. "
"If it is not released yet, please install nightly built PyTorch with "
"`pip3 install --pre torch torchvision torchaudio --index-url https://download.pytorch.org/whl/nightly/cu121`"
)
def is_power_of_2(n: int) -> bool:
return (n & (n - 1) == 0) and n != 0
class DistriConfig:
def __init__(
self,
height: int = 1024,
width: int = 1024,
do_classifier_free_guidance: bool = True,
split_batch: bool = True,
warmup_steps: int = 4,
comm_checkpoint: int = 60,
mode: str = "corrected_async_gn",
use_cuda_graph: bool = True,
parallelism: str = "patch",
split_scheme: str = "row",
verbose: bool = False,
):
try:
# Initialize the process group
dist.init_process_group("nccl")
# Get the rank and world_size
rank = dist.get_rank()
world_size = dist.get_world_size()
except Exception as e:
rank = 0
world_size = 1
print(f"Failed to initialize process group: {e}, falling back to single GPU")
assert is_power_of_2(world_size)
#check_env()
self.world_size = world_size
self.rank = rank
self.height = height
self.width = width
self.do_classifier_free_guidance = do_classifier_free_guidance
self.split_batch = split_batch
self.warmup_steps = warmup_steps
self.comm_checkpoint = comm_checkpoint
self.mode = mode
self.use_cuda_graph = use_cuda_graph
self.parallelism = parallelism
self.split_scheme = split_scheme
self.verbose = verbose
if do_classifier_free_guidance and split_batch:
n_device_per_batch = world_size // 2
if n_device_per_batch == 0:
n_device_per_batch = 1
else:
n_device_per_batch = world_size
self.n_device_per_batch = n_device_per_batch
self.height = height
self.width = width
device = torch.device(f"cuda:{rank}")
torch.cuda.set_device(device)
self.device = device
batch_group = None
split_group = None
if do_classifier_free_guidance and split_batch and world_size >= 2:
batch_groups = []
for i in range(2):
batch_groups.append(dist.new_group(list(range(i * (world_size // 2), (i + 1) * (world_size // 2)))))
batch_group = batch_groups[self.batch_idx()]
split_groups = []
for i in range(world_size // 2):
split_groups.append(dist.new_group([i, i + world_size // 2]))
split_group = split_groups[self.split_idx()]
self.batch_group = batch_group
self.split_group = split_group
def batch_idx(self, rank: int or None = None) -> int:
if rank is None:
rank = self.rank
if self.do_classifier_free_guidance and self.split_batch:
return 1 - int(rank < (self.world_size // 2))
else:
return 0 # raise NotImplementedError
def split_idx(self, rank: int or None = None) -> int:
if rank is None:
rank = self.rank
return rank % self.n_device_per_batch
class PatchParallelismCommManager:
def __init__(self, distri_config: DistriConfig):
self.distri_config = distri_config
self.torch_dtype = None
self.numel = 0
self.numel_dict = {}
self.buffer_list = None
self.starts = []
self.ends = []
self.shapes = []
self.idx_queue = []
self.handles = None
def register_tensor(
self, shape: Tuple[int, ...] or List[int], torch_dtype: torch.dtype, layer_type: str = None
) -> int:
if self.torch_dtype is None:
self.torch_dtype = torch_dtype
else:
assert self.torch_dtype == torch_dtype
self.starts.append(self.numel)
numel = 1
for dim in shape:
numel *= dim
self.numel += numel
if layer_type is not None:
if layer_type not in self.numel_dict:
self.numel_dict[layer_type] = 0
self.numel_dict[layer_type] += numel
self.ends.append(self.numel)
self.shapes.append(shape)
return len(self.starts) - 1
def create_buffer(self):
distri_config = self.distri_config
if distri_config.rank == 0 and distri_config.verbose:
print(
f"Create buffer with {self.numel / 1e6:.3f}M parameters for {len(self.starts)} tensors on each device."
)
for layer_type, numel in self.numel_dict.items():
print(f" {layer_type}: {numel / 1e6:.3f}M parameters")
self.buffer_list = [
torch.empty(self.numel, dtype=self.torch_dtype, device=self.distri_config.device)
for _ in range(self.distri_config.n_device_per_batch)
]
self.handles = [None for _ in range(len(self.starts))]
def get_buffer_list(self, idx: int) -> List[torch.Tensor]:
buffer_list = [t[self.starts[idx] : self.ends[idx]].view(self.shapes[idx]) for t in self.buffer_list]
return buffer_list
def communicate(self):
distri_config = self.distri_config
start = self.starts[self.idx_queue[0]]
end = self.ends[self.idx_queue[-1]]
tensor = self.buffer_list[distri_config.split_idx()][start:end]
buffer_list = [t[start:end] for t in self.buffer_list]
handle = dist.all_gather(buffer_list, tensor, group=self.distri_config.batch_group, async_op=True)
for i in self.idx_queue:
self.handles[i] = handle
self.idx_queue = []
def enqueue(self, idx: int, tensor: torch.Tensor):
distri_config = self.distri_config
if idx == 0 and len(self.idx_queue) > 0:
self.communicate()
assert len(self.idx_queue) == 0 or self.idx_queue[-1] == idx - 1
self.idx_queue.append(idx)
self.buffer_list[distri_config.split_idx()][self.starts[idx] : self.ends[idx]].copy_(tensor.flatten())
if len(self.idx_queue) == distri_config.comm_checkpoint:
self.communicate()
def clear(self):
if len(self.idx_queue) > 0:
self.communicate()
if self.handles is not None:
for i in range(len(self.handles)):
if self.handles[i] is not None:
self.handles[i].wait()
self.handles[i] = None
# Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
#
# This work is licensed under a Creative Commons
# Attribution-NonCommercial-ShareAlike 4.0 International License.
# You should have received a copy of the license along with this
# work. If not, see http://creativecommons.org/licenses/by-nc-sa/4.0/
# empty
# GigaGAN: https://github.com/mingukkang/GigaGAN
# The MIT License (MIT)
# See license file or visit https://github.com/mingukkang/GigaGAN for details
# evaluation.py
import os
import torch
import numpy as np
from pathlib import Path
from tqdm import tqdm
from PIL import Image
import torch.nn.functional as F
from torch.utils.data import DataLoader
from .data_util import EvalDataset, CenterCropLongEdge
def tensor2pil(image: torch.Tensor):
''' output image : tensor to PIL
'''
if isinstance(image, list) or image.ndim == 4:
return [tensor2pil(im) for im in image]
assert image.ndim == 3
output_image = Image.fromarray(((image + 1.0) * 127.5).clamp(
0.0, 255.0).to(torch.uint8).permute(1, 2, 0).detach().cpu().numpy())
return output_image
@torch.no_grad()
def compute_clip_score(
dataset: DataLoader, clip_model="ViT-B/32", device="cuda", how_many=5000):
print("Computing CLIP score")
import clip as openai_clip
if clip_model == "ViT-B/32":
clip, clip_preprocessor = openai_clip.load("ViT-B/32", device=device)
clip = clip.eval()
elif clip_model == "ViT-G/14":
import open_clip
clip, _, clip_preprocessor = open_clip.create_model_and_transforms("ViT-g-14", pretrained="laion2b_s12b_b42k")
clip = clip.to(device)
clip = clip.eval()
clip = clip.float()
else:
raise NotImplementedError
cos_sims = []
count = 0
for imgs, txts in tqdm(dataset):
imgs_pil = [clip_preprocessor(tensor2pil(img)) for img in imgs]
imgs = torch.stack(imgs_pil, dim=0).to(device)
tokens = openai_clip.tokenize(txts, truncate=True).to(device)
# Prepending text prompts with "A photo depicts "
# https://arxiv.org/abs/2104.08718
prepend_text = "A photo depicts "
prepend_text_token = openai_clip.tokenize(prepend_text)[:, 1:4].to(device)
prepend_text_tokens = prepend_text_token.expand(tokens.shape[0], -1)
start_tokens = tokens[:, :1]
new_text_tokens = torch.cat(
[start_tokens, prepend_text_tokens, tokens[:, 1:]], dim=1)[:, :77]
last_cols = new_text_tokens[:, 77 - 1:77]
last_cols[last_cols > 0] = 49407 # eot token
new_text_tokens = torch.cat([new_text_tokens[:, :76], last_cols], dim=1)
img_embs = clip.encode_image(imgs)
text_embs = clip.encode_text(new_text_tokens)
similarities = F.cosine_similarity(img_embs, text_embs, dim=1)
cos_sims.append(similarities)
count += similarities.shape[0]
if count >= how_many:
break
clip_score = torch.cat(cos_sims, dim=0)[:how_many].mean()
clip_score = clip_score.detach().cpu().numpy()
return clip_score
@torch.no_grad()
def compute_fid(fake_dir: Path, gt_dir: Path,
resize_size=None, feature_extractor="clip"):
from cleanfid import fid
center_crop_trsf = CenterCropLongEdge()
def resize_and_center_crop(image_np):
image_pil = Image.fromarray(image_np)
image_pil = center_crop_trsf(image_pil)
if resize_size is not None:
image_pil = image_pil.resize((resize_size, resize_size),
Image.LANCZOS)
return np.array(image_pil)
if feature_extractor == "inception":
model_name = "inception_v3"
elif feature_extractor == "clip":
model_name = "clip_vit_b_32"
else:
raise ValueError(
"Unrecognized feature extractor [%s]" % feature_extractor)
fid = fid.compute_fid(gt_dir,
fake_dir,
model_name=model_name,
custom_image_tranform=resize_and_center_crop)
return fid
def evaluate_model(opt):
### Generated images
ref_sub_folder_name = "val2014" if opt.ref_data == "coco2014" else opt.ref_type
fid = compute_fid(
os.path.join(opt.ref_dir, ref_sub_folder_name),
opt.fake_dir,
resize_size=opt.eval_res,
feature_extractor="inception")
print(f"FID_{opt.eval_res}px: {fid}")
dset2 = EvalDataset(data_name=opt.ref_data,
data_dir=opt.fake_dir,
captionfile=opt.caption_file,
crop_long_edge=True,
resize_size=opt.eval_res,
resizer="lanczos",
normalize=True,
load_txt_from_file=True if opt.ref_data == "coco2014" else False)
dset2_dataloader = DataLoader(dataset=dset2,
batch_size=opt.batch_size,
shuffle=False,
pin_memory=True,
drop_last=False)
if opt.ref_data == "coco2014":
clip_score = compute_clip_score(dset2_dataloader, clip_model=opt.clip_model4eval, how_many=opt.how_many)
print(f"CLIP score: {clip_score}")
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("--how_many", default=30000, type=int)
parser.add_argument("--clip_model4eval", default="ViT-B/32", type=str, help="[WO, ViT-B/32, ViT-G/14]")
parser.add_argument("--ref_data", default="coco2014", type=str, help="in [imagenet2012, coco2014, laion4k]")
parser.add_argument("--ref_dir",
default="/home/COCO2014/",
help="location of the reference images for evaluation")
parser.add_argument("--ref_type",
default="train/valid/test",
help="Type of reference dataset")
parser.add_argument("--fake_dir",
default="/home/GigaGAN_images/",
help="location of fake images for evaluation")
parser.add_argument("--caption_file",
default="assets/captions.txt",
help="location of txt file containing image captions")
parser.add_argument("--eval_res", default=256, type=int)
parser.add_argument("--batch_size", default=8, type=int)
opt, _ = parser.parse_known_args()
evaluate_model(opt)
# Copyright (c) 2021, NVIDIA CORPORATION. All rights reserved.
#
# NVIDIA CORPORATION and its licensors retain all intellectual property
# and proprietary rights in and to this software, related documentation
# and any modifications thereto. Any use, reproduction, disclosure or
# distribution of this software and related documentation without an express
# license agreement from NVIDIA CORPORATION is strictly prohibited.
import os
import glob
import torch
import torch.utils.cpp_extension
import importlib
import hashlib
import shutil
from pathlib import Path
from torch.utils.file_baton import FileBaton
#----------------------------------------------------------------------------
# Global options.
verbosity = 'brief' # Verbosity level: 'none', 'brief', 'full'
#----------------------------------------------------------------------------
# Internal helper funcs.
def _find_compiler_bindir():
patterns = [
'C:/Program Files (x86)/Microsoft Visual Studio/*/Professional/VC/Tools/MSVC/*/bin/Hostx64/x64',
'C:/Program Files (x86)/Microsoft Visual Studio/*/BuildTools/VC/Tools/MSVC/*/bin/Hostx64/x64',
'C:/Program Files (x86)/Microsoft Visual Studio/*/Community/VC/Tools/MSVC/*/bin/Hostx64/x64',
'C:/Program Files (x86)/Microsoft Visual Studio */vc/bin',
]
for pattern in patterns:
matches = sorted(glob.glob(pattern))
if len(matches):
return matches[-1]
return None
#----------------------------------------------------------------------------
# Main entry point for compiling and loading C++/CUDA plugins.
_cached_plugins = dict()
def get_plugin(module_name, sources, **build_kwargs):
assert verbosity in ['none', 'brief', 'full']
# Already cached?
if module_name in _cached_plugins:
return _cached_plugins[module_name]
# Print status.
if verbosity == 'full':
print(f'Setting up PyTorch plugin "{module_name}"...')
elif verbosity == 'brief':
print(f'Setting up PyTorch plugin "{module_name}"... ', end='', flush=True)
try: # pylint: disable=too-many-nested-blocks
# Make sure we can find the necessary compiler binaries.
if os.name == 'nt' and os.system("where cl.exe >nul 2>nul") != 0:
compiler_bindir = _find_compiler_bindir()
if compiler_bindir is None:
raise RuntimeError(f'Could not find MSVC/GCC/CLANG installation on this computer. Check _find_compiler_bindir() in "{__file__}".')
os.environ['PATH'] += ';' + compiler_bindir
# Compile and load.
verbose_build = (verbosity == 'full')
# Incremental build md5sum trickery. Copies all the input source files
# into a cached build directory under a combined md5 digest of the input
# source files. Copying is done only if the combined digest has changed.
# This keeps input file timestamps and filenames the same as in previous
# extension builds, allowing for fast incremental rebuilds.
#
# This optimization is done only in case all the source files reside in
# a single directory (just for simplicity) and if the TORCH_EXTENSIONS_DIR
# environment variable is set (we take this as a signal that the user
# actually cares about this.)
source_dirs_set = set(os.path.dirname(source) for source in sources)
if len(source_dirs_set) == 1 and ('TORCH_EXTENSIONS_DIR' in os.environ):
all_source_files = sorted(list(x for x in Path(list(source_dirs_set)[0]).iterdir() if x.is_file()))
# Compute a combined hash digest for all source files in the same
# custom op directory (usually .cu, .cpp, .py and .h files).
hash_md5 = hashlib.md5()
for src in all_source_files:
with open(src, 'rb') as f:
hash_md5.update(f.read())
build_dir = torch.utils.cpp_extension._get_build_directory(module_name, verbose=verbose_build) # pylint: disable=protected-access
digest_build_dir = os.path.join(build_dir, hash_md5.hexdigest())
if not os.path.isdir(digest_build_dir):
os.makedirs(digest_build_dir, exist_ok=True)
baton = FileBaton(os.path.join(digest_build_dir, 'lock'))
if baton.try_acquire():
try:
for src in all_source_files:
shutil.copyfile(src, os.path.join(digest_build_dir, os.path.basename(src)))
finally:
baton.release()
else:
# Someone else is copying source files under the digest dir,
# wait until done and continue.
baton.wait()
digest_sources = [os.path.join(digest_build_dir, os.path.basename(x)) for x in sources]
torch.utils.cpp_extension.load(name=module_name, build_directory=build_dir,
verbose=verbose_build, sources=digest_sources, **build_kwargs)
else:
torch.utils.cpp_extension.load(name=module_name, verbose=verbose_build, sources=sources, **build_kwargs)
module = importlib.import_module(module_name)
except:
if verbosity == 'brief':
print('Failed!')
raise
# Print status and add to cache.
if verbosity == 'full':
print(f'Done setting up PyTorch plugin "{module_name}".')
elif verbosity == 'brief':
print('Done.')
_cached_plugins[module_name] = module
return module
#----------------------------------------------------------------------------
# GigaGAN: https://github.com/mingukkang/GigaGAN
# The MIT License (MIT)
# See license file or visit https://github.com/mingukkang/GigaGAN for details
# data_util.py
import os
import re
import io
import random
from torch.utils.data import Dataset
from torchvision.datasets import CocoCaptions
from torchvision.datasets import ImageFolder
from torchvision.transforms import InterpolationMode
from PIL import Image
import torchvision.transforms as transforms
import glob
resizer_collection = {"nearest": InterpolationMode.NEAREST,
"box": InterpolationMode.BOX,
"bilinear": InterpolationMode.BILINEAR,
"hamming": InterpolationMode.HAMMING,
"bicubic": InterpolationMode.BICUBIC,
"lanczos": InterpolationMode.LANCZOS}
class CenterCropLongEdge(object):
"""
this code is borrowed from https://github.com/ajbrock/BigGAN-PyTorch
MIT License
Copyright (c) 2019 Andy Brock
"""
def __call__(self, img):
return transforms.functional.center_crop(img, min(img.size))
def __repr__(self):
return self.__class__.__name__
class EvalDataset(Dataset):
def __init__(self,
data_name,
data_dir,
captionfile,
crop_long_edge=False,
resize_size=None,
resizer="lanczos",
normalize=True,
load_txt_from_file=False,
):
super(EvalDataset, self).__init__()
self.data_name = data_name
self.data_dir = data_dir
self.captionfile = captionfile
self.resize_size = resize_size
self.normalize = normalize
self.load_txt_from_file = load_txt_from_file
self.trsf_list = [CenterCropLongEdge()]
if isinstance(self.resize_size, int):
self.trsf_list += [transforms.Resize(self.resize_size,
interpolation=resizer_collection[resizer])]
if self.normalize:
self.trsf_list += [transforms.ToTensor()]
self.trsf_list += [transforms.Normalize([0.5, 0.5, 0.5],
[0.5, 0.5, 0.5])]
else:
self.trsf_list += [transforms.PILToTensor()]
self.trsf = transforms.Compose(self.trsf_list)
self.load_dataset()
def natural_sort(self, l):
convert = lambda text: int(text) if text.isdigit() else text.lower()
alphanum_key = lambda key: [convert(c) for c in re.split('([0-9]+)', key)]
return sorted(l, key=alphanum_key)
def load_dataset(self):
if self.data_name == "coco2014":
if self.load_txt_from_file:
self.imagelist = self.natural_sort(glob.glob(os.path.join(self.data_dir, "*.%s" % "png")))
with io.open(self.captionfile, 'r', encoding="utf-8") as f:
self.captions = f.read().splitlines()
self.data = list(zip(self.imagelist, self.captions))
else:
self.data = CocoCaptions(root=os.path.join(self.data_dir,
"val2014"),
annFile=os.path.join(self.data_dir,
"annotations",
"captions_val2014.json"))
else:
self.data = ImageFolder(root=self.data_dir)
def __len__(self):
num_dataset = len(self.data)
return num_dataset
def __getitem__(self, index):
if self.data_name == "coco2014":
img, txt = self.data[index]
if isinstance(img, str):
img = Image.open(img).convert("RGB")
if isinstance(txt, list):
txt = txt[random.randint(0, 4)]
return self.trsf(img), txt
else:
img, label = self.data[index]
return self.trsf(img), int(label)
# Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
#
# This work is licensed under a Creative Commons
# Attribution-NonCommercial-ShareAlike 4.0 International License.
# You should have received a copy of the license along with this
# work. If not, see http://creativecommons.org/licenses/by-nc-sa/4.0/
import os
import torch
from . import training_stats
#----------------------------------------------------------------------------
def init():
if 'MASTER_ADDR' not in os.environ:
os.environ['MASTER_ADDR'] = 'localhost'
if 'MASTER_PORT' not in os.environ:
os.environ['MASTER_PORT'] = '29500'
if 'RANK' not in os.environ:
os.environ['RANK'] = '0'
if 'LOCAL_RANK' not in os.environ:
os.environ['LOCAL_RANK'] = '0'
if 'WORLD_SIZE' not in os.environ:
os.environ['WORLD_SIZE'] = '1'
backend = 'gloo' if os.name == 'nt' else 'nccl'
torch.distributed.init_process_group(backend=backend, init_method='env://')
torch.cuda.set_device(int(os.environ.get('LOCAL_RANK', '0')))
sync_device = torch.device('cuda') if get_world_size() > 1 else None
training_stats.init_multiprocessing(rank=get_rank(), sync_device=sync_device)
#----------------------------------------------------------------------------
def get_rank():
return torch.distributed.get_rank() if torch.distributed.is_initialized() else 0
def get_local_rank():
return int(os.environ.get('LOCAL_RANK', '0'))
#----------------------------------------------------------------------------
def get_world_size():
return torch.distributed.get_world_size() if torch.distributed.is_initialized() else 1
#----------------------------------------------------------------------------
def should_stop():
return False
#----------------------------------------------------------------------------
def update_progress(cur, total):
_ = cur, total
#----------------------------------------------------------------------------
def print0(*args, **kwargs):
if get_rank() == 0:
print(*args, **kwargs)
#----------------------------------------------------------------------------
# Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
#
# This work is licensed under a Creative Commons
# Attribution-NonCommercial-ShareAlike 4.0 International License.
# You should have received a copy of the license along with this
# work. If not, see http://creativecommons.org/licenses/by-nc-sa/4.0/
from .util import EasyDict, make_cache_dir_path
# Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
#
# This work is licensed under a Creative Commons
# Attribution-NonCommercial-ShareAlike 4.0 International License.
# You should have received a copy of the license along with this
# work. If not, see http://creativecommons.org/licenses/by-nc-sa/4.0/
"""Miscellaneous utility classes and functions."""
import ctypes
import fnmatch
import importlib
import inspect
import numpy as np
import os
import shutil
import sys
import types
import io
import pickle
import re
import requests
import html
import hashlib
import glob
import tempfile
import urllib
import urllib.request
import uuid
from distutils.util import strtobool
from typing import Any, List, Tuple, Union, Optional
# Util classes
# ------------------------------------------------------------------------------------------
class EasyDict(dict):
"""Convenience class that behaves like a dict but allows access with the attribute syntax."""
def __getattr__(self, name: str) -> Any:
try:
return self[name]
except KeyError:
raise AttributeError(name)
def __setattr__(self, name: str, value: Any) -> None:
self[name] = value
def __delattr__(self, name: str) -> None:
del self[name]
class Logger(object):
"""Redirect stderr to stdout, optionally print stdout to a file, and optionally force flushing on both stdout and the file."""
def __init__(self, file_name: Optional[str] = None, file_mode: str = "w", should_flush: bool = True):
self.file = None
if file_name is not None:
self.file = open(file_name, file_mode)
self.should_flush = should_flush
self.stdout = sys.stdout
self.stderr = sys.stderr
sys.stdout = self
sys.stderr = self
def __enter__(self) -> "Logger":
return self
def __exit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None:
self.close()
def write(self, text: Union[str, bytes]) -> None:
"""Write text to stdout (and a file) and optionally flush."""
if isinstance(text, bytes):
text = text.decode()
if len(text) == 0: # workaround for a bug in VSCode debugger: sys.stdout.write(''); sys.stdout.flush() => crash
return
if self.file is not None:
self.file.write(text)
self.stdout.write(text)
if self.should_flush:
self.flush()
def flush(self) -> None:
"""Flush written text to both stdout and a file, if open."""
if self.file is not None:
self.file.flush()
self.stdout.flush()
def close(self) -> None:
"""Flush, close possible files, and remove stdout/stderr mirroring."""
self.flush()
# if using multiple loggers, prevent closing in wrong order
if sys.stdout is self:
sys.stdout = self.stdout
if sys.stderr is self:
sys.stderr = self.stderr
if self.file is not None:
self.file.close()
self.file = None
# Cache directories
# ------------------------------------------------------------------------------------------
_dnnlib_cache_dir = None
def set_cache_dir(path: str) -> None:
global _dnnlib_cache_dir
_dnnlib_cache_dir = path
def make_cache_dir_path(*paths: str) -> str:
if _dnnlib_cache_dir is not None:
return os.path.join(_dnnlib_cache_dir, *paths)
if 'DNNLIB_CACHE_DIR' in os.environ:
return os.path.join(os.environ['DNNLIB_CACHE_DIR'], *paths)
if 'HOME' in os.environ:
return os.path.join(os.environ['HOME'], '.cache', 'dnnlib', *paths)
if 'USERPROFILE' in os.environ:
return os.path.join(os.environ['USERPROFILE'], '.cache', 'dnnlib', *paths)
return os.path.join(tempfile.gettempdir(), '.cache', 'dnnlib', *paths)
# Small util functions
# ------------------------------------------------------------------------------------------
def format_time(seconds: Union[int, float]) -> str:
"""Convert the seconds to human readable string with days, hours, minutes and seconds."""
s = int(np.rint(seconds))
if s < 60:
return "{0}s".format(s)
elif s < 60 * 60:
return "{0}m {1:02}s".format(s // 60, s % 60)
elif s < 24 * 60 * 60:
return "{0}h {1:02}m {2:02}s".format(s // (60 * 60), (s // 60) % 60, s % 60)
else:
return "{0}d {1:02}h {2:02}m".format(s // (24 * 60 * 60), (s // (60 * 60)) % 24, (s // 60) % 60)
def format_time_brief(seconds: Union[int, float]) -> str:
"""Convert the seconds to human readable string with days, hours, minutes and seconds."""
s = int(np.rint(seconds))
if s < 60:
return "{0}s".format(s)
elif s < 60 * 60:
return "{0}m {1:02}s".format(s // 60, s % 60)
elif s < 24 * 60 * 60:
return "{0}h {1:02}m".format(s // (60 * 60), (s // 60) % 60)
else:
return "{0}d {1:02}h".format(s // (24 * 60 * 60), (s // (60 * 60)) % 24)
def ask_yes_no(question: str) -> bool:
"""Ask the user the question until the user inputs a valid answer."""
while True:
try:
print("{0} [y/n]".format(question))
return strtobool(input().lower())
except ValueError:
pass
def tuple_product(t: Tuple) -> Any:
"""Calculate the product of the tuple elements."""
result = 1
for v in t:
result *= v
return result
_str_to_ctype = {
"uint8": ctypes.c_ubyte,
"uint16": ctypes.c_uint16,
"uint32": ctypes.c_uint32,
"uint64": ctypes.c_uint64,
"int8": ctypes.c_byte,
"int16": ctypes.c_int16,
"int32": ctypes.c_int32,
"int64": ctypes.c_int64,
"float32": ctypes.c_float,
"float64": ctypes.c_double
}
def get_dtype_and_ctype(type_obj: Any) -> Tuple[np.dtype, Any]:
"""Given a type name string (or an object having a __name__ attribute), return matching Numpy and ctypes types that have the same size in bytes."""
type_str = None
if isinstance(type_obj, str):
type_str = type_obj
elif hasattr(type_obj, "__name__"):
type_str = type_obj.__name__
elif hasattr(type_obj, "name"):
type_str = type_obj.name
else:
raise RuntimeError("Cannot infer type name from input")
assert type_str in _str_to_ctype.keys()
my_dtype = np.dtype(type_str)
my_ctype = _str_to_ctype[type_str]
assert my_dtype.itemsize == ctypes.sizeof(my_ctype)
return my_dtype, my_ctype
def is_pickleable(obj: Any) -> bool:
try:
with io.BytesIO() as stream:
pickle.dump(obj, stream)
return True
except:
return False
# Functionality to import modules/objects by name, and call functions by name
# ------------------------------------------------------------------------------------------
def get_module_from_obj_name(obj_name: str) -> Tuple[types.ModuleType, str]:
"""Searches for the underlying module behind the name to some python object.
Returns the module and the object name (original name with module part removed)."""
# allow convenience shorthands, substitute them by full names
obj_name = re.sub("^np.", "numpy.", obj_name)
obj_name = re.sub("^tf.", "tensorflow.", obj_name)
# list alternatives for (module_name, local_obj_name)
parts = obj_name.split(".")
name_pairs = [(".".join(parts[:i]), ".".join(parts[i:])) for i in range(len(parts), 0, -1)]
# try each alternative in turn
for module_name, local_obj_name in name_pairs:
try:
module = importlib.import_module(module_name) # may raise ImportError
get_obj_from_module(module, local_obj_name) # may raise AttributeError
return module, local_obj_name
except:
pass
# maybe some of the modules themselves contain errors?
for module_name, _local_obj_name in name_pairs:
try:
importlib.import_module(module_name) # may raise ImportError
except ImportError:
if not str(sys.exc_info()[1]).startswith("No module named '" + module_name + "'"):
raise
# maybe the requested attribute is missing?
for module_name, local_obj_name in name_pairs:
try:
module = importlib.import_module(module_name) # may raise ImportError
get_obj_from_module(module, local_obj_name) # may raise AttributeError
except ImportError:
pass
# we are out of luck, but we have no idea why
raise ImportError(obj_name)
def get_obj_from_module(module: types.ModuleType, obj_name: str) -> Any:
"""Traverses the object name and returns the last (rightmost) python object."""
if obj_name == '':
return module
obj = module
for part in obj_name.split("."):
obj = getattr(obj, part)
return obj
def get_obj_by_name(name: str) -> Any:
"""Finds the python object with the given name."""
module, obj_name = get_module_from_obj_name(name)
return get_obj_from_module(module, obj_name)
def call_func_by_name(*args, func_name: str = None, **kwargs) -> Any:
"""Finds the python object with the given name and calls it as a function."""
assert func_name is not None
func_obj = get_obj_by_name(func_name)
assert callable(func_obj)
return func_obj(*args, **kwargs)
def construct_class_by_name(*args, class_name: str = None, **kwargs) -> Any:
"""Finds the python class with the given name and constructs it with the given arguments."""
return call_func_by_name(*args, func_name=class_name, **kwargs)
def get_module_dir_by_obj_name(obj_name: str) -> str:
"""Get the directory path of the module containing the given object name."""
module, _ = get_module_from_obj_name(obj_name)
return os.path.dirname(inspect.getfile(module))
def is_top_level_function(obj: Any) -> bool:
"""Determine whether the given object is a top-level function, i.e., defined at module scope using 'def'."""
return callable(obj) and obj.__name__ in sys.modules[obj.__module__].__dict__
def get_top_level_function_name(obj: Any) -> str:
"""Return the fully-qualified name of a top-level function."""
assert is_top_level_function(obj)
module = obj.__module__
if module == '__main__':
module = os.path.splitext(os.path.basename(sys.modules[module].__file__))[0]
return module + "." + obj.__name__
# File system helpers
# ------------------------------------------------------------------------------------------
def list_dir_recursively_with_ignore(dir_path: str, ignores: List[str] = None, add_base_to_relative: bool = False) -> List[Tuple[str, str]]:
"""List all files recursively in a given directory while ignoring given file and directory names.
Returns list of tuples containing both absolute and relative paths."""
assert os.path.isdir(dir_path)
base_name = os.path.basename(os.path.normpath(dir_path))
if ignores is None:
ignores = []
result = []
for root, dirs, files in os.walk(dir_path, topdown=True):
for ignore_ in ignores:
dirs_to_remove = [d for d in dirs if fnmatch.fnmatch(d, ignore_)]
# dirs need to be edited in-place
for d in dirs_to_remove:
dirs.remove(d)
files = [f for f in files if not fnmatch.fnmatch(f, ignore_)]
absolute_paths = [os.path.join(root, f) for f in files]
relative_paths = [os.path.relpath(p, dir_path) for p in absolute_paths]
if add_base_to_relative:
relative_paths = [os.path.join(base_name, p) for p in relative_paths]
assert len(absolute_paths) == len(relative_paths)
result += zip(absolute_paths, relative_paths)
return result
def copy_files_and_create_dirs(files: List[Tuple[str, str]]) -> None:
"""Takes in a list of tuples of (src, dst) paths and copies files.
Will create all necessary directories."""
for file in files:
target_dir_name = os.path.dirname(file[1])
# will create all intermediate-level directories
if not os.path.exists(target_dir_name):
os.makedirs(target_dir_name)
shutil.copyfile(file[0], file[1])
# URL helpers
# ------------------------------------------------------------------------------------------
def is_url(obj: Any, allow_file_urls: bool = False) -> bool:
"""Determine whether the given object is a valid URL string."""
if not isinstance(obj, str) or not "://" in obj:
return False
if allow_file_urls and obj.startswith('file://'):
return True
try:
res = requests.compat.urlparse(obj)
if not res.scheme or not res.netloc or not "." in res.netloc:
return False
res = requests.compat.urlparse(requests.compat.urljoin(obj, "/"))
if not res.scheme or not res.netloc or not "." in res.netloc:
return False
except:
return False
return True
def open_url(url: str, cache_dir: str = None, num_attempts: int = 10, verbose: bool = True, return_filename: bool = False, cache: bool = True) -> Any:
"""Download the given URL and return a binary-mode file object to access the data."""
assert num_attempts >= 1
assert not (return_filename and (not cache))
# Doesn't look like an URL scheme so interpret it as a local filename.
if not re.match('^[a-z]+://', url):
return url if return_filename else open(url, "rb")
# Handle file URLs. This code handles unusual file:// patterns that
# arise on Windows:
#
# file:///c:/foo.txt
#
# which would translate to a local '/c:/foo.txt' filename that's
# invalid. Drop the forward slash for such pathnames.
#
# If you touch this code path, you should test it on both Linux and
# Windows.
#
# Some internet resources suggest using urllib.request.url2pathname() but
# but that converts forward slashes to backslashes and this causes
# its own set of problems.
if url.startswith('file://'):
filename = urllib.parse.urlparse(url).path
if re.match(r'^/[a-zA-Z]:', filename):
filename = filename[1:]
return filename if return_filename else open(filename, "rb")
assert is_url(url)
# Lookup from cache.
if cache_dir is None:
cache_dir = make_cache_dir_path('downloads')
url_md5 = hashlib.md5(url.encode("utf-8")).hexdigest()
if cache:
cache_files = glob.glob(os.path.join(cache_dir, url_md5 + "_*"))
if len(cache_files) == 1:
filename = cache_files[0]
return filename if return_filename else open(filename, "rb")
# Download.
url_name = None
url_data = None
with requests.Session() as session:
if verbose:
print("Downloading %s ..." % url, end="", flush=True)
for attempts_left in reversed(range(num_attempts)):
try:
with session.get(url) as res:
res.raise_for_status()
if len(res.content) == 0:
raise IOError("No data received")
if len(res.content) < 8192:
content_str = res.content.decode("utf-8")
if "download_warning" in res.headers.get("Set-Cookie", ""):
links = [html.unescape(link) for link in content_str.split('"') if "export=download" in link]
if len(links) == 1:
url = requests.compat.urljoin(url, links[0])
raise IOError("Google Drive virus checker nag")
if "Google Drive - Quota exceeded" in content_str:
raise IOError("Google Drive download quota exceeded -- please try again later")
match = re.search(r'filename="([^"]*)"', res.headers.get("Content-Disposition", ""))
url_name = match[1] if match else url
url_data = res.content
if verbose:
print(" done")
break
except KeyboardInterrupt:
raise
except:
if not attempts_left:
if verbose:
print(" failed")
raise
if verbose:
print(".", end="", flush=True)
# Save to cache.
if cache:
safe_name = re.sub(r"[^0-9a-zA-Z-._]", "_", url_name)
safe_name = safe_name[:min(len(safe_name), 128)]
cache_file = os.path.join(cache_dir, url_md5 + "_" + safe_name)
temp_file = os.path.join(cache_dir, "tmp_" + uuid.uuid4().hex + "_" + url_md5 + "_" + safe_name)
os.makedirs(cache_dir, exist_ok=True)
with open(temp_file, "wb") as f:
f.write(url_data)
os.replace(temp_file, cache_file) # atomic
if return_filename:
return cache_file
# Return data as file object.
assert not return_filename
return io.BytesIO(url_data)
# Copyright (c) 2024, Mingyuan Zhou. All rights reserved.
#
# This work is licensed under APACHE LICENSE, VERSION 2.0
# You should have received a copy of the license along with this
# work. If not, see https://www.apache.org/licenses/LICENSE-2.0.txt
import os
import re
import click
import tqdm
import numpy as np
import torch
import PIL.Image
import torch
from diffusers import AutoPipelineForText2Image
from ..linfusion import LinFusion
from . import distributed as dist
#----------------------------------------------------------------------------
# Wrapper for torch.Generator that allows specifying a different random seed
# for each sample in a minibatch.
class StackedRandomGenerator:
def __init__(self, device, seeds):
super().__init__()
self.generators = [torch.Generator(device).manual_seed(int(seed) % (1 << 32)) for seed in seeds]
def randn(self, size, **kwargs):
assert size[0] == len(self.generators)
return torch.stack([torch.randn(size[1:], generator=gen, **kwargs) for gen in self.generators])
def randn_like(self, input):
return self.randn(input.shape, dtype=input.dtype, layout=input.layout, device=input.device)
def randint(self, *args, size, **kwargs):
assert size[0] == len(self.generators)
return torch.stack([torch.randint(*args, size=size[1:], generator=gen, **kwargs) for gen in self.generators])
#----------------------------------------------------------------------------
# Parse a comma separated list of numbers or ranges and return a list of ints.
# Example: '1,2,5-10' returns [1, 2, 5, 6, 7, 8, 9, 10]
def parse_int_list(s):
if isinstance(s, list): return s
ranges = []
range_re = re.compile(r'^(\d+)-(\d+)$')
for p in s.split(','):
m = range_re.match(p)
if m:
ranges.extend(range(int(m.group(1)), int(m.group(2))+1))
else:
ranges.append(int(p))
return ranges
def read_file_to_sentences(filename):
# Initialize an empty list to store the sentences
sentences = []
# Open the file
with open(filename, 'r', encoding='utf-8') as file:
# Read each line from the file
for line in file:
# Strip newline and any trailing whitespace characters
clean_line = line.strip()
# Add the cleaned line to the list if it is not empty
if clean_line:
sentences.append(clean_line)
return sentences
#----------------------------------------------------------------------------
def compress_to_npz(folder_path, num=50000):
# Get the list of all files in the folder
npz_path = f"{folder_path}.npz"
file_names = os.listdir(folder_path)
# Filter the list of files to include only images
file_names = [file_name for file_name in file_names if file_name.endswith(('.png', '.jpg', '.jpeg'))]
num = min(num, len(file_names))
file_names = file_names[:num]
# Initialize a dictionary to hold image arrays and their filenames
samples = []
# Iterate through the files, load each image, and add it to the dictionary with a progress bar
for file_name in tqdm.tqdm(file_names, desc=f"Compressing images to {npz_path}"):
# Create the full path to the image file
file_path = os.path.join(folder_path, file_name)
# Read the image using PIL and convert it to a NumPy array
image = PIL.Image.open(file_path)
image_array = np.asarray(image).astype(np.uint8)
samples.append(image_array)
samples = np.stack(samples)
# Save the images as a .npz file
np.savez(npz_path, arr_0=samples)
print(f"Images from folder {folder_path} have been saved as {npz_path}")
#----------------------------------------------------------------------------
@click.command()
@click.option('--outdir', help='Where to save the output images', metavar='DIR', type=str, required=True)
@click.option('--seeds', help='Random seeds (e.g. 1,2,5-10)', metavar='LIST', type=parse_int_list, default='0-63', show_default=True)
@click.option('--subdirs', help='Create subdirectory for every 1000 seeds', is_flag=True)
@click.option('--batch', 'max_batch_size', help='Maximum batch size', metavar='INT', type=click.IntRange(min=1), default=16, show_default=True)
@click.option('--num', 'num_fid_samples', help='Maximum num of images', metavar='INT', type=click.IntRange(min=1), default=30000, show_default=True)
@click.option('--text_prompts', 'text_prompts', help='captions filename; the default [prompts/captions.txt] consists of 30k COCO2014 prompts', metavar='PATH|URL', type=str, default='assets/captions.txt', show_default=True)
@click.option('--repo_id', 'repo_id', help='diffusion pipeline filename', metavar='PATH|URL', type=str, default='runwayml/stable-diffusion-v1-5', show_default=True)
@click.option('--use_fp16', help='Enable mixed-precision training', metavar='BOOL', type=bool, default=False, show_default=True)
@click.option('--use_bf16', help='Enable mixed-precision training', metavar='BOOL', type=bool, default=False, show_default=True)
@click.option('--enable_compress_npz', help='Enable compressinve npz', metavar='BOOL', type=bool, default=False, show_default=True)
@click.option('--num_steps_eval', 'num_steps_eval', help='Set as 25 by default', metavar='INT', type=click.IntRange(min=0), default=25, show_default=True)
@click.option('--guidance_scale', 'guidance_scale', help='Scale of classifier-free guidance. Set as 7.5 by default', metavar='FLOAT', type=click.FloatRange(min=1.0), default=7.5, show_default=True)
@click.option('--resolution', 'resolution', help='Set as None by default, which means default resolution of the diffusion model', metavar='INT', type=int, default=None, show_default=True)
@click.option('--custom_seed', help='Enable custom seed', metavar='BOOL', type=bool, default=False, show_default=True)
def main(outdir, subdirs, seeds, max_batch_size, num_fid_samples, text_prompts,repo_id,device=torch.device('cuda'),use_fp16=True,use_bf16=False,enable_compress_npz=False,num_steps_eval=25,guidance_scale=7.5,resolution=None,custom_seed=False):
dist.init()
dtype=torch.float16 if use_fp16 else torch.float32
dtype=torch.bfloat16 if use_bf16 else dtype
captions = read_file_to_sentences(text_prompts)
num_batches = ((len(seeds) - 1) // (max_batch_size * dist.get_world_size()) + 1) * dist.get_world_size()
if not custom_seed:
all_batches = torch.as_tensor(seeds).tensor_split(num_batches)
else:
seeds_idx = parse_int_list(f'0-{len(seeds)-1}')
all_batches = torch.as_tensor(seeds_idx).tensor_split(num_batches)
rank_batches = all_batches[dist.get_rank() :: dist.get_world_size()]
# Rank 0 goes first.
if dist.get_rank() != 0:
torch.distributed.barrier()
# Evaluate
if use_fp16:
pipeline = AutoPipelineForText2Image.from_pretrained(repo_id, variant='fp16').to(device, dtype)
else:
pipeline = AutoPipelineForText2Image.from_pretrained(repo_id).to(device, dtype)
_ = LinFusion.construct_for(pipeline)
resolution = resolution or pipeline.default_sample_size * pipeline.vae_scale_factor
# Other ranks follow.
if dist.get_rank() == 0:
torch.distributed.barrier()
# Loop over batches.
dist.print0(f'Generating {len(seeds)} images to "{outdir}"...')
for batch_seeds in tqdm.tqdm(rank_batches, unit='batch', disable=(dist.get_rank() != 0)):
torch.distributed.barrier()
batch_size = len(batch_seeds)
if batch_size == 0:
continue
# Pick latents and labels.
if not custom_seed:
rnd = StackedRandomGenerator(device, batch_seeds)
else:
cseed= [seeds[i] for i in batch_seeds]
rnd = StackedRandomGenerator(device, cseed)
img_channels=4
latents = rnd.randn([batch_size, img_channels, resolution // pipeline.vae_scale_factor, resolution // pipeline.vae_scale_factor], device=device, dtype=dtype)
c = [captions[i] for i in batch_seeds] # Index captions using list comprehension
with torch.no_grad():
images = pipeline(
prompt=c,
num_inference_steps=num_steps_eval,
guidance_scale=guidance_scale,
latents=latents
).images
# Save images.
for seed, image in zip(batch_seeds, images):
image_dir = os.path.join(outdir, f'{seed-seed%1000:06d}') if subdirs else outdir
os.makedirs(image_dir, exist_ok=True)
image_path = os.path.join(image_dir, f'{seed:06d}.png')
image.save(image_path)
# Done.
if enable_compress_npz:
torch.distributed.barrier()
if dist.get_rank() == 0:
compress_to_npz(outdir, num_fid_samples)
torch.distributed.barrier()
dist.print0('Done.')
#----------------------------------------------------------------------------
if __name__ == "__main__":
main()
#----------------------------------------------------------------------------
# Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
#
# This work is licensed under a Creative Commons
# Attribution-NonCommercial-ShareAlike 4.0 International License.
# You should have received a copy of the license along with this
# work. If not, see http://creativecommons.org/licenses/by-nc-sa/4.0/
import re
import contextlib
import numpy as np
import torch
import warnings
from . import dnnlib
#----------------------------------------------------------------------------
# Cached construction of constant tensors. Avoids CPU=>GPU copy when the
# same constant is used multiple times.
_constant_cache = dict()
def constant(value, shape=None, dtype=None, device=None, memory_format=None):
value = np.asarray(value)
if shape is not None:
shape = tuple(shape)
if dtype is None:
dtype = torch.get_default_dtype()
if device is None:
device = torch.device('cpu')
if memory_format is None:
memory_format = torch.contiguous_format
key = (value.shape, value.dtype, value.tobytes(), shape, dtype, device, memory_format)
tensor = _constant_cache.get(key, None)
if tensor is None:
tensor = torch.as_tensor(value.copy(), dtype=dtype, device=device)
if shape is not None:
tensor, _ = torch.broadcast_tensors(tensor, torch.empty(shape))
tensor = tensor.contiguous(memory_format=memory_format)
_constant_cache[key] = tensor
return tensor
#----------------------------------------------------------------------------
# Replace NaN/Inf with specified numerical values.
try:
nan_to_num = torch.nan_to_num # 1.8.0a0
except AttributeError:
def nan_to_num(input, nan=0.0, posinf=None, neginf=None, *, out=None): # pylint: disable=redefined-builtin
assert isinstance(input, torch.Tensor)
if posinf is None:
posinf = torch.finfo(input.dtype).max
if neginf is None:
neginf = torch.finfo(input.dtype).min
assert nan == 0
return torch.clamp(input.unsqueeze(0).nansum(0), min=neginf, max=posinf, out=out)
#----------------------------------------------------------------------------
# Symbolic assert.
try:
symbolic_assert = torch._assert # 1.8.0a0 # pylint: disable=protected-access
except AttributeError:
symbolic_assert = torch.Assert # 1.7.0
#----------------------------------------------------------------------------
# Context manager to temporarily suppress known warnings in torch.jit.trace().
# Note: Cannot use catch_warnings because of https://bugs.python.org/issue29672
@contextlib.contextmanager
def suppress_tracer_warnings():
flt = ('ignore', None, torch.jit.TracerWarning, None, 0)
warnings.filters.insert(0, flt)
yield
warnings.filters.remove(flt)
#----------------------------------------------------------------------------
# Assert that the shape of a tensor matches the given list of integers.
# None indicates that the size of a dimension is allowed to vary.
# Performs symbolic assertion when used in torch.jit.trace().
def assert_shape(tensor, ref_shape):
if tensor.ndim != len(ref_shape):
raise AssertionError(f'Wrong number of dimensions: got {tensor.ndim}, expected {len(ref_shape)}')
for idx, (size, ref_size) in enumerate(zip(tensor.shape, ref_shape)):
if ref_size is None:
pass
elif isinstance(ref_size, torch.Tensor):
with suppress_tracer_warnings(): # as_tensor results are registered as constants
symbolic_assert(torch.equal(torch.as_tensor(size), ref_size), f'Wrong size for dimension {idx}')
elif isinstance(size, torch.Tensor):
with suppress_tracer_warnings(): # as_tensor results are registered as constants
symbolic_assert(torch.equal(size, torch.as_tensor(ref_size)), f'Wrong size for dimension {idx}: expected {ref_size}')
elif size != ref_size:
raise AssertionError(f'Wrong size for dimension {idx}: got {size}, expected {ref_size}')
#----------------------------------------------------------------------------
# Function decorator that calls torch.autograd.profiler.record_function().
def profiled_function(fn):
def decorator(*args, **kwargs):
with torch.autograd.profiler.record_function(fn.__name__):
return fn(*args, **kwargs)
decorator.__name__ = fn.__name__
return decorator
#----------------------------------------------------------------------------
# Sampler for torch.utils.data.DataLoader that loops over the dataset
# indefinitely, shuffling items as it goes.
class InfiniteSampler(torch.utils.data.Sampler):
def __init__(self, dataset, rank=0, num_replicas=1, shuffle=True, seed=0, window_size=0.5):
assert len(dataset) > 0
assert num_replicas > 0
assert 0 <= rank < num_replicas
assert 0 <= window_size <= 1
super().__init__(dataset)
self.dataset = dataset
self.rank = rank
self.num_replicas = num_replicas
self.shuffle = shuffle
self.seed = seed
self.window_size = window_size
def __iter__(self):
order = np.arange(len(self.dataset))
rnd = None
window = 0
if self.shuffle:
rnd = np.random.RandomState(self.seed)
rnd.shuffle(order)
window = int(np.rint(order.size * self.window_size))
idx = 0
while True:
i = idx % order.size
if idx % self.num_replicas == self.rank:
yield order[i]
if window >= 2:
j = (i - rnd.randint(window)) % order.size
order[i], order[j] = order[j], order[i]
idx += 1
#----------------------------------------------------------------------------
# Utilities for operating with torch.nn.Module parameters and buffers.
def params_and_buffers(module):
assert isinstance(module, torch.nn.Module)
return list(module.parameters()) + list(module.buffers())
def named_params_and_buffers(module):
assert isinstance(module, torch.nn.Module)
return list(module.named_parameters()) + list(module.named_buffers())
@torch.no_grad()
def copy_params_and_buffers(src_module, dst_module, require_all=False):
assert isinstance(src_module, torch.nn.Module)
assert isinstance(dst_module, torch.nn.Module)
src_tensors = dict(named_params_and_buffers(src_module))
for name, tensor in named_params_and_buffers(dst_module):
assert (name in src_tensors) or (not require_all)
if name in src_tensors:
tensor.copy_(src_tensors[name])
#----------------------------------------------------------------------------
# Context manager for easily enabling/disabling DistributedDataParallel
# synchronization.
@contextlib.contextmanager
def ddp_sync(module, sync):
assert isinstance(module, torch.nn.Module)
if sync or not isinstance(module, torch.nn.parallel.DistributedDataParallel):
yield
else:
with module.no_sync():
yield
#----------------------------------------------------------------------------
# Check DistributedDataParallel consistency across processes.
def check_ddp_consistency(module, ignore_regex=None):
assert isinstance(module, torch.nn.Module)
for name, tensor in named_params_and_buffers(module):
fullname = type(module).__name__ + '.' + name
if ignore_regex is not None and re.fullmatch(ignore_regex, fullname):
continue
tensor = tensor.detach()
if tensor.is_floating_point():
tensor = nan_to_num(tensor)
other = tensor.clone()
torch.distributed.broadcast(tensor=other, src=0)
assert (tensor == other).all(), fullname
#----------------------------------------------------------------------------
# Print summary table of module hierarchy.
def print_module_summary(module, inputs, max_nesting=3, skip_redundant=True):
assert isinstance(module, torch.nn.Module)
assert not isinstance(module, torch.jit.ScriptModule)
assert isinstance(inputs, (tuple, list))
# Register hooks.
entries = []
nesting = [0]
def pre_hook(_mod, _inputs):
nesting[0] += 1
def post_hook(mod, _inputs, outputs):
nesting[0] -= 1
if nesting[0] <= max_nesting:
outputs = list(outputs) if isinstance(outputs, (tuple, list)) else [outputs]
outputs = [t for t in outputs if isinstance(t, torch.Tensor)]
entries.append(dnnlib.EasyDict(mod=mod, outputs=outputs))
hooks = [mod.register_forward_pre_hook(pre_hook) for mod in module.modules()]
hooks += [mod.register_forward_hook(post_hook) for mod in module.modules()]
# Run module.
outputs = module(*inputs)
for hook in hooks:
hook.remove()
# Identify unique outputs, parameters, and buffers.
tensors_seen = set()
for e in entries:
e.unique_params = [t for t in e.mod.parameters() if id(t) not in tensors_seen]
e.unique_buffers = [t for t in e.mod.buffers() if id(t) not in tensors_seen]
e.unique_outputs = [t for t in e.outputs if id(t) not in tensors_seen]
tensors_seen |= {id(t) for t in e.unique_params + e.unique_buffers + e.unique_outputs}
# Filter out redundant entries.
if skip_redundant:
entries = [e for e in entries if len(e.unique_params) or len(e.unique_buffers) or len(e.unique_outputs)]
# Construct table.
rows = [[type(module).__name__, 'Parameters', 'Buffers', 'Output shape', 'Datatype']]
rows += [['---'] * len(rows[0])]
param_total = 0
buffer_total = 0
submodule_names = {mod: name for name, mod in module.named_modules()}
for e in entries:
name = '<top-level>' if e.mod is module else submodule_names[e.mod]
param_size = sum(t.numel() for t in e.unique_params)
buffer_size = sum(t.numel() for t in e.unique_buffers)
output_shapes = [str(list(t.shape)) for t in e.outputs]
output_dtypes = [str(t.dtype).split('.')[-1] for t in e.outputs]
rows += [[
name + (':0' if len(e.outputs) >= 2 else ''),
str(param_size) if param_size else '-',
str(buffer_size) if buffer_size else '-',
(output_shapes + ['-'])[0],
(output_dtypes + ['-'])[0],
]]
for idx in range(1, len(e.outputs)):
rows += [[name + f':{idx}', '-', '-', output_shapes[idx], output_dtypes[idx]]]
param_total += param_size
buffer_total += buffer_size
rows += [['---'] * len(rows[0])]
rows += [['Total', str(param_total), str(buffer_total), '-', '-']]
# Print table.
widths = [max(len(cell) for cell in column) for column in zip(*rows)]
print()
for row in rows:
print(' '.join(cell + ' ' * (width - len(cell)) for cell, width in zip(row, widths)))
print()
return outputs
#----------------------------------------------------------------------------
# Copyright (c) 2021, NVIDIA CORPORATION. All rights reserved.
#
# NVIDIA CORPORATION and its licensors retain all intellectual property
# and proprietary rights in and to this software, related documentation
# and any modifications thereto. Any use, reproduction, disclosure or
# distribution of this software and related documentation without an express
# license agreement from NVIDIA CORPORATION is strictly prohibited.
# empty
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment