Unverified Commit 0a742aff authored by SparkSnail's avatar SparkSnail Committed by GitHub
Browse files

Merge pull request #249 from microsoft/master

merge master
parents 0fd38deb 76c819c0
...@@ -167,6 +167,7 @@ install-dependencies: $(NNI_NODE_TARBALL) $(NNI_YARN_TARBALL) ...@@ -167,6 +167,7 @@ install-dependencies: $(NNI_NODE_TARBALL) $(NNI_YARN_TARBALL)
.PHONY: install-python-modules .PHONY: install-python-modules
install-python-modules: install-python-modules:
#$(_INFO) Installing Python SDK $(_END) #$(_INFO) Installing Python SDK $(_END)
sed -ie 's/$(NNI_VERSION_TEMPLATE)/$(NNI_VERSION_VALUE)/' src/sdk/pynni/nni/__init__.py
sed -ie 's/$(NNI_VERSION_TEMPLATE)/$(NNI_VERSION_VALUE)/' setup.py && $(PIP_INSTALL) $(PIP_MODE) . sed -ie 's/$(NNI_VERSION_TEMPLATE)/$(NNI_VERSION_VALUE)/' setup.py && $(PIP_INSTALL) $(PIP_MODE) .
.PHONY: dev-install-python-modules .PHONY: dev-install-python-modules
......
...@@ -47,6 +47,7 @@ build: ...@@ -47,6 +47,7 @@ build:
cp $(CWD)../../src/nni_manager/package.json $(CWD)nni cp $(CWD)../../src/nni_manager/package.json $(CWD)nni
sed -ie 's/$(NNI_VERSION_TEMPLATE)/$(NNI_VERSION_VALUE)/' $(CWD)nni/package.json sed -ie 's/$(NNI_VERSION_TEMPLATE)/$(NNI_VERSION_VALUE)/' $(CWD)nni/package.json
cd $(CWD)nni && $(NNI_YARN) --prod cd $(CWD)nni && $(NNI_YARN) --prod
sed -ie 's/$(NNI_VERSION_TEMPLATE)/$(NNI_VERSION_VALUE)/' $(CWD)../../src/sdk/pynni/nni/__init__.py
cd $(CWD) && sed -ie 's/$(NNI_VERSION_TEMPLATE)/$(NNI_VERSION_VALUE)/' setup.py && python3 setup.py bdist_wheel -p $(WHEEL_SPEC) cd $(CWD) && sed -ie 's/$(NNI_VERSION_TEMPLATE)/$(NNI_VERSION_VALUE)/' setup.py && python3 setup.py bdist_wheel -p $(WHEEL_SPEC)
cd $(CWD) cd $(CWD)
......
...@@ -60,6 +60,8 @@ Copy-Item $CWD\..\..\src\nni_manager\package.json $CWD\nni ...@@ -60,6 +60,8 @@ Copy-Item $CWD\..\..\src\nni_manager\package.json $CWD\nni
(Get-Content $CWD\nni\package.json).replace($NNI_VERSION_TEMPLATE, $NNI_VERSION_VALUE) | Set-Content $CWD\nni\package.json (Get-Content $CWD\nni\package.json).replace($NNI_VERSION_TEMPLATE, $NNI_VERSION_VALUE) | Set-Content $CWD\nni\package.json
cd $CWD\nni cd $CWD\nni
yarn --prod yarn --prod
cd $CWD\..\..\src\sdk\pynni\nni
(Get-Content __init__.py).replace($NNI_VERSION_TEMPLATE, $NNI_VERSION_VALUE) | Set-Content __init__.py
cd $CWD cd $CWD
(Get-Content setup.py).replace($NNI_VERSION_TEMPLATE, $NNI_VERSION_VALUE) | Set-Content setup.py (Get-Content setup.py).replace($NNI_VERSION_TEMPLATE, $NNI_VERSION_VALUE) | Set-Content setup.py
python setup.py bdist_wheel -p $WHEEL_SPEC python setup.py bdist_wheel -p $WHEEL_SPEC
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.
import tensorflow as tf
from tensorflow.data import Dataset
def get_dataset():
(x_train, y_train), (x_valid, y_valid) = tf.keras.datasets.cifar10.load_data()
x_train, x_valid = x_train / 255.0, x_valid / 255.0
train_set = (x_train, y_train)
valid_set = (x_valid, y_valid)
return train_set, valid_set
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.
import tensorflow as tf
from tensorflow.keras import Model, Sequential
from tensorflow.keras.layers import (
AveragePooling2D,
BatchNormalization,
Conv2D,
Dense,
Dropout,
GlobalAveragePooling2D,
MaxPool2D,
ReLU,
SeparableConv2D,
)
from nni.nas.tensorflow.mutables import InputChoice, LayerChoice, MutableScope
def build_conv(filters, kernel_size, name=None):
return Sequential([
Conv2D(filters, kernel_size=1, use_bias=False),
BatchNormalization(trainable=False),
ReLU(),
Conv2D(filters, kernel_size, padding='same'),
BatchNormalization(trainable=False),
ReLU(),
], name)
def build_separable_conv(filters, kernel_size, name=None):
return Sequential([
Conv2D(filters, kernel_size=1, use_bias=False),
BatchNormalization(trainable=False),
ReLU(),
SeparableConv2D(filters, kernel_size, padding='same', use_bias=False),
Conv2D(filters, kernel_size=1, use_bias=False),
BatchNormalization(trainable=False),
ReLU(),
], name)
def build_avg_pool(filters, name=None):
return Sequential([
Conv2D(filters, kernel_size=1, use_bias=False),
BatchNormalization(trainable=False),
ReLU(),
AveragePooling2D(pool_size=3, strides=1, padding='same'),
BatchNormalization(trainable=False),
], name)
def build_max_pool(filters, name=None):
return Sequential([
Conv2D(filters, kernel_size=1, use_bias=False),
BatchNormalization(trainable=False),
ReLU(),
MaxPool2D(pool_size=3, strides=1, padding='same'),
BatchNormalization(trainable=False),
], name)
class FactorizedReduce(Model):
def __init__(self, filters):
super().__init__()
self.conv1 = Conv2D(filters // 2, kernel_size=1, strides=2, use_bias=False)
self.conv2 = Conv2D(filters // 2, kernel_size=1, strides=2, use_bias=False)
self.bn = BatchNormalization(trainable=False)
def call(self, x):
out1 = self.conv1(x)
out2 = self.conv2(x[:, 1:, 1:, :])
out = tf.concat([out1, out2], axis=3)
out = self.bn(out)
return out
class ENASLayer(MutableScope):
def __init__(self, key, prev_labels, filters):
super().__init__(key)
self.mutable = LayerChoice([
build_conv(filters, 3, 'conv3'),
build_separable_conv(filters, 3, 'sepconv3'),
build_conv(filters, 5, 'conv5'),
build_separable_conv(filters, 5, 'sepconv5'),
build_avg_pool(filters, 'avgpool'),
build_max_pool(filters, 'maxpool'),
])
if len(prev_labels) > 0:
self.skipconnect = InputChoice(choose_from=prev_labels, n_chosen=None)
else:
self.skipconnect = None
self.batch_norm = BatchNormalization(trainable=False)
def call(self, prev_layers):
out = self.mutable(prev_layers[-1])
if self.skipconnect is not None:
connection = self.skipconnect(prev_layers[:-1])
if connection is not None:
out += connection
return self.batch_norm(out)
class GeneralNetwork(Model):
def __init__(self, num_layers=12, filters=24, num_classes=10, dropout_rate=0.0):
super().__init__()
self.num_layers = num_layers
self.stem = Sequential([
Conv2D(filters, kernel_size=3, padding='same', use_bias=False),
BatchNormalization()
])
labels = ['layer_{}'.format(i) for i in range(num_layers)]
self.enas_layers = []
for i in range(num_layers):
layer = ENASLayer(labels[i], labels[:i], filters)
self.enas_layers.append(layer)
pool_num = 2
self.pool_distance = num_layers // (pool_num + 1)
self.pool_layers = [FactorizedReduce(filters) for _ in range(pool_num)]
self.gap = GlobalAveragePooling2D()
self.dropout = Dropout(dropout_rate)
self.dense = Dense(num_classes)
def call(self, x):
cur = self.stem(x)
prev_outputs = [cur]
for i, layer in enumerate(self.enas_layers):
if i > 0 and i % self.pool_distance == 0:
pool = self.pool_layers[i // self.pool_distance - 1]
prev_outputs = [pool(tensor) for tensor in prev_outputs]
cur = prev_outputs[-1]
cur = layer(prev_outputs)
prev_outputs.append(cur)
cur = self.gap(cur)
cur = self.dropout(cur)
logits = self.dense(cur)
return logits
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.
import tensorflow as tf
from tensorflow.keras import Model, Sequential
from tensorflow.keras.layers import (
AveragePooling2D,
BatchNormalization,
Conv2D,
Dense,
Dropout,
GlobalAveragePooling2D,
MaxPool2D,
ReLU,
SeparableConv2D,
)
from nni.nas.tensorflow.mutables import InputChoice, LayerChoice, MutableScope
def build_conv_1x1(filters, name=None):
return Sequential([
Conv2D(filters, kernel_size=1, use_bias=False),
BatchNormalization(trainable=False),
ReLU(),
], name)
def build_sep_conv(filters, kernel_size, name=None):
return Sequential([
ReLU(),
SeparableConv2D(filters, kernel_size, padding='same'),
BatchNormalization(trainable=True),
], name)
class FactorizedReduce(Model):
def __init__(self, filters):
super().__init__()
self.conv1 = Conv2D(filters // 2, kernel_size=1, strides=2, use_bias=False)
self.conv2 = Conv2D(filters // 2, kernel_size=1, strides=2, use_bias=False)
self.bn = BatchNormalization(trainable=False)
def call(self, x):
out1 = self.conv1(x)
out2 = self.conv2(x[:, 1:, 1:, :])
out = tf.concat([out1, out2], axis=3)
out = self.bn(out)
return out
class ReductionLayer(Model):
def __init__(self, filters):
super().__init__()
self.reduce0 = FactorizedReduce(filters)
self.reduce1 = FactorizedReduce(filters)
def call(self, prevprev, prev):
return self.reduce0(prevprev), self.reduce1(prev)
class Calibration(Model):
def __init__(self, filters):
super().__init__()
self.filters = filters
self.process = None
def build(self, shape):
assert len(shape) == 4 # batch_size, width, height, filters
if shape[3] != self.filters:
self.process = build_conv_1x1(self.filters)
def call(self, x):
if self.process is None:
return x
return self.process(x)
class Cell(Model):
def __init__(self, cell_name, prev_labels, filters):
super().__init__()
self.input_choice = InputChoice(choose_from=prev_labels, n_chosen=1, return_mask=True, key=cell_name + '_input')
self.op_choice = LayerChoice([
build_sep_conv(filters, 3),
build_sep_conv(filters, 5),
AveragePooling2D(pool_size=3, strides=1, padding='same'),
MaxPool2D(pool_size=3, strides=1, padding='same'),
Sequential(), # Identity
], key=cell_name + '_op')
def call(self, prev_layers):
chosen_input, chosen_mask = self.input_choice(prev_layers)
cell_out = self.op_choice(chosen_input)
return cell_out, chosen_mask
class Node(MutableScope):
def __init__(self, node_name, prev_node_names, filters):
super().__init__(node_name)
self.cell_x = Cell(node_name + '_x', prev_node_names, filters)
self.cell_y = Cell(node_name + '_y', prev_node_names, filters)
def call(self, prev_layers):
out_x, mask_x = self.cell_x(prev_layers)
out_y, mask_y = self.cell_y(prev_layers)
return out_x + out_y, mask_x | mask_y
class ENASLayer(Model):
def __init__(self, num_nodes, filters, reduction):
super().__init__()
self.preproc0 = Calibration(filters)
self.preproc1 = Calibration(filters)
self.nodes = []
node_labels = [InputChoice.NO_KEY, InputChoice.NO_KEY]
name_prefix = 'reduce' if reduction else 'normal'
for i in range(num_nodes):
node_labels.append('{}_node_{}'.format(name_prefix, i))
self.nodes.append(Node(node_labels[-1], node_labels[:-1], filters))
self.conv_ops = [Conv2D(filters, kernel_size=1, padding='same', use_bias=False) for _ in range(num_nodes + 2)]
self.bn = BatchNormalization(trainable=False)
def call(self, prevprev, prev):
prev_nodes_out = [self.preproc0(prevprev), self.preproc1(prev)]
nodes_used_mask = tf.zeros(len(self.nodes) + 2, dtype=tf.bool)
for i, node in enumerate(self.nodes):
node_out, mask = node(prev_nodes_out)
nodes_used_mask |= tf.pad(mask, [[0, nodes_used_mask.shape[0] - mask.shape[0]]])
prev_nodes_out.append(node_out)
outputs = []
for used, out, conv in zip(nodes_used_mask.numpy(), prev_nodes_out, self.conv_ops):
if not used:
outputs.append(conv(out))
out = tf.add_n(outputs)
return prev, self.bn(out)
class MicroNetwork(Model):
def __init__(self, num_layers=6, num_nodes=5, out_channels=20, num_classes=10, dropout_rate=0.1):
super().__init__()
self.num_layers = num_layers
self.stem = Sequential([
Conv2D(out_channels * 3, kernel_size=3, padding='same', use_bias=False),
BatchNormalization(),
])
pool_distance = num_layers // 3
pool_layer_indices = [pool_distance, 2 * pool_distance + 1]
self.enas_layers = []
filters = out_channels
for i in range(num_layers + 2):
if i in pool_layer_indices:
reduction = True
filters *= 2
self.enas_layers.append(ReductionLayer(filters))
else:
reduction = False
self.enas_layers.append(ENASLayer(num_nodes, filters, reduction))
self.gap = GlobalAveragePooling2D()
self.dropout = Dropout(dropout_rate)
self.dense = Dense(num_classes)
def call(self, x):
prev = cur = self.stem(x)
for layer in self.enas_layers:
prev, cur = layer(prev, cur)
cur = tf.keras.activations.relu(cur)
cur = self.gap(cur)
cur = self.dropout(cur)
logits = self.dense(cur)
return logits
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.
from tensorflow.keras.losses import Reduction, SparseCategoricalCrossentropy
from tensorflow.keras.optimizers import SGD
from nni.nas.tensorflow import enas
import datasets
from macro import GeneralNetwork
from micro import MicroNetwork
from utils import accuracy, accuracy_metrics
# TODO: argparse
dataset_train, dataset_valid = datasets.get_dataset()
#model = GeneralNetwork()
model = MicroNetwork()
loss = SparseCategoricalCrossentropy(from_logits=True, reduction=Reduction.NONE)
optimizer = SGD(learning_rate=0.05, momentum=0.9)
trainer = enas.EnasTrainer(model,
loss=loss,
metrics=accuracy_metrics,
reward_function=accuracy,
optimizer=optimizer,
batch_size=64,
num_epochs=310,
dataset_train=dataset_train,
dataset_valid=dataset_valid)
trainer.train()
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.
import tensorflow as tf
def accuracy_metrics(y_true, logits):
return {'enas_acc': accuracy(y_true, logits)}
def accuracy(y_true, logits):
# y_true: shape=(batch_size) or (batch_size,1), type=integer
# logits: shape=(batch_size, num_of_classes), type=float
# returns float
batch_size = y_true.shape[0]
y_true = tf.squeeze(y_true)
y_pred = tf.math.argmax(logits, axis=1)
y_pred = tf.cast(y_pred, y_true.dtype)
equal = tf.cast(y_pred == y_true, tf.int32)
return tf.math.reduce_sum(equal).numpy() / batch_size
import tensorflow as tf
from tensorflow.keras import Model
from tensorflow.keras.layers import (AveragePooling2D, BatchNormalization, Conv2D, Dense, MaxPool2D)
from tensorflow.keras.losses import Reduction, SparseCategoricalCrossentropy
from tensorflow.keras.optimizers import SGD
from nni.nas.tensorflow.mutables import LayerChoice, InputChoice
from nni.nas.tensorflow.enas import EnasTrainer
tf.get_logger().setLevel('ERROR')
class Net(Model):
def __init__(self):
super().__init__()
self.conv1 = LayerChoice([
Conv2D(6, 3, padding='same', activation='relu'),
Conv2D(6, 5, padding='same', activation='relu'),
])
self.pool = MaxPool2D(2)
self.conv2 = LayerChoice([
Conv2D(16, 3, padding='same', activation='relu'),
Conv2D(16, 5, padding='same', activation='relu'),
])
self.conv3 = Conv2D(16, 1)
self.skipconnect = InputChoice(n_candidates=1)
self.bn = BatchNormalization()
self.gap = AveragePooling2D(2)
self.fc1 = Dense(120, activation='relu')
self.fc2 = Dense(84, activation='relu')
self.fc3 = Dense(10)
def call(self, x):
bs = x.shape[0]
t = self.conv1(x)
x = self.pool(t)
x0 = self.conv2(x)
x1 = self.conv3(x0)
x0 = self.skipconnect([x0])
if x0 is not None:
x1 += x0
x = self.pool(self.bn(x1))
x = self.gap(x)
x = tf.reshape(x, [bs, -1])
x = self.fc1(x)
x = self.fc2(x)
x = self.fc3(x)
return x
def accuracy(output, target):
bs = target.shape[0]
predicted = tf.cast(tf.argmax(output, 1), target.dtype)
target = tf.reshape(target, [-1])
return sum(tf.cast(predicted == target, tf.float32)) / bs
if __name__ == '__main__':
cifar10 = tf.keras.datasets.cifar10
(x_train, y_train), (x_test, y_test) = cifar10.load_data()
x_train, x_test = x_train / 255.0, x_test / 255.0
split = int(len(x_train) * 0.9)
dataset_train = tf.data.Dataset.from_tensor_slices((x_train[:split], y_train[:split])).batch(64)
dataset_valid = tf.data.Dataset.from_tensor_slices((x_train[split:], y_train[split:])).batch(64)
dataset_test = tf.data.Dataset.from_tensor_slices((x_test, y_test)).batch(64)
net = Net()
trainer = EnasTrainer(
net,
loss=SparseCategoricalCrossentropy(reduction=Reduction.SUM),
metrics=accuracy,
reward_function=accuracy,
optimizer=SGD(learning_rate=0.001, momentum=0.9),
batch_size=64,
num_epochs=2,
dataset_train=dataset_train,
dataset_valid=dataset_valid,
dataset_test=dataset_test
)
trainer.train()
#trainer.export('checkpoint')
...@@ -45,4 +45,6 @@ enable= unused-wildcard-import, ...@@ -45,4 +45,6 @@ enable= unused-wildcard-import,
ignore-patterns=test* ignore-patterns=test*
# List of members which are set dynamically and missed by pylint inference # List of members which are set dynamically and missed by pylint inference
generated-members=numpy.*,torch.* generated-members=numpy.*,torch.*,tensorflow.*
ignored-modules=tensorflow
...@@ -3,6 +3,7 @@ ...@@ -3,6 +3,7 @@
'use strict'; 'use strict';
import * as assert from 'assert';
import * as cpp from 'child-process-promise'; import * as cpp from 'child-process-promise';
import * as fs from 'fs'; import * as fs from 'fs';
import * as path from 'path'; import * as path from 'path';
...@@ -72,6 +73,11 @@ class FrameworkControllerTrainingService extends KubernetesTrainingService imple ...@@ -72,6 +73,11 @@ class FrameworkControllerTrainingService extends KubernetesTrainingService imple
this.kubernetesRestServerPort = restServer.clusterRestServerPort; this.kubernetesRestServerPort = restServer.clusterRestServerPort;
} }
// wait upload of code Dir to finish
if (this.copyExpCodeDirPromise !== undefined) {
await this.copyExpCodeDirPromise;
}
const trialJobId: string = uniqueString(5); const trialJobId: string = uniqueString(5);
// Set trial's NFS working folder // Set trial's NFS working folder
const trialWorkingFolder: string = path.join(this.CONTAINER_MOUNT_PATH, 'nni', getExperimentId(), trialJobId); const trialWorkingFolder: string = path.join(this.CONTAINER_MOUNT_PATH, 'nni', getExperimentId(), trialJobId);
...@@ -81,8 +87,8 @@ class FrameworkControllerTrainingService extends KubernetesTrainingService imple ...@@ -81,8 +87,8 @@ class FrameworkControllerTrainingService extends KubernetesTrainingService imple
this.generateContainerPort(); this.generateContainerPort();
await this.prepareRunScript(trialLocalTempFolder, trialJobId, trialWorkingFolder, form); await this.prepareRunScript(trialLocalTempFolder, trialJobId, trialWorkingFolder, form);
//upload code files //wait upload of script files to finish
const trialJobOutputUrl: string = await this.uploadCodeFiles(trialJobId, trialLocalTempFolder); const trialJobOutputUrl: string = await this.uploadFolder(trialLocalTempFolder, `nni/${getExperimentId()}/${trialJobId}`);
let initStatus: TrialJobStatus = 'WAITING'; let initStatus: TrialJobStatus = 'WAITING';
if (!trialJobOutputUrl) { if (!trialJobOutputUrl) {
initStatus = 'FAILED'; initStatus = 'FAILED';
...@@ -151,6 +157,8 @@ class FrameworkControllerTrainingService extends KubernetesTrainingService imple ...@@ -151,6 +157,8 @@ class FrameworkControllerTrainingService extends KubernetesTrainingService imple
// Validate to make sure codeDir doesn't have too many files // Validate to make sure codeDir doesn't have too many files
try { try {
await validateCodeDir(this.fcTrialConfig.codeDir); await validateCodeDir(this.fcTrialConfig.codeDir);
//upload codeDir to storage
this.copyExpCodeDirPromise = this.uploadFolder(this.fcTrialConfig.codeDir, `nni/${getExperimentId()}/nni-code`);
} catch (error) { } catch (error) {
this.log.error(error); this.log.error(error);
...@@ -171,41 +179,31 @@ class FrameworkControllerTrainingService extends KubernetesTrainingService imple ...@@ -171,41 +179,31 @@ class FrameworkControllerTrainingService extends KubernetesTrainingService imple
} }
/** /**
* upload code files to nfs or azureStroage * upload local folder to nfs or azureStroage
* @param trialJobId
* @param trialLocalTempFolder
* return: trialJobOutputUrl
*/ */
private async uploadCodeFiles(trialJobId: string, trialLocalTempFolder: string): Promise<string> { private async uploadFolder(srcDirectory: string, destDirectory: string): Promise<string> {
if (this.fcClusterConfig === undefined) { if (this.fcClusterConfig === undefined) {
throw new Error('Kubeflow Cluster config is not initialized'); throw new Error('Kubeflow Cluster config is not initialized');
} }
if (this.fcTrialConfig === undefined) { assert(this.fcClusterConfig.storage === undefined
throw new Error('Kubeflow trial config is not initialized'); || this.fcClusterConfig.storage === 'azureStorage'
} || this.fcClusterConfig.storage === 'nfs');
let trialJobOutputUrl: string = '';
if (this.fcClusterConfig.storageType === 'azureStorage') { if (this.fcClusterConfig.storage === 'azureStorage') {
const azureFrameworkControllerClusterConfig: FrameworkControllerClusterConfigAzure = if (this.azureStorageClient === undefined) {
<FrameworkControllerClusterConfigAzure>this.fcClusterConfig; throw new Error('azureStorageClient is not initialized');
trialJobOutputUrl = await this.uploadFilesToAzureStorage(trialJobId, trialLocalTempFolder, this.fcTrialConfig.codeDir, }
azureFrameworkControllerClusterConfig.uploadRetryCount); const fcClusterConfigAzure: FrameworkControllerClusterConfigAzure = <FrameworkControllerClusterConfigAzure>this.fcClusterConfig;
} else if (this.fcClusterConfig.storageType === 'nfs') { return await this.uploadFolderToAzureStorage(srcDirectory, destDirectory, fcClusterConfigAzure.uploadRetryCount);
const nfsFrameworkControllerClusterConfig: FrameworkControllerClusterConfigNFS = } else if (this.fcClusterConfig.storage === 'nfs' || this.fcClusterConfig.storage === undefined) {
<FrameworkControllerClusterConfigNFS>this.fcClusterConfig; await cpp.exec(`mkdir -p ${this.trialLocalNFSTempFolder}/${destDirectory}`);
// Creat work dir for current trial in NFS directory await cpp.exec(`cp -r ${srcDirectory}/* ${this.trialLocalNFSTempFolder}/${destDirectory}/.`);
await cpp.exec(`mkdir -p ${this.trialLocalNFSTempFolder}/nni/${getExperimentId()}/${trialJobId}`); const fcClusterConfigNFS: FrameworkControllerClusterConfigNFS = <FrameworkControllerClusterConfigNFS>this.fcClusterConfig;
// Copy code files from local dir to NFS mounted dir const nfsConfig: NFSConfig = fcClusterConfigNFS.nfs;
await cpp.exec(`cp -r ${trialLocalTempFolder}/* ${this.trialLocalNFSTempFolder}/nni/${getExperimentId()}/${trialJobId}/.`); return `nfs://${nfsConfig.server}:${destDirectory}`;
// Copy codeDir to NFS mounted dir
await cpp.exec(`cp -r ${this.fcTrialConfig.codeDir}/* ${this.trialLocalNFSTempFolder}/nni/${getExperimentId()}/${trialJobId}/.`);
const nfsConfig: NFSConfig = nfsFrameworkControllerClusterConfig.nfs;
trialJobOutputUrl = `nfs://${nfsConfig.server}:${path.join(nfsConfig.path, 'nni', getExperimentId(), trialJobId, 'output')}`;
} }
return '';
return Promise.resolve(trialJobOutputUrl);
} }
/** /**
......
...@@ -74,14 +74,20 @@ class KubeflowTrainingService extends KubernetesTrainingService implements Kuber ...@@ -74,14 +74,20 @@ class KubeflowTrainingService extends KubernetesTrainingService implements Kuber
const restServer: KubeflowJobRestServer = component.get(KubeflowJobRestServer); const restServer: KubeflowJobRestServer = component.get(KubeflowJobRestServer);
this.kubernetesRestServerPort = restServer.clusterRestServerPort; this.kubernetesRestServerPort = restServer.clusterRestServerPort;
} }
// upload code Dir to storage
if (this.copyExpCodeDirPromise !== undefined) {
await this.copyExpCodeDirPromise;
}
const trialJobId: string = uniqueString(5); const trialJobId: string = uniqueString(5);
const trialWorkingFolder: string = path.join(this.CONTAINER_MOUNT_PATH, 'nni', getExperimentId(), trialJobId); const trialWorkingFolder: string = path.join(this.CONTAINER_MOUNT_PATH, 'nni', getExperimentId(), trialJobId);
const kubeflowJobName: string = `nni-exp-${this.experimentId}-trial-${trialJobId}`.toLowerCase(); const kubeflowJobName: string = `nni-exp-${this.experimentId}-trial-${trialJobId}`.toLowerCase();
const trialLocalTempFolder: string = path.join(getExperimentRootDir(), 'trials-local', trialJobId); const trialLocalTempFolder: string = path.join(getExperimentRootDir(), 'trials-local', trialJobId);
//prepare the runscript //prepare the runscript
await this.prepareRunScript(trialLocalTempFolder, trialJobId, trialWorkingFolder, form); await this.prepareRunScript(trialLocalTempFolder, trialJobId, trialWorkingFolder, form);
//upload files to sotrage //upload script files to sotrage
const trialJobOutputUrl: string = await this.uploadCodeFiles(trialJobId, trialLocalTempFolder); const trialJobOutputUrl: string = await this.uploadFolder(trialLocalTempFolder, `nni/${getExperimentId()}/${trialJobId}`);
let initStatus: TrialJobStatus = 'WAITING'; let initStatus: TrialJobStatus = 'WAITING';
if (!trialJobOutputUrl) { if (!trialJobOutputUrl) {
initStatus = 'FAILED'; initStatus = 'FAILED';
...@@ -152,6 +158,8 @@ class KubeflowTrainingService extends KubernetesTrainingService implements Kuber ...@@ -152,6 +158,8 @@ class KubeflowTrainingService extends KubernetesTrainingService implements Kuber
// Validate to make sure codeDir doesn't have too many files // Validate to make sure codeDir doesn't have too many files
try { try {
await validateCodeDir(this.kubeflowTrialConfig.codeDir); await validateCodeDir(this.kubeflowTrialConfig.codeDir);
//upload codeDir to storage
this.copyExpCodeDirPromise = this.uploadFolder(this.kubeflowTrialConfig.codeDir, `nni/${getExperimentId()}/nni-code`);
} catch (error) { } catch (error) {
this.log.error(error); this.log.error(error);
...@@ -172,12 +180,9 @@ class KubeflowTrainingService extends KubernetesTrainingService implements Kuber ...@@ -172,12 +180,9 @@ class KubeflowTrainingService extends KubernetesTrainingService implements Kuber
} }
/** /**
* upload code files to nfs or azureStroage * upload local folder to nfs or azureStroage
* @param trialJobId
* @param trialLocalTempFolder
* return: trialJobOutputUrl
*/ */
private async uploadCodeFiles(trialJobId: string, trialLocalTempFolder: string): Promise<string> { private async uploadFolder(srcDirectory: string, destDirectory: string): Promise<string> {
if (this.kubeflowClusterConfig === undefined) { if (this.kubeflowClusterConfig === undefined) {
throw new Error('Kubeflow Cluster config is not initialized'); throw new Error('Kubeflow Cluster config is not initialized');
} }
...@@ -186,8 +191,6 @@ class KubeflowTrainingService extends KubernetesTrainingService implements Kuber ...@@ -186,8 +191,6 @@ class KubeflowTrainingService extends KubernetesTrainingService implements Kuber
throw new Error('Kubeflow Trial config is not initialized'); throw new Error('Kubeflow Trial config is not initialized');
} }
let trialJobOutputUrl: string = '';
assert(this.kubeflowClusterConfig.storage === undefined assert(this.kubeflowClusterConfig.storage === undefined
|| this.kubeflowClusterConfig.storage === 'azureStorage' || this.kubeflowClusterConfig.storage === 'azureStorage'
|| this.kubeflowClusterConfig.storage === 'nfs'); || this.kubeflowClusterConfig.storage === 'nfs');
...@@ -197,20 +200,15 @@ class KubeflowTrainingService extends KubernetesTrainingService implements Kuber ...@@ -197,20 +200,15 @@ class KubeflowTrainingService extends KubernetesTrainingService implements Kuber
throw new Error('azureStorageClient is not initialized'); throw new Error('azureStorageClient is not initialized');
} }
const azureKubeflowClusterConfig: KubeflowClusterConfigAzure = <KubeflowClusterConfigAzure>this.kubeflowClusterConfig; const azureKubeflowClusterConfig: KubeflowClusterConfigAzure = <KubeflowClusterConfigAzure>this.kubeflowClusterConfig;
trialJobOutputUrl = await this.uploadFilesToAzureStorage(trialJobId, trialLocalTempFolder, this.kubeflowTrialConfig.codeDir, azureKubeflowClusterConfig.uploadRetryCount); return await this.uploadFolderToAzureStorage(srcDirectory, destDirectory, azureKubeflowClusterConfig.uploadRetryCount);
} else if (this.kubeflowClusterConfig.storage === 'nfs' || this.kubeflowClusterConfig.storage === undefined) { } else if (this.kubeflowClusterConfig.storage === 'nfs' || this.kubeflowClusterConfig.storage === undefined) {
await cpp.exec(`mkdir -p ${this.trialLocalNFSTempFolder}/${destDirectory}`);
await cpp.exec(`cp -r ${srcDirectory}/* ${this.trialLocalNFSTempFolder}/${destDirectory}/.`);
const nfsKubeflowClusterConfig: KubeflowClusterConfigNFS = <KubeflowClusterConfigNFS>this.kubeflowClusterConfig; const nfsKubeflowClusterConfig: KubeflowClusterConfigNFS = <KubeflowClusterConfigNFS>this.kubeflowClusterConfig;
// Creat work dir for current trial in NFS directory
await cpp.exec(`mkdir -p ${this.trialLocalNFSTempFolder}/nni/${getExperimentId()}/${trialJobId}`);
// Copy script files from local dir to NFS mounted dir
await cpp.exec(`cp -r ${trialLocalTempFolder}/* ${this.trialLocalNFSTempFolder}/nni/${getExperimentId()}/${trialJobId}/.`);
// Copy codeDir to NFS mounted dir
await cpp.exec(`cp -r ${this.kubeflowTrialConfig.codeDir}/* ${this.trialLocalNFSTempFolder}/nni/${getExperimentId()}/${trialJobId}/.`);
const nfsConfig: NFSConfig = nfsKubeflowClusterConfig.nfs; const nfsConfig: NFSConfig = nfsKubeflowClusterConfig.nfs;
trialJobOutputUrl = `nfs://${nfsConfig.server}:${path.join(nfsConfig.path, 'nni', getExperimentId(), trialJobId, 'output')}`; return `nfs://${nfsConfig.server}:${destDirectory}`;
} }
return '';
return Promise.resolve(trialJobOutputUrl);
} }
private async prepareRunScript(trialLocalTempFolder: string, trialJobId: string, trialWorkingFolder: string, private async prepareRunScript(trialLocalTempFolder: string, trialJobId: string, trialWorkingFolder: string,
......
...@@ -39,7 +39,7 @@ export class KubernetesTrialJobDetail implements TrialJobDetail { ...@@ -39,7 +39,7 @@ export class KubernetesTrialJobDetail implements TrialJobDetail {
export const kubernetesScriptFormat: string = export const kubernetesScriptFormat: string =
`#!/bin/bash `#!/bin/bash
export NNI_PLATFORM={0} export NNI_PLATFORM={0}
export NNI_SYS_DIR=$PWD/nni/{1} export NNI_SYS_DIR={1}
export NNI_OUTPUT_DIR={2} export NNI_OUTPUT_DIR={2}
export MULTI_PHASE=false export MULTI_PHASE=false
export NNI_TRIAL_JOB_ID={3} export NNI_TRIAL_JOB_ID={3}
...@@ -49,7 +49,7 @@ export NNI_TRIAL_SEQ_ID={6} ...@@ -49,7 +49,7 @@ export NNI_TRIAL_SEQ_ID={6}
{7} {7}
mkdir -p $NNI_SYS_DIR mkdir -p $NNI_SYS_DIR
mkdir -p $NNI_OUTPUT_DIR mkdir -p $NNI_OUTPUT_DIR
cp -rT $NNI_CODE_DIR $NNI_SYS_DIR cp -r $NNI_CODE_DIR/. $NNI_SYS_DIR
cd $NNI_SYS_DIR cd $NNI_SYS_DIR
sh install_nni.sh sh install_nni.sh
python3 -m nni_trial_tool.trial_keeper --trial_command '{8}' --nnimanager_ip {9} --nnimanager_port {10} \ python3 -m nni_trial_tool.trial_keeper --trial_command '{8}' --nnimanager_ip {9} --nnimanager_port {10} \
......
...@@ -49,6 +49,8 @@ abstract class KubernetesTrainingService { ...@@ -49,6 +49,8 @@ abstract class KubernetesTrainingService {
protected kubernetesClusterConfig?: KubernetesClusterConfig; protected kubernetesClusterConfig?: KubernetesClusterConfig;
protected versionCheck: boolean = true; protected versionCheck: boolean = true;
protected logCollection: string; protected logCollection: string;
protected copyExpCodeDirPromise?: Promise<string>;
protected expContainerCodeFolder: string;
constructor() { constructor() {
this.log = getLogger(); this.log = getLogger();
...@@ -57,6 +59,7 @@ abstract class KubernetesTrainingService { ...@@ -57,6 +59,7 @@ abstract class KubernetesTrainingService {
this.trialLocalNFSTempFolder = path.join(getExperimentRootDir(), 'trials-nfs-tmp'); this.trialLocalNFSTempFolder = path.join(getExperimentRootDir(), 'trials-nfs-tmp');
this.experimentId = getExperimentId(); this.experimentId = getExperimentId();
this.CONTAINER_MOUNT_PATH = '/tmp/mount'; this.CONTAINER_MOUNT_PATH = '/tmp/mount';
this.expContainerCodeFolder = path.join(this.CONTAINER_MOUNT_PATH, 'nni', this.experimentId, 'nni-code');
this.genericK8sClient = new GeneralK8sClient(); this.genericK8sClient = new GeneralK8sClient();
this.logCollection = 'none'; this.logCollection = 'none';
} }
...@@ -272,11 +275,11 @@ abstract class KubernetesTrainingService { ...@@ -272,11 +275,11 @@ abstract class KubernetesTrainingService {
const runScript: string = String.Format( const runScript: string = String.Format(
kubernetesScriptFormat, kubernetesScriptFormat,
platform, platform,
trialJobId, trialWorkingFolder,
path.join(trialWorkingFolder, 'output', `${roleName}_output`), path.join(trialWorkingFolder, 'output', `${roleName}_output`),
trialJobId, trialJobId,
getExperimentId(), getExperimentId(),
trialWorkingFolder, this.expContainerCodeFolder,
trialSequenceId, trialSequenceId,
nvidiaScript, nvidiaScript,
command, command,
...@@ -329,51 +332,45 @@ abstract class KubernetesTrainingService { ...@@ -329,51 +332,45 @@ abstract class KubernetesTrainingService {
); );
return registrySecretName; return registrySecretName;
} }
protected async uploadFilesToAzureStorage(trialJobId: string, trialLocalTempFolder: string, codeDir: string, uploadRetryCount: number | undefined): Promise<string> { /**
* upload local directory to azureStorage
* @param srcDirectory the source directory of local folder
* @param destDirectory the target directory in azure
* @param uploadRetryCount the retry time when upload failed
*/
protected async uploadFolderToAzureStorage(srcDirectory: string, destDirectory: string, uploadRetryCount: number | undefined): Promise<string> {
if (this.azureStorageClient === undefined) { if (this.azureStorageClient === undefined) {
throw new Error('azureStorageClient is not initialized'); throw new Error('azureStorageClient is not initialized');
} }
let trialJobOutputUrl: string = '';
let retryCount: number = 1; let retryCount: number = 1;
if(uploadRetryCount) { if(uploadRetryCount) {
retryCount = uploadRetryCount; retryCount = uploadRetryCount;
} }
let resultUploadNNIScript: boolean = false; let uploadSuccess: boolean = false;
let resultUploadCodeFile: boolean = false; let folderUriInAzure = '';
try { try {
do { do {
//upload local files, including scripts for running the trial and configuration (e.g., hyperparameters) for the trial, to azure storage uploadSuccess = await AzureStorageClientUtility.uploadDirectory(
if(!resultUploadNNIScript) { this.azureStorageClient,
resultUploadNNIScript = await AzureStorageClientUtility.uploadDirectory(this.azureStorageClient, `${destDirectory}`,
`nni/${getExperimentId()}/${trialJobId}`, this.azureStorageShare, this.azureStorageShare,
`${trialLocalTempFolder}`); `${srcDirectory}`);
} if (!uploadSuccess) {
//upload code files to azure storage
if(!resultUploadCodeFile) {
resultUploadCodeFile = await AzureStorageClientUtility.uploadDirectory(this.azureStorageClient,
`nni/${getExperimentId()}/${trialJobId}`, this.azureStorageShare,
`${codeDir}`);
}
if (resultUploadNNIScript && resultUploadCodeFile) {
trialJobOutputUrl = `https://${this.azureStorageAccountName}.file.core.windows.net/${this.azureStorageShare}` +
`/${path.join('nni', getExperimentId(), trialJobId, 'output')}`;
break;
} else {
//wait for 5 seconds to re-upload files //wait for 5 seconds to re-upload files
await delay(5000); await delay(5000);
this.log.info('Upload failed, Retry: upload files to azure-storage'); this.log.info('Upload failed, Retry: upload files to azure-storage');
} else {
folderUriInAzure = `https://${this.azureStorageAccountName}.file.core.windows.net/${this.azureStorageShare}/${destDirectory}`;
break;
} }
} while (retryCount-- >= 0) } while (retryCount-- >= 0)
} catch (error) { } catch (error) {
this.log.error(error); this.log.error(error);
//return a empty url when got error //return a empty url when got error
return Promise.resolve(""); return Promise.resolve('');
}
if(!trialJobOutputUrl) {
this.log.info(`Retry-count is used up, upload files to azureStorage for trial ${trialJobId} failed!`);
} }
return Promise.resolve(trialJobOutputUrl); return Promise.resolve(folderUriInAzure);
} }
} }
......
...@@ -361,21 +361,25 @@ class LocalTrainingService implements TrainingService { ...@@ -361,21 +361,25 @@ class LocalTrainingService implements TrainingService {
trialJobDetail: TrialJobDetail, trialJobDetail: TrialJobDetail,
resource: { gpuIndices: number[] }, resource: { gpuIndices: number[] },
gpuNum: number | undefined): { key: string; value: string }[] { gpuNum: number | undefined): { key: string; value: string }[] {
const envVariables: { key: string; value: string }[] = [ if (this.localTrialConfig === undefined) {
{ key: 'NNI_PLATFORM', value: 'local' }, throw new Error('localTrialConfig is not initialized!');
{ key: 'NNI_EXP_ID', value: this.experimentId }, }
{ key: 'NNI_SYS_DIR', value: trialJobDetail.workingDirectory }, const envVariables: { key: string; value: string }[] = [
{ key: 'NNI_TRIAL_JOB_ID', value: trialJobDetail.id }, { key: 'NNI_PLATFORM', value: 'local' },
{ key: 'NNI_OUTPUT_DIR', value: trialJobDetail.workingDirectory }, { key: 'NNI_EXP_ID', value: this.experimentId },
{ key: 'NNI_TRIAL_SEQ_ID', value: trialJobDetail.form.sequenceId.toString() }, { key: 'NNI_SYS_DIR', value: trialJobDetail.workingDirectory },
{ key: 'MULTI_PHASE', value: this.isMultiPhase.toString() } { key: 'NNI_TRIAL_JOB_ID', value: trialJobDetail.id },
]; { key: 'NNI_OUTPUT_DIR', value: trialJobDetail.workingDirectory },
if (gpuNum !== undefined) { { key: 'NNI_TRIAL_SEQ_ID', value: trialJobDetail.form.sequenceId.toString() },
envVariables.push({ { key: 'MULTI_PHASE', value: this.isMultiPhase.toString() },
key: 'CUDA_VISIBLE_DEVICES', { key: 'NNI_CODE_DIR', value: this.localTrialConfig.codeDir}
value: this.gpuScheduler === undefined ? '-1' : resource.gpuIndices.join(',') ];
}); if (gpuNum !== undefined) {
} envVariables.push({
key: 'CUDA_VISIBLE_DEVICES',
value: this.gpuScheduler === undefined ? '-1' : resource.gpuIndices.join(',')
});
}
return envVariables; return envVariables;
} }
...@@ -473,12 +477,16 @@ class LocalTrainingService implements TrainingService { ...@@ -473,12 +477,16 @@ class LocalTrainingService implements TrainingService {
private getScript(localTrialConfig: TrialConfig, workingDirectory: string): string[] { private getScript(localTrialConfig: TrialConfig, workingDirectory: string): string[] {
const script: string[] = []; const script: string[] = [];
if (process.platform === 'win32') { if (process.platform === 'win32') {
script.push(`Copy-Item $env:NNI_CODE_DIR\\* -Destination $env:NNI_SYS_DIR -Recurse`);
script.push(`cd $env:NNI_SYS_DIR`);
script.push( script.push(
`cmd.exe /c ${localTrialConfig.command} 2>"${path.join(workingDirectory, 'stderr')}"`, `cmd.exe /c ${localTrialConfig.command} 2>"${path.join(workingDirectory, 'stderr')}"`,
`$NOW_DATE = [int64](([datetime]::UtcNow)-(get-date "1/1/1970")).TotalSeconds`, `$NOW_DATE = [int64](([datetime]::UtcNow)-(get-date "1/1/1970")).TotalSeconds`,
`$NOW_DATE = "$NOW_DATE" + (Get-Date -Format fff).ToString()`, `$NOW_DATE = "$NOW_DATE" + (Get-Date -Format fff).ToString()`,
`Write $LASTEXITCODE " " $NOW_DATE | Out-File "${path.join(workingDirectory, '.nni', 'state')}" -NoNewline -encoding utf8`); `Write $LASTEXITCODE " " $NOW_DATE | Out-File "${path.join(workingDirectory, '.nni', 'state')}" -NoNewline -encoding utf8`);
} else { } else {
script.push(`cp -r $NNI_CODE_DIR/. $NNI_SYS_DIR`);
script.push(`cd $NNI_SYS_DIR`);
script.push(`eval ${localTrialConfig.command} 2>"${path.join(workingDirectory, 'stderr')}"`); script.push(`eval ${localTrialConfig.command} 2>"${path.join(workingDirectory, 'stderr')}"`);
if (process.platform === 'darwin') { if (process.platform === 'darwin') {
// https://superuser.com/questions/599072/how-to-get-bash-execution-time-in-milliseconds-under-mac-os-x // https://superuser.com/questions/599072/how-to-get-bash-execution-time-in-milliseconds-under-mac-os-x
...@@ -506,7 +514,6 @@ class LocalTrainingService implements TrainingService { ...@@ -506,7 +514,6 @@ class LocalTrainingService implements TrainingService {
if (process.platform !== 'win32') { if (process.platform !== 'win32') {
runScriptContent.push('#!/bin/bash'); runScriptContent.push('#!/bin/bash');
} }
runScriptContent.push(`cd '${this.localTrialConfig.codeDir}'`);
for (const variable of variables) { for (const variable of variables) {
runScriptContent.push(setEnvironmentVariable(variable)); runScriptContent.push(setEnvironmentVariable(variable));
} }
......
...@@ -31,7 +31,6 @@ fi`; ...@@ -31,7 +31,6 @@ fi`;
export const PAI_K8S_TRIAL_COMMAND_FORMAT: string = export const PAI_K8S_TRIAL_COMMAND_FORMAT: string =
`export NNI_PLATFORM=pai NNI_SYS_DIR={0} NNI_OUTPUT_DIR={1} NNI_TRIAL_JOB_ID={2} NNI_EXP_ID={3} NNI_TRIAL_SEQ_ID={4} MULTI_PHASE={5} \ `export NNI_PLATFORM=pai NNI_SYS_DIR={0} NNI_OUTPUT_DIR={1} NNI_TRIAL_JOB_ID={2} NNI_EXP_ID={3} NNI_TRIAL_SEQ_ID={4} MULTI_PHASE={5} \
&& ls $NNI_SYS_DIR \ && NNI_CODE_DIR={6} && cp -r $NNI_CODE_DIR/. $NNI_SYS_DIR && cd $NNI_SYS_DIR && sh install_nni.sh \
&& cd $NNI_SYS_DIR && sh install_nni.sh \ && python3 -m nni_trial_tool.trial_keeper --trial_command '{7}' --nnimanager_ip '{8}' --nnimanager_port '{9}' \
&& python3 -m nni_trial_tool.trial_keeper --trial_command '{6}' --nnimanager_ip '{7}' --nnimanager_port '{8}' \ --nni_manager_version '{10}' --log_collection '{11}'`;
--nni_manager_version '{9}' --log_collection '{10}'`;
...@@ -53,6 +53,7 @@ const yaml = require('js-yaml'); ...@@ -53,6 +53,7 @@ const yaml = require('js-yaml');
@component.Singleton @component.Singleton
class PAIK8STrainingService extends PAITrainingService { class PAIK8STrainingService extends PAITrainingService {
protected paiTrialConfig: NNIPAIK8STrialConfig | undefined; protected paiTrialConfig: NNIPAIK8STrialConfig | undefined;
private copyExpCodeDirPromise?: Promise<void>;
private paiJobConfig: undefined; private paiJobConfig: undefined;
private nniVersion: string | undefined; private nniVersion: string | undefined;
constructor() { constructor() {
...@@ -78,7 +79,7 @@ class PAIK8STrainingService extends PAITrainingService { ...@@ -78,7 +79,7 @@ class PAIK8STrainingService extends PAITrainingService {
} }
break; break;
case TrialConfigMetadataKey.TRIAL_CONFIG: case TrialConfigMetadataKey.TRIAL_CONFIG: {
if (this.paiClusterConfig === undefined) { if (this.paiClusterConfig === undefined) {
this.log.error('pai cluster config is not initialized'); this.log.error('pai cluster config is not initialized');
break; break;
...@@ -86,10 +87,15 @@ class PAIK8STrainingService extends PAITrainingService { ...@@ -86,10 +87,15 @@ class PAIK8STrainingService extends PAITrainingService {
this.paiTrialConfig = <NNIPAIK8STrialConfig>JSON.parse(value); this.paiTrialConfig = <NNIPAIK8STrialConfig>JSON.parse(value);
// Validate to make sure codeDir doesn't have too many files // Validate to make sure codeDir doesn't have too many files
await validateCodeDir(this.paiTrialConfig.codeDir); await validateCodeDir(this.paiTrialConfig.codeDir);
const nniManagerNFSExpCodeDir = path.join(this.paiTrialConfig.nniManagerNFSMountPath, this.experimentId, 'nni-code');
await execMkdir(nniManagerNFSExpCodeDir);
//Copy codeDir files to local working folder
this.copyExpCodeDirPromise = execCopydir(this.paiTrialConfig.codeDir, nniManagerNFSExpCodeDir);
if (this.paiTrialConfig.paiConfigPath) { if (this.paiTrialConfig.paiConfigPath) {
this.paiJobConfig = yaml.safeLoad(fs.readFileSync(this.paiTrialConfig.paiConfigPath, 'utf8')); this.paiJobConfig = yaml.safeLoad(fs.readFileSync(this.paiTrialConfig.paiConfigPath, 'utf8'));
} }
break; break;
}
case TrialConfigMetadataKey.VERSION_CHECK: case TrialConfigMetadataKey.VERSION_CHECK:
this.versionCheck = (value === 'true' || value === 'True'); this.versionCheck = (value === 'true' || value === 'True');
this.nniVersion = this.versionCheck ? await getVersion() : ''; this.nniVersion = this.versionCheck ? await getVersion() : '';
...@@ -152,6 +158,7 @@ class PAIK8STrainingService extends PAITrainingService { ...@@ -152,6 +158,7 @@ class PAIK8STrainingService extends PAITrainingService {
if (this.paiTrialConfig === undefined) { if (this.paiTrialConfig === undefined) {
throw new Error('trial config is not initialized'); throw new Error('trial config is not initialized');
} }
const containerNFSExpCodeDir = `${this.paiTrialConfig.containerNFSMountPath}/${this.experimentId}/'nni-code`;
const containerWorkingDir: string = `${this.paiTrialConfig.containerNFSMountPath}/${this.experimentId}/${trialJobDetail.id}`; const containerWorkingDir: string = `${this.paiTrialConfig.containerNFSMountPath}/${this.experimentId}/${trialJobDetail.id}`;
const nniManagerIp: string = this.nniManagerIpConfig ? this.nniManagerIpConfig.nniManagerIp : getIPV4Address(); const nniManagerIp: string = this.nniManagerIpConfig ? this.nniManagerIpConfig.nniManagerIp : getIPV4Address();
const nniPaiTrialCommand: string = String.Format( const nniPaiTrialCommand: string = String.Format(
...@@ -162,6 +169,7 @@ class PAIK8STrainingService extends PAITrainingService { ...@@ -162,6 +169,7 @@ class PAIK8STrainingService extends PAITrainingService {
this.experimentId, this.experimentId,
trialJobDetail.form.sequenceId, trialJobDetail.form.sequenceId,
this.isMultiPhase, this.isMultiPhase,
containerNFSExpCodeDir,
command, command,
nniManagerIp, nniManagerIp,
this.paiRestServerPort, this.paiRestServerPort,
...@@ -264,15 +272,18 @@ class PAIK8STrainingService extends PAITrainingService { ...@@ -264,15 +272,18 @@ class PAIK8STrainingService extends PAITrainingService {
throw new Error('paiJobRestServer is not initialized'); throw new Error('paiJobRestServer is not initialized');
} }
// Make sure experiment code files is copied from local to NFS
if (this.copyExpCodeDirPromise !== undefined) {
await this.copyExpCodeDirPromise;
}
this.paiRestServerPort = this.paiJobRestServer.clusterRestServerPort; this.paiRestServerPort = this.paiJobRestServer.clusterRestServerPort;
// Step 1. Prepare PAI job configuration // Step 1. Prepare PAI job configuration
//create trial local working folder locally. //create trial local working folder locally.
await execMkdir(trialJobDetail.logPath); await execMkdir(trialJobDetail.logPath);
const runScriptContent: string = CONTAINER_INSTALL_NNI_SHELL_FORMAT;
// Write NNI installation file to local files // Write NNI installation file to local files
await fs.promises.writeFile(path.join(trialJobDetail.logPath, 'install_nni.sh'), runScriptContent, { encoding: 'utf8' }); await fs.promises.writeFile(path.join(trialJobDetail.logPath, 'install_nni.sh'), CONTAINER_INSTALL_NNI_SHELL_FORMAT, { encoding: 'utf8' });
// Write file content ( parameter.cfg ) to local working folders // Write file content ( parameter.cfg ) to local working folders
if (trialJobDetail.form !== undefined) { if (trialJobDetail.form !== undefined) {
...@@ -284,7 +295,7 @@ class PAIK8STrainingService extends PAITrainingService { ...@@ -284,7 +295,7 @@ class PAIK8STrainingService extends PAITrainingService {
//Generate Job Configuration in yaml format //Generate Job Configuration in yaml format
const paiJobConfig = this.generateJobConfigInYamlFormat(trialJobDetail); const paiJobConfig = this.generateJobConfigInYamlFormat(trialJobDetail);
this.log.debug(paiJobConfig); this.log.debug(paiJobConfig);
// Step 3. Submit PAI job via Rest call // Step 2. Submit PAI job via Rest call
// Refer https://github.com/Microsoft/pai/blob/master/docs/rest-server/API.md for more detail about PAI Rest API // Refer https://github.com/Microsoft/pai/blob/master/docs/rest-server/API.md for more detail about PAI Rest API
const submitJobRequest: request.Options = { const submitJobRequest: request.Options = {
uri: `${this.protocol}://${this.paiClusterConfig.host}/rest-server/api/v2/jobs`, uri: `${this.protocol}://${this.paiClusterConfig.host}/rest-server/api/v2/jobs`,
......
...@@ -179,13 +179,14 @@ export enum ScheduleResultType { ...@@ -179,13 +179,14 @@ export enum ScheduleResultType {
export const REMOTEMACHINE_TRIAL_COMMAND_FORMAT: string = export const REMOTEMACHINE_TRIAL_COMMAND_FORMAT: string =
`#!/bin/bash `#!/bin/bash
export NNI_PLATFORM=remote NNI_SYS_DIR={0} NNI_OUTPUT_DIR={1} NNI_TRIAL_JOB_ID={2} NNI_EXP_ID={3} \ export NNI_PLATFORM=remote NNI_SYS_DIR={0} NNI_OUTPUT_DIR={1} NNI_TRIAL_JOB_ID={2} NNI_EXP_ID={3} \
NNI_TRIAL_SEQ_ID={4} export MULTI_PHASE={5} NNI_TRIAL_SEQ_ID={4} MULTI_PHASE={5} NNI_CODE_DIR={6}
cp -r $NNI_CODE_DIR/. $NNI_SYS_DIR
cd $NNI_SYS_DIR cd $NNI_SYS_DIR
sh install_nni.sh sh install_nni.sh
echo $$ >{6} echo $$ >{7}
python3 -m nni_trial_tool.trial_keeper --trial_command '{7}' --nnimanager_ip '{8}' --nnimanager_port '{9}' \ python3 -m nni_trial_tool.trial_keeper --trial_command '{8}' --nnimanager_ip '{9}' --nnimanager_port '{10}' \
--nni_manager_version '{10}' --log_collection '{11}' 1>$NNI_OUTPUT_DIR/trialkeeper_stdout 2>$NNI_OUTPUT_DIR/trialkeeper_stderr --nni_manager_version '{11}' --log_collection '{12}' 1>$NNI_OUTPUT_DIR/trialkeeper_stdout 2>$NNI_OUTPUT_DIR/trialkeeper_stderr
echo $? \`date +%s%3N\` >{12}`; echo $? \`date +%s%3N\` >{13}`;
export const HOST_JOB_SHELL_FORMAT: string = export const HOST_JOB_SHELL_FORMAT: string =
`#!/bin/bash `#!/bin/bash
......
...@@ -26,7 +26,7 @@ import { CONTAINER_INSTALL_NNI_SHELL_FORMAT } from '../common/containerJobData'; ...@@ -26,7 +26,7 @@ import { CONTAINER_INSTALL_NNI_SHELL_FORMAT } from '../common/containerJobData';
import { GPUSummary } from '../common/gpuData'; import { GPUSummary } from '../common/gpuData';
import { TrialConfig } from '../common/trialConfig'; import { TrialConfig } from '../common/trialConfig';
import { TrialConfigMetadataKey } from '../common/trialConfigMetadataKey'; import { TrialConfigMetadataKey } from '../common/trialConfigMetadataKey';
import { execCopydir, execMkdir, validateCodeDir, getGpuMetricsCollectorBashScriptContent } from '../common/util'; import { execMkdir, validateCodeDir, getGpuMetricsCollectorBashScriptContent } from '../common/util';
import { GPUScheduler } from './gpuScheduler'; import { GPUScheduler } from './gpuScheduler';
import { import {
REMOTEMACHINE_TRIAL_COMMAND_FORMAT, RemoteMachineMeta, REMOTEMACHINE_TRIAL_COMMAND_FORMAT, RemoteMachineMeta,
...@@ -42,11 +42,13 @@ import { ShellExecutor } from 'training_service/remote_machine/shellExecutor'; ...@@ -42,11 +42,13 @@ import { ShellExecutor } from 'training_service/remote_machine/shellExecutor';
@component.Singleton @component.Singleton
class RemoteMachineTrainingService implements TrainingService { class RemoteMachineTrainingService implements TrainingService {
private readonly machineExecutorManagerMap: Map<RemoteMachineMeta, ExecutorManager>; //machine excutor map private readonly machineExecutorManagerMap: Map<RemoteMachineMeta, ExecutorManager>; //machine excutor map
private readonly machineCopyExpCodeDirPromiseMap: Map<RemoteMachineMeta, Promise<void>>;
private readonly trialExecutorMap: Map<string, ShellExecutor>; //trial excutor map private readonly trialExecutorMap: Map<string, ShellExecutor>; //trial excutor map
private readonly trialJobsMap: Map<string, RemoteMachineTrialJobDetail>; private readonly trialJobsMap: Map<string, RemoteMachineTrialJobDetail>;
private readonly MAX_TRIAL_NUMBER_PER_EXECUTOR: number = 5; // every excutor has a max trial concurrency number private readonly MAX_TRIAL_NUMBER_PER_EXECUTOR: number = 5; // every excutor has a max trial concurrency number
private readonly expRootDir: string; private readonly expRootDir: string;
private readonly remoteExpRootDir: string; private readonly remoteExpRootDir: string;
private readonly remoteExpCodeDir: string;
private trialConfig: TrialConfig | undefined; private trialConfig: TrialConfig | undefined;
private gpuScheduler?: GPUScheduler; private gpuScheduler?: GPUScheduler;
private readonly jobQueue: string[]; private readonly jobQueue: string[];
...@@ -68,9 +70,11 @@ class RemoteMachineTrainingService implements TrainingService { ...@@ -68,9 +70,11 @@ class RemoteMachineTrainingService implements TrainingService {
this.trialJobsMap = new Map<string, RemoteMachineTrialJobDetail>(); this.trialJobsMap = new Map<string, RemoteMachineTrialJobDetail>();
this.trialExecutorMap = new Map<string, ShellExecutor>(); this.trialExecutorMap = new Map<string, ShellExecutor>();
this.machineExecutorManagerMap = new Map<RemoteMachineMeta, ExecutorManager>(); this.machineExecutorManagerMap = new Map<RemoteMachineMeta, ExecutorManager>();
this.machineCopyExpCodeDirPromiseMap = new Map<RemoteMachineMeta, Promise<void>>();
this.jobQueue = []; this.jobQueue = [];
this.expRootDir = getExperimentRootDir(); this.expRootDir = getExperimentRootDir();
this.remoteExpRootDir = this.getRemoteExperimentRootDir(); this.remoteExpRootDir = this.getRemoteExperimentRootDir();
this.remoteExpCodeDir = unixPathJoin(this.remoteExpRootDir, 'nni-code');
this.timer = timer; this.timer = timer;
this.log = getLogger(); this.log = getLogger();
this.trialSequenceId = -1; this.trialSequenceId = -1;
...@@ -320,9 +324,20 @@ class RemoteMachineTrainingService implements TrainingService { ...@@ -320,9 +324,20 @@ class RemoteMachineTrainingService implements TrainingService {
throw new Error(`codeDir ${remoteMachineTrailConfig.codeDir} is not a directory`); throw new Error(`codeDir ${remoteMachineTrailConfig.codeDir} is not a directory`);
} }
// Validate to make sure codeDir doesn't have too many files
try { try {
// Validate to make sure codeDir doesn't have too many files
await validateCodeDir(remoteMachineTrailConfig.codeDir); await validateCodeDir(remoteMachineTrailConfig.codeDir);
// Copy codeDir to remote machine
for (const [rmMeta, executorManager] of this.machineExecutorManagerMap.entries()) {
const executor: ShellExecutor = await executorManager.getAvailableExecutor();
if (executor !== undefined) {
this.machineCopyExpCodeDirPromiseMap.set(
rmMeta,
executor.copyDirectoryToRemote(remoteMachineTrailConfig.codeDir, this.remoteExpCodeDir, this.remoteOS)
);
}
}
} catch (error) { } catch (error) {
this.log.error(error); this.log.error(error);
...@@ -480,6 +495,10 @@ class RemoteMachineTrainingService implements TrainingService { ...@@ -480,6 +495,10 @@ class RemoteMachineTrainingService implements TrainingService {
const trialWorkingFolder: string = unixPathJoin(this.remoteExpRootDir, 'trials', trialJobId); const trialWorkingFolder: string = unixPathJoin(this.remoteExpRootDir, 'trials', trialJobId);
trialJobDetail.rmMeta = rmScheduleInfo.rmMeta; trialJobDetail.rmMeta = rmScheduleInfo.rmMeta;
const copyExpCodeDirPromise = this.machineCopyExpCodeDirPromiseMap.get(trialJobDetail.rmMeta);
if (copyExpCodeDirPromise !== undefined) {
await copyExpCodeDirPromise;
}
await this.allocateExecutorForTrial(trialJobDetail); await this.allocateExecutorForTrial(trialJobDetail);
await this.launchTrialOnScheduledMachine( await this.launchTrialOnScheduledMachine(
...@@ -554,6 +573,7 @@ class RemoteMachineTrainingService implements TrainingService { ...@@ -554,6 +573,7 @@ class RemoteMachineTrainingService implements TrainingService {
getExperimentId(), getExperimentId(),
trialJobDetail.form.sequenceId.toString(), trialJobDetail.form.sequenceId.toString(),
this.isMultiPhase, this.isMultiPhase,
this.remoteExpCodeDir,
unixPathJoin(trialWorkingFolder, '.nni', 'jobpid'), unixPathJoin(trialWorkingFolder, '.nni', 'jobpid'),
command, command,
nniManagerIp, nniManagerIp,
...@@ -565,12 +585,8 @@ class RemoteMachineTrainingService implements TrainingService { ...@@ -565,12 +585,8 @@ class RemoteMachineTrainingService implements TrainingService {
//create tmp trial working folder locally. //create tmp trial working folder locally.
await execMkdir(path.join(trialLocalTempFolder, '.nni')); await execMkdir(path.join(trialLocalTempFolder, '.nni'));
// Write install_nni.sh
//create tmp trial working folder locally. await fs.promises.writeFile(path.join(trialLocalTempFolder, 'install_nni.sh'), CONTAINER_INSTALL_NNI_SHELL_FORMAT, { encoding: 'utf8' });
await execCopydir(this.trialConfig.codeDir, trialLocalTempFolder);
const installScriptContent: string = CONTAINER_INSTALL_NNI_SHELL_FORMAT;
// Write NNI installation file to local tmp files
await fs.promises.writeFile(path.join(trialLocalTempFolder, 'install_nni.sh'), installScriptContent, { encoding: 'utf8' });
// Write file content ( run.sh and parameter.cfg ) to local tmp files // Write file content ( run.sh and parameter.cfg ) to local tmp files
await fs.promises.writeFile(path.join(trialLocalTempFolder, 'run.sh'), runScriptTrialContent, { encoding: 'utf8' }); await fs.promises.writeFile(path.join(trialLocalTempFolder, 'run.sh'), runScriptTrialContent, { encoding: 'utf8' });
await this.writeParameterFile(trialJobId, form.hyperParameters); await this.writeParameterFile(trialJobId, form.hyperParameters);
......
...@@ -183,13 +183,14 @@ class ShellExecutor { ...@@ -183,13 +183,14 @@ class ShellExecutor {
* Copy files and directories in local directory recursively to remote directory * Copy files and directories in local directory recursively to remote directory
* @param localDirectory local diretory * @param localDirectory local diretory
* @param remoteDirectory remote directory * @param remoteDirectory remote directory
* @param sshClient SSH client * @param remoteOS the OS of remote machine
*/ */
public async copyDirectoryToRemote(localDirectory: string, remoteDirectory: string, remoteOS: string): Promise<void> { public async copyDirectoryToRemote(localDirectory: string, remoteDirectory: string, remoteOS: string): Promise<void> {
const tmpSuffix: string = uniqueString(5); const tmpSuffix: string = uniqueString(5);
const localTarPath: string = path.join(os.tmpdir(), `nni_tmp_local_${tmpSuffix}.tar.gz`); const localTarPath: string = path.join(os.tmpdir(), `nni_tmp_local_${tmpSuffix}.tar.gz`);
const remoteTarPath: string = unixPathJoin(getRemoteTmpDir(remoteOS), `nni_tmp_remote_${tmpSuffix}.tar.gz`); const remoteTarPath: string = unixPathJoin(getRemoteTmpDir(remoteOS), `nni_tmp_remote_${tmpSuffix}.tar.gz`);
// Create remote directory
await this.createFolder(remoteDirectory);
// Compress files in local directory to experiment root directory // Compress files in local directory to experiment root directory
await tarAdd(localTarPath, localDirectory); await tarAdd(localTarPath, localDirectory);
// Copy the compressed file to remoteDirectory and delete it // Copy the compressed file to remoteDirectory and delete it
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment