Commit b0f41f60 authored by yuguo960516's avatar yuguo960516
Browse files

resnet50

parent 456eb360
Pipeline #144 failed with stages
in 0 seconds
import numpy as np
import oneflow as flow
class Accuracy(flow.nn.Module):
def __init__(self):
super().__init__()
def forward(self, preds, labels):
top1_num = flow.zeros(1, dtype=flow.float32)
num_samples = 0
for pred, label in zip(preds, labels):
clsidxs = pred.argmax(dim=-1)
clsidxs = clsidxs.to(flow.int32)
match = (clsidxs == label).sum()
top1_num += match.to(device=top1_num.device, dtype=top1_num.dtype)
num_samples += np.prod(label.shape).item()
top1_acc = top1_num / num_samples
return top1_acc
import os
import numpy as np
import oneflow as flow
def make_data_loader(args, mode, is_global=False, synthetic=False):
assert mode in ("train", "validation")
if mode == "train":
total_batch_size = args.train_global_batch_size
batch_size = args.train_batch_size
num_samples = args.samples_per_epoch
else:
total_batch_size = args.val_global_batch_size
batch_size = args.val_batch_size
num_samples = args.val_samples_per_epoch
placement = None
sbp = None
if is_global:
placement = flow.env.all_device_placement("cpu")
sbp = flow.sbp.split(0)
# NOTE(zwx): global view, only consider logical batch size
batch_size = total_batch_size
if synthetic:
data_loader = SyntheticDataLoader(
batch_size=batch_size,
num_classes=args.num_classes,
placement=placement,
sbp=sbp,
channel_last=args.channel_last,
)
return data_loader.to("cuda")
ofrecord_data_loader = OFRecordDataLoader(
ofrecord_dir=args.ofrecord_path,
ofrecord_part_num=args.ofrecord_part_num,
dataset_size=num_samples,
mode=mode,
batch_size=batch_size,
total_batch_size=total_batch_size,
channel_last=args.channel_last,
placement=placement,
sbp=sbp,
use_gpu_decode=args.use_gpu_decode,
)
return ofrecord_data_loader
class OFRecordDataLoader(flow.nn.Module):
def __init__(
self,
ofrecord_dir="./ofrecord",
ofrecord_part_num=1,
dataset_size=9469,
mode="train",
batch_size=1,
total_batch_size=1,
channel_last=False,
placement=None,
sbp=None,
use_gpu_decode=False,
):
super().__init__()
assert mode in ("train", "validation")
self.batch_size = batch_size
self.total_batch_size = total_batch_size
self.dataset_size = dataset_size
self.mode = mode
random_shuffle = True if mode == "train" else False
shuffle_after_epoch = True if mode == "train" else False
ofrecord_path = os.path.join(ofrecord_dir, self.mode)
self.ofrecord_reader = flow.nn.OfrecordReader(
ofrecord_path,
batch_size=batch_size,
data_part_num=ofrecord_part_num,
part_name_suffix_length=5,
random_shuffle=random_shuffle,
shuffle_after_epoch=shuffle_after_epoch,
placement=placement,
sbp=sbp,
)
self.label_decoder = flow.nn.OfrecordRawDecoder(
"class/label", shape=tuple(), dtype=flow.int32
)
if channel_last:
os.environ["ONEFLOW_ENABLE_NHWC"] = "1"
color_space = "RGB"
image_height = 224
image_width = 224
resize_shorter = 256
rgb_mean = [123.68, 116.779, 103.939]
rgb_std = [58.393, 57.12, 57.375]
self.use_gpu_decode = use_gpu_decode
if self.mode == "train":
if self.use_gpu_decode:
self.bytesdecoder_img = flow.nn.OFRecordBytesDecoder("encoded")
self.image_decoder = flow.nn.OFRecordImageGpuDecoderRandomCropResize(
target_width=image_width,
target_height=image_height,
num_workers=3,
warmup_size=2048,
)
else:
self.image_decoder = flow.nn.OFRecordImageDecoderRandomCrop(
"encoded", color_space=color_space
)
self.resize = flow.nn.image.Resize(
target_size=[image_width, image_height]
)
self.flip = flow.nn.CoinFlip(
batch_size=self.batch_size, placement=placement, sbp=sbp
)
self.crop_mirror_norm = flow.nn.CropMirrorNormalize(
color_space=color_space,
mean=rgb_mean,
std=rgb_std,
output_dtype=flow.float,
)
else:
self.image_decoder = flow.nn.OFRecordImageDecoder(
"encoded", color_space=color_space
)
self.resize = flow.nn.image.Resize(
resize_side="shorter",
keep_aspect_ratio=True,
target_size=resize_shorter,
)
self.crop_mirror_norm = flow.nn.CropMirrorNormalize(
color_space=color_space,
crop_h=image_height,
crop_w=image_width,
crop_pos_y=0.5,
crop_pos_x=0.5,
mean=rgb_mean,
std=rgb_std,
output_dtype=flow.float,
)
def __len__(self):
return self.dataset_size // self.total_batch_size
def forward(self):
if self.mode == "train":
record = self.ofrecord_reader()
if self.use_gpu_decode:
encoded = self.bytesdecoder_img(record)
image = self.image_decoder(encoded)
else:
image_raw_bytes = self.image_decoder(record)
image = self.resize(image_raw_bytes)[0]
image = image.to("cuda")
label = self.label_decoder(record)
flip_code = self.flip()
flip_code = flip_code.to("cuda")
image = self.crop_mirror_norm(image, flip_code)
else:
record = self.ofrecord_reader()
image_raw_bytes = self.image_decoder(record)
label = self.label_decoder(record)
image = self.resize(image_raw_bytes)[0]
image = self.crop_mirror_norm(image)
return image, label
class SyntheticDataLoader(flow.nn.Module):
def __init__(
self,
batch_size,
image_size=224,
num_classes=1000,
placement=None,
sbp=None,
channel_last=False,
):
super().__init__()
if channel_last:
self.image_shape = (batch_size, image_size, image_size, 3)
else:
self.image_shape = (batch_size, 3, image_size, image_size)
self.label_shape = (batch_size,)
self.num_classes = num_classes
self.placement = placement
self.sbp = sbp
if self.placement is not None and self.sbp is not None:
self.image = flow.nn.Parameter(
flow.randint(
0,
high=256,
size=self.image_shape,
dtype=flow.float32,
placement=self.placement,
sbp=self.sbp,
),
requires_grad=False,
)
self.label = flow.nn.Parameter(
flow.randint(
0,
high=self.num_classes,
size=self.label_shape,
placement=self.placement,
sbp=self.sbp,
).to(dtype=flow.int32),
requires_grad=False,
)
else:
self.image = flow.randint(
0, high=256, size=self.image_shape, dtype=flow.float32, device="cuda"
)
self.label = flow.randint(
0, high=self.num_classes, size=self.label_shape, device="cuda",
).to(dtype=flow.int32)
def forward(self):
return self.image, self.label
import oneflow as flow
def make_optimizer(args, model):
param_group = {"params": [p for p in model.parameters() if p is not None]}
if args.grad_clipping > 0.0:
assert args.grad_clipping == 1.0, "ONLY support grad_clipping == 1.0"
param_group["clip_grad_max_norm"] = (1.0,)
param_group["clip_grad_norm_type"] = (2.0,)
optimizer = flow.optim.SGD(
[param_group],
lr=args.learning_rate,
momentum=args.momentum,
weight_decay=args.weight_decay,
)
return optimizer
def make_grad_scaler():
return flow.amp.GradScaler(
init_scale=2 ** 30, growth_factor=2.0, backoff_factor=0.5, growth_interval=2000,
)
def make_static_grad_scaler():
return flow.amp.StaticGradScaler(flow.env.get_world_size())
def make_lr_scheduler(args, optimizer):
assert args.lr_decay_type in ("none", "cosine")
if args.lr_decay_type == "none":
return None
warmup_batches = args.batches_per_epoch * args.warmup_epochs
total_batches = args.batches_per_epoch * args.num_epochs
# TODO(zwx): These's no need that decay_batches minus warmup_batches
# decay_batches = total_batches - warmup_batches
decay_batches = total_batches
lr_scheduler = flow.optim.lr_scheduler.CosineDecayLR(
optimizer, decay_steps=decay_batches
)
if args.warmup_epochs > 0:
lr_scheduler = flow.optim.lr_scheduler.WarmUpLR(
lr_scheduler,
warmup_factor=0,
warmup_iters=warmup_batches,
warmup_method="linear",
)
return lr_scheduler
def make_cross_entropy(args):
if args.label_smoothing > 0:
cross_entropy = LabelSmoothLoss(
num_classes=args.num_classes, smooth_rate=args.label_smoothing
)
else:
cross_entropy = flow.nn.CrossEntropyLoss(reduction="mean")
return cross_entropy
class LabelSmoothLoss(flow.nn.Module):
def __init__(self, num_classes=-1, smooth_rate=0.0):
super().__init__()
self.num_classes = num_classes
self.smooth_rate = smooth_rate
# TODO(zwx): check this hyper param correction
self.on_value = 1 - self.smooth_rate + self.smooth_rate / self.num_classes
self.off_value = self.smooth_rate / self.num_classes
def forward(self, input, label):
onehot_label = flow._C.one_hot(
label, self.num_classes, self.on_value, self.off_value
)
# NOTE(zwx): manual way has bug
# log_prob = input.softmax(dim=-1).log()
# onehot_label = flow.F.cast(onehot_label, log_prob.dtype)
# loss = flow.mul(log_prob * -1, onehot_label).sum(dim=-1).mean()
loss = flow._C.softmax_cross_entropy(input, onehot_label.to(dtype=input.dtype))
return loss.mean()
import torch
import torch.nn as nn
from torch import Tensor
from typing import Type, Any, Callable, Union, List, Optional
def conv3x3(
in_planes: int, out_planes: int, stride: int = 1, groups: int = 1, dilation: int = 1
) -> nn.Conv2d:
"""3x3 convolution with padding"""
return nn.Conv2d(
in_planes,
out_planes,
kernel_size=3,
stride=stride,
padding=dilation,
groups=groups,
bias=False,
dilation=dilation,
)
def conv1x1(in_planes: int, out_planes: int, stride: int = 1) -> nn.Conv2d:
"""1x1 convolution"""
return nn.Conv2d(in_planes, out_planes, kernel_size=1, stride=stride, bias=False)
class BasicBlock(nn.Module):
expansion: int = 1
def __init__(
self,
inplanes: int,
planes: int,
stride: int = 1,
downsample: Optional[nn.Module] = None,
groups: int = 1,
base_width: int = 64,
dilation: int = 1,
norm_layer: Optional[Callable[..., nn.Module]] = None,
) -> None:
super(BasicBlock, self).__init__()
if norm_layer is None:
norm_layer = nn.BatchNorm2d
if groups != 1 or base_width != 64:
raise ValueError("BasicBlock only supports groups=1 and base_width=64")
if dilation > 1:
raise NotImplementedError("Dilation > 1 not supported in BasicBlock")
# Both self.conv1 and self.downsample layers downsample the input when stride != 1
self.conv1 = conv3x3(inplanes, planes, stride)
self.bn1 = norm_layer(planes)
self.relu = nn.ReLU(inplace=True)
self.conv2 = conv3x3(planes, planes)
self.bn2 = norm_layer(planes)
self.downsample = downsample
self.stride = stride
def forward(self, x: Tensor) -> Tensor:
identity = x
out = self.conv1(x)
out = self.bn1(out)
out = self.relu(out)
out = self.conv2(out)
out = self.bn2(out)
if self.downsample is not None:
identity = self.downsample(x)
out += identity
out = self.relu(out)
return out
class Bottleneck(nn.Module):
# Bottleneck in torchvision places the stride for downsampling at 3x3 convolution(self.conv2)
# while original implementation places the stride at the first 1x1 convolution(self.conv1)
# according to "Deep residual learning for image recognition"https://arxiv.org/abs/1512.03385.
# This variant is also known as ResNet V1.5 and improves accuracy according to
# https://ngc.nvidia.com/catalog/model-scripts/nvidia:resnet_50_v1_5_for_pytorch.
expansion: int = 4
def __init__(
self,
inplanes: int,
planes: int,
stride: int = 1,
downsample: Optional[nn.Module] = None,
groups: int = 1,
base_width: int = 64,
dilation: int = 1,
norm_layer: Optional[Callable[..., nn.Module]] = None,
) -> None:
super(Bottleneck, self).__init__()
if norm_layer is None:
norm_layer = nn.BatchNorm2d
width = int(planes * (base_width / 64.0)) * groups
# Both self.conv2 and self.downsample layers downsample the input when stride != 1
self.conv1 = conv1x1(inplanes, width)
self.bn1 = norm_layer(width)
self.conv2 = conv3x3(width, width, stride, groups, dilation)
self.bn2 = norm_layer(width)
self.conv3 = conv1x1(width, planes * self.expansion)
self.bn3 = norm_layer(planes * self.expansion)
self.relu = nn.ReLU(inplace=True)
self.downsample = downsample
self.stride = stride
def forward(self, x: Tensor) -> Tensor:
identity = x
out = self.conv1(x)
out = self.bn1(out)
out = self.relu(out)
out = self.conv2(out)
out = self.bn2(out)
out = self.relu(out)
out = self.conv3(out)
out = self.bn3(out)
if self.downsample is not None:
identity = self.downsample(x)
out += identity
out = self.relu(out)
return out
class ResNet(nn.Module):
def __init__(
self,
block: Type[Union[BasicBlock, Bottleneck]],
layers: List[int],
num_classes: int = 1000,
zero_init_residual: bool = False,
groups: int = 1,
width_per_group: int = 64,
replace_stride_with_dilation: Optional[List[bool]] = None,
norm_layer: Optional[Callable[..., nn.Module]] = None,
) -> None:
super(ResNet, self).__init__()
if norm_layer is None:
norm_layer = nn.BatchNorm2d
self._norm_layer = norm_layer
self.inplanes = 64
self.dilation = 1
if replace_stride_with_dilation is None:
# each element in the tuple indicates if we should replace
# the 2x2 stride with a dilated convolution instead
replace_stride_with_dilation = [False, False, False]
if len(replace_stride_with_dilation) != 3:
raise ValueError(
"replace_stride_with_dilation should be None "
"or a 3-element tuple, got {}".format(replace_stride_with_dilation)
)
self.groups = groups
self.base_width = width_per_group
self.conv1 = nn.Conv2d(
3, self.inplanes, kernel_size=7, stride=2, padding=3, bias=False
)
self.bn1 = norm_layer(self.inplanes)
self.relu = nn.ReLU(inplace=True)
self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
self.layer1 = self._make_layer(block, 64, layers[0])
self.layer2 = self._make_layer(
block, 128, layers[1], stride=2, dilate=replace_stride_with_dilation[0]
)
self.layer3 = self._make_layer(
block, 256, layers[2], stride=2, dilate=replace_stride_with_dilation[1]
)
self.layer4 = self._make_layer(
block, 512, layers[3], stride=2, dilate=replace_stride_with_dilation[2]
)
self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
self.fc = nn.Linear(512 * block.expansion, num_classes)
for m in self.modules():
if isinstance(m, nn.Conv2d):
nn.init.kaiming_normal_(m.weight, mode="fan_out", nonlinearity="relu")
elif isinstance(m, (nn.BatchNorm2d, nn.GroupNorm)):
nn.init.constant_(m.weight, 1)
nn.init.constant_(m.bias, 0)
# Zero-initialize the last BN in each residual branch,
# so that the residual branch starts with zeros, and each residual block behaves like an identity.
# This improves the model by 0.2~0.3% according to https://arxiv.org/abs/1706.02677
if zero_init_residual:
for m in self.modules():
if isinstance(m, Bottleneck):
nn.init.constant_(m.bn3.weight, 0) # type: ignore[arg-type]
elif isinstance(m, BasicBlock):
nn.init.constant_(m.bn2.weight, 0) # type: ignore[arg-type]
def _make_layer(
self,
block: Type[Union[BasicBlock, Bottleneck]],
planes: int,
blocks: int,
stride: int = 1,
dilate: bool = False,
) -> nn.Sequential:
norm_layer = self._norm_layer
downsample = None
previous_dilation = self.dilation
if dilate:
self.dilation *= stride
stride = 1
if stride != 1 or self.inplanes != planes * block.expansion:
downsample = nn.Sequential(
conv1x1(self.inplanes, planes * block.expansion, stride),
norm_layer(planes * block.expansion),
)
layers = []
layers.append(
block(
self.inplanes,
planes,
stride,
downsample,
self.groups,
self.base_width,
previous_dilation,
norm_layer,
)
)
self.inplanes = planes * block.expansion
for _ in range(1, blocks):
layers.append(
block(
self.inplanes,
planes,
groups=self.groups,
base_width=self.base_width,
dilation=self.dilation,
norm_layer=norm_layer,
)
)
return nn.Sequential(*layers)
def _forward_impl(self, x: Tensor) -> Tensor:
# See note [TorchScript super()]
x = self.conv1(x)
x = self.bn1(x)
x = self.relu(x)
x = self.maxpool(x)
x = self.layer1(x)
x = self.layer2(x)
x = self.layer3(x)
x = self.layer4(x)
x = self.avgpool(x)
x = torch.flatten(x, 1)
x = self.fc(x)
return x
def forward(self, x: Tensor) -> Tensor:
return self._forward_impl(x)
def _resnet(
arch: str,
block: Type[Union[BasicBlock, Bottleneck]],
layers: List[int],
**kwargs: Any
) -> ResNet:
model = ResNet(block, layers, **kwargs)
return model
def resnet50(**kwargs: Any) -> ResNet:
r"""ResNet-50 model from
`"Deep Residual Learning for Image Recognition" <https://arxiv.org/pdf/1512.03385.pdf>`_.
Args:
pretrained (bool): If True, returns a model pre-trained on ImageNet
progress (bool): If True, displays a progress bar of the download to stderr
"""
return _resnet("resnet50", Bottleneck, [3, 4, 6, 3], **kwargs)
import os
import oneflow as flow
import oneflow.nn as nn
from oneflow import Tensor
from typing import Type, Any, Callable, Union, List, Optional
def conv3x3(
in_planes: int, out_planes: int, stride: int = 1, groups: int = 1, dilation: int = 1
) -> nn.Conv2d:
"""3x3 convolution with padding"""
return nn.Conv2d(
in_planes,
out_planes,
kernel_size=3,
stride=stride,
padding=dilation,
groups=groups,
bias=False,
dilation=dilation,
)
def conv1x1(in_planes: int, out_planes: int, stride: int = 1) -> nn.Conv2d:
"""1x1 convolution"""
return nn.Conv2d(in_planes, out_planes, kernel_size=1, stride=stride, bias=False)
class BasicBlock(nn.Module):
expansion: int = 1
def __init__(
self,
inplanes: int,
planes: int,
stride: int = 1,
downsample: Optional[nn.Module] = None,
groups: int = 1,
base_width: int = 64,
dilation: int = 1,
norm_layer: Optional[Callable[..., nn.Module]] = None,
) -> None:
super(BasicBlock, self).__init__()
if norm_layer is None:
norm_layer = nn.BatchNorm2d
if groups != 1 or base_width != 64:
raise ValueError("BasicBlock only supports groups=1 and base_width=64")
if dilation > 1:
raise NotImplementedError("Dilation > 1 not supported in BasicBlock")
# Both self.conv1 and self.downsample layers downsample the input when stride != 1
self.conv1 = conv3x3(inplanes, planes, stride)
self.bn1 = norm_layer(planes)
self.relu = nn.ReLU()
self.conv2 = conv3x3(planes, planes)
self.bn2 = norm_layer(planes)
self.downsample = downsample
self.stride = stride
def forward(self, x: Tensor) -> Tensor:
identity = x
out = self.conv1(x)
out = self.bn1(out)
out = self.relu(out)
out = self.conv2(out)
out = self.bn2(out)
if self.downsample is not None:
identity = self.downsample(x)
out += identity
out = self.relu(out)
return out
class Bottleneck(nn.Module):
expansion: int = 4
def __init__(
self,
inplanes: int,
planes: int,
stride: int = 1,
downsample: Optional[nn.Module] = None,
groups: int = 1,
base_width: int = 64,
dilation: int = 1,
norm_layer: Optional[Callable[..., nn.Module]] = None,
fuse_bn_relu=False,
fuse_bn_add_relu=False,
) -> None:
super(Bottleneck, self).__init__()
self.fuse_bn_relu = fuse_bn_relu
self.fuse_bn_add_relu = fuse_bn_add_relu
if norm_layer is None:
norm_layer = nn.BatchNorm2d
width = int(planes * (base_width / 64.0)) * groups
# Both self.conv2 and self.downsample layers downsample the input when stride != 1
self.conv1 = conv1x1(inplanes, width)
if self.fuse_bn_relu:
self.bn1 = nn.FusedBatchNorm2d(width)
self.bn2 = nn.FusedBatchNorm2d(width)
else:
self.bn1 = norm_layer(width)
self.bn2 = norm_layer(width)
self.relu = nn.ReLU()
self.conv2 = conv3x3(width, width, stride, groups, dilation)
self.conv3 = conv1x1(width, planes * self.expansion)
if self.fuse_bn_add_relu:
self.bn3 = nn.FusedBatchNorm2d(planes * self.expansion)
else:
self.bn3 = norm_layer(planes * self.expansion)
self.relu = nn.ReLU()
self.downsample = downsample
self.stride = stride
def forward(self, x: Tensor) -> Tensor:
identity = x
if self.downsample is not None:
# Note self.downsample execute before self.conv1 has better performance
# when open allow_fuse_add_to_output optimizatioin in nn.Graph.
# Reference: https://github.com/Oneflow-Inc/OneTeam/issues/840#issuecomment-994903466
# Reference: https://github.com/NVIDIA/cudnn-frontend/issues/21
identity = self.downsample(x)
out = self.conv1(x)
if self.fuse_bn_relu:
out = self.bn1(out, None)
else:
out = self.bn1(out)
out = self.relu(out)
out = self.conv2(out)
if self.fuse_bn_relu:
out = self.bn2(out, None)
else:
out = self.bn2(out)
out = self.relu(out)
out = self.conv3(out)
if self.fuse_bn_add_relu:
out = self.bn3(out, identity)
else:
out = self.bn3(out)
out += identity
out = self.relu(out)
return out
class ResNet(nn.Module):
def __init__(
self,
block: Type[Union[BasicBlock, Bottleneck]],
layers: List[int],
num_classes: int = 1000,
zero_init_residual: bool = False,
groups: int = 1,
width_per_group: int = 64,
replace_stride_with_dilation: Optional[List[bool]] = None,
norm_layer: Optional[Callable[..., nn.Module]] = None,
fuse_bn_relu=False,
fuse_bn_add_relu=False,
channel_last=False,
) -> None:
super(ResNet, self).__init__()
if norm_layer is None:
norm_layer = nn.BatchNorm2d
self._norm_layer = norm_layer
self.fuse_bn_relu = fuse_bn_relu
self.fuse_bn_add_relu = fuse_bn_add_relu
self.channel_last = channel_last
if self.channel_last:
self.pad_input = True
else:
self.pad_input = False
self.inplanes = 64
self.dilation = 1
if replace_stride_with_dilation is None:
# each element in the tuple indicates if we should replace
# the 2x2 stride with a dilated convolution instead
replace_stride_with_dilation = [False, False, False]
if len(replace_stride_with_dilation) != 3:
raise ValueError(
"replace_stride_with_dilation should be None "
"or a 3-element tuple, got {}".format(replace_stride_with_dilation)
)
self.groups = groups
self.base_width = width_per_group
if self.pad_input:
channel_size = 4
else:
channel_size = 3
if self.channel_last:
os.environ["ONEFLOW_ENABLE_NHWC"] = "1"
self.conv1 = nn.Conv2d(
channel_size, self.inplanes, kernel_size=7, stride=2, padding=3, bias=False
)
if self.fuse_bn_relu:
self.bn1 = nn.FusedBatchNorm2d(self.inplanes)
else:
self.bn1 = self._norm_layer(self.inplanes)
self.relu = nn.ReLU()
self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
self.layer1 = self._make_layer(block, 64, layers[0])
self.layer2 = self._make_layer(
block, 128, layers[1], stride=2, dilate=replace_stride_with_dilation[0]
)
self.layer3 = self._make_layer(
block, 256, layers[2], stride=2, dilate=replace_stride_with_dilation[1]
)
self.layer4 = self._make_layer(
block, 512, layers[3], stride=2, dilate=replace_stride_with_dilation[2]
)
self.avgpool = nn.AvgPool2d((7, 7), stride=(1, 1))
self.fc = nn.Linear(512 * block.expansion, num_classes)
for m in self.modules():
if isinstance(m, nn.Conv2d):
nn.init.kaiming_normal_(m.weight, mode="fan_out", nonlinearity="relu")
elif isinstance(m, nn.BatchNorm2d):
nn.init.constant_(m.weight, 1)
nn.init.constant_(m.bias, 0)
# Zero-initialize the last BN in each residual branch,
# so that the residual branch starts with zeros, and each residual block behaves like an identity.
# This improves the model by 0.2~0.3% according to https://arxiv.org/abs/1706.02677
if zero_init_residual:
for m in self.modules():
if isinstance(m, Bottleneck):
nn.init.constant_(m.bn3.weight, 0) # type: ignore[arg-type]
elif isinstance(m, BasicBlock):
nn.init.constant_(m.bn2.weight, 0) # type: ignore[arg-type]
def _make_layer(
self,
block: Type[Union[BasicBlock, Bottleneck]],
planes: int,
blocks: int,
stride: int = 1,
dilate: bool = False,
) -> nn.Sequential:
norm_layer = self._norm_layer
downsample = None
previous_dilation = self.dilation
if dilate:
self.dilation *= stride
stride = 1
if stride != 1 or self.inplanes != planes * block.expansion:
downsample = nn.Sequential(
conv1x1(self.inplanes, planes * block.expansion, stride),
norm_layer(planes * block.expansion),
)
layers = []
layers.append(
block(
self.inplanes,
planes,
stride,
downsample,
self.groups,
self.base_width,
previous_dilation,
norm_layer,
fuse_bn_relu=self.fuse_bn_relu,
fuse_bn_add_relu=self.fuse_bn_add_relu,
)
)
self.inplanes = planes * block.expansion
for _ in range(1, blocks):
layers.append(
block(
self.inplanes,
planes,
groups=self.groups,
base_width=self.base_width,
dilation=self.dilation,
norm_layer=norm_layer,
fuse_bn_relu=self.fuse_bn_relu,
fuse_bn_add_relu=self.fuse_bn_add_relu,
)
)
return nn.Sequential(*layers)
def _forward_impl(self, x: Tensor) -> Tensor:
if self.pad_input:
if self.channel_last:
# NHWC
paddings = (0, 1)
else:
# NCHW
paddings = (0, 0, 0, 0, 0, 1)
x = flow._C.pad(x, pad=paddings, mode="constant", value=0)
x = self.conv1(x)
if self.fuse_bn_relu:
x = self.bn1(x, None)
else:
x = self.bn1(x)
x = self.relu(x)
x = self.maxpool(x)
x = self.layer1(x)
x = self.layer2(x)
x = self.layer3(x)
x = self.layer4(x)
x = self.avgpool(x)
x = flow.flatten(x, 1)
x = self.fc(x)
return x
def forward(self, x: Tensor) -> Tensor:
return self._forward_impl(x)
def _resnet(
arch: str,
block: Type[Union[BasicBlock, Bottleneck]],
layers: List[int],
**kwargs: Any
) -> ResNet:
model = ResNet(block, layers, **kwargs)
return model
def resnet50(**kwargs: Any) -> ResNet:
r"""ResNet-5
`"Deep Residual Learning for Image Recognition" <https://arxiv.org/pdf/1512.03385.pdf>`_.
"""
return _resnet("resnet50", Bottleneck, [3, 4, 6, 3], **kwargs)
import os
import sys
sys.path.append(
os.path.abspath(os.path.join(os.path.dirname(__file__), os.path.pardir))
)
import numpy as np
import time
import oneflow as flow
from oneflow.nn.parallel import DistributedDataParallel as ddp
from config import get_args
from graph import make_train_graph, make_eval_graph
from models.resnet50 import resnet50, Bottleneck
from models.data import make_data_loader
from models.optimizer import make_optimizer
from models.optimizer import make_lr_scheduler
from models.optimizer import make_cross_entropy
from models.accuracy import Accuracy
import utils.logger as log
from utils.stat import CudaUtilMemStat
class Trainer(object):
def __init__(self):
args = get_args()
for k, v in args.__dict__.items():
setattr(self, k, v)
self.rank = flow.env.get_rank()
self.world_size = flow.env.get_world_size()
self.cur_epoch = 0
self.cur_iter = 0
self.cur_batch = 0
self.is_global = (self.world_size > 1 and not self.ddp) or self.graph
self.is_train = False
self.meter_lr = self.graph is False
self.init_logger()
flow.boxing.nccl.set_fusion_threshold_mbytes(self.nccl_fusion_threshold_mb)
flow.boxing.nccl.set_fusion_max_ops_num(self.nccl_fusion_max_ops)
if self.use_fp16 and self.num_nodes * self.num_devices_per_node > 1:
flow.boxing.nccl.enable_use_buffer_to_fuse_all_reduce(False)
self.model = resnet50(
zero_init_residual=self.zero_init_residual,
fuse_bn_relu=self.fuse_bn_relu,
fuse_bn_add_relu=self.fuse_bn_add_relu,
channel_last=self.channel_last,
)
self.init_model()
self.cross_entropy = make_cross_entropy(args)
self.train_data_loader = make_data_loader(
args, "train", self.is_global, self.synthetic_data
)
self.val_data_loader = make_data_loader(
args, "validation", self.is_global, self.synthetic_data
)
self.optimizer = make_optimizer(args, self.model)
self.lr_scheduler = make_lr_scheduler(args, self.optimizer)
self.acc = Accuracy()
if self.graph:
self.train_graph = make_train_graph(
self.model,
self.cross_entropy,
self.train_data_loader,
self.optimizer,
self.lr_scheduler,
return_pred_and_label=self.metric_train_acc,
)
self.eval_graph = make_eval_graph(self.model, self.val_data_loader)
if self.gpu_stat_file is not None:
self.gpu_stat = CudaUtilMemStat(
f"rank{self.rank}_" + self.gpu_stat_file, only_ordinal=self.rank
)
else:
self.gpu_stat = None
def init_model(self):
self.logger.print("***** Model Init *****", print_ranks=[0])
start_t = time.perf_counter()
if self.is_global:
placement = flow.env.all_device_placement("cuda")
self.model = self.model.to_global(
placement=placement, sbp=flow.sbp.broadcast
)
else:
self.model = self.model.to("cuda")
if self.load_path is None:
self.legacy_init_parameters()
else:
self.load_state_dict()
if self.ddp:
self.model = ddp(self.model)
if self.save_init:
self.save("init")
end_t = time.perf_counter()
self.logger.print(
f"***** Model Init Finish, time escapled: {end_t - start_t:.5f} s *****",
print_ranks=[0],
)
def legacy_init_parameters(self):
if not self.legacy_init:
return
for m in self.model.modules():
# NOTE(zwx): legacy BatchNorm initializer in Benchmark seems wrong, so don't follow it
if isinstance(m, flow.nn.Conv2d):
flow.nn.init.kaiming_normal_(
m.weight, mode="fan_in", nonlinearity="relu"
)
elif isinstance(m, flow.nn.Linear):
flow.nn.init.kaiming_normal_(
m.weight, mode="fan_in", nonlinearity="relu"
)
flow.nn.init.constant_(m.bias, 0)
elif isinstance(m, flow.nn.BatchNorm2d):
flow.nn.init.constant_(m.weight, 1)
flow.nn.init.constant_(m.bias, 0)
for m in self.model.modules():
if isinstance(m, Bottleneck):
flow.nn.init.constant_(m.bn3.weight, 0)
def load_state_dict(self):
self.logger.print(f"Loading model from {self.load_path}", print_ranks=[0])
if self.is_global:
state_dict = flow.load(self.load_path, global_src_rank=0)
elif self.rank == 0:
state_dict = flow.load(self.load_path)
else:
return
self.model.load_state_dict(state_dict)
def init_logger(self):
if self.metric_local:
print_ranks = list(range(self.world_size))
else:
print_ranks = [0]
self.logger = log.get_logger(self.rank, print_ranks)
self.logger.register_metric("job", log.IterationMeter(), "[{}]")
self.logger.register_metric("epoch", log.IterationMeter(), "epoch: {}/{}")
self.logger.register_metric("iter", log.IterationMeter(), "iter: {}/{}")
self.logger.register_metric("loss", log.AverageMeter(), "loss: {:.5f}", True)
if self.meter_lr:
self.logger.register_metric("lr", log.IterationMeter(), "lr: {:.6f}")
self.logger.register_metric("top1", log.AverageMeter(), "top1: {:.5f}", True)
time_meter_str = (
"throughput: {:.2f}, timestamp: {:.6f}"
if self.print_timestamp
else "throughput: {:.2f}"
)
self.logger.register_metric(
"time", log.TimeMeter(self.print_timestamp), time_meter_str, True
)
def meter(
self,
epoch_pg=None,
iter_pg=None,
loss=None,
lr=None,
top1=None,
num_samples=1,
do_print=False,
):
self.logger.meter("job", "train" if self.is_train else "eval")
self.logger.meter("epoch", epoch_pg or (self.cur_epoch, self.num_epochs))
self.logger.meter("iter", iter_pg or (self.cur_iter, self.batches_per_epoch))
if loss is not None:
self.logger.meter("loss", loss)
if lr is not None and self.meter_lr:
self.logger.meter("lr", lr)
if top1 is not None:
self.logger.meter("top1", top1)
self.logger.meter("time", num_samples)
if do_print:
self.logger.print_metrics()
if self.gpu_stat is not None:
self.gpu_stat.stat()
def meter_train_iter(self, loss, top1):
assert self.is_train is True
lr = None
if self.meter_lr:
lr = self.optimizer.param_groups[0]["lr"]
do_print = (
self.cur_iter % self.print_interval == 0
or self.cur_iter == self.batches_per_epoch
)
self.meter(
loss=loss,
lr=lr,
top1=top1,
num_samples=self.train_batch_size,
do_print=do_print,
)
def __call__(self):
self.train()
def train(self):
self.logger.metric("time").reset()
for _ in range(self.num_epochs):
self.train_one_epoch()
if self.cur_batch == self.total_batches:
break
if not self.skip_eval:
acc = self.eval()
else:
acc = 0
#save_dir = f"epoch_{self.cur_epoch}_val_acc_{acc}"
#self.save(save_dir)
self.cur_epoch += 1
self.cur_iter = 0
def train_one_epoch(self):
self.model.train()
self.is_train = True
for _ in range(self.batches_per_epoch):
if self.graph:
loss, pred, label = self.train_graph()
else:
loss, pred, label = self.train_eager()
self.cur_iter += 1
loss = tol(loss, self.metric_local)
if pred is not None and label is not None:
pred = tol(pred, self.metric_local)
label = tol(label, self.metric_local)
top1_acc = self.acc([pred], [label])
else:
top1_acc = 0
self.meter_train_iter(loss, top1_acc)
self.cur_batch += 1
if self.cur_batch == self.total_batches:
break
def train_eager(self):
loss, pred, label = self.forward()
if loss.is_global and self.scale_grad:
# NOTE(zwx): scale init grad with world_size
# because global_tensor.mean() include dividor numel * world_size
loss = loss / self.world_size
loss.backward()
for param_group in self.optimizer.param_groups:
for param in param_group.parameters:
param.grad /= self.world_size
else:
loss.backward()
loss = loss / self.world_size
self.optimizer.step()
self.optimizer.zero_grad()
if self.lr_scheduler:
self.lr_scheduler.step()
return loss, pred, label
def eval(self):
self.model.eval()
self.is_train = False
preds, labels = [], []
for _ in range(self.val_batches_per_epoch):
if self.graph:
pred, label = self.eval_graph()
else:
pred, label = self.inference()
preds.append(tton(pred, self.metric_local))
labels.append(tton(label, self.metric_local))
top1_acc = calc_acc(preds, labels)
self.meter(
iter_pg=(self.val_batches_per_epoch, self.val_batches_per_epoch),
loss=0.0,
top1=top1_acc,
num_samples=self.val_batch_size * self.val_batches_per_epoch,
do_print=True,
)
return top1_acc
def forward(self):
image, label = self.train_data_loader()
image = image.to("cuda")
label = label.to("cuda")
logits = self.model(image)
loss = self.cross_entropy(logits, label)
if self.metric_train_acc:
pred = logits.softmax()
return loss, pred, label
else:
return loss, None, None
def inference(self):
image, label = self.val_data_loader()
image = image.to("cuda")
label = label.to("cuda")
with flow.no_grad():
logits = self.model(image)
pred = logits.softmax()
return pred, label
def save(self, subdir):
if self.save_path is None:
return
save_path = os.path.join(self.save_path, subdir)
self.logger.print(f"Saving model to {save_path}", print_ranks=[0])
state_dict = self.model.state_dict()
if self.is_global:
flow.save(state_dict, save_path, global_dst_rank=0)
elif self.rank == 0:
flow.save(state_dict, save_path)
else:
return
def tol(tensor, pure_local=True):
""" to local """
if tensor.is_global:
if pure_local:
tensor = tensor.to_local()
else:
tensor = tensor.to_global(sbp=flow.sbp.broadcast).to_local()
return tensor
def tton(tensor, local_only=True):
""" tensor to numpy """
if tensor.is_global:
if local_only:
tensor = tensor.to_local().numpy()
else:
tensor = tensor.to_global(sbp=flow.sbp.broadcast).to_local().numpy()
else:
tensor = tensor.numpy()
return tensor
def calc_acc(preds, labels):
correct_of = 0.0
num_samples = 0
for pred, label in zip(preds, labels):
clsidxs = np.argmax(pred, axis=1)
correct_of += (clsidxs == label).sum()
num_samples += label.size
top1_acc = correct_of / num_samples
return top1_acc
if __name__ == "__main__":
trainer = Trainer()
trainer()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment