Unverified Commit 9fae194a authored by SparkSnail's avatar SparkSnail Committed by GitHub
Browse files

Merge pull request #206 from microsoft/master

merge master
parents 8fe2588b 41e58703
authorName: default
experimentName: example_mnist_pytorch
trialConcurrency: 1
maxExecDuration: 1h
maxTrialNum: 10
#choice: local, remote, pai
trainingServicePlatform: local
searchSpacePath: search_space.json
#choice: true, false
useAnnotation: false
tuner:
#choice: TPE, Random, Anneal, Evolution, BatchTuner, MetisTuner, GPTuner
#SMAC (SMAC should be installed through nnictl)
builtinTunerName: TPE
classArgs:
#choice: maximize, minimize
optimize_mode: maximize
trial:
command: python3 mnist.py
codeDir: .
gpuNum: 0
authorName: default
experimentName: example_mnist_pytorch
trialConcurrency: 1
maxExecDuration: 1h
maxTrialNum: 50
#choice: local, remote
trainingServicePlatform: local
searchSpacePath: search_space.json
#choice: true, false
useAnnotation: false
tuner:
#choice: TPE, Random, Anneal, Evolution, BatchTuner, MetisTuner, GPTuner
#SMAC (SMAC should be installed through nnictl)
builtinTunerName: TPE
classArgs:
#choice: maximize, minimize
optimize_mode: maximize
assessor:
#choice: Medianstop, Curvefitting
builtinAssessorName: Curvefitting
classArgs:
#choice: maximize, minimize
optimize_mode: maximize
epoch_num: 20
threshold: 0.9
trial:
command: python3 mnist.py
codeDir: .
gpuNum: 0
authorName: default
experimentName: example_mnist_pytorch
trialConcurrency: 1
maxExecDuration: 1h
maxTrialNum: 10
#choice: local, remote, pai, kubeflow
trainingServicePlatform: frameworkcontroller
searchSpacePath: search_space.json
#choice: true, false
useAnnotation: false
tuner:
#choice: TPE, Random, Anneal, Evolution, BatchTuner, MetisTuner, GPTuner
builtinTunerName: TPE
classArgs:
#choice: maximize, minimize
optimize_mode: maximize
assessor:
builtinAssessorName: Medianstop
classArgs:
optimize_mode: maximize
gpuNum: 0
trial:
codeDir: .
taskRoles:
- name: worker
taskNum: 1
command: python3 mnist.py
gpuNum: 1
cpuNum: 1
memoryMB: 8192
image: msranni/nni:latest
frameworkAttemptCompletionPolicy:
minFailedTaskCount: 1
minSucceededTaskCount: 1
frameworkcontrollerConfig:
storage: nfs
nfs:
# Your NFS server IP, like 10.10.10.10
server: {your_nfs_server_ip}
# Your NFS server export path, like /var/nfs/nni
path: {your_nfs_server_export_path}
\ No newline at end of file
authorName: default
experimentName: example_dist_pytorch
trialConcurrency: 1
maxExecDuration: 1h
maxTrialNum: 1
#choice: local, remote, pai, kubeflow
trainingServicePlatform: kubeflow
searchSpacePath: search_space.json
#choice: true, false
useAnnotation: false
tuner:
#choice: TPE, Random, Anneal, Evolution, BatchTuner, MetisTuner, GPTuner
builtinTunerName: TPE
classArgs:
#choice: maximize, minimize
optimize_mode: maximize
trial:
codeDir: .
worker:
replicas: 1
command: python3 mnist.py
gpuNum: 0
cpuNum: 1
memoryMB: 8192
image: msranni/nni:latest
kubeflowConfig:
operator: tf-operator
apiVersion: v1alpha2
storage: nfs
nfs:
server: 10.10.10.10
path: /var/nfs/general
\ No newline at end of file
authorName: default
experimentName: example_mnist_pytorch
trialConcurrency: 1
maxExecDuration: 1h
maxTrialNum: 10
#choice: local, remote, pai
trainingServicePlatform: pai
searchSpacePath: search_space.json
#choice: true, false
useAnnotation: false
tuner:
#choice: TPE, Random, Anneal, Evolution, BatchTuner, MetisTuner, GPTuner
#SMAC (SMAC should be installed through nnictl)
builtinTunerName: TPE
classArgs:
#choice: maximize, minimize
optimize_mode: maximize
trial:
command: python3 mnist.py
codeDir: .
gpuNum: 0
cpuNum: 1
memoryMB: 8196
#The docker image to run nni job on pai
image: msranni/nni:latest
paiConfig:
#The username to login pai
userName: username
#The password to login pai
passWord: password
#The host of restful server of pai
host: 10.10.10.10
\ No newline at end of file
authorName: default
experimentName: example_mnist_pytorch
trialConcurrency: 1
maxExecDuration: 1h
maxTrialNum: 10
#choice: local, remote, pai
trainingServicePlatform: local
searchSpacePath: search_space.json
#choice: true, false
useAnnotation: false
tuner:
#choice: TPE, Random, Anneal, Evolution, BatchTuner, MetisTuner, GPTuner
#SMAC (SMAC should be installed through nnictl)
builtinTunerName: TPE
classArgs:
#choice: maximize, minimize
optimize_mode: maximize
trial:
command: python mnist.py
codeDir: .
gpuNum: 0
"""
A deep MNIST classifier using convolutional layers.
This file is a modification of the official pytorch mnist example:
https://github.com/pytorch/examples/blob/master/mnist/main.py
"""
import argparse
import logging
import nni
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import datasets, transforms
logger = logging.getLogger('mnist_AutoML')
class Net(nn.Module):
def __init__(self, hidden_size):
super(Net, self).__init__()
self.conv1 = nn.Conv2d(1, 20, 5, 1)
self.conv2 = nn.Conv2d(20, 50, 5, 1)
self.fc1 = nn.Linear(4*4*50, hidden_size)
self.fc2 = nn.Linear(hidden_size, 10)
def forward(self, x):
x = F.relu(self.conv1(x))
x = F.max_pool2d(x, 2, 2)
x = F.relu(self.conv2(x))
x = F.max_pool2d(x, 2, 2)
x = x.view(-1, 4*4*50)
x = F.relu(self.fc1(x))
x = self.fc2(x)
return F.log_softmax(x, dim=1)
def train(args, model, device, train_loader, optimizer, epoch):
model.train()
for batch_idx, (data, target) in enumerate(train_loader):
data, target = data.to(device), target.to(device)
optimizer.zero_grad()
output = model(data)
loss = F.nll_loss(output, target)
loss.backward()
optimizer.step()
if batch_idx % args['log_interval'] == 0:
logger.info('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
epoch, batch_idx * len(data), len(train_loader.dataset),
100. * batch_idx / len(train_loader), loss.item()))
def test(args, model, device, test_loader):
model.eval()
test_loss = 0
correct = 0
with torch.no_grad():
for data, target in test_loader:
data, target = data.to(device), target.to(device)
output = model(data)
# sum up batch loss
test_loss += F.nll_loss(output, target, reduction='sum').item()
# get the index of the max log-probability
pred = output.argmax(dim=1, keepdim=True)
correct += pred.eq(target.view_as(pred)).sum().item()
test_loss /= len(test_loader.dataset)
accuracy = 100. * correct / len(test_loader.dataset)
logger.info('\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format(
test_loss, correct, len(test_loader.dataset), accuracy))
return accuracy
def main(args):
use_cuda = not args['no_cuda'] and torch.cuda.is_available()
torch.manual_seed(args['seed'])
device = torch.device("cuda" if use_cuda else "cpu")
kwargs = {'num_workers': 1, 'pin_memory': True} if use_cuda else {}
train_loader = torch.utils.data.DataLoader(
datasets.MNIST(args['data_dir'], train=True, download=True,
transform=transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.1307,), (0.3081,))
])),
batch_size=args['batch_size'], shuffle=True, **kwargs)
test_loader = torch.utils.data.DataLoader(
datasets.MNIST(args['data_dir'], train=False, transform=transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.1307,), (0.3081,))
])),
batch_size=1000, shuffle=True, **kwargs)
hidden_size = args['hidden_size']
model = Net(hidden_size=hidden_size).to(device)
optimizer = optim.SGD(model.parameters(), lr=args['lr'],
momentum=args['momentum'])
for epoch in range(1, args['epochs'] + 1):
train(args, model, device, train_loader, optimizer, epoch)
test_acc = test(args, model, device, test_loader)
if epoch < args['epochs']:
# report intermediate result
nni.report_intermediate_result(test_acc)
logger.debug('test accuracy %g', test_acc)
logger.debug('Pipe send intermediate result done.')
else:
# report final result
nni.report_final_result(test_acc)
logger.debug('Final result is %g', test_acc)
logger.debug('Send final result done.')
def get_params():
# Training settings
parser = argparse.ArgumentParser(description='PyTorch MNIST Example')
parser.add_argument("--data_dir", type=str,
default='/tmp/tensorflow/mnist/input_data', help="data directory")
parser.add_argument('--batch_size', type=int, default=64, metavar='N',
help='input batch size for training (default: 64)')
parser.add_argument("--hidden_size", type=int, default=512, metavar='N',
help='hidden layer size (default: 512)')
parser.add_argument('--lr', type=float, default=0.01, metavar='LR',
help='learning rate (default: 0.01)')
parser.add_argument('--momentum', type=float, default=0.5, metavar='M',
help='SGD momentum (default: 0.5)')
parser.add_argument('--epochs', type=int, default=10, metavar='N',
help='number of epochs to train (default: 10)')
parser.add_argument('--seed', type=int, default=1, metavar='S',
help='random seed (default: 1)')
parser.add_argument('--no_cuda', action='store_true', default=False,
help='disables CUDA training')
parser.add_argument('--log_interval', type=int, default=1000, metavar='N',
help='how many batches to wait before logging training status')
args, _ = parser.parse_known_args()
return args
if __name__ == '__main__':
try:
# get parameters form tuner
tuner_params = nni.get_next_parameter()
logger.debug(tuner_params)
params = vars(get_params())
params.update(tuner_params)
main(params)
except Exception as exception:
logger.exception(exception)
raise
{
"batch_size": {"_type":"choice", "_value": [16, 32, 64, 128]},
"hidden_size":{"_type":"choice","_value":[128, 256, 512, 1024]},
"lr":{"_type":"choice","_value":[0.0001, 0.001, 0.01, 0.1]},
"momentum":{"_type":"uniform","_value":[0, 1]}
}
...@@ -60,7 +60,8 @@ ...@@ -60,7 +60,8 @@
"lodash": "^4.17.13", "lodash": "^4.17.13",
"lodash.merge": "^4.6.2", "lodash.merge": "^4.6.2",
"node.extend": "^1.1.7", "node.extend": "^1.1.7",
"hoek": "^4.2.1" "hoek": "^4.2.1",
"js-yaml": "^3.13.1"
}, },
"engines": { "engines": {
"node": ">=10.0.0" "node": ">=10.0.0"
......
...@@ -53,6 +53,11 @@ export namespace ValidationSchemas { ...@@ -53,6 +53,11 @@ export namespace ValidationSchemas {
shmMB: joi.number(), shmMB: joi.number(),
authFile: joi.string(), authFile: joi.string(),
nasMode: joi.string().valid('classic_mode', 'enas_mode', 'oneshot_mode', 'darts_mode'), nasMode: joi.string().valid('classic_mode', 'enas_mode', 'oneshot_mode', 'darts_mode'),
portList: joi.array().items(joi.object({
label: joi.string().required(),
beginAt: joi.number().required(),
portNumber: joi.number().required(),
})),
worker: joi.object({ worker: joi.object({
replicas: joi.number().min(1).required(), replicas: joi.number().min(1).required(),
image: joi.string().min(1), image: joi.string().min(1),
...@@ -120,7 +125,8 @@ export namespace ValidationSchemas { ...@@ -120,7 +125,8 @@ export namespace ValidationSchemas {
azureStorage: joi.object({ azureStorage: joi.object({
accountName: joi.string().regex(/^([0-9]|[a-z]|[A-Z]|-){3,31}$/), accountName: joi.string().regex(/^([0-9]|[a-z]|[A-Z]|-){3,31}$/),
azureShare: joi.string().regex(/^([0-9]|[a-z]|[A-Z]|-){3,63}$/) azureShare: joi.string().regex(/^([0-9]|[a-z]|[A-Z]|-){3,63}$/)
}) }),
uploadRetryCount: joi.number().min(1)
}), }),
frameworkcontroller_config: joi.object({ frameworkcontroller_config: joi.object({
storage: joi.string().min(1), storage: joi.string().min(1),
...@@ -136,7 +142,8 @@ export namespace ValidationSchemas { ...@@ -136,7 +142,8 @@ export namespace ValidationSchemas {
azureStorage: joi.object({ azureStorage: joi.object({
accountName: joi.string().regex(/^([0-9]|[a-z]|[A-Z]|-){3,31}$/), accountName: joi.string().regex(/^([0-9]|[a-z]|[A-Z]|-){3,31}$/),
azureShare: joi.string().regex(/^([0-9]|[a-z]|[A-Z]|-){3,63}$/) azureShare: joi.string().regex(/^([0-9]|[a-z]|[A-Z]|-){3,63}$/)
}) }),
uploadRetryCount: joi.number().min(1)
}), }),
nni_manager_ip: joi.object({ nni_manager_ip: joi.object({
nniManagerIp: joi.string().min(1) nniManagerIp: joi.string().min(1)
......
...@@ -35,15 +35,15 @@ export namespace AzureStorageClientUtility { ...@@ -35,15 +35,15 @@ export namespace AzureStorageClientUtility {
* @param fileServerClient * @param fileServerClient
* @param azureShare * @param azureShare
*/ */
export async function createShare(fileServerClient: any, azureShare: any): Promise<void> { export async function createShare(fileServerClient: any, azureShare: any): Promise<boolean> {
const deferred: Deferred<void> = new Deferred<void>(); const deferred: Deferred<boolean> = new Deferred<boolean>();
fileServerClient.createShareIfNotExists(azureShare, (error: any, result: any, response: any) => { fileServerClient.createShareIfNotExists(azureShare, (error: any, result: any, response: any) => {
if (error) { if (error) {
getLogger() getLogger()
.error(`Create share failed:, ${error}`); .error(`Create share failed:, ${error}`);
deferred.reject(error); deferred.resolve(false);
} else { } else {
deferred.resolve(); deferred.resolve(true);
} }
}); });
...@@ -56,18 +56,17 @@ export namespace AzureStorageClientUtility { ...@@ -56,18 +56,17 @@ export namespace AzureStorageClientUtility {
* @param azureFoler * @param azureFoler
* @param azureShare * @param azureShare
*/ */
export async function createDirectory(fileServerClient: azureStorage.FileService, azureFoler: any, azureShare: any): Promise<void> { export async function createDirectory(fileServerClient: azureStorage.FileService, azureFoler: any, azureShare: any): Promise<boolean> {
const deferred: Deferred<void> = new Deferred<void>(); const deferred: Deferred<boolean> = new Deferred<boolean>();
fileServerClient.createDirectoryIfNotExists(azureShare, azureFoler, (error: any, result: any, response: any) => { fileServerClient.createDirectoryIfNotExists(azureShare, azureFoler, (error: any, result: any, response: any) => {
if (error) { if (error) {
getLogger() getLogger()
.error(`Create directory failed:, ${error}`); .error(`Create directory failed:, ${error}`);
deferred.reject(error); deferred.resolve(false);
} else { } else {
deferred.resolve(); deferred.resolve(true);
} }
}); });
return deferred.promise; return deferred.promise;
} }
...@@ -77,16 +76,20 @@ export namespace AzureStorageClientUtility { ...@@ -77,16 +76,20 @@ export namespace AzureStorageClientUtility {
* @param azureDirectory * @param azureDirectory
*/ */
export async function createDirectoryRecursive(fileServerClient: azureStorage.FileService, azureDirectory: string, export async function createDirectoryRecursive(fileServerClient: azureStorage.FileService, azureDirectory: string,
azureShare: any): Promise<void> { azureShare: any): Promise<boolean> {
const deferred: Deferred<void> = new Deferred<void>(); const deferred: Deferred<boolean> = new Deferred<boolean>();
const directories: string[] = azureDirectory.split('/'); const directories: string[] = azureDirectory.split('/');
let rootDirectory: string = ''; let rootDirectory: string = '';
for (const directory of directories) { for (const directory of directories) {
rootDirectory += directory; rootDirectory += directory;
await createDirectory(fileServerClient, rootDirectory, azureShare); let result:boolean = await createDirectory(fileServerClient, rootDirectory, azureShare);
if (!result) {
deferred.resolve(false);
return deferred.promise;
}
rootDirectory += '/'; rootDirectory += '/';
} }
deferred.resolve(); deferred.resolve(true);
return deferred.promise; return deferred.promise;
} }
...@@ -100,16 +103,16 @@ export namespace AzureStorageClientUtility { ...@@ -100,16 +103,16 @@ export namespace AzureStorageClientUtility {
* @param localFilePath * @param localFilePath
*/ */
async function uploadFileToAzure(fileServerClient: any, azureDirectory: string, azureFileName: any, azureShare: any, async function uploadFileToAzure(fileServerClient: any, azureDirectory: string, azureFileName: any, azureShare: any,
localFilePath: string): Promise<void> { localFilePath: string): Promise<boolean> {
const deferred: Deferred<void> = new Deferred<void>(); const deferred: Deferred<boolean> = new Deferred<boolean>();
await fileServerClient.createFileFromLocalFile(azureShare, azureDirectory, azureFileName, localFilePath, await fileServerClient.createFileFromLocalFile(azureShare, azureDirectory, azureFileName, localFilePath,
(error: any, result: any, response: any) => { (error: any, result: any, response: any) => {
if (error) { if (error) {
getLogger() getLogger()
.error(`Upload file failed:, ${error}`); .error(`Upload file failed:, ${error}`);
deferred.reject(error); deferred.resolve(false);
} else { } else {
deferred.resolve(); deferred.resolve(true);
} }
}); });
...@@ -125,17 +128,17 @@ export namespace AzureStorageClientUtility { ...@@ -125,17 +128,17 @@ export namespace AzureStorageClientUtility {
* @param localFilePath * @param localFilePath
*/ */
async function downloadFile(fileServerClient: any, azureDirectory: string, azureFileName: any, azureShare: any, async function downloadFile(fileServerClient: any, azureDirectory: string, azureFileName: any, azureShare: any,
localFilePath: string): Promise<void> { localFilePath: string): Promise<boolean> {
const deferred: Deferred<void> = new Deferred<void>(); const deferred: Deferred<boolean> = new Deferred<boolean>();
// tslint:disable-next-line:non-literal-fs-path // tslint:disable-next-line:non-literal-fs-path
await fileServerClient.getFileToStream(azureShare, azureDirectory, azureFileName, fs.createWriteStream(localFilePath), await fileServerClient.getFileToStream(azureShare, azureDirectory, azureFileName, fs.createWriteStream(localFilePath),
(error: any, result: any, response: any) => { (error: any, result: any, response: any) => {
if (error) { if (error) {
getLogger() getLogger()
.error(`Download file failed:, ${error}`); .error(`Download file failed:, ${error}`);
deferred.reject(error); deferred.resolve(false);
} else { } else {
deferred.resolve(); deferred.resolve(true);
} }
}); });
...@@ -151,28 +154,38 @@ export namespace AzureStorageClientUtility { ...@@ -151,28 +154,38 @@ export namespace AzureStorageClientUtility {
*/ */
// tslint:disable:non-literal-fs-path // tslint:disable:non-literal-fs-path
export async function uploadDirectory(fileServerClient: azureStorage.FileService, azureDirectory: string, azureShare: any, export async function uploadDirectory(fileServerClient: azureStorage.FileService, azureDirectory: string, azureShare: any,
localDirectory: string): Promise<void> { localDirectory: string): Promise<boolean> {
const deferred: Deferred<void> = new Deferred<void>(); const deferred: Deferred<boolean> = new Deferred<boolean>();
const fileNameArray: string[] = fs.readdirSync(localDirectory); const fileNameArray: string[] = fs.readdirSync(localDirectory);
await createDirectoryRecursive(fileServerClient, azureDirectory, azureShare); let result: boolean = await createDirectoryRecursive(fileServerClient, azureDirectory, azureShare);
if (!result) {
deferred.resolve(false);
return deferred.promise;
}
for (const fileName of fileNameArray) { for (const fileName of fileNameArray) {
const fullFilePath: string = path.join(localDirectory, fileName); const fullFilePath: string = path.join(localDirectory, fileName);
try { try {
let resultUploadFile: boolean = true;
let resultUploadDir: boolean = true;
if (fs.lstatSync(fullFilePath) if (fs.lstatSync(fullFilePath)
.isFile()) { .isFile()) {
await uploadFileToAzure(fileServerClient, azureDirectory, fileName, azureShare, fullFilePath); resultUploadFile = await uploadFileToAzure(fileServerClient, azureDirectory, fileName, azureShare, fullFilePath);
} else { } else {
// If filePath is a directory, recuisively copy it to azure // If filePath is a directory, recuisively copy it to azure
await uploadDirectory(fileServerClient, String.Format('{0}/{1}', azureDirectory, fileName), azureShare, fullFilePath); resultUploadDir = await uploadDirectory(fileServerClient, String.Format('{0}/{1}', azureDirectory, fileName), azureShare, fullFilePath);
}
if (!(resultUploadFile && resultUploadDir)) {
deferred.resolve(false);
return deferred.promise;
} }
} catch (error) { } catch (error) {
deferred.reject(error); deferred.resolve(false);
return deferred.promise; return deferred.promise;
} }
} }
// All files/directories are copied successfully, resolve // All files/directories are copied successfully, resolve
deferred.resolve(); deferred.resolve(true);
return deferred.promise; return deferred.promise;
} }
......
...@@ -25,7 +25,7 @@ import * as path from 'path'; ...@@ -25,7 +25,7 @@ import * as path from 'path';
import * as component from '../../../common/component'; import * as component from '../../../common/component';
import { getExperimentId } from '../../../common/experimentStartupInfo'; import { getExperimentId } from '../../../common/experimentStartupInfo';
import { import {
JobApplicationForm, NNIManagerIpConfig, TrialJobApplicationForm, TrialJobDetail JobApplicationForm, NNIManagerIpConfig, TrialJobApplicationForm, TrialJobDetail, TrialJobStatus
} from '../../../common/trainingService'; } from '../../../common/trainingService';
import { delay, generateParamFileName, getExperimentRootDir, uniqueString } from '../../../common/utils'; import { delay, generateParamFileName, getExperimentRootDir, uniqueString } from '../../../common/utils';
import { CONTAINER_INSTALL_NNI_SHELL_FORMAT } from '../../common/containerJobData'; import { CONTAINER_INSTALL_NNI_SHELL_FORMAT } from '../../common/containerJobData';
...@@ -102,10 +102,13 @@ class FrameworkControllerTrainingService extends KubernetesTrainingService imple ...@@ -102,10 +102,13 @@ class FrameworkControllerTrainingService extends KubernetesTrainingService imple
//upload code files //upload code files
const trialJobOutputUrl: string = await this.uploadCodeFiles(trialJobId, trialLocalTempFolder); const trialJobOutputUrl: string = await this.uploadCodeFiles(trialJobId, trialLocalTempFolder);
let initStatus: TrialJobStatus = 'WAITING';
if (!trialJobOutputUrl) {
initStatus = 'FAILED';
}
const trialJobDetail: KubernetesTrialJobDetail = new KubernetesTrialJobDetail( const trialJobDetail: KubernetesTrialJobDetail = new KubernetesTrialJobDetail(
trialJobId, trialJobId,
'WAITING', initStatus,
Date.now(), Date.now(),
trialWorkingFolder, trialWorkingFolder,
form, form,
...@@ -208,24 +211,10 @@ class FrameworkControllerTrainingService extends KubernetesTrainingService imple ...@@ -208,24 +211,10 @@ class FrameworkControllerTrainingService extends KubernetesTrainingService imple
let trialJobOutputUrl: string = ''; let trialJobOutputUrl: string = '';
if (this.fcClusterConfig.storageType === 'azureStorage') { if (this.fcClusterConfig.storageType === 'azureStorage') {
if (this.azureStorageClient === undefined) { const azureFrameworkControllerClusterConfig: FrameworkControllerClusterConfigAzure =
throw new Error('azureStorageClient is not initialized'); <FrameworkControllerClusterConfigAzure>this.fcClusterConfig;
} trialJobOutputUrl = await this.uploadFilesToAzureStorage(trialJobId, trialLocalTempFolder, this.fcTrialConfig.codeDir,
try { azureFrameworkControllerClusterConfig.uploadRetryCount);
//upload local files, including scripts for running the trial and configuration (e.g., hyperparameters) for the trial, to azure storage
await AzureStorageClientUtility.uploadDirectory(
this.azureStorageClient, `nni/${getExperimentId()}/${trialJobId}`, this.azureStorageShare, `${trialLocalTempFolder}`);
//upload code files to azure storage
await AzureStorageClientUtility.uploadDirectory(
this.azureStorageClient, `nni/${getExperimentId()}/${trialJobId}`, this.azureStorageShare, `${this.fcTrialConfig.codeDir}`);
trialJobOutputUrl = `https://${this.azureStorageAccountName}.file.core.windows.net/` +
`${this.azureStorageShare}/${path.join('nni', getExperimentId(), trialJobId, 'output')}`;
} catch (error) {
this.log.error(error);
return Promise.reject(error);
}
} else if (this.fcClusterConfig.storageType === 'nfs') { } else if (this.fcClusterConfig.storageType === 'nfs') {
const nfsFrameworkControllerClusterConfig: FrameworkControllerClusterConfigNFS = const nfsFrameworkControllerClusterConfig: FrameworkControllerClusterConfigNFS =
<FrameworkControllerClusterConfigNFS>this.fcClusterConfig; <FrameworkControllerClusterConfigNFS>this.fcClusterConfig;
......
...@@ -27,7 +27,7 @@ import * as component from '../../../common/component'; ...@@ -27,7 +27,7 @@ import * as component from '../../../common/component';
import { getExperimentId } from '../../../common/experimentStartupInfo'; import { getExperimentId } from '../../../common/experimentStartupInfo';
import { import {
JobApplicationForm, NNIManagerIpConfig, TrialJobApplicationForm, TrialJobDetail JobApplicationForm, NNIManagerIpConfig, TrialJobApplicationForm, TrialJobDetail, TrialJobStatus
} from '../../../common/trainingService'; } from '../../../common/trainingService';
import { delay, generateParamFileName, getExperimentRootDir, uniqueString } from '../../../common/utils'; import { delay, generateParamFileName, getExperimentRootDir, uniqueString } from '../../../common/utils';
import { CONTAINER_INSTALL_NNI_SHELL_FORMAT } from '../../common/containerJobData'; import { CONTAINER_INSTALL_NNI_SHELL_FORMAT } from '../../common/containerJobData';
...@@ -102,9 +102,13 @@ class KubeflowTrainingService extends KubernetesTrainingService implements Kuber ...@@ -102,9 +102,13 @@ class KubeflowTrainingService extends KubernetesTrainingService implements Kuber
await this.prepareRunScript(trialLocalTempFolder, trialJobId, trialWorkingFolder, curTrialSequenceId, form); await this.prepareRunScript(trialLocalTempFolder, trialJobId, trialWorkingFolder, curTrialSequenceId, form);
//upload files to sotrage //upload files to sotrage
const trialJobOutputUrl: string = await this.uploadCodeFiles(trialJobId, trialLocalTempFolder); const trialJobOutputUrl: string = await this.uploadCodeFiles(trialJobId, trialLocalTempFolder);
let initStatus: TrialJobStatus = 'WAITING';
if (!trialJobOutputUrl) {
initStatus = 'FAILED';
}
const trialJobDetail: KubernetesTrialJobDetail = new KubernetesTrialJobDetail( const trialJobDetail: KubernetesTrialJobDetail = new KubernetesTrialJobDetail(
trialJobId, trialJobId,
'WAITING', initStatus,
Date.now(), Date.now(),
trialWorkingFolder, trialWorkingFolder,
form, form,
...@@ -215,23 +219,8 @@ class KubeflowTrainingService extends KubernetesTrainingService implements Kuber ...@@ -215,23 +219,8 @@ class KubeflowTrainingService extends KubernetesTrainingService implements Kuber
if (this.azureStorageClient === undefined) { if (this.azureStorageClient === undefined) {
throw new Error('azureStorageClient is not initialized'); throw new Error('azureStorageClient is not initialized');
} }
try { const azureKubeflowClusterConfig: KubeflowClusterConfigAzure = <KubeflowClusterConfigAzure>this.kubeflowClusterConfig;
//upload local files, including scripts for running the trial and configuration (e.g., hyperparameters) for the trial, to azure storage trialJobOutputUrl = await this.uploadFilesToAzureStorage(trialJobId, trialLocalTempFolder, this.kubeflowTrialConfig.codeDir, azureKubeflowClusterConfig.uploadRetryCount);
await AzureStorageClientUtility.uploadDirectory(this.azureStorageClient,
`nni/${getExperimentId()}/${trialJobId}`, this.azureStorageShare,
`${trialLocalTempFolder}`);
//upload code files to azure storage
await AzureStorageClientUtility.uploadDirectory(this.azureStorageClient,
`nni/${getExperimentId()}/${trialJobId}`, this.azureStorageShare,
`${this.kubeflowTrialConfig.codeDir}`);
trialJobOutputUrl = `https://${this.azureStorageAccountName}.file.core.windows.net/${this.azureStorageShare}` +
`/${path.join('nni', getExperimentId(), trialJobId, 'output')}`;
} catch (error) {
this.log.error(error);
return Promise.reject(error);
}
} else if (this.kubeflowClusterConfig.storage === 'nfs' || this.kubeflowClusterConfig.storage === undefined) { } else if (this.kubeflowClusterConfig.storage === 'nfs' || this.kubeflowClusterConfig.storage === undefined) {
const nfsKubeflowClusterConfig: KubeflowClusterConfigNFS = <KubeflowClusterConfigNFS>this.kubeflowClusterConfig; const nfsKubeflowClusterConfig: KubeflowClusterConfigNFS = <KubeflowClusterConfigNFS>this.kubeflowClusterConfig;
// Creat work dir for current trial in NFS directory // Creat work dir for current trial in NFS directory
......
...@@ -75,16 +75,19 @@ export class KubernetesClusterConfigNFS extends KubernetesClusterConfig { ...@@ -75,16 +75,19 @@ export class KubernetesClusterConfigNFS extends KubernetesClusterConfig {
export class KubernetesClusterConfigAzure extends KubernetesClusterConfig { export class KubernetesClusterConfigAzure extends KubernetesClusterConfig {
public readonly keyVault: KeyVaultConfig; public readonly keyVault: KeyVaultConfig;
public readonly azureStorage: AzureStorage; public readonly azureStorage: AzureStorage;
public readonly uploadRetryCount: number | undefined;
constructor( constructor(
apiVersion: string, apiVersion: string,
keyVault: KeyVaultConfig, keyVault: KeyVaultConfig,
azureStorage: AzureStorage, azureStorage: AzureStorage,
storage?: KubernetesStorageKind storage?: KubernetesStorageKind,
uploadRetryCount?: number
) { ) {
super(apiVersion, storage); super(apiVersion, storage);
this.keyVault = keyVault; this.keyVault = keyVault;
this.azureStorage = azureStorage; this.azureStorage = azureStorage;
this.uploadRetryCount = uploadRetryCount;
} }
public get storageType(): KubernetesStorageKind { public get storageType(): KubernetesStorageKind {
...@@ -98,7 +101,8 @@ export class KubernetesClusterConfigAzure extends KubernetesClusterConfig { ...@@ -98,7 +101,8 @@ export class KubernetesClusterConfigAzure extends KubernetesClusterConfig {
kubernetesClusterConfigObjectAzure.apiVersion, kubernetesClusterConfigObjectAzure.apiVersion,
kubernetesClusterConfigObjectAzure.keyVault, kubernetesClusterConfigObjectAzure.keyVault,
kubernetesClusterConfigObjectAzure.azureStorage, kubernetesClusterConfigObjectAzure.azureStorage,
kubernetesClusterConfigObjectAzure.storage kubernetesClusterConfigObjectAzure.storage,
kubernetesClusterConfigObjectAzure.uploadRetryCount
); );
} }
} }
......
...@@ -31,13 +31,14 @@ import { getLogger, Logger } from '../../common/log'; ...@@ -31,13 +31,14 @@ import { getLogger, Logger } from '../../common/log';
import { import {
NNIManagerIpConfig, TrialJobDetail, TrialJobMetric NNIManagerIpConfig, TrialJobDetail, TrialJobMetric
} from '../../common/trainingService'; } from '../../common/trainingService';
import { getExperimentRootDir, getIPV4Address, getJobCancelStatus, getVersion, uniqueString } from '../../common/utils'; import { delay, getExperimentRootDir, getIPV4Address, getJobCancelStatus, getVersion, uniqueString } from '../../common/utils';
import { AzureStorageClientUtility } from './azureStorageClientUtils'; import { AzureStorageClientUtility } from './azureStorageClientUtils';
import { GeneralK8sClient, KubernetesCRDClient } from './kubernetesApiClient'; import { GeneralK8sClient, KubernetesCRDClient } from './kubernetesApiClient';
import { KubernetesClusterConfig } from './kubernetesConfig'; import { KubernetesClusterConfig } from './kubernetesConfig';
import { kubernetesScriptFormat, KubernetesTrialJobDetail } from './kubernetesData'; import { kubernetesScriptFormat, KubernetesTrialJobDetail } from './kubernetesData';
import { KubernetesJobRestServer } from './kubernetesJobRestServer'; import { KubernetesJobRestServer } from './kubernetesJobRestServer';
var yaml = require('js-yaml');
var fs = require('fs'); var fs = require('fs');
/** /**
...@@ -357,6 +358,52 @@ abstract class KubernetesTrainingService { ...@@ -357,6 +358,52 @@ abstract class KubernetesTrainingService {
); );
return registrySecretName; return registrySecretName;
} }
protected async uploadFilesToAzureStorage(trialJobId: string, trialLocalTempFolder: String, codeDir: String, uploadRetryCount: number | undefined): Promise<string> {
if (this.azureStorageClient === undefined) {
throw new Error('azureStorageClient is not initialized');
}
let trialJobOutputUrl: string = '';
let retryCount: number = 1;
if(uploadRetryCount) {
retryCount = uploadRetryCount;
}
let resultUploadNNIScript: boolean = false;
let resultUploadCodeFile: boolean = false;
try {
do {
//upload local files, including scripts for running the trial and configuration (e.g., hyperparameters) for the trial, to azure storage
if(!resultUploadNNIScript) {
resultUploadNNIScript = await AzureStorageClientUtility.uploadDirectory(this.azureStorageClient,
`nni/${getExperimentId()}/${trialJobId}`, this.azureStorageShare,
`${trialLocalTempFolder}`);
}
//upload code files to azure storage
if(!resultUploadCodeFile) {
resultUploadCodeFile = await AzureStorageClientUtility.uploadDirectory(this.azureStorageClient,
`nni/${getExperimentId()}/${trialJobId}`, this.azureStorageShare,
`${codeDir}`);
}
if (resultUploadNNIScript && resultUploadCodeFile) {
trialJobOutputUrl = `https://${this.azureStorageAccountName}.file.core.windows.net/${this.azureStorageShare}` +
`/${path.join('nni', getExperimentId(), trialJobId, 'output')}`;
break;
} else {
//wait for 5 seconds to re-upload files
await delay(5000);
this.log.info('Upload failed, Retry: upload files to azure-storage');
}
} while (retryCount-- >= 0)
} catch (error) {
this.log.error(error);
//return a empty url when got error
return Promise.resolve("");
}
if(!trialJobOutputUrl) {
this.log.info(`Retry-count is used up, upload files to azureStorage for trial ${trialJobId} failed!`);
}
return Promise.resolve(trialJobOutputUrl);
}
} }
export { KubernetesTrainingService }; export { KubernetesTrainingService };
...@@ -39,6 +39,8 @@ export class PAITaskRole { ...@@ -39,6 +39,8 @@ export class PAITaskRole {
public readonly command: string; public readonly command: string;
//Shared memory for one task in the task role //Shared memory for one task in the task role
public readonly shmMB?: number; public readonly shmMB?: number;
//portList to specify the port used in container
public portList?: portListMetaData[];
/** /**
* Constructor * Constructor
...@@ -50,7 +52,7 @@ export class PAITaskRole { ...@@ -50,7 +52,7 @@ export class PAITaskRole {
* @param command Executable command for tasks in the task role, can not be empty * @param command Executable command for tasks in the task role, can not be empty
*/ */
constructor(name : string, taskNumber : number, cpuNumber : number, memoryMB : number, gpuNumber : number, constructor(name : string, taskNumber : number, cpuNumber : number, memoryMB : number, gpuNumber : number,
command : string, shmMB?: number) { command : string, shmMB?: number, portList?: portListMetaData[]) {
this.name = name; this.name = name;
this.taskNumber = taskNumber; this.taskNumber = taskNumber;
this.cpuNumber = cpuNumber; this.cpuNumber = cpuNumber;
...@@ -58,6 +60,7 @@ export class PAITaskRole { ...@@ -58,6 +60,7 @@ export class PAITaskRole {
this.gpuNumber = gpuNumber; this.gpuNumber = gpuNumber;
this.command = command; this.command = command;
this.shmMB = shmMB; this.shmMB = shmMB;
this.portList = portList;
} }
} }
...@@ -120,6 +123,16 @@ export class PAIClusterConfig { ...@@ -120,6 +123,16 @@ export class PAIClusterConfig {
} }
} }
/**
* portList data structure used in PAI taskRole
*/
export class portListMetaData {
public readonly label : string = '';
public readonly beginAt: number = 0;
public readonly portNumber: number = 0;
}
/** /**
* PAI trial configuration * PAI trial configuration
*/ */
...@@ -134,9 +147,11 @@ export class NNIPAITrialConfig extends TrialConfig { ...@@ -134,9 +147,11 @@ export class NNIPAITrialConfig extends TrialConfig {
public shmMB?: number; public shmMB?: number;
//authentication file used for private Docker registry //authentication file used for private Docker registry
public authFile?: string; public authFile?: string;
//portList to specify the port used in container
public portList?: portListMetaData[];
constructor(command : string, codeDir : string, gpuNum : number, cpuNum: number, memoryMB: number, constructor(command : string, codeDir : string, gpuNum : number, cpuNum: number, memoryMB: number,
image: string, virtualCluster?: string, shmMB?: number, authFile?: string) { image: string, virtualCluster?: string, shmMB?: number, authFile?: string, portList?: portListMetaData[]) {
super(command, codeDir, gpuNum); super(command, codeDir, gpuNum);
this.cpuNum = cpuNum; this.cpuNum = cpuNum;
this.memoryMB = memoryMB; this.memoryMB = memoryMB;
...@@ -144,5 +159,6 @@ export class NNIPAITrialConfig extends TrialConfig { ...@@ -144,5 +159,6 @@ export class NNIPAITrialConfig extends TrialConfig {
this.virtualCluster = virtualCluster; this.virtualCluster = virtualCluster;
this.shmMB = shmMB; this.shmMB = shmMB;
this.authFile = authFile; this.authFile = authFile;
this.portList = portList;
} }
} }
...@@ -79,6 +79,7 @@ class PAITrainingService implements TrainingService { ...@@ -79,6 +79,7 @@ class PAITrainingService implements TrainingService {
private logCollection: string; private logCollection: string;
private isMultiPhase: boolean = false; private isMultiPhase: boolean = false;
private authFileHdfsPath: string | undefined = undefined; private authFileHdfsPath: string | undefined = undefined;
private portList?: string | undefined;
constructor() { constructor() {
this.log = getLogger(); this.log = getLogger();
...@@ -446,6 +447,8 @@ class PAITrainingService implements TrainingService { ...@@ -446,6 +447,8 @@ class PAITrainingService implements TrainingService {
nniPaiTrialCommand, nniPaiTrialCommand,
// Task shared memory // Task shared memory
this.paiTrialConfig.shmMB, this.paiTrialConfig.shmMB,
// Task portList
this.paiTrialConfig.portList
) )
]; ];
......
...@@ -1410,14 +1410,7 @@ js-tokens@^4.0.0: ...@@ -1410,14 +1410,7 @@ js-tokens@^4.0.0:
version "4.0.0" version "4.0.0"
resolved "https://registry.yarnpkg.com/js-tokens/-/js-tokens-4.0.0.tgz#19203fb59991df98e3a287050d4647cdeaf32499" resolved "https://registry.yarnpkg.com/js-tokens/-/js-tokens-4.0.0.tgz#19203fb59991df98e3a287050d4647cdeaf32499"
js-yaml@^3.10.0: js-yaml@^3.10.0, js-yaml@^3.13.1:
version "3.12.0"
resolved "https://registry.yarnpkg.com/js-yaml/-/js-yaml-3.12.0.tgz#eaed656ec8344f10f527c6bfa1b6e2244de167d1"
dependencies:
argparse "^1.0.7"
esprima "^4.0.0"
js-yaml@^3.13.1:
version "3.13.1" version "3.13.1"
resolved "https://registry.yarnpkg.com/js-yaml/-/js-yaml-3.13.1.tgz#aff151b30bfdfa8e49e05da22e7415e9dfa37847" resolved "https://registry.yarnpkg.com/js-yaml/-/js-yaml-3.13.1.tgz#aff151b30bfdfa8e49e05da22e7415e9dfa37847"
dependencies: dependencies:
......
...@@ -171,15 +171,15 @@ class MsgDispatcher(MsgDispatcherBase): ...@@ -171,15 +171,15 @@ class MsgDispatcher(MsgDispatcherBase):
id_ = data['parameter_id'] id_ = data['parameter_id']
value = data['value'] value = data['value']
if id_ in _customized_parameter_ids: if id_ in _customized_parameter_ids:
if multi_phase_enabled(): if not hasattr(self.tuner, '_accept_customized'):
self.tuner.receive_customized_trial_result(id_, _trial_params[id_], value, trial_job_id=data['trial_job_id']) self.tuner._accept_customized = False
else: if not self.tuner._accept_customized:
self.tuner.receive_customized_trial_result(id_, _trial_params[id_], value) _logger.info('Customized trial job %s ignored by tuner', id_)
return
customized = True
else: else:
if multi_phase_enabled(): customized = False
self.tuner.receive_trial_result(id_, _trial_params[id_], value, trial_job_id=data['trial_job_id']) self.tuner.receive_trial_result(id_, _trial_params[id_], value, customized=customized, trial_job_id=data.get('trial_job_id'))
else:
self.tuner.receive_trial_result(id_, _trial_params[id_], value)
def _handle_intermediate_metric_data(self, data): def _handle_intermediate_metric_data(self, data):
"""Call assessor to process intermediate results """Call assessor to process intermediate results
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment