Unverified Commit 7aa44612 authored by Zhenhua Han's avatar Zhenhua Han Committed by GitHub
Browse files

Fix compatibility of Retiarii CGO (#4621)

* fix compatibility of CGO
* remove old test files for CGO
* revert to fit pytorch-lightning 1.5.x
* update PyTorch-lightning version in doc
* fix nni.trace issue
parent fe02b808
...@@ -63,7 +63,7 @@ CGO Execution Engine (experimental) ...@@ -63,7 +63,7 @@ CGO Execution Engine (experimental)
CGO (Cross-Graph Optimization) execution engine does cross-model optimizations based on the graph-based execution engine. In CGO execution engine, multiple models could be merged and trained together in one trial. CGO (Cross-Graph Optimization) execution engine does cross-model optimizations based on the graph-based execution engine. In CGO execution engine, multiple models could be merged and trained together in one trial.
Currently, it only supports ``DedupInputOptimizer`` that can merge graphs sharing the same dataset to only loading and pre-processing each batch of data once, which can avoid bottleneck on data loading. Currently, it only supports ``DedupInputOptimizer`` that can merge graphs sharing the same dataset to only loading and pre-processing each batch of data once, which can avoid bottleneck on data loading.
.. note :: To use CGO engine, PyTorch-lightning above version 1.4.2 is required. .. note :: To use CGO engine, PyTorch Lightning of 1.5.x is required.
To enable CGO execution engine, you need to follow these steps: To enable CGO execution engine, you need to follow these steps:
......
...@@ -29,8 +29,20 @@ class _MultiModelSupervisedLearningModule(LightningModule): ...@@ -29,8 +29,20 @@ class _MultiModelSupervisedLearningModule(LightningModule):
self.criterion_cls = criterion self.criterion_cls = criterion
self.optimizer = optimizer self.optimizer = optimizer
self.metrics = nn.ModuleDict({name: cls() for name, cls in metrics.items()}) self.metrics = nn.ModuleDict({name: cls() for name, cls in metrics.items()})
self.metrics_args = metrics
self.n_models = n_models self.n_models = n_models
def dump_kwargs(self):
kwargs = {}
kwargs['criterion'] = self.criterion_cls
kwargs['metrics'] = self.metrics_args
kwargs['n_models'] = self.n_models
kwargs['learning_rate'] = self.hparams['learning_rate']
kwargs['weight_decay'] = self.hparams['weight_decay']
kwargs['optimizer'] = self.optimizer
return kwargs
def forward(self, x): def forward(self, x):
y_hat = self.model(x) y_hat = self.model(x)
return y_hat return y_hat
...@@ -101,7 +113,6 @@ class _MultiModelSupervisedLearningModule(LightningModule): ...@@ -101,7 +113,6 @@ class _MultiModelSupervisedLearningModule(LightningModule):
return {name: self.trainer.callback_metrics['val_' + name].item() for name in self.metrics} return {name: self.trainer.callback_metrics['val_' + name].item() for name in self.metrics}
@nni.trace
class MultiModelSupervisedLearningModule(_MultiModelSupervisedLearningModule): class MultiModelSupervisedLearningModule(_MultiModelSupervisedLearningModule):
""" """
Lightning Module of SupervisedLearning for Cross-Graph Optimization. Lightning Module of SupervisedLearning for Cross-Graph Optimization.
...@@ -126,8 +137,7 @@ class MultiModelSupervisedLearningModule(_MultiModelSupervisedLearningModule): ...@@ -126,8 +137,7 @@ class MultiModelSupervisedLearningModule(_MultiModelSupervisedLearningModule):
super().__init__(criterion, metrics, learning_rate=learning_rate, weight_decay=weight_decay, optimizer=optimizer) super().__init__(criterion, metrics, learning_rate=learning_rate, weight_decay=weight_decay, optimizer=optimizer)
@nni.trace class _ClassificationModule(_MultiModelSupervisedLearningModule):
class _ClassificationModule(MultiModelSupervisedLearningModule):
def __init__(self, criterion: nn.Module = nn.CrossEntropyLoss, def __init__(self, criterion: nn.Module = nn.CrossEntropyLoss,
learning_rate: float = 0.001, learning_rate: float = 0.001,
weight_decay: float = 0., weight_decay: float = 0.,
...@@ -173,9 +183,7 @@ class Classification(Lightning): ...@@ -173,9 +183,7 @@ class Classification(Lightning):
super().__init__(module, Trainer(use_cgo=True, **trainer_kwargs), super().__init__(module, Trainer(use_cgo=True, **trainer_kwargs),
train_dataloader=train_dataloader, val_dataloaders=val_dataloaders) train_dataloader=train_dataloader, val_dataloaders=val_dataloaders)
class _RegressionModule(_MultiModelSupervisedLearningModule):
@nni.trace
class _RegressionModule(MultiModelSupervisedLearningModule):
def __init__(self, criterion: nn.Module = nn.MSELoss, def __init__(self, criterion: nn.Module = nn.MSELoss,
learning_rate: float = 0.001, learning_rate: float = 0.001,
weight_decay: float = 0., weight_decay: float = 0.,
......
...@@ -2,11 +2,8 @@ ...@@ -2,11 +2,8 @@
# Licensed under the MIT license. # Licensed under the MIT license.
import pytorch_lightning as pl import pytorch_lightning as pl
import nni
from .accelerator import BypassAccelerator from .accelerator import BypassAccelerator
@nni.trace
class Trainer(pl.Trainer): class Trainer(pl.Trainer):
""" """
Trainer for cross-graph optimization. Trainer for cross-graph optimization.
......
...@@ -200,7 +200,7 @@ class CGOExecutionEngine(AbstractExecutionEngine): ...@@ -200,7 +200,7 @@ class CGOExecutionEngine(AbstractExecutionEngine):
# replace the module with a new instance whose n_models is set # replace the module with a new instance whose n_models is set
# n_models must be set in __init__, otherwise it cannot be captured by serialize_cls # n_models must be set in __init__, otherwise it cannot be captured by serialize_cls
new_module_init_params = model.evaluator.module.trace_kwargs.copy() new_module_init_params = model.evaluator.module.dump_kwargs().copy()
# MultiModelSupervisedLearningModule hides n_models of _MultiModelSupervisedLearningModule from users # MultiModelSupervisedLearningModule hides n_models of _MultiModelSupervisedLearningModule from users
new_module_init_params['n_models'] = len(multi_model) new_module_init_params['n_models'] = len(multi_model)
......
from collections import OrderedDict
from typing import (List, Optional)
import torch
import torch.nn as torch_nn
#sys.path.append(str(Path(__file__).resolve().parents[2]))
import ops
import nni.retiarii.nn.pytorch as nn
from nni.retiarii import basic_unit
@basic_unit
class AuxiliaryHead(nn.Module):
""" Auxiliary head in 2/3 place of network to let the gradient flow well """
def __init__(self, input_size, C, n_classes):
""" assuming input size 7x7 or 8x8 """
assert input_size in [7, 8]
super().__init__()
self.net = nn.Sequential(
nn.ReLU(inplace=True),
nn.AvgPool2d(5, stride=input_size - 5, padding=0, count_include_pad=False), # 2x2 out
nn.Conv2d(C, 128, kernel_size=1, bias=False),
nn.BatchNorm2d(128),
nn.ReLU(inplace=True),
nn.Conv2d(128, 768, kernel_size=2, bias=False), # 1x1 out
nn.BatchNorm2d(768),
nn.ReLU(inplace=True)
)
self.linear = nn.Linear(768, n_classes)
def forward(self, x):
out = self.net(x)
out = out.view(out.size(0), -1) # flatten
logits = self.linear(out)
return logits
class Node(nn.Module):
def __init__(self, node_id, num_prev_nodes, channels, num_downsample_connect):
super().__init__()
self.ops = nn.ModuleList()
choice_keys = []
for i in range(num_prev_nodes):
stride = 2 if i < num_downsample_connect else 1
choice_keys.append("{}_p{}".format(node_id, i))
self.ops.append(
nn.LayerChoice([
ops.PoolBN('max', channels, 3, stride, 1, affine=False),
ops.PoolBN('avg', channels, 3, stride, 1, affine=False),
nn.Identity() if stride == 1 else ops.FactorizedReduce(channels, channels, affine=False),
ops.SepConv(channels, channels, 3, stride, 1, affine=False),
ops.SepConv(channels, channels, 5, stride, 2, affine=False),
ops.DilConv(channels, channels, 3, stride, 2, 2, affine=False),
ops.DilConv(channels, channels, 5, stride, 4, 2, affine=False)
]))
self.drop_path = ops.DropPath()
self.input_switch = nn.InputChoice(n_candidates=num_prev_nodes, n_chosen=2)
def forward(self, prev_nodes: List['Tensor']) -> 'Tensor':
#assert self.ops.__len__() == len(prev_nodes)
#out = [op(node) for op, node in zip(self.ops, prev_nodes)]
out = []
for i, op in enumerate(self.ops):
out.append(op(prev_nodes[i]))
#out = [self.drop_path(o) if o is not None else None for o in out]
return self.input_switch(out)
class Cell(nn.Module):
def __init__(self, n_nodes, channels_pp, channels_p, channels, reduction_p, reduction):
super().__init__()
self.reduction = reduction
self.n_nodes = n_nodes
# If previous cell is reduction cell, current input size does not match with
# output size of cell[k-2]. So the output[k-2] should be reduced by preprocessing.
if reduction_p:
self.preproc0 = ops.FactorizedReduce(channels_pp, channels, affine=False)
else:
self.preproc0 = ops.StdConv(channels_pp, channels, 1, 1, 0, affine=False)
self.preproc1 = ops.StdConv(channels_p, channels, 1, 1, 0, affine=False)
# generate dag
self.mutable_ops = nn.ModuleList()
for depth in range(2, self.n_nodes + 2):
self.mutable_ops.append(Node("{}_n{}".format("reduce" if reduction else "normal", depth),
depth, channels, 2 if reduction else 0))
def forward(self, s0, s1):
# s0, s1 are the outputs of previous previous cell and previous cell, respectively.
tensors = [self.preproc0(s0), self.preproc1(s1)]
new_tensors = []
for node in self.mutable_ops:
tmp = tensors + new_tensors
cur_tensor = node(tmp)
new_tensors.append(cur_tensor)
output = torch.cat(new_tensors, dim=1)
return output
class CNN(nn.Module):
def __init__(self, input_size, in_channels, channels, n_classes, n_layers, n_nodes=4,
stem_multiplier=3, auxiliary=False):
super().__init__()
self.in_channels = in_channels
self.channels = channels
self.n_classes = n_classes
self.n_layers = n_layers
self.aux_pos = 2 * n_layers // 3 if auxiliary else -1
c_cur = stem_multiplier * self.channels
self.stem = nn.Sequential(
nn.Conv2d(in_channels, c_cur, 3, 1, 1, bias=False),
nn.BatchNorm2d(c_cur)
)
# for the first cell, stem is used for both s0 and s1
# [!] channels_pp and channels_p is output channel size, but c_cur is input channel size.
channels_pp, channels_p, c_cur = c_cur, c_cur, channels
self.cells = nn.ModuleList()
reduction_p, reduction = False, False
for i in range(n_layers):
reduction_p, reduction = reduction, False
# Reduce featuremap size and double channels in 1/3 and 2/3 layer.
if i in [n_layers // 3, 2 * n_layers // 3]:
c_cur *= 2
reduction = True
cell = Cell(n_nodes, channels_pp, channels_p, c_cur, reduction_p, reduction)
self.cells.append(cell)
c_cur_out = c_cur * n_nodes
channels_pp, channels_p = channels_p, c_cur_out
#if i == self.aux_pos:
# self.aux_head = AuxiliaryHead(input_size // 4, channels_p, n_classes)
self.gap = nn.AdaptiveAvgPool2d(1)
self.linear = nn.Linear(channels_p, n_classes)
def forward(self, x):
s0 = s1 = self.stem(x)
#aux_logits = None
for i, cell in enumerate(self.cells):
s0, s1 = s1, cell(s0, s1)
#if i == self.aux_pos and self.training:
# aux_logits = self.aux_head(s1)
out = self.gap(s1)
out = out.view(out.size(0), -1) # flatten
logits = self.linear(out)
#if aux_logits is not None:
# return logits, aux_logits
return logits
def drop_path_prob(self, p):
for module in self.modules():
if isinstance(module, ops.DropPath):
module.p = p
if __name__ == '__main__':
base_model = CNN(32, 3, 16, 10, 8)
import torch
import nni.retiarii.nn.pytorch as nn
from nni.retiarii import basic_unit
@basic_unit
class DropPath(nn.Module):
def __init__(self, p=0.):
"""
Drop path with probability.
Parameters
----------
p : float
Probability of an path to be zeroed.
"""
super().__init__()
self.p = p
def forward(self, x):
if self.training and self.p > 0.:
keep_prob = 1. - self.p
# per data point mask
mask = torch.zeros((x.size(0), 1, 1, 1), device=x.device).bernoulli_(keep_prob)
return x / keep_prob * mask
return x
@basic_unit
class PoolBN(nn.Module):
"""
AvgPool or MaxPool with BN. `pool_type` must be `max` or `avg`.
"""
def __init__(self, pool_type, C, kernel_size, stride, padding, affine=True):
super().__init__()
if pool_type.lower() == 'max':
self.pool = nn.MaxPool2d(kernel_size, stride, padding)
elif pool_type.lower() == 'avg':
self.pool = nn.AvgPool2d(kernel_size, stride, padding, count_include_pad=False)
else:
raise ValueError()
self.bn = nn.BatchNorm2d(C, affine=affine)
def forward(self, x):
out = self.pool(x)
out = self.bn(out)
return out
@basic_unit
class StdConv(nn.Module):
"""
Standard conv: ReLU - Conv - BN
"""
def __init__(self, C_in, C_out, kernel_size, stride, padding, affine=True):
super().__init__()
self.net = nn.Sequential(
nn.ReLU(),
nn.Conv2d(C_in, C_out, kernel_size, stride, padding, bias=False),
nn.BatchNorm2d(C_out, affine=affine)
)
def forward(self, x):
return self.net(x)
@basic_unit
class FacConv(nn.Module):
"""
Factorized conv: ReLU - Conv(Kx1) - Conv(1xK) - BN
"""
def __init__(self, C_in, C_out, kernel_length, stride, padding, affine=True):
super().__init__()
self.net = nn.Sequential(
nn.ReLU(),
nn.Conv2d(C_in, C_in, (kernel_length, 1), stride, padding, bias=False),
nn.Conv2d(C_in, C_out, (1, kernel_length), stride, padding, bias=False),
nn.BatchNorm2d(C_out, affine=affine)
)
def forward(self, x):
return self.net(x)
@basic_unit
class DilConv(nn.Module):
"""
(Dilated) depthwise separable conv.
ReLU - (Dilated) depthwise separable - Pointwise - BN.
If dilation == 2, 3x3 conv => 5x5 receptive field, 5x5 conv => 9x9 receptive field.
"""
def __init__(self, C_in, C_out, kernel_size, stride, padding, dilation, affine=True):
super().__init__()
self.net = nn.Sequential(
nn.ReLU(),
nn.Conv2d(C_in, C_in, kernel_size, stride, padding, dilation=dilation, groups=C_in,
bias=False),
nn.Conv2d(C_in, C_out, 1, stride=1, padding=0, bias=False),
nn.BatchNorm2d(C_out, affine=affine)
)
def forward(self, x):
return self.net(x)
@basic_unit
class SepConv(nn.Module):
"""
Depthwise separable conv.
DilConv(dilation=1) * 2.
"""
def __init__(self, C_in, C_out, kernel_size, stride, padding, affine=True):
super().__init__()
self.net = nn.Sequential(
DilConv(C_in, C_in, kernel_size, stride, padding, dilation=1, affine=affine),
DilConv(C_in, C_out, kernel_size, 1, padding, dilation=1, affine=affine)
)
def forward(self, x):
return self.net(x)
@basic_unit
class FactorizedReduce(nn.Module):
"""
Reduce feature map size by factorized pointwise (stride=2).
"""
def __init__(self, C_in, C_out, affine=True):
super().__init__()
self.relu = nn.ReLU()
self.conv1 = nn.Conv2d(C_in, C_out // 2, 1, stride=2, padding=0, bias=False)
self.conv2 = nn.Conv2d(C_in, C_out // 2, 1, stride=2, padding=0, bias=False)
self.bn = nn.BatchNorm2d(C_out, affine=affine)
def forward(self, x):
x = self.relu(x)
out = torch.cat([self.conv1(x), self.conv2(x[:, :, 1:, 1:])], dim=1)
out = self.bn(out)
return out
import json
import os
import sys
import torch
from pathlib import Path
import nni.retiarii.evaluator.pytorch.cgo.evaluator as cgo
import nni.retiarii.evaluator.pytorch.lightning as pl
import nni.retiarii.strategy as strategy
from nni.retiarii import serialize
from nni.retiarii.experiment.pytorch import RetiariiExperiment, RetiariiExeConfig
from torchvision import transforms
from torchvision.datasets import CIFAR10
from darts_model import CNN
if __name__ == '__main__':
base_model = CNN(32, 3, 16, 10, 8)
train_transform = transforms.Compose([
transforms.RandomCrop(32, padding=4),
transforms.RandomHorizontalFlip(),
transforms.ToTensor(),
transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
])
valid_transform = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
])
train_dataset = serialize(CIFAR10, root='data/cifar10', train=True, download=True, transform=train_transform)
test_dataset = serialize(CIFAR10, root='data/cifar10', train=False, download=True, transform=valid_transform)
trainer = cgo.Classification(train_dataloader=pl.DataLoader(train_dataset, batch_size=100),
val_dataloaders=pl.DataLoader(test_dataset, batch_size=100),
max_epochs=1, limit_train_batches=0.2)
simple_strategy = strategy.Random()
exp = RetiariiExperiment(base_model, trainer, [], simple_strategy)
exp_config = RetiariiExeConfig('local')
exp_config.experiment_name = 'darts_search'
exp_config.execution_engine = 'cgo'
exp_config.trial_concurrency = 3
# since CGO may merge multiple trials into one, RetiariiExperiment may run more trials than max_trial_number
# when max_trial_number = 3, it actually runs 9 models since each merged trial contains 3 trials from strategy
exp_config.max_trial_number = 100
exp_config.devices = ['cuda:0', 'cuda:1', 'cuda:2']
exp_config.trial_gpu_number = 1
exp_config.batch_waiting_time = 100
exp_config.training_service.use_active_gpu = True
exp_config.training_service.gpu_indices = [0, 1, 2]
exp.run(exp_config, 8081)
...@@ -58,10 +58,8 @@ if __name__ == '__main__': ...@@ -58,10 +58,8 @@ if __name__ == '__main__':
exp_config.trial_concurrency = 3 exp_config.trial_concurrency = 3
exp_config.max_trial_number = 10 exp_config.max_trial_number = 10
exp_config.trial_gpu_number = 1 exp_config.trial_gpu_number = 1
exp_config.training_service.use_active_gpu = True
exp_config.training_service.reuse_mode = True exp_config.training_service.reuse_mode = True
exp_config.training_service.gpu_indices = [0, 1, 2] exp_config.max_concurrency_cgo = 3
exp_config.max_concurrency_cgo = 1
exp_config.batch_waiting_time = 0 exp_config.batch_waiting_time = 0
rm_conf = RemoteMachineConfig() rm_conf = RemoteMachineConfig()
......
...@@ -10,6 +10,7 @@ from pathlib import Path ...@@ -10,6 +10,7 @@ from pathlib import Path
import nni import nni
import nni.runtime.platform.test import nni.runtime.platform.test
import json
try: try:
from nni.common.device import GPUDevice from nni.common.device import GPUDevice
...@@ -147,7 +148,7 @@ def _new_trainer(): ...@@ -147,7 +148,7 @@ def _new_trainer():
train_dataset = serialize(MNIST, root='data/mnist', train=True, download=True, transform=transform) train_dataset = serialize(MNIST, root='data/mnist', train=True, download=True, transform=transform)
test_dataset = serialize(MNIST, root='data/mnist', train=False, download=True, transform=transform) test_dataset = serialize(MNIST, root='data/mnist', train=False, download=True, transform=transform)
multi_module = MultiModelSupervisedLearningModule(nn.CrossEntropyLoss, {'acc': pl._AccuracyWithLogits}) multi_module = _MultiModelSupervisedLearningModule(nn.CrossEntropyLoss, {'acc': pl._AccuracyWithLogits})
lightning = pl.Lightning(multi_module, cgo_trainer.Trainer(use_cgo=True, lightning = pl.Lightning(multi_module, cgo_trainer.Trainer(use_cgo=True,
max_epochs=1, max_epochs=1,
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment