"web/scripts/vscode:/vscode.git/clone" did not exist on "235727fed79880ac2053a1db1b0d13c0f75714e8"
Unverified Commit 035d58bc authored by SparkSnail's avatar SparkSnail Committed by GitHub
Browse files

Merge pull request #121 from Microsoft/master

merge master
parents b633c265 8e732f2c
**Tutorial: Run an experiment on multiple machines**
===
NNI supports running an experiment on multiple machines through SSH channel, called `remote` mode. NNI assumes that you have access to those machines, and already setup the environment for running deep learning training code.
e.g. Three machines and you login in with account `bob` (Note: the account is not necessarily the same on different machine):
| IP | Username| Password |
| -------- |---------|-------|
| 10.1.1.1 | bob | bob123 |
| 10.1.1.2 | bob | bob123 |
| 10.1.1.3 | bob | bob123 |
## Setup NNI environment
Install NNI on each of your machines following the install guide [here](GetStarted.md).
For remote machines that are used only to run trials but not the nnictl, you can just install python SDK:
* __Install python SDK through pip__
python3 -m pip install --user --upgrade nni-sdk
## Run an experiment
Install NNI on another machine which has network accessibility to those three machines above, or you can just use any machine above to run nnictl command line tool.
We use `examples/trials/mnist-annotation` as an example here. `cat ~/nni/examples/trials/mnist-annotation/config_remote.yml` to see the detailed configuration file:
```
authorName: default
experimentName: example_mnist
trialConcurrency: 1
maxExecDuration: 1h
maxTrialNum: 10
#choice: local, remote, pai
trainingServicePlatform: remote
#choice: true, false
useAnnotation: true
tuner:
#choice: TPE, Random, Anneal, Evolution, BatchTuner
#SMAC (SMAC should be installed through nnictl)
builtinTunerName: TPE
classArgs:
#choice: maximize, minimize
optimize_mode: maximize
trial:
command: python3 mnist.py
codeDir: .
gpuNum: 0
#machineList can be empty if the platform is local
machineList:
- ip: 10.1.1.1
username: bob
passwd: bob123
#port can be skip if using default ssh port 22
#port: 22
- ip: 10.1.1.2
username: bob
passwd: bob123
- ip: 10.1.1.3
username: bob
passwd: bob123
```
Simply filling the `machineList` section and then run:
```
nnictl create --config ~/nni/examples/trials/mnist-annotation/config_remote.yml
```
to start the experiment.
# Tutorial - Try different Tuners and Assessors
NNI provides an easy to adopt approach to set up parameter tuning algorithms as well as early stop policies, we call them **Tuners** and **Assessors**.
**Tuner** specifies the algorithm you use to generate hyperparameter sets for each trial. In NNI, we support two approaches to set the tuner.
1. Directly use tuner provided by nni sdk
required fields: builtinTunerName and classArgs.
2. Customize your own tuner file
required fields: codeDirectory, classFileName, className and classArgs.
### **Learn More about tuners**
* For detailed defintion and usage about the required field, please refer to [Config an experiment](ExperimentConfig.md)
* [Tuners in the latest NNI release](HowToChooseTuner.md)
* [How to implement your own tuner](howto_2_CustomizedTuner.md)
**Assessor** specifies the algorithm you use to apply early stop policy. In NNI, there are two approaches to set the assessor.
1. Directly use assessor provided by nni sdk
required fields: builtinAssessorName and classArgs.
2. Customize your own assessor file
required fields: codeDirectory, classFileName, className and classArgs.
### **Learn More about assessor**
* For detailed defintion and usage aobut the required field, please refer to [Config an experiment](ExperimentConfig.md)
* Find more about the detailed instruction about [enable assessor](EnableAssessor.md)
* [How to implement your own assessor](../examples/assessors/README.md)
## **Learn More**
* [How to run an experiment on local (with multiple GPUs)?](tutorial_1_CR_exp_local_api.md)
* [How to run an experiment on multiple machines?](tutorial_2_RemoteMachineMode.md)
* [How to run an experiment on OpenPAI?](PAIMode.md)
......@@ -88,7 +88,7 @@ nnictl create --config ~/nni/examples/trials/ga_squad/config.yml
Due to the memory limitation of upload, we only upload the source code and complete the data download and training on OpenPAI. This experiment requires sufficient memory that `memoryMB >= 32G`, and the training may last for several hours.
### Update configuration
Modify `nni/examples/trials/ga_squad/config_pai.yaml`, here is the default configuration:
Modify `nni/examples/trials/ga_squad/config_pai.yml`, here is the default configuration:
```
authorName: default
......@@ -114,18 +114,18 @@ trial:
gpuNum: 0
cpuNum: 1
memoryMB: 32869
#The docker image to run nni job on pai
#The docker image to run NNI job on OpenPAI
image: msranni/nni:latest
#The hdfs directory to store data on pai, format 'hdfs://host:port/directory'
#The hdfs directory to store data on OpenPAI, format 'hdfs://host:port/directory'
dataDir: hdfs://10.10.10.10:9000/username/nni
#The hdfs directory to store output data generated by nni, format 'hdfs://host:port/directory'
#The hdfs directory to store output data generated by NNI, format 'hdfs://host:port/directory'
outputDir: hdfs://10.10.10.10:9000/username/nni
paiConfig:
#The username to login pai
#The username to login OpenPAI
userName: username
#The password to login pai
#The password to login OpenPAI
passWord: password
#The host of restful server of pai
#The host of restful server of OpenPAI
host: 10.10.10.10
```
......
......@@ -2,7 +2,7 @@ authorName: default
experimentName: example_mnist
trialConcurrency: 1
maxExecDuration: 1h
maxTrialNum: 20
maxTrialNum: 50
#choice: local, remote
trainingServicePlatform: local
searchSpacePath: search_space.json
......@@ -17,10 +17,12 @@ tuner:
optimize_mode: maximize
assessor:
#choice: Medianstop, Curvefitting
builtinAssessorName: Medianstop
builtinAssessorName: Curvefitting
classArgs:
#choice: maximize, minimize
optimize_mode: maximize
epoch_num: 20
threshold: 0.9
trial:
command: python3 mnist.py
codeDir: .
......
"""A deep MNIST classifier using convolutional layers."""
import argparse
import logging
import math
import tempfile
import tensorflow as tf
from tensorflow.examples.tutorials.mnist import input_data
FLAGS = None
logger = logging.getLogger('mnist_AutoML')
class MnistNetwork(object):
'''
MnistNetwork is for initializing and building basic network for mnist.
'''
def __init__(self,
channel_1_num,
channel_2_num,
conv_size,
hidden_size,
pool_size,
learning_rate,
x_dim=784,
y_dim=10):
self.channel_1_num = channel_1_num
self.channel_2_num = channel_2_num
self.conv_size = conv_size
self.hidden_size = hidden_size
self.pool_size = pool_size
self.learning_rate = learning_rate
self.x_dim = x_dim
self.y_dim = y_dim
self.images = tf.placeholder(
tf.float32, [None, self.x_dim], name='input_x')
self.labels = tf.placeholder(
tf.float32, [None, self.y_dim], name='input_y')
self.keep_prob = tf.placeholder(tf.float32, name='keep_prob')
self.train_step = None
self.accuracy = None
def build_network(self):
'''
Building network for mnist
'''
# Reshape to use within a convolutional neural net.
# Last dimension is for "features" - there is only one here, since images are
# grayscale -- it would be 3 for an RGB image, 4 for RGBA, etc.
with tf.name_scope('reshape'):
try:
input_dim = int(math.sqrt(self.x_dim))
except:
print(
'input dim cannot be sqrt and reshape. input dim: ' + str(self.x_dim))
logger.debug(
'input dim cannot be sqrt and reshape. input dim: %s', str(self.x_dim))
raise
x_image = tf.reshape(self.images, [-1, input_dim, input_dim, 1])
# First convolutional layer - maps one grayscale image to 32 feature maps.
with tf.name_scope('conv1'):
w_conv1 = weight_variable(
[self.conv_size, self.conv_size, 1, self.channel_1_num])
b_conv1 = bias_variable([self.channel_1_num])
h_conv1 = tf.nn.relu(conv2d(x_image, w_conv1) + b_conv1)
# Pooling layer - downsamples by 2X.
with tf.name_scope('pool1'):
h_pool1 = max_pool(h_conv1, self.pool_size)
# Second convolutional layer -- maps 32 feature maps to 64.
with tf.name_scope('conv2'):
w_conv2 = weight_variable([self.conv_size, self.conv_size,
self.channel_1_num, self.channel_2_num])
b_conv2 = bias_variable([self.channel_2_num])
h_conv2 = tf.nn.relu(conv2d(h_pool1, w_conv2) + b_conv2)
# Second pooling layer.
with tf.name_scope('pool2'):
h_pool2 = max_pool(h_conv2, self.pool_size)
# Fully connected layer 1 -- after 2 round of downsampling, our 28x28 image
# is down to 7x7x64 feature maps -- maps this to 1024 features.
last_dim = int(input_dim / (self.pool_size * self.pool_size))
with tf.name_scope('fc1'):
w_fc1 = weight_variable(
[last_dim * last_dim * self.channel_2_num, self.hidden_size])
b_fc1 = bias_variable([self.hidden_size])
h_pool2_flat = tf.reshape(
h_pool2, [-1, last_dim * last_dim * self.channel_2_num])
h_fc1 = tf.nn.relu(tf.matmul(h_pool2_flat, w_fc1) + b_fc1)
# Dropout - controls the complexity of the model, prevents co-adaptation of features.
with tf.name_scope('dropout'):
h_fc1_drop = tf.nn.dropout(h_fc1, self.keep_prob)
# Map the 1024 features to 10 classes, one for each digit
with tf.name_scope('fc2'):
w_fc2 = weight_variable([self.hidden_size, self.y_dim])
b_fc2 = bias_variable([self.y_dim])
y_conv = tf.matmul(h_fc1_drop, w_fc2) + b_fc2
with tf.name_scope('loss'):
cross_entropy = tf.reduce_mean(
tf.nn.softmax_cross_entropy_with_logits(labels=self.labels, logits=y_conv))
with tf.name_scope('adam_optimizer'):
self.train_step = tf.train.AdamOptimizer(
self.learning_rate).minimize(cross_entropy)
with tf.name_scope('accuracy'):
correct_prediction = tf.equal(
tf.argmax(y_conv, 1), tf.argmax(self.labels, 1))
self.accuracy = tf.reduce_mean(
tf.cast(correct_prediction, tf.float32))
def conv2d(x_input, w_matrix):
"""conv2d returns a 2d convolution layer with full stride."""
return tf.nn.conv2d(x_input, w_matrix, strides=[1, 1, 1, 1], padding='SAME')
def max_pool(x_input, pool_size):
"""max_pool downsamples a feature map by 2X."""
return tf.nn.max_pool(x_input, ksize=[1, pool_size, pool_size, 1],
strides=[1, pool_size, pool_size, 1], padding='SAME')
def weight_variable(shape):
"""weight_variable generates a weight variable of a given shape."""
initial = tf.truncated_normal(shape, stddev=0.1)
return tf.Variable(initial)
def bias_variable(shape):
"""bias_variable generates a bias variable of a given shape."""
initial = tf.constant(0.1, shape=shape)
return tf.Variable(initial)
def main(params):
'''
Main function, build mnist network, run and send result to NNI.
'''
# Import data
mnist = input_data.read_data_sets(params['data_dir'], one_hot=True)
print('Mnist download data down.')
logger.debug('Mnist download data down.')
# Create the model
# Build the graph for the deep net
mnist_network = MnistNetwork(channel_1_num=params['channel_1_num'],
channel_2_num=params['channel_2_num'],
conv_size=params['conv_size'],
hidden_size=params['hidden_size'],
pool_size=params['pool_size'],
learning_rate=params['learning_rate'])
mnist_network.build_network()
logger.debug('Mnist build network done.')
# Write log
graph_location = tempfile.mkdtemp()
logger.debug('Saving graph to: %s', graph_location)
train_writer = tf.summary.FileWriter(graph_location)
train_writer.add_graph(tf.get_default_graph())
test_acc = 0.0
with tf.Session() as sess:
sess.run(tf.global_variables_initializer())
for i in range(params['batch_num']):
batch = mnist.train.next_batch(params['batch_size'])
mnist_network.train_step.run(feed_dict={mnist_network.images: batch[0],
mnist_network.labels: batch[1],
mnist_network.keep_prob: 1 - params['dropout_rate']}
)
if i % 100 == 0:
test_acc = mnist_network.accuracy.eval(
feed_dict={mnist_network.images: mnist.test.images,
mnist_network.labels: mnist.test.labels,
mnist_network.keep_prob: 1.0})
logger.debug('test accuracy %g', test_acc)
logger.debug('Pipe send intermediate result done.')
test_acc = mnist_network.accuracy.eval(
feed_dict={mnist_network.images: mnist.test.images,
mnist_network.labels: mnist.test.labels,
mnist_network.keep_prob: 1.0})
logger.debug('Final result is %g', test_acc)
logger.debug('Send final result done.')
def get_params():
''' Get parameters from command line '''
parser = argparse.ArgumentParser()
parser.add_argument("--data_dir", type=str, default='/tmp/tensorflow/mnist/input_data', help="data directory")
parser.add_argument("--dropout_rate", type=float, default=0.5, help="dropout rate")
parser.add_argument("--channel_1_num", type=int, default=32)
parser.add_argument("--channel_2_num", type=int, default=64)
parser.add_argument("--conv_size", type=int, default=5)
parser.add_argument("--pool_size", type=int, default=2)
parser.add_argument("--hidden_size", type=int, default=1024)
parser.add_argument("--learning_rate", type=float, default=1e-4)
parser.add_argument("--batch_num", type=int, default=2000)
parser.add_argument("--batch_size", type=int, default=32)
args, _ = parser.parse_known_args()
return args
if __name__ == '__main__':
try:
params = vars(get_params())
main(params)
except Exception as exception:
logger.exception(exception)
raise
......@@ -18,7 +18,7 @@ pip install -r requirements.txt
### 3. Update configuration
Modify `examples/trials/network_morphism/cifar10/config.yaml` to fit your own task, note that searchSpacePath is not required in our configuration. Here is the default configuration:
Modify `examples/trials/network_morphism/cifar10/config.yml` to fit your own task, note that searchSpacePath is not required in our configuration. Here is the default configuration:
```yaml
authorName: default
......@@ -79,16 +79,16 @@ net = build_graph_from_json(RCV_CONFIG)
# training procedure
# ....
# report the final accuracy to nni
# report the final accuracy to NNI
nni.report_final_result(best_acc)
```
### 5. Submit this job
```bash
# You can use nni command tool "nnictl" to create the a job which submit to the nni
# finally you successfully commit a Network Morphism Job to nni
nnictl create --config config.yaml
# You can use NNI command tool "nnictl" to create the a job which submit to the NNI
# finally you successfully commit a Network Morphism Job to NNI
nnictl create --config config.yml
```
## Trial Examples
......@@ -99,10 +99,10 @@ The trial has some examples which can guide you which located in `examples/trial
`Fashion-MNIST` is a dataset of [Zalando](https://jobs.zalando.com/tech/)'s article images—consisting of a training set of 60,000 examples and a test set of 10,000 examples. Each example is a 28x28 grayscale image, associated with a label from 10 classes. It is a modern image classification dataset widely used to replacing MNIST as a baseline dataset, because the dataset MNIST is too easy and overused.
There are two examples, [FashionMNIST-keras.py](./FashionMNIST/FashionMNIST_keras.py) and [FashionMNIST-pytorch.py](./FashionMNIST/FashionMNIST_pytorch.py). Attention, you should change the `input_width` to 28 and `input_channel` to 1 in `config.yaml ` for this dataset.
There are two examples, [FashionMNIST-keras.py](./FashionMNIST/FashionMNIST_keras.py) and [FashionMNIST-pytorch.py](./FashionMNIST/FashionMNIST_pytorch.py). Attention, you should change the `input_width` to 28 and `input_channel` to 1 in `config.yml` for this dataset.
### Cifar10
The `CIFAR-10` dataset [Canadian Institute For Advanced Research](https://www.cifar.ca/) is a collection of images that are commonly used to train machine learning and computer vision algorithms. It is one of the most widely used datasets for machine learning research. The CIFAR-10 dataset contains 60,000 32x32 color images in 10 different classes.
There are two examples, [cifar10-keras.py](./cifar10/cifar10_keras.py) and [cifar10-pytorch.py](./cifar10/cifar10_pytorch.py). The value `input_width` is 32 and the value `input_channel` is 3 in `config.yaml ` for this dataset.
There are two examples, [cifar10-keras.py](./cifar10/cifar10_keras.py) and [cifar10-pytorch.py](./cifar10/cifar10_pytorch.py). The value `input_width` is 32 and the value `input_channel` is 3 in `config.yml` for this dataset.
**Run ENAS in NNI**
===
Now we have an enas example [enas-nni](https://github.com/countif/enas_nni) run in nni from our contributors.
Now we have an enas example [enas-nni](https://github.com/countif/enas_nni) run in NNI from our contributors.
Thanks our lovely contributors.
And welcome more and more people to join us!
......@@ -138,7 +138,7 @@ class Logger {
private log(level: string, param: any[]): void {
const buffer: WritableStreamBuffer = new WritableStreamBuffer();
buffer.write(`[${(new Date()).toISOString()}] ${level} `);
buffer.write(format(null, param));
buffer.write(format(param));
buffer.write('\n');
buffer.end();
this.bufferSerialEmitter.feed(buffer.getContents());
......
......@@ -49,6 +49,7 @@ class NNIDataStore implements DataStore {
if(isNewExperiment()) {
mkDirP(databaseDir).then(() => {
this.db.init(true, databaseDir).then(() => {
this.log.info('Datastore initialization done');
this.initTask.resolve();
}).catch((err: Error) => {
this.initTask.reject(err);
......@@ -58,6 +59,7 @@ class NNIDataStore implements DataStore {
});
} else {
this.db.init(false, databaseDir).then(() => {
this.log.info('Datastore initialization done');
this.initTask.resolve();
}).catch((err: Error) => {
this.initTask.reject(err);
......
......@@ -3,7 +3,7 @@
"version": "999.0.0-developing",
"main": "index.js",
"scripts": {
"postbuild": "cp -rf scripts ./dist/ && cp -rf config ./dist/",
"postbuild": "cp -rf config ./dist/",
"build": "tsc",
"test": "nyc mocha -r ts-node/register -t 15000 --recursive **/*.test.ts --exclude node_modules/**/**/*.test.ts --exclude core/test/nnimanager.test.ts --colors",
"start": "node dist/main.js",
......
......@@ -115,7 +115,6 @@ class NNIRestHandler {
const ds: DataStore = component.get<DataStore>(DataStore);
ds.init().then(() => {
res.send(this.nniManager.getStatus());
this.log.info('Datastore initialization done');
}).catch(async (err: Error) => {
this.handle_error(err, res);
this.log.error(err.message);
......
# ============================================================================================================================== #
# Copyright (c) Microsoft Corporation
# All rights reserved.
#
# MIT License
#
# Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
# documentation files (the "Software"), to deal in the Software without restriction, including without limitation
# the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and
# to permit persons to whom the Software is furnished to do so, subject to the following conditions:
# The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING
# BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
# DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
# ============================================================================================================================== #
import argparse
import errno
import json
import os
import re
METRICS_FILENAME = '.nni/metrics'
OFFSET_FILENAME = '.nni/metrics_offset'
JOB_CODE_FILENAME = '.nni/code'
JOB_PID_FILENAME = '.nni/jobpid'
JOB_CODE_PATTERN = re.compile('^(\d+)\s+(\d+)$')
LEN_FIELD_SIZE = 6
MAGIC = 'ME'
class TrialMetricsReader():
'''
Read metrics data from a trial job
'''
def __init__(self, trial_job_dir):
self.trial_job_dir = trial_job_dir
self.offset_filename = os.path.join(trial_job_dir, OFFSET_FILENAME)
self.metrics_filename = os.path.join(trial_job_dir, METRICS_FILENAME)
self.jobcode_filename = os.path.join(trial_job_dir, JOB_CODE_FILENAME)
self.jobpid_filemame = os.path.join(trial_job_dir, JOB_PID_FILENAME)
def _metrics_file_is_empty(self):
if not os.path.isfile(self.metrics_filename):
return True
statinfo = os.stat(self.metrics_filename)
return statinfo.st_size == 0
def _get_offset(self):
offset = 0
if os.path.isfile(self.offset_filename):
with open(self.offset_filename, 'r') as f:
offset = int(f.readline())
return offset
def _write_offset(self, offset):
statinfo = os.stat(self.metrics_filename)
if offset < 0 or offset > statinfo.st_size:
raise ValueError('offset value is invalid: {}'.format(offset))
with open(self.offset_filename, 'w') as f:
f.write(str(offset)+'\n')
def _read_all_available_records(self, offset):
new_offset = offset
metrics = []
with open(self.metrics_filename, 'r') as f:
f.seek(offset)
while True:
magic_string = f.read(len(MAGIC))
# empty data means EOF
if not magic_string:
break
strdatalen = f.read(LEN_FIELD_SIZE)
# empty data means EOF
if not strdatalen:
raise ValueError("metric file {} format error after offset: {}.".format(self.metrics_filename, new_offset))
datalen = int(strdatalen)
data = f.read(datalen)
if datalen > 0 and len(data) == datalen:
new_offset = f.tell()
metrics.append(data)
else:
raise ValueError("metric file {} format error after offset: {}.".format(self.metrics_filename, new_offset))
self._write_offset(new_offset)
return metrics
def _pid_exists(selft, pid):
if pid < 0:
return False
if pid == 0:
# According to "man 2 kill" PID 0 refers to every process
# in the process group of the calling process.
# On certain systems 0 is a valid PID but we have no way
# to know that in a portable fashion.
raise ValueError('invalid PID 0')
try:
os.kill(pid, 0)
except OSError as err:
if err.errno == errno.ESRCH:
# ESRCH == No such process
return False
elif err.errno == errno.EPERM:
# EPERM clearly means there's a process to deny access to
return True
else:
# According to "man 2 kill" possible error values are
# (EINVAL, EPERM, ESRCH)
raise
else:
return True
def read_trial_metrics(self):
'''
Read available metrics data for a trial
'''
if self._metrics_file_is_empty():
return []
offset = self._get_offset()
return self._read_all_available_records(offset)
def read_trial_status(self):
if os.path.isfile(self.jobpid_filemame):
with open(self.jobpid_filemame, 'r') as f:
jobpid = int(f.readline())
if self._pid_exists(jobpid):
return 'RUNNING' ,-1
else:
return self._read_job_return_code()
else:
# raise ValueError('offset value is invalid: {}'.format(offset))
return 'UNKNOWN' ,-1
def _read_job_return_code(self):
if os.path.isfile(self.jobcode_filename):
with open(self.jobcode_filename, 'r') as f:
job_return_code = f.readline()
match = JOB_CODE_PATTERN.match(job_return_code)
if(match):
return_code = int(match.group(1))
timestamp = int(match.group(2))
status = ''
if return_code == 0:
status = 'SUCCEEDED'
elif return_code > 128:
status = 'USER_CANCELED'
else:
status = 'FAILED'
return status, timestamp
else:
raise ValueError('Job code file format incorrect')
else:
raise ValueError('job return code file doesnt exist: {}'.format(self.jobcode_filename))
def read_experiment_metrics(args):
'''
Read metrics data for specified trial jobs
'''
trial_job_ids = args.trial_job_ids.strip().split(',')
trial_job_ids = [id.strip() for id in trial_job_ids]
results = []
for trial_job_id in trial_job_ids:
result = {}
try:
trial_job_dir = os.path.join(args.experiment_dir, 'trials', trial_job_id)
reader = TrialMetricsReader(trial_job_dir)
result['jobId'] = trial_job_id
result['metrics'] = reader.read_trial_metrics()
result['jobStatus'], result['endTimestamp'] = reader.read_trial_status()
results.append(result)
except Exception:
#TODO error logging to file
pass
print(json.dumps(results))
if __name__ == '__main__':
PARSER = argparse.ArgumentParser()
PARSER.add_argument("--experiment_dir", type=str, help="Root directory of experiment", required=True)
PARSER.add_argument("--trial_job_ids", type=str, help="Trial job ids splited with ','", required=True)
ARGS, UNKNOWN = PARSER.parse_known_args()
read_experiment_metrics(ARGS)
......@@ -163,4 +163,20 @@ export class GPUScheduler {
}
};
}
/**
* remove the job's gpu reversion
* @param trialJobId
* @param rmMeta
*/
public removeGpuReservation(trialJobId: string, rmMeta?: RemoteMachineMeta): void{
// If remote machine has no GPU, gpuReservcation is not initialized, so check if it's undefined
if(rmMeta !== undefined && rmMeta.gpuReservation !== undefined) {
rmMeta.gpuReservation.forEach((reserveTrialJobId : string, gpuIndex : number) => {
if(reserveTrialJobId == trialJobId) {
rmMeta.gpuReservation.delete(gpuIndex);
}
});
}
}
}
/**
* Copyright (c) Microsoft Corporation
* All rights reserved.
*
* MIT License
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
* documentation files (the "Software"), to deal in the Software without restriction, including without limitation
* the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and
* to permit persons to whom the Software is furnished to do so, subject to the following conditions:
* The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING
* BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
* DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
'use strict';
import * as assert from 'assert';
import { EventEmitter } from 'events';
import * as path from 'path';
import { Client } from 'ssh2';
import { getLogger, Logger } from '../../common/log';
import { TrialJobStatus, TrialJobDetail } from '../../common/trainingService';
import { JobMetrics } from '../common/jobMetrics';
import { RemoteCommandResult, RemoteMachineMeta, RemoteMachineTrialJobDetail } from './remoteMachineData';
import { SSHClientUtility } from './sshClientUtility';
export class MetricsCollector {
private machineSSHClientMap : Map<RemoteMachineMeta, Client>;
private trialJobsMap : Map<string, any>;
private expRootDir: string;
private metricsEmitter: EventEmitter;
private log: Logger = getLogger();
constructor(clientMap: Map<RemoteMachineMeta, Client>,
jobMap: Map<string, any>,
expDir: string, eventEmitter: EventEmitter) {
this.machineSSHClientMap = clientMap;
this.trialJobsMap = jobMap;
this.expRootDir = expDir;
this.metricsEmitter = eventEmitter;
}
public async collectMetrics(): Promise<void> {
const aliveJobStatus : TrialJobStatus[] = ['RUNNING', 'SUCCEEDED'];
const runningJobsMap: Map<RemoteMachineMeta, string[]> = this.getTrialJobIdsGroupByRmMeta(aliveJobStatus);
const readMetricsTasks: Promise<JobMetrics[]>[] = [];;
runningJobsMap.forEach((jobIds: string[], rmMeta: RemoteMachineMeta) => {
readMetricsTasks.push(this.readRmMetrics(rmMeta, jobIds));
});
const allMetrics = await Promise.all(readMetricsTasks.map(task => { return task.catch(err => { this.log.error(err.message); }); }));
allMetrics.forEach((rmMetrics) => {
if (rmMetrics !== undefined && rmMetrics.length > 0) {
rmMetrics.forEach((jobMetrics) => {
const trialJobId : string = jobMetrics.jobId;
const trialJobDetail : RemoteMachineTrialJobDetail = <RemoteMachineTrialJobDetail>this.trialJobsMap.get(trialJobId);
assert(trialJobDetail);
// If job status is not alive again, remove its GPU reservation
if(!['RUNNING'].includes(jobMetrics.jobStatus)) {
if (trialJobDetail.status !== 'EARLY_STOPPED') {
trialJobDetail.status = jobMetrics.jobStatus;
}
this.log.debug(`Set trialjob ${trialJobDetail.id} status to ${trialJobDetail.status}`);
runningJobsMap.forEach((jobIds: string[], rmMeta: RemoteMachineMeta) => {
// If remote machine has no GPU, gpuReservcation is not initialized, so check if it's undefined
if(rmMeta.gpuReservation !== undefined) {
rmMeta.gpuReservation.forEach((reserveTrialJobId : string, gpuIndex : number) => {
if(reserveTrialJobId == trialJobId) {
rmMeta.gpuReservation.delete(gpuIndex);
}
});
}
});
}
this.sendMetricsToListeners(jobMetrics);
});
}
});
}
private getTrialJobIdsGroupByRmMeta(status: TrialJobStatus[]): Map<RemoteMachineMeta, string[]> {
const map: Map<RemoteMachineMeta, string[]> = new Map<RemoteMachineMeta, string[]>();
this.trialJobsMap.forEach((trialJob, id) => {
let reservedTrialJobIds : string[] = [];
if(trialJob.rmMeta !== undefined
&& trialJob.rmMeta.gpuReservation !== undefined) {
reservedTrialJobIds = Array.from(trialJob.rmMeta.gpuReservation.values());
}
if (reservedTrialJobIds.includes(id) || status.includes(trialJob.status)) {
if (map.has(trialJob.rmMeta)) {
const ids = map.get(trialJob.rmMeta);
if (ids !== undefined && !ids.includes(id)) {
ids.push(id);
}
} else {
let initJobIds : string[] = [id];
// If the remote machine has jobs reserve GPU, also put that jobs into list to get metrics data
if(trialJob.rmMeta.gpuReservation !== undefined) {
const concatJobIds : string[] = initJobIds.concat(reservedTrialJobIds);
initJobIds = concatJobIds.filter((item, pos) => concatJobIds.indexOf(item) === pos);
}
map.set(trialJob.rmMeta, initJobIds);
}
}
});
return map;
}
private sendMetricsToListeners(jobMetrics: JobMetrics): void {
if (jobMetrics === undefined) {
return;
}
const jobId: string = jobMetrics.jobId;
jobMetrics.metrics.forEach((metric: string) => {
if (metric.length > 0) {
this.metricsEmitter.emit('metric', {
id : jobId,
data : metric
});
}
});
}
private async readRmMetrics(rmMeta: RemoteMachineMeta, trialJobIds: string[]): Promise<JobMetrics[]> {
if (trialJobIds === undefined || trialJobIds.length < 1) {
return [];
}
const scriptFile: string = path.join(path.dirname(path.dirname(this.expRootDir)), 'scripts', 'metrics_reader.py');
const cmdStr: string = `python3 ${scriptFile} --experiment_dir ${this.expRootDir} --trial_job_ids ${trialJobIds.join(',')}`;
trialJobIds.forEach((id: string) => {
const trialJob: RemoteMachineTrialJobDetail = this.trialJobsMap.get(id);
assert(trialJob.rmMeta === rmMeta);
});
const sshClient: Client | undefined = this.machineSSHClientMap.get(rmMeta);
if (sshClient === undefined) {
throw new Error('SSHClient not found!');
}
const result: RemoteCommandResult = await SSHClientUtility.remoteExeCommand(cmdStr, sshClient);
if (result.exitCode !== 0) {
throw new Error(`Failed to read metrics data: ${result.stderr}`);
} else {
if (result.stdout !== undefined && result.stdout.length > 0) {
return <JobMetrics[]>JSON.parse(result.stdout);
} else {
return [];
}
}
}
}
......@@ -108,15 +108,14 @@ export enum ScheduleResultType {
REQUIRE_EXCEED_TOTAL
}
export const REMOTEMACHINE_RUN_SHELL_FORMAT: string =
export const REMOTEMACHINE_TRIAL_COMMAND_FORMAT: string =
`#!/bin/bash
export NNI_PLATFORM=remote NNI_SYS_DIR={0} NNI_TRIAL_JOB_ID={1} NNI_OUTPUT_DIR={0}
export MULTI_PHASE={7}
export NNI_TRIAL_SEQ_ID={8}
export NNI_PLATFORM=remote NNI_SYS_DIR={0} NNI_OUTPUT_DIR={1} NNI_TRIAL_JOB_ID={2} NNI_EXP_ID={3} NNI_TRIAL_SEQ_ID={4} export MULTI_PHASE={5}
cd $NNI_SYS_DIR
echo $$ >{2}
eval {3}{4} 2>{5}
echo $? \`date +%s%3N\` >{6}`;
sh install_nni.sh
echo $$ >{6}
python3 -m nni_trial_tool.trial_keeper --trial_command '{7}' --nnimanager_ip '{8}' --nnimanager_port '{9}'
echo $? \`date +%s%3N\` >{10}`;
export const HOST_JOB_SHELL_FORMAT: string =
`#!/bin/bash
......@@ -124,3 +123,11 @@ cd {0}
echo $$ >{1}
eval {2} >stdout 2>stderr
echo $? \`date +%s%3N\` >{3}`;
export const GPU_COLLECTOR_FORMAT: string =
`
#!/bin/bash
export METRIC_OUTPUT_DIR={0}
echo $$ >{1}
python3 -m nni_gpu_tool.gpu_metrics_collector
`
/**
* Copyright (c) Microsoft Corporation
* All rights reserved.
*
* MIT License
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
* documentation files (the "Software"), to deal in the Software without restriction, including without limitation
* the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and
* to permit persons to whom the Software is furnished to do so, subject to the following conditions:
* The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING
* BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
* DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
'use strict';
import * as component from '../../common/component';
import { Inject } from 'typescript-ioc';
import { RemoteMachineTrainingService } from './remoteMachineTrainingService';
import { ClusterJobRestServer } from '../common/clusterJobRestServer'
/**
* RemoteMachine Training service Rest server, provides rest RemoteMachine to support remotemachine job metrics update
*
*/
@component.Singleton
export class RemoteMachineJobRestServer extends ClusterJobRestServer{
@Inject
private readonly remoteMachineTrainingService : RemoteMachineTrainingService;
/**
* constructor to provide NNIRestServer's own rest property, e.g. port
*/
constructor() {
super();
this.remoteMachineTrainingService = component.get(RemoteMachineTrainingService);
}
protected handleTrialMetrics(jobId : string, metrics : any[]) : void {
// Split metrics array into single metric, then emit
// Warning: If not split metrics into single ones, the behavior will be UNKNOWNls
for (const singleMetric of metrics) {
this.remoteMachineTrainingService.MetricsEmitter.emit('metric', {
id : jobId,
data : singleMetric
});
}
}
}
\ No newline at end of file
......@@ -29,30 +29,34 @@ import { Client, ConnectConfig } from 'ssh2';
import { Deferred } from 'ts-deferred';
import { String } from 'typescript-string-operations';
import * as component from '../../common/component';
import { MethodNotImplementedError, NNIError, NNIErrorNames } from '../../common/errors';
import { NNIError, NNIErrorNames } from '../../common/errors';
import { getExperimentId, getInitTrialSequenceId } from '../../common/experimentStartupInfo';
import { getLogger, Logger } from '../../common/log';
import { ObservableTimer } from '../../common/observableTimer';
import {
HostJobApplicationForm, HyperParameters, JobApplicationForm, TrainingService, TrialJobApplicationForm, TrialJobDetail, TrialJobMetric
HostJobApplicationForm, HyperParameters, JobApplicationForm, TrainingService, TrialJobApplicationForm, TrialJobDetail, TrialJobMetric, NNIManagerIpConfig
} from '../../common/trainingService';
import { delay, generateParamFileName, getExperimentRootDir, uniqueString, getJobCancelStatus, getRemoteTmpDir } from '../../common/utils';
import { delay, generateParamFileName, getExperimentRootDir, uniqueString, getJobCancelStatus, getRemoteTmpDir,getIPV4Address } from '../../common/utils';
import { GPUSummary } from '../common/gpuData';
import { TrialConfig } from '../common/trialConfig';
import { TrialConfigMetadataKey } from '../common/trialConfigMetadataKey';
import { GPUScheduler } from './gpuScheduler';
import { MetricsCollector } from './metricsCollector';
import {
HOST_JOB_SHELL_FORMAT, RemoteCommandResult, RemoteMachineMeta,
REMOTEMACHINE_RUN_SHELL_FORMAT, RemoteMachineScheduleInfo, RemoteMachineScheduleResult,
RemoteMachineTrialJobDetail, ScheduleResultType
RemoteMachineScheduleInfo, RemoteMachineScheduleResult,
RemoteMachineTrialJobDetail, ScheduleResultType, REMOTEMACHINE_TRIAL_COMMAND_FORMAT,
GPU_COLLECTOR_FORMAT
} from './remoteMachineData';
import { SSHClientUtility } from './sshClientUtility';
import { validateCodeDir} from '../common/util';
import { RemoteMachineJobRestServer } from './remoteMachineJobRestServer';
import { CONTAINER_INSTALL_NNI_SHELL_FORMAT } from '../common/containerJobData';
import { mkDirP } from '../../common/utils';
/**
* Training Service implementation for Remote Machine (Linux)
*/
@component.Singleton
class RemoteMachineTrainingService implements TrainingService {
private machineSSHClientMap: Map<RemoteMachineMeta, Client>;
private trialJobsMap: Map<string, RemoteMachineTrialJobDetail>;
......@@ -67,7 +71,9 @@ class RemoteMachineTrainingService implements TrainingService {
private log: Logger;
private isMultiPhase: boolean = false;
private trialSequenceId: number;
private remoteRestServerPort?: number;
private readonly remoteOS: string;
private nniManagerIpConfig?: NNIManagerIpConfig;
constructor(@component.Inject timer: ObservableTimer) {
this.remoteOS = 'linux';
......@@ -88,9 +94,12 @@ class RemoteMachineTrainingService implements TrainingService {
* Loop to launch trial jobs and collect trial metrics
*/
public async run(): Promise<void> {
const restServer: RemoteMachineJobRestServer = component.get(RemoteMachineJobRestServer);
await restServer.start();
this.log.info('Run remote machine training service.');
while (!this.stopping) {
while (this.jobQueue.length > 0) {
this.updateGpuReservation();
const trialJobId: string = this.jobQueue[0];
const prepareResult : boolean = await this.prepareTrialJob(trialJobId);
if (prepareResult) {
......@@ -102,9 +111,6 @@ class RemoteMachineTrainingService implements TrainingService {
break;
}
}
const metricsCollector: MetricsCollector = new MetricsCollector(
this.machineSSHClientMap, this.trialJobsMap, this.remoteExpRootDir, this.metricsEmitter);
await metricsCollector.collectMetrics();
await delay(3000);
}
this.log.info('Remote machine training service exit.');
......@@ -225,6 +231,17 @@ class RemoteMachineTrainingService implements TrainingService {
return trialJobDetail;
}
/**
* remove gpu reversion when job is not running
*/
private updateGpuReservation() {
for (const [key, value] of this.trialJobsMap) {
if(!['WAITING', 'RUNNING'].includes(value.status)) {
this.gpuScheduler.removeGpuReservation(value.id, value.rmMeta);
}
};
}
/**
* Is multiphase job supported in current training service
......@@ -284,8 +301,13 @@ class RemoteMachineTrainingService implements TrainingService {
*/
public async setClusterMetadata(key: string, value: string): Promise<void> {
switch (key) {
case TrialConfigMetadataKey.NNI_MANAGER_IP:
this.nniManagerIpConfig = <NNIManagerIpConfig>JSON.parse(value);
break;
case TrialConfigMetadataKey.MACHINE_LIST:
await this.setupConnections(value);
//remove local temp files
await cpp.exec(`rm -rf ${this.getLocalGpuMetricCollectorDir()}`);
break;
case TrialConfigMetadataKey.TRIAL_CONFIG:
const remoteMachineTrailConfig: TrialConfig = <TrialConfig>JSON.parse(value);
......@@ -326,12 +348,58 @@ class RemoteMachineTrainingService implements TrainingService {
return deferred.promise;
}
public cleanUp(): Promise<void> {
/**
* cleanup() has a time out of 10s to clean remote connections
*/
public async cleanUp(): Promise<void> {
this.log.info('Stopping remote machine training service...');
this.stopping = true;
await Promise.race([delay(10000), this.cleanupConnections()]);
}
/**
* stop gpu_metric_collector process in remote machine and remove unused scripts
*/
private async cleanupConnections(): Promise<void> {
try{
for (const [rmMeta, client] of this.machineSSHClientMap.entries()) {
let jobpidPath: string = path.join(this.getRemoteScriptsPath(rmMeta.username), 'pid');
await SSHClientUtility.remoteExeCommand(`pkill -P \`cat ${jobpidPath}\``, client);
await SSHClientUtility.remoteExeCommand(`rm -rf ${this.getRemoteScriptsPath(rmMeta.username)}`, client);
}
}catch (error) {
//ignore error, this function is called to cleanup remote connections when experiment is stopping
this.log.error(`Cleanup connection exception, error is ${error.message}`);
}
return Promise.resolve();
}
/**
* Generate gpu metric collector directory to store temp gpu metric collector script files
*/
private getLocalGpuMetricCollectorDir(): string {
let userName: string = path.basename(os.homedir()); //get current user name of os
return `${os.tmpdir()}/${userName}/nni/scripts/`;
}
/**
* Generate gpu metric collector shell script in local machine,
* used to run in remote machine, and will be deleted after uploaded from local.
*/
private async generateGpuMetricsCollectorScript(userName: string): Promise<void> {
let gpuMetricCollectorScriptFolder : string = this.getLocalGpuMetricCollectorDir();
await cpp.exec(`mkdir -p ${path.join(gpuMetricCollectorScriptFolder, userName)}`);
//generate gpu_metrics_collector.sh
let gpuMetricsCollectorScriptPath: string = path.join(gpuMetricCollectorScriptFolder, userName, 'gpu_metrics_collector.sh');
const remoteGPUScriptsDir: string = this.getRemoteScriptsPath(userName); // This directory is used to store gpu_metrics and pid created by script
const gpuMetricsCollectorScriptContent: string = String.Format(
GPU_COLLECTOR_FORMAT,
remoteGPUScriptsDir,
path.join(remoteGPUScriptsDir, 'pid'),
);
await fs.promises.writeFile(gpuMetricsCollectorScriptPath, gpuMetricsCollectorScriptContent, { encoding: 'utf8' });
}
private async setupConnections(machineList: string): Promise<void> {
......@@ -340,6 +408,7 @@ class RemoteMachineTrainingService implements TrainingService {
//TO DO: verify if value's format is wrong, and json parse failed, how to handle error
const rmMetaList: RemoteMachineMeta[] = <RemoteMachineMeta[]>JSON.parse(machineList);
let connectedRMNum: number = 0;
rmMetaList.forEach((rmMeta: RemoteMachineMeta) => {
const conn: Client = new Client();
let connectConfig: ConnectConfig = {
......@@ -372,29 +441,30 @@ class RemoteMachineTrainingService implements TrainingService {
deferred.reject(new Error(err.message));
}).connect(connectConfig);
});
return deferred.promise;
}
private async initRemoteMachineOnConnected(rmMeta: RemoteMachineMeta, conn: Client): Promise<void> {
// Create root working directory after ssh connection is ready
//TO DO: Should we mk experiments rootDir here?
const nniRootDir: string = '/tmp/nni';
await this.generateGpuMetricsCollectorScript(rmMeta.username); //generate gpu script in local machine first, will copy to remote machine later
const nniRootDir: string = `${os.tmpdir()}/nni`;
await SSHClientUtility.remoteExeCommand(`mkdir -p ${this.remoteExpRootDir}`, conn);
// Copy NNI scripts to remote expeirment working directory
const remoteScriptsDir: string = this.getRemoteScriptsPath();
await SSHClientUtility.remoteExeCommand(`mkdir -p ${remoteScriptsDir}`, conn);
await SSHClientUtility.copyDirectoryToRemote('./scripts', remoteScriptsDir, conn, this.remoteOS);
const localGpuScriptCollectorDir: string = this.getLocalGpuMetricCollectorDir();
const remoteGpuScriptCollectorDir: string = this.getRemoteScriptsPath(rmMeta.username); //the directory to store temp scripts in remote machine
await SSHClientUtility.remoteExeCommand(`mkdir -p ${remoteGpuScriptCollectorDir}`, conn);
await SSHClientUtility.remoteExeCommand(`chmod 777 ${nniRootDir} ${nniRootDir}/* ${nniRootDir}/scripts/*`, conn);
//copy gpu_metrics_collector.sh to remote
await SSHClientUtility.copyFileToRemote(path.join(localGpuScriptCollectorDir, rmMeta.username, 'gpu_metrics_collector.sh'), path.join(remoteGpuScriptCollectorDir, 'gpu_metrics_collector.sh'), conn);
//Begin to execute gpu_metrics_collection scripts
SSHClientUtility.remoteExeCommand(`cd ${remoteScriptsDir} && python3 gpu_metrics_collector.py`, conn);
SSHClientUtility.remoteExeCommand(`bash ${path.join(remoteGpuScriptCollectorDir, 'gpu_metrics_collector.sh')}`, conn);
this.timer.subscribe(
async (tick: number) => {
const cmdresult: RemoteCommandResult = await SSHClientUtility.remoteExeCommand(
`tail -n 1 ${path.join(remoteScriptsDir, 'gpu_metrics')}`, conn);
`tail -n 1 ${path.join(remoteGpuScriptCollectorDir, 'gpu_metrics')}`, conn);
if (cmdresult && cmdresult.stdout) {
rmMeta.gpuSummary = <GPUSummary>JSON.parse(cmdresult.stdout);
}
......@@ -412,7 +482,6 @@ class RemoteMachineTrainingService implements TrainingService {
if (trialJobDetail === undefined) {
throw new NNIError(NNIErrorNames.INVALID_JOB_DETAIL, `Invalid job detail information for trial job ${trialJobId}`);
}
// get an ssh client from scheduler
const rmScheduleResult: RemoteMachineScheduleResult = this.gpuScheduler.scheduleMachine(this.trialConfig.gpuNum, trialJobId);
if (rmScheduleResult.resultType === ScheduleResultType.REQUIRE_EXCEED_TOTAL) {
......@@ -466,39 +535,51 @@ class RemoteMachineTrainingService implements TrainingService {
await SSHClientUtility.remoteExeCommand(`mkdir -p ${trialWorkingFolder}`, sshClient);
await SSHClientUtility.remoteExeCommand(`mkdir -p ${path.join(trialWorkingFolder, '.nni')}`, sshClient);
await SSHClientUtility.remoteExeCommand(`touch ${path.join(trialWorkingFolder, '.nni', 'metrics')}`, sshClient);
// RemoteMachineRunShellFormat is the run shell format string,
// See definition in remoteMachineData.ts
const runScriptContent: string = String.Format(
REMOTEMACHINE_RUN_SHELL_FORMAT,
let command: string;
// Set CUDA_VISIBLE_DEVICES environment variable based on cuda_visible_device
// If no valid cuda_visible_device is defined, set CUDA_VISIBLE_DEVICES to empty string to hide GPU device
if(typeof cuda_visible_device === 'string' && cuda_visible_device.length > 0) {
command = `CUDA_VISIBLE_DEVICES=${cuda_visible_device} ${this.trialConfig.command}`;
} else {
command = `CUDA_VISIBLE_DEVICES=" " ${this.trialConfig.command}`;
}
const nniManagerIp = this.nniManagerIpConfig?this.nniManagerIpConfig.nniManagerIp:getIPV4Address();
if(!this.remoteRestServerPort) {
const restServer: RemoteMachineJobRestServer = component.get(RemoteMachineJobRestServer);
this.remoteRestServerPort = restServer.clusterRestServerPort;
}
const runScriptTrialContent: string = String.Format(
REMOTEMACHINE_TRIAL_COMMAND_FORMAT,
trialWorkingFolder,
trialWorkingFolder,
trialJobId,
path.join(trialWorkingFolder, '.nni', 'jobpid'),
// Set CUDA_VISIBLE_DEVICES environment variable based on cuda_visible_device
// If no valid cuda_visible_device is defined, set CUDA_VISIBLE_DEVICES to empty string to hide GPU device
(typeof cuda_visible_device === 'string' && cuda_visible_device.length > 0) ?
`CUDA_VISIBLE_DEVICES=${cuda_visible_device} ` : `CUDA_VISIBLE_DEVICES=" " `,
this.trialConfig.command,
path.join(trialWorkingFolder, 'stderr'),
path.join(trialWorkingFolder, '.nni', 'code'),
/** Mark if the trial is multi-phase job */
getExperimentId(),
trialJobDetail.sequenceId.toString(),
this.isMultiPhase,
trialJobDetail.sequenceId.toString()
);
path.join(trialWorkingFolder, '.nni', 'jobpid'),
command,
nniManagerIp,
this.remoteRestServerPort,
path.join(trialWorkingFolder, '.nni', 'code')
)
//create tmp trial working folder locally.
await cpp.exec(`mkdir -p ${path.join(trialLocalTempFolder, '.nni')}`);
//create tmp trial working folder locally.
await cpp.exec(`cp -r ${this.trialConfig.codeDir}/* ${trialLocalTempFolder}`);
const installScriptContent : string = CONTAINER_INSTALL_NNI_SHELL_FORMAT;
// Write NNI installation file to local tmp files
await fs.promises.writeFile(path.join(trialLocalTempFolder, 'install_nni.sh'), installScriptContent, { encoding: 'utf8' });
// Write file content ( run.sh and parameter.cfg ) to local tmp files
await fs.promises.writeFile(path.join(trialLocalTempFolder, 'run.sh'), runScriptContent, { encoding: 'utf8' });
// Copy local tmp files to remote machine
await SSHClientUtility.copyFileToRemote(
path.join(trialLocalTempFolder, 'run.sh'), path.join(trialWorkingFolder, 'run.sh'), sshClient);
await fs.promises.writeFile(path.join(trialLocalTempFolder, 'run.sh'), runScriptTrialContent, { encoding: 'utf8' });
await this.writeParameterFile(trialJobId, form.hyperParameters, rmScheduleInfo.rmMeta);
// Copy files in codeDir to remote working directory
await SSHClientUtility.copyDirectoryToRemote(this.trialConfig.codeDir, trialWorkingFolder, sshClient, this.remoteOS);
await SSHClientUtility.copyDirectoryToRemote(trialLocalTempFolder, trialWorkingFolder, sshClient, this.remoteOS);
// Execute command in remote machine
SSHClientUtility.remoteExeCommand(`bash ${path.join(trialWorkingFolder, 'run.sh')}`, sshClient);
}
......@@ -546,7 +627,6 @@ class RemoteMachineTrainingService implements TrainingService {
const deferred: Deferred<TrialJobDetail> = new Deferred<TrialJobDetail>();
const jobpidPath: string = this.getJobPidPath(trialJob.id);
const trialReturnCodeFilePath: string = path.join(this.remoteExpRootDir, 'trials', trialJob.id, '.nni', 'code');
try {
const killResult: number = (await SSHClientUtility.remoteExeCommand(`kill -0 \`cat ${jobpidPath}\``, sshClient)).exitCode;
// if the process of jobpid is not alive any more
......@@ -576,12 +656,11 @@ class RemoteMachineTrainingService implements TrainingService {
deferred.resolve(trialJob);
}
}
return deferred.promise;
}
private getRemoteScriptsPath(): string {
return path.join(path.dirname(path.dirname(this.remoteExpRootDir)), 'scripts');
private getRemoteScriptsPath(userName: string): string {
return path.join(getRemoteTmpDir(this.remoteOS), userName, 'nni', 'scripts');
}
private getHostJobRemoteDir(jobId: string): string {
......@@ -592,6 +671,10 @@ class RemoteMachineTrainingService implements TrainingService {
return path.join(getRemoteTmpDir(this.remoteOS), 'nni', 'experiments', getExperimentId());
}
public get MetricsEmitter() : EventEmitter {
return this.metricsEmitter;
}
private getJobPidPath(jobId: string): string {
const trialJobDetail: RemoteMachineTrialJobDetail | undefined = this.trialJobsMap.get(jobId);
if (trialJobDetail === undefined) {
......@@ -633,22 +716,6 @@ class RemoteMachineTrainingService implements TrainingService {
return this.trialSequenceId++;
}
private async writeRemoteTrialFile(trialJobId: string, fileContent: string,
rmMeta: RemoteMachineMeta, fileName: string): Promise<void> {
const sshClient: Client | undefined = this.machineSSHClientMap.get(rmMeta);
if (sshClient === undefined) {
throw new Error('sshClient is undefined.');
}
const trialWorkingFolder: string = path.join(this.remoteExpRootDir, 'trials', trialJobId);
const trialLocalTempFolder: string = path.join(this.expRootDir, 'trials-local', trialJobId);
const localFilepath: string = path.join(trialLocalTempFolder, fileName);
await fs.promises.writeFile(localFilepath, fileContent, { encoding: 'utf8' });
await SSHClientUtility.copyFileToRemote(localFilepath, path.join(trialWorkingFolder, fileName), sshClient);
}
}
export { RemoteMachineTrainingService };
......@@ -44,13 +44,10 @@ _time_format = '%Y-%m-%d %H:%M:%S'
class _LoggerFileWrapper(TextIOBase):
def __init__(self, logger_file):
self.file = logger_file
self.orig_stdout = sys.stdout
def write(self, s):
if s != '\n':
time = datetime.now().strftime(_time_format)
self.orig_stdout.write(s + '\n')
self.orig_stdout.flush()
self.file.write('[{}] PRINT '.format(time) + s + '\n')
self.file.flush()
return len(s)
......
......@@ -37,7 +37,7 @@ The figure below is the result of our algorithm on MNIST trial history data, whe
</p>
## 2. Usage
To use Curve Fitting Assessor, you should add the following spec in your experiment's yaml config file:
To use Curve Fitting Assessor, you should add the following spec in your experiment's YAML config file:
```
assessor:
......
Hyperband on nni
Hyperband on NNI
===
## 1. Introduction
......@@ -10,7 +10,7 @@ Frist, this is an example of how to write an automl algorithm based on MsgDispat
Second, this implementation fully leverages Hyperband's internal parallelism. More specifically, the next bucket is not started strictly after the current bucket, instead, it starts when there is available resource.
## 3. Usage
To use Hyperband, you should add the following spec in your experiment's yaml config file:
To use Hyperband, you should add the following spec in your experiment's yml config file:
```
advisor:
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment