"src/vscode:/vscode.git/clone" did not exist on "20423a3583ab2d6b129a4b3d3174f6023850cec9"
Unverified Commit 9ff1f9d4 authored by SparkSnail's avatar SparkSnail Committed by GitHub
Browse files

Add mnist-distributed-pytorch example (#483)

Add mnist-distributed-pytorch example for kubeflow
parent 56d0f08c
authorName: default
experimentName: example_mnist_distributed_pytorch
trialConcurrency: 1
maxExecDuration: 1h
maxTrialNum: 10
#choice: local, remote, pai, kubeflow
trainingServicePlatform: kubeflow
searchSpacePath: search_space.json
#choice: true, false
useAnnotation: false
tuner:
#choice: TPE, Random, Anneal, Evolution
builtinTunerName: TPE
classArgs:
#choice: maximize, minimize
optimize_mode: minimize
trial:
codeDir: .
master:
replicas: 1
command: python3 dist_mnist.py
gpuNum: 1
cpuNum: 1
memoryMB: 2048
image: msranni/nni:latest
worker:
replicas: 1
command: python3 dist_mnist.py
gpuNum: 0
cpuNum: 1
memoryMB: 2048
image: msranni/nni:latest
kubeflowConfig:
operator: pytorch-operator
apiVersion: v1alpha2
nfs:
# Your NFS server IP, like 10.10.10.10
server: {your_nfs_server_ip}
# Your NFS server export path, like /var/nfs/nni
path: {your_nfs_server_export_path}
# Copyright 2018 The Kubeflow Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# NNI (https://github.com/Microsoft/nni) modified this code to show how to
# integrate distributed pytorch training with NNI SDK
#
import os
import torch
import torch.distributed as dist
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import nni
import logging
from math import ceil
from random import Random
from torch.autograd import Variable
from torchvision import datasets, transforms
logger = logging.getLogger('nni_pytorch_dist')
class Partition(object):
""" Dataset-like object, but only access a subset of it. """
def __init__(self, data, index):
self.data = data
self.index = index
def __len__(self):
return len(self.index)
def __getitem__(self, index):
data_idx = self.index[index]
return self.data[data_idx]
class DataPartitioner(object):
""" Partitions a dataset into different chuncks. """
def __init__(self, data, sizes=[0.7, 0.2, 0.1], seed=1234):
self.data = data
self.partitions = []
rng = Random()
rng.seed(seed)
data_len = len(data)
indexes = [x for x in range(0, data_len)]
rng.shuffle(indexes)
for frac in sizes:
part_len = int(frac * data_len)
self.partitions.append(indexes[0:part_len])
indexes = indexes[part_len:]
def use(self, partition):
return Partition(self.data, self.partitions[partition])
class Net(nn.Module):
""" Network architecture. """
def __init__(self):
super(Net, self).__init__()
self.conv1 = nn.Conv2d(1, 10, kernel_size=5)
self.conv2 = nn.Conv2d(10, 20, kernel_size=5)
self.conv2_drop = nn.Dropout2d()
self.fc1 = nn.Linear(320, 50)
self.fc2 = nn.Linear(50, 10)
def forward(self, x):
x = F.relu(F.max_pool2d(self.conv1(x), 2))
x = F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)), 2))
x = x.view(-1, 320)
x = F.relu(self.fc1(x))
x = F.dropout(x, training=self.training)
x = self.fc2(x)
return F.log_softmax(x, dim=1)
def partition_dataset():
""" Partitioning MNIST """
dataset = datasets.MNIST(
'./data',
train=True,
download=True,
transform=transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.1307, ), (0.3081, ))
]))
size = dist.get_world_size()
bsz = 128 / float(size)
partition_sizes = [1.0 / size for _ in range(size)]
partition = DataPartitioner(dataset, partition_sizes)
partition = partition.use(dist.get_rank())
train_set = torch.utils.data.DataLoader(
partition, batch_size=int(bsz), shuffle=True)
return train_set, bsz
def average_gradients(model):
""" Gradient averaging. """
size = float(dist.get_world_size())
for param in model.parameters():
dist.all_reduce(param.grad.data, op=dist.reduce_op.SUM, group=0)
param.grad.data /= size
def run(params):
""" Distributed Synchronous SGD Example """
rank = dist.get_rank()
torch.manual_seed(1234)
train_set, bsz = partition_dataset()
model = Net()
model = model
optimizer = optim.SGD(model.parameters(), lr=params['learning_rate'], momentum=params['momentum'])
num_batches = ceil(len(train_set.dataset) / float(bsz))
total_loss = 0.0
for epoch in range(3):
epoch_loss = 0.0
for data, target in train_set:
data, target = Variable(data), Variable(target)
optimizer.zero_grad()
output = model(data)
loss = F.nll_loss(output, target)
epoch_loss += loss.item()
loss.backward()
average_gradients(model)
optimizer.step()
#logger.debug('Rank: ', rank, ', epoch: ', epoch, ': ', epoch_loss / num_batches)
if rank == 0:
nni.report_intermediate_result(epoch_loss / num_batches)
total_loss += (epoch_loss / num_batches)
total_loss /= 3
logger.debug('Final loss: {}'.format(total_loss))
if rank == 0:
nni.report_final_result(total_loss)
def init_processes(fn, params, backend='tcp'):
""" Initialize the distributed environment. """
dist.init_process_group(backend)
fn(params)
def generate_default_params():
'''
Generate default parameters for mnist network.
'''
params = {
'learning_rate': 0.01,
'momentum': 0.5}
return params
if __name__ == "__main__":
RCV_PARAMS = nni.get_next_parameter()
logger.debug(RCV_PARAMS)
params = generate_default_params()
params.update(RCV_PARAMS)
init_processes(run, params)
{
"learning_rate":{"_type":"choice","_value":[0.0001, 0.001, 0.01, 0.1]},
"momentum":{"_type":"choice","_value":[0.4, 0.5, 0.6]}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment