Unverified Commit fe89e5af authored by Zhenhua Han's avatar Zhenhua Han Committed by GitHub
Browse files

Update CGO to support pytorch-lightning 1.6.1 (#4814)

parent cd387899
......@@ -7,7 +7,7 @@ torch == 1.10.0+cpu ; sys_platform != "darwin"
torch == 1.10.0 ; sys_platform == "darwin"
torchvision == 0.11.1+cpu ; sys_platform != "darwin"
torchvision == 0.11.1 ; sys_platform == "darwin"
pytorch-lightning >= 1.5.0, < 1.6.0
pytorch-lightning >= 1.6.1
torchmetrics
lightgbm
onnx
......
......@@ -4,7 +4,7 @@
tensorflow
torch == 1.10.0+cu111
torchvision == 0.11.1+cu111
pytorch-lightning >= 1.5.0, < 1.6.0
pytorch-lightning >= 1.6.1
lightgbm
onnx
peewee
......
......@@ -63,7 +63,7 @@ CGO Execution Engine (experimental)
CGO (Cross-Graph Optimization) execution engine does cross-model optimizations based on the graph-based execution engine. In CGO execution engine, multiple models could be merged and trained together in one trial.
Currently, it only supports ``DedupInputOptimizer`` that can merge graphs sharing the same dataset to only loading and pre-processing each batch of data once, which can avoid bottleneck on data loading.
.. note :: To use CGO engine, PyTorch Lightning of 1.5.x is required.
.. note :: To use CGO engine, PyTorch Lightning >= 1.6.1 is required.
To enable CGO execution engine, you need to follow these steps:
......
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.
from typing import Any, List, Optional, Union
import torch
from pytorch_lightning.accelerators.accelerator import Accelerator
from pytorch_lightning.plugins.environments import ClusterEnvironment
from pytorch_lightning.plugins.training_type.training_type_plugin import TrainingTypePlugin
from pytorch_lightning.trainer import Trainer
from pytorch_lightning.trainer.connectors.accelerator_connector import AcceleratorConnector
import nni
class BypassPlugin(TrainingTypePlugin):
""" Plugin that handles communication on a single device. """
def __init__(self, device: str):
super().__init__()
self.device: str = device
self.global_rank = 0
self.local_rank = 0
self.world_size = 1
def connect(self, model: torch.nn.Module) -> torch.nn.Module:
self._model = model
self.model_to_device()
return self.model
@property
def on_tpu(self) -> bool:
return False
@property
def on_gpu(self) -> bool:
return "cuda" in self.device and torch.cuda.is_available()
def reduce(self, tensor: Union[Any, torch.Tensor], *args: Any, **kwargs: Any) -> Union[Any, torch.Tensor]:
"""
Reduces a tensor from several distributed processes to one aggregated tensor.
As this plugin only operates with a single device, the reduction is simply the identity.
Args:
tensor: the tensor to sync and reduce
*args: ignored
**kwargs: ignored
Return:
the unmodified input as reduction is not needed for single process operation
"""
return tensor
def all_gather(self, tensor: torch.Tensor, group: Optional[Any] = None, sync_grads: bool = False) -> torch.Tensor:
"""Perform a all_gather on all processes """
return tensor
def teardown(self):
"""
This method is called to teardown the training process.
It is the right place to release memory and free other resources.
"""
pass
@property
def root_device(self) -> torch.device:
return torch.device(self.device)
def model_to_device(self) -> None:
# bypass device placement from pytorch lightning
pass
def setup(self) -> None:
pass
@property
def is_global_zero(self) -> bool:
return True
def barrier(self, *args, **kwargs) -> None:
pass
def broadcast(self, obj: object, src: int = 0) -> object:
return obj
def get_accelerator_connector(
num_processes: int = 1,
devices: Optional[Union[List[int], str, int]] = None,
tpu_cores: Optional[Union[List[int], str, int]] = None,
ipus: Optional[int] = None,
distributed_backend: Optional[str] = None,
accelerator: Optional[Union[str, Accelerator]] = None,
gpus: Optional[Union[List[int], str, int]] = None,
auto_select_gpus: bool = False,
num_nodes: int = 1,
sync_batchnorm: bool = False,
benchmark: bool = False,
replace_sampler_ddp: bool = True,
deterministic: bool = False,
precision: int = 32,
amp_backend: str = 'native',
amp_level: Optional[str] = None,
plugins: Optional[Union[List[Union[TrainingTypePlugin, ClusterEnvironment, str]],
TrainingTypePlugin, ClusterEnvironment, str]] = None,
**other_trainier_kwargs) -> AcceleratorConnector:
gpu_ids = Trainer()._parse_devices(gpus, auto_select_gpus, tpu_cores)
return AcceleratorConnector(
num_processes,
devices,
tpu_cores,
ipus,
distributed_backend,
accelerator,
gpus,
gpu_ids,
num_nodes,
sync_batchnorm,
benchmark,
replace_sampler_ddp,
deterministic,
precision,
amp_backend,
amp_level,
plugins,
)
@nni.trace
class BypassAccelerator(Accelerator):
def __init__(self, precision_plugin=None, device="cpu", **trainer_kwargs):
if precision_plugin is None:
precision_plugin = get_accelerator_connector(**trainer_kwargs).select_precision_plugin()
# pylint: disable=abstract-class-instantiated
super().__init__(precision_plugin=precision_plugin, training_type_plugin=BypassPlugin(device))
......@@ -2,7 +2,13 @@
# Licensed under the MIT license.
import pytorch_lightning as pl
from .accelerator import BypassAccelerator
from pytorch_lightning.strategies import SingleDeviceStrategy
class BypassStrategy(SingleDeviceStrategy):
strategy_name = "single_device"
def model_to_device(self) -> None:
pass
class Trainer(pl.Trainer):
"""
......@@ -24,6 +30,9 @@ class Trainer(pl.Trainer):
if use_cgo:
if "accelerator" in trainer_kwargs:
raise ValueError("accelerator should not be set when cross-graph optimization is enabled.")
trainer_kwargs['accelerator'] = BypassAccelerator(device='cpu', **trainer_kwargs)
if 'strategy' in trainer_kwargs:
raise ValueError("cgo.trainer does not support specifying strategy")
trainer_kwargs['strategy'] = BypassStrategy()
super().__init__(**trainer_kwargs)
......@@ -320,9 +320,9 @@ class BaseOneShotLightningModule(pl.LightningModule):
# under v1.5
w_optimizers, lr_schedulers, self.frequencies, monitor = \
self.trainer._configure_optimizers(self.model.configure_optimizers()) # type: ignore
lr_schedulers = self.trainer._configure_schedulers(lr_schedulers, monitor, not self.automatic_optimization)
lr_schedulers = self.trainer._configure_schedulers(lr_schedulers, monitor, not self.automatic_optimization) # type: ignore
if any(sch["scheduler"].optimizer not in w_optimizers for sch in lr_schedulers):
if any(sch["scheduler"].optimizer not in w_optimizers for sch in lr_schedulers): # type: ignore
raise Exception(
"Some schedulers are attached with an optimizer that wasn't returned from `configure_optimizers`."
)
......
......@@ -40,12 +40,12 @@ class _InvertedResidual(nn.Module):
# Pointwise
nn.Conv2d(in_ch, mid_ch, 1, bias=False),
nn.BatchNorm2d(mid_ch, momentum=bn_momentum),
nn.ReLU(inplace=True),
nn.ReLU(inplace=False),
# Depthwise
nn.Conv2d(mid_ch, mid_ch, kernel_size, padding=kernel_size // 2,
stride=stride, groups=mid_ch, bias=False),
nn.BatchNorm2d(mid_ch, momentum=bn_momentum),
nn.ReLU(inplace=True),
nn.ReLU(inplace=False),
# Linear pointwise. Note that there's no activation.
nn.Conv2d(mid_ch, out_ch, 1, bias=False),
nn.BatchNorm2d(out_ch, momentum=bn_momentum))
......@@ -78,14 +78,14 @@ def _stack_normal_conv(in_ch, out_ch, kernel_size, skip, dconv, stride, repeats,
modules = [
nn.Conv2d(in_ch, in_ch, kernel_size, padding=kernel_size // 2, stride=s, groups=in_ch, bias=False),
nn.BatchNorm2d(in_ch, momentum=bn_momentum),
nn.ReLU(inplace=True),
nn.ReLU(inplace=False),
nn.Conv2d(in_ch, out_ch, 1, padding=0, stride=1, bias=False),
nn.BatchNorm2d(out_ch, momentum=bn_momentum)
]
else:
modules = [
nn.Conv2d(in_ch, out_ch, kernel_size, padding=kernel_size // 2, stride=s, bias=False),
nn.ReLU(inplace=True),
nn.ReLU(inplace=False),
nn.BatchNorm2d(out_ch, momentum=bn_momentum)
]
if skip and in_ch == out_ch and s == 1:
......@@ -141,7 +141,7 @@ class MNASNet(nn.Module):
# First layer: regular conv.
nn.Conv2d(3, depths[0], 3, padding=1, stride=2, bias=False),
nn.BatchNorm2d(depths[0], momentum=_BN_MOMENTUM),
nn.ReLU(inplace=True),
nn.ReLU(inplace=False),
]
count = 0
# for conv, prev_depth, depth, ks, skip, stride, repeat, exp_ratio in \
......@@ -177,10 +177,10 @@ class MNASNet(nn.Module):
# Final mapping to classifier input.
nn.Conv2d(depths[7], 1280, 1, padding=0, stride=1, bias=False),
nn.BatchNorm2d(1280, momentum=_BN_MOMENTUM),
nn.ReLU(inplace=True),
nn.ReLU(inplace=False),
]
self.layers = nn.Sequential(*layers)
self.classifier = nn.Sequential(nn.Dropout(p=dropout, inplace=True),
self.classifier = nn.Sequential(nn.Dropout(p=dropout, inplace=False),
nn.Linear(1280, num_classes))
self._initialize_weights()
#self.for_test = 10
......@@ -228,7 +228,7 @@ class RegularConv(nn.Module):
self.stride = stride
self.conv = nn.Conv2d(in_ch, out_ch, kernel_size, padding=kernel_size // 2, stride=stride, bias=False)
self.relu = nn.ReLU(inplace=True)
self.relu = nn.ReLU(inplace=False)
self.bn = nn.BatchNorm2d(out_ch, momentum=BN_MOMENTUM)
def forward(self, x):
......@@ -250,7 +250,7 @@ class DepthwiseConv(nn.Module):
self.conv1 = nn.Conv2d(in_ch, in_ch, kernel_size, padding=kernel_size // 2, stride=stride, groups=in_ch, bias=False)
self.bn1 = nn.BatchNorm2d(in_ch, momentum=BN_MOMENTUM)
self.relu = nn.ReLU(inplace=True)
self.relu = nn.ReLU(inplace=False)
self.conv2 = nn.Conv2d(in_ch, out_ch, 1, padding=0, stride=1, bias=False)
self.bn2 = nn.BatchNorm2d(out_ch, momentum=BN_MOMENTUM)
......@@ -277,12 +277,12 @@ class MobileConv(nn.Module):
# Pointwise
nn.Conv2d(in_ch, mid_ch, 1, bias=False),
nn.BatchNorm2d(mid_ch, momentum=BN_MOMENTUM),
nn.ReLU(inplace=True),
nn.ReLU(inplace=False),
# Depthwise
nn.Conv2d(mid_ch, mid_ch, kernel_size, padding=(kernel_size - 1) // 2,
stride=stride, groups=mid_ch, bias=False),
nn.BatchNorm2d(mid_ch, momentum=BN_MOMENTUM),
nn.ReLU(inplace=True),
nn.ReLU(inplace=False),
# Linear pointwise. Note that there's no activation.
nn.Conv2d(mid_ch, out_ch, 1, bias=False),
nn.BatchNorm2d(out_ch, momentum=BN_MOMENTUM))
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment