Unverified Commit a3872505 authored by QuanluZhang's avatar QuanluZhang Committed by GitHub
Browse files

Dev hyperband (#405)

* support hyperband

* add example for hyperband

* register Hyperband in tuner

* after debug

* update doc

* trivial change

* update spec validation of yaml config

* modify nnictl launcher

* modify nnimanager and util to support advisor

* Quick fix nnictl config logic (#289)

* fix nnictl bug

* fix install.sh

* add desc for Dockerfile.build.base

* update document for Dockerfile

* update

* refactor port detect

* update

* refactor NNICTLDOC.md

* add document for pai and nnictl

* add default value for port

* add exception handling in trial_keeper.py

* fix port bug

* fix resume

* fix nnictl resume and fix nnictl stop

* fix document

* update

* refactor nnictl

* update

* update doc

* update

* update nnictl

* fix comment

* revert dockerfile

* update

* update

* update

* fix nnictl error hit

* fix comments

* fix bash-completion

* fix paramiko install

* quick fix resume logic

* update

* quick fix nnictl

* refactor sdk main

* update unit test accordingly

* update example's config file

* update restserver validation

* PR merge to 0.3 (#297)

* refactor doc

* update with Mao's suggestions

* Set theme jekyll-theme-dinky

* update doc

* fix links

* fix links

* fix links

* merge

* fix links and doc errors

* merge

* merge

* merge

* merge

* Update README.md (#288)

added License badge

* merge

* updated the "Contribute" part (merged Gems' wiki in, updated ReadMe)

* fix link

* fix doc mistakes and broken links. (#271)

* refactor doc

* update with Mao's suggestions

* Set theme jekyll-theme-dinky

* updated the "Contribute" part (merged Gems' wiki in, updated ReadMe)

* fix link

* Update README.md

* Fix misspelling in examples/trials/ga_squad/README.md

* revise the installation cmd to v0.2

* revise to install v0.2

* remove files

* update

* remove enas readme (#292)

* support checkpoint directory

* Fix datastore performance issue (#301)

* fix pylint

* Fix nnictl in v0.3 (#299)

Fix old version of config file
fix sklearn requirements
Fix resume log logic

* modify log

* trivial changes

* update example

* update makefile

* update launcher.py to fix the problem of finding main.js

* debug

* add hyperparameter info into trial_end api

* fix bug and update example

* fix error induced by merge

* support initialize

* add doc for hyperband

* fix bugs and add config_pai

* fix bugs and add config_pai

* fix bugs and add config_pai

* fix bugs and add config_pai

* update doc

* add doc for advisor

* fit

* modification based on hui's comments

* update doc
parent d2f06385
......@@ -6,7 +6,7 @@ So, if user want to implement a customized Tuner, she/he only need to:
1) Inherit a tuner of a base Tuner class
2) Implement receive_trial_result and generate_parameter function
3) Write a script to run Tuner
3) Configure your customized tuner in experiment yaml config file
Here ia an example:
......@@ -93,3 +93,6 @@ More detail example you could see:
> * [evolution-tuner](../src/sdk/pynni/nni/evolution_tuner)
> * [hyperopt-tuner](../src/sdk/pynni/nni/hyperopt_tuner)
> * [evolution-based-customized-tuner](../examples/tuners/ga_customer_tuner)
## Write a more advanced automl algorithm
The methods above are usually enough to write a general tuner. However, users may also want more methods, for example, intermediate results, trials' state (e.g., the methods in assessor), in order to have a more powerful automl algorithm. Therefore, we have another concept called `advisor` which directly inherits from `MsgDispatcherBase` in [`src/sdk/pynni/nni/msg_dispatcher_base.py`](../src/sdk/pynni/nni/msg_dispatcher_base.py). Please refer to [here](howto_3_CustomizedAdvisor) for how to write a customized advisor.
# **How To** - Customize Your Own Advisor
*Advisor targets the scenario that the automl algorithm wants the methods of both tuner and assessor. Advisor is similar to tuner on that it receives trial configuration request, final results, and generate trial configurations. Also, it is similar to assessor on that it receives intermediate results, trial's end state, and could send trial kill command. Note that, if you use Advisor, tuner and assessor are not allowed to be used at the same time.*
So, if user want to implement a customized Advisor, she/he only need to:
1) Define an Advisor inheriting from the MsgDispatcherBase class
2) Implement the methods with prefix `handle_` except `handle_request`
3) Configure your customized Advisor in experiment yaml config file
Here ia an example:
**1) Define an Advisor inheriting from the MsgDispatcherBase class**
```python
from nni.msg_dispatcher_base import MsgDispatcherBase
class CustomizedAdvisor(MsgDispatcherBase):
def __init__(self, ...):
...
```
**2) Implement the methods with prefix `handle_` except `handle_request`**
Please refer to the implementation of Hyperband ([src/sdk/pynni/nni/hyperband_advisor/hyperband_advisor.py](../src/sdk/pynni/nni/hyperband_advisor/hyperband_advisor.py)) for how to implement the methods.
**3) Configure your customized Advisor in experiment yaml config file**
Similar to tuner and assessor. NNI needs to locate your customized Advisor class and instantiate the class, so you need to specify the location of the customized Advisor class and pass literal values as parameters to the \_\_init__ constructor.
```yaml
advisor:
codeDir: /home/abc/myadvisor
classFileName: my_customized_advisor.py
className: CustomizedAdvisor
# Any parameter need to pass to your advisor class __init__ constructor
# can be specified in this optional classArgs field, for example
classArgs:
arg1: value1
```
authorName: default
experimentName: example_mnist
trialConcurrency: 2
maxExecDuration: 100h
maxTrialNum: 10000
#choice: local, remote, pai
trainingServicePlatform: local
searchSpacePath: search_space.json
#choice: true, false
useAnnotation: false
advisor:
#choice: Hyperband
builtinAdvisorName: Hyperband
classArgs:
#R: the maximum STEPS (could be the number of mini-batches or epochs) can be
# allocated to a trial. Each trial should use STEPS to control how long it runs.
R: 100
#eta: proportion of discarded trials
eta: 3
#choice: maximize, minimize
optimize_mode: maximize
trial:
command: python3 mnist.py
codeDir: .
gpuNum: 0
authorName: default
experimentName: example_mnist_hyperband
maxExecDuration: 1h
maxTrialNum: 10000
trialConcurrency: 10
#choice: local, remote, pai
trainingServicePlatform: pai
searchSpacePath: search_space.json
#choice: true, false
useAnnotation: false
advisor:
#choice: Hyperband
builtinAdvisorName: Hyperband
classArgs:
#R: the maximum STEPS
R: 100
#eta: proportion of discarded trials
eta: 3
#choice: maximize, minimize
optimize_mode: maximize
trial:
command: python3 mnist.py
codeDir: .
gpuNum: 0
cpuNum: 1
memoryMB: 8196
#The docker image to run nni job on pai
image: openpai/pai.example.tensorflow
#The hdfs directory to store data on pai, format 'hdfs://host:port/directory'
dataDir: hdfs://10.10.10.10:9000/username/nni
#The hdfs directory to store output data generated by nni, format 'hdfs://host:port/directory'
outputDir: hdfs://10.10.10.10:9000/username/nni
paiConfig:
#The username to login pai
userName: username
#The password to login pai
passWord: password
#The host of restful server of pai
host: 10.10.10.10
"""A deep MNIST classifier using convolutional layers."""
import logging
import math
import tempfile
import tensorflow as tf
from tensorflow.examples.tutorials.mnist import input_data
import nni
FLAGS = None
logger = logging.getLogger('mnist_AutoML')
class MnistNetwork(object):
'''
MnistNetwork is for initlizing and building basic network for mnist.
'''
def __init__(self,
channel_1_num,
channel_2_num,
conv_size,
hidden_size,
pool_size,
learning_rate,
x_dim=784,
y_dim=10):
self.channel_1_num = channel_1_num
self.channel_2_num = channel_2_num
self.conv_size = conv_size
self.hidden_size = hidden_size
self.pool_size = pool_size
self.learning_rate = learning_rate
self.x_dim = x_dim
self.y_dim = y_dim
self.images = tf.placeholder(tf.float32, [None, self.x_dim], name='input_x')
self.labels = tf.placeholder(tf.float32, [None, self.y_dim], name='input_y')
self.keep_prob = tf.placeholder(tf.float32, name='keep_prob')
self.train_step = None
self.accuracy = None
def build_network(self):
'''
Building network for mnist
'''
# Reshape to use within a convolutional neural net.
# Last dimension is for "features" - there is only one here, since images are
# grayscale -- it would be 3 for an RGB image, 4 for RGBA, etc.
with tf.name_scope('reshape'):
try:
input_dim = int(math.sqrt(self.x_dim))
except:
print(
'input dim cannot be sqrt and reshape. input dim: ' + str(self.x_dim))
logger.debug(
'input dim cannot be sqrt and reshape. input dim: %s', str(self.x_dim))
raise
x_image = tf.reshape(self.images, [-1, input_dim, input_dim, 1])
# First convolutional layer - maps one grayscale image to 32 feature maps.
with tf.name_scope('conv1'):
w_conv1 = weight_variable(
[self.conv_size, self.conv_size, 1, self.channel_1_num])
b_conv1 = bias_variable([self.channel_1_num])
h_conv1 = tf.nn.relu(conv2d(x_image, w_conv1) + b_conv1)
# Pooling layer - downsamples by 2X.
with tf.name_scope('pool1'):
h_pool1 = max_pool(h_conv1, self.pool_size)
# Second convolutional layer -- maps 32 feature maps to 64.
with tf.name_scope('conv2'):
w_conv2 = weight_variable([self.conv_size, self.conv_size,
self.channel_1_num, self.channel_2_num])
b_conv2 = bias_variable([self.channel_2_num])
h_conv2 = tf.nn.relu(conv2d(h_pool1, w_conv2) + b_conv2)
# Second pooling layer.
with tf.name_scope('pool2'):
h_pool2 = max_pool(h_conv2, self.pool_size)
# Fully connected layer 1 -- after 2 round of downsampling, our 28x28 image
# is down to 7x7x64 feature maps -- maps this to 1024 features.
last_dim = int(input_dim / (self.pool_size * self.pool_size))
with tf.name_scope('fc1'):
w_fc1 = weight_variable(
[last_dim * last_dim * self.channel_2_num, self.hidden_size])
b_fc1 = bias_variable([self.hidden_size])
h_pool2_flat = tf.reshape(
h_pool2, [-1, last_dim * last_dim * self.channel_2_num])
h_fc1 = tf.nn.relu(tf.matmul(h_pool2_flat, w_fc1) + b_fc1)
# Dropout - controls the complexity of the model, prevents co-adaptation of features.
with tf.name_scope('dropout'):
h_fc1_drop = tf.nn.dropout(h_fc1, self.keep_prob)
# Map the 1024 features to 10 classes, one for each digit
with tf.name_scope('fc2'):
w_fc2 = weight_variable([self.hidden_size, self.y_dim])
b_fc2 = bias_variable([self.y_dim])
y_conv = tf.matmul(h_fc1_drop, w_fc2) + b_fc2
with tf.name_scope('loss'):
cross_entropy = tf.reduce_mean(
tf.nn.softmax_cross_entropy_with_logits(labels=self.labels, logits=y_conv))
with tf.name_scope('adam_optimizer'):
self.train_step = tf.train.AdamOptimizer(
self.learning_rate).minimize(cross_entropy)
with tf.name_scope('accuracy'):
correct_prediction = tf.equal(
tf.argmax(y_conv, 1), tf.argmax(self.labels, 1))
self.accuracy = tf.reduce_mean(
tf.cast(correct_prediction, tf.float32))
def conv2d(x_input, w_matrix):
"""conv2d returns a 2d convolution layer with full stride."""
return tf.nn.conv2d(x_input, w_matrix, strides=[1, 1, 1, 1], padding='SAME')
def max_pool(x_input, pool_size):
"""max_pool downsamples a feature map by 2X."""
return tf.nn.max_pool(x_input, ksize=[1, pool_size, pool_size, 1],
strides=[1, pool_size, pool_size, 1], padding='SAME')
def weight_variable(shape):
"""weight_variable generates a weight variable of a given shape."""
initial = tf.truncated_normal(shape, stddev=0.1)
return tf.Variable(initial)
def bias_variable(shape):
"""bias_variable generates a bias variable of a given shape."""
initial = tf.constant(0.1, shape=shape)
return tf.Variable(initial)
def main(params):
'''
Main function, build mnist network, run and send result to NNI.
'''
# Import data
mnist = input_data.read_data_sets(params['data_dir'], one_hot=True)
print('Mnist download data down.')
logger.debug('Mnist download data down.')
# Create the model
# Build the graph for the deep net
mnist_network = MnistNetwork(channel_1_num=params['channel_1_num'],
channel_2_num=params['channel_2_num'],
conv_size=params['conv_size'],
hidden_size=params['hidden_size'],
pool_size=params['pool_size'],
learning_rate=params['learning_rate'])
mnist_network.build_network()
logger.debug('Mnist build network done.')
# Write log
graph_location = tempfile.mkdtemp()
logger.debug('Saving graph to: %s', graph_location)
train_writer = tf.summary.FileWriter(graph_location)
train_writer.add_graph(tf.get_default_graph())
test_acc = 0.0
with tf.Session() as sess:
sess.run(tf.global_variables_initializer())
for i in range(params['batch_num']):
batch = mnist.train.next_batch(params['batch_size'])
mnist_network.train_step.run(feed_dict={mnist_network.images: batch[0],
mnist_network.labels: batch[1],
mnist_network.keep_prob: 1 - params['dropout_rate']}
)
if i % 10 == 0:
test_acc = mnist_network.accuracy.eval(
feed_dict={mnist_network.images: mnist.test.images,
mnist_network.labels: mnist.test.labels,
mnist_network.keep_prob: 1.0})
nni.report_intermediate_result(test_acc)
logger.debug('test accuracy %g', test_acc)
logger.debug('Pipe send intermediate result done.')
test_acc = mnist_network.accuracy.eval(
feed_dict={mnist_network.images: mnist.test.images,
mnist_network.labels: mnist.test.labels,
mnist_network.keep_prob: 1.0})
nni.report_final_result(test_acc)
logger.debug('Final result is %g', test_acc)
logger.debug('Send final result done.')
def generate_default_params():
'''
Generate default parameters for mnist network.
'''
params = {
'data_dir': '/tmp/tensorflow/mnist/input_data',
'dropout_rate': 0.5,
'channel_1_num': 32,
'channel_2_num': 64,
'conv_size': 5,
'pool_size': 2,
'hidden_size': 1024,
'learning_rate': 1e-4,
'batch_size': 32}
return params
if __name__ == '__main__':
try:
# get parameters form tuner
RCV_PARAMS = nni.get_next_parameter()
logger.debug(RCV_PARAMS)
# run
params = generate_default_params()
params.update(RCV_PARAMS)
'''
If you use Hyperband, among the hyperparameters (i.e., key-value pairs) received by a trial,
there is one more key called `STEPS` besides the hyperparameters defined by user.
By using this `STEPS`, the trial can control how long it runs.
'''
params['batch_num'] = RCV_PARAMS['STEPS'] * 10
main(params)
except Exception as exception:
logger.exception(exception)
raise
{
"dropout_rate":{"_type":"uniform","_value":[0.5,0.9]},
"conv_size":{"_type":"choice","_value":[2,3,5,7]},
"hidden_size":{"_type":"choice","_value":[124, 512, 1024]},
"batch_size": {"_type":"choice","_value":[8, 16, 32, 64]},
"learning_rate":{"_type":"choice","_value":[0.0001, 0.001, 0.01, 0.1]}
}
......@@ -35,7 +35,7 @@ interface ExperimentParams {
trainingServicePlatform: string;
multiPhase?: boolean;
multiThread?: boolean;
tuner: {
tuner?: {
className: string;
builtinTunerName?: string;
codeDir?: string;
......@@ -53,6 +53,15 @@ interface ExperimentParams {
checkpointDir: string;
gpuNum?: number;
};
advisor?: {
className: string;
builtinAdvisorName?: string;
codeDir?: string;
classArgs?: any;
classFileName?: string;
checkpointDir: string;
gpuNum?: number;
};
clusterMetaData?: {
key: string;
value: string;
......
......@@ -47,6 +47,10 @@ function getDefaultDatabaseDir(): string {
return path.join(getExperimentRootDir(), 'db');
}
function getCheckpointDir(): string {
return path.join(getExperimentRootDir(), 'checkpoint');
}
function mkDirP(dirPath: string): Promise<void> {
const deferred: Deferred<void> = new Deferred<void>();
fs.exists(dirPath, (exists: boolean) => {
......@@ -137,7 +141,8 @@ function parseArg(names: string[]): string {
}
/**
* Generate command line to start advisor process which runs tuner and assessor
* Generate command line to start automl algorithm(s),
* either start advisor or start a process which runs tuner and assessor
* @param tuner : For builtin tuner:
* {
* className: 'EvolutionTuner'
......@@ -158,10 +163,18 @@ function parseArg(names: string[]): string {
* }
*
* @param assessor: similiar as tuner
* @param advisor: similar as tuner
*
*/
function getMsgDispatcherCommand(tuner: any, assessor: any, multiPhase: boolean = false, multiThread: boolean = false): string {
let command: string = `python3 -m nni --tuner_class_name ${tuner.className}`;
function getMsgDispatcherCommand(tuner: any, assessor: any, advisor: any, multiPhase: boolean = false, multiThread: boolean = false): string {
if ((tuner || assessor) && advisor) {
throw new Error('Error: specify both tuner/assessor and advisor is not allowed');
}
if (!tuner && !advisor) {
throw new Error('Error: specify neither tuner nor advisor is not allowed');
}
let command: string = `python3 -m nni`;
if (multiPhase) {
command += ' --multi_phase';
}
......@@ -170,14 +183,25 @@ function getMsgDispatcherCommand(tuner: any, assessor: any, multiPhase: boolean
command += ' --multi_thread';
}
if (advisor) {
command += ` --advisor_class_name ${advisor.className}`;
if (advisor.classArgs !== undefined) {
command += ` --advisor_args ${JSON.stringify(JSON.stringify(advisor.classArgs))}`;
}
if (advisor.codeDir !== undefined && advisor.codeDir.length > 1) {
command += ` --advisor_directory ${advisor.codeDir}`;
}
if (advisor.classFileName !== undefined && advisor.classFileName.length > 1) {
command += ` --advisor_class_filename ${advisor.classFileName}`;
}
} else {
command += ` --tuner_class_name ${tuner.className}`;
if (tuner.classArgs !== undefined) {
command += ` --tuner_args ${JSON.stringify(JSON.stringify(tuner.classArgs))}`;
}
if (tuner.codeDir !== undefined && tuner.codeDir.length > 1) {
command += ` --tuner_directory ${tuner.codeDir}`;
}
if (tuner.classFileName !== undefined && tuner.classFileName.length > 1) {
command += ` --tuner_class_filename ${tuner.classFileName}`;
}
......@@ -187,15 +211,14 @@ function getMsgDispatcherCommand(tuner: any, assessor: any, multiPhase: boolean
if (assessor.classArgs !== undefined) {
command += ` --assessor_args ${JSON.stringify(JSON.stringify(assessor.classArgs))}`;
}
if (assessor.codeDir !== undefined && assessor.codeDir.length > 1) {
command += ` --assessor_directory ${assessor.codeDir}`;
}
if (assessor.classFileName !== undefined && assessor.classFileName.length > 1) {
command += ` --assessor_class_filename ${assessor.classFileName}`;
}
}
}
return command;
}
......@@ -321,6 +344,6 @@ function countFilesRecursively(directory: string, timeoutMilliSeconds?: number):
});
}
export {countFilesRecursively, getRemoteTmpDir, generateParamFileName, getMsgDispatcherCommand,
export {countFilesRecursively, getRemoteTmpDir, generateParamFileName, getMsgDispatcherCommand, getCheckpointDir,
getLogDir, getExperimentRootDir, getJobCancelStatus, getDefaultDatabaseDir, getIPV4Address,
mkDirP, delay, prepareUnitTest, parseArg, cleanupUnitTest, uniqueString, randomSelect };
......@@ -35,7 +35,7 @@ import {
import {
TrainingService, TrialJobApplicationForm, TrialJobDetail, TrialJobMetric, TrialJobStatus
} from '../common/trainingService';
import { delay, getLogDir, getMsgDispatcherCommand } from '../common/utils';
import { delay , getLogDir, getCheckpointDir, getMsgDispatcherCommand, mkDirP} from '../common/utils';
import {
ADD_CUSTOMIZED_TRIAL_JOB, INITIALIZE, INITIALIZED, KILL_TRIAL_JOB, NEW_TRIAL_JOB, NO_MORE_TRIAL_JOBS,
REPORT_METRIC_DATA, REQUEST_TRIAL_JOBS, SEND_TRIAL_JOB_PARAMETER, TERMINATE, TRIAL_END, UPDATE_SEARCH_SPACE
......@@ -127,15 +127,15 @@ class NNIManager implements Manager {
this.trainingService.setClusterMetadata('multiPhase', expParams.multiPhase.toString());
}
const dispatcherCommand: string = getMsgDispatcherCommand(
expParams.tuner, expParams.assessor, expParams.multiPhase, expParams.multiThread);
const dispatcherCommand: string = getMsgDispatcherCommand(expParams.tuner, expParams.assessor, expParams.advisor,
expParams.multiPhase, expParams.multiThread);
this.log.debug(`dispatcher command: ${dispatcherCommand}`);
const checkpointDir: string = await this.createCheckpointDir();
this.setupTuner(
//expParams.tuner.tunerCommand,
dispatcherCommand,
undefined,
'start',
expParams.tuner.checkpointDir);
checkpointDir);
this.experimentProfile.startTime = Date.now();
this.status.status = 'EXPERIMENT_RUNNING';
......@@ -160,14 +160,15 @@ class NNIManager implements Manager {
this.trainingService.setClusterMetadata('multiPhase', expParams.multiPhase.toString());
}
const dispatcherCommand: string = getMsgDispatcherCommand(
expParams.tuner, expParams.assessor, expParams.multiPhase, expParams.multiThread);
const dispatcherCommand: string = getMsgDispatcherCommand(expParams.tuner, expParams.assessor, expParams.advisor,
expParams.multiPhase, expParams.multiThread);
this.log.debug(`dispatcher command: ${dispatcherCommand}`);
const checkpointDir: string = await this.createCheckpointDir();
this.setupTuner(
dispatcherCommand,
undefined,
'resume',
expParams.tuner.checkpointDir);
checkpointDir);
const allTrialJobs: TrialJobInfo[] = await this.dataStore.listTrialJobs();
......@@ -371,13 +372,22 @@ class NNIManager implements Manager {
this.trialJobs.set(trialJobId, Object.assign({}, trialJobDetail));
await this.dataStore.storeTrialJobEvent(trialJobDetail.status, trialJobDetail.id, undefined, trialJobDetail);
}
let hyperParams: string | undefined = undefined;
switch (trialJobDetail.status) {
case 'SUCCEEDED':
case 'USER_CANCELED':
case 'EARLY_STOPPED':
this.trialJobs.delete(trialJobId);
finishedTrialJobNum++;
this.dispatcher.sendCommand(TRIAL_END, JSON.stringify({trial_job_id: trialJobDetail.id, event: trialJobDetail.status}));
if (trialJobDetail.form.jobType === 'TRIAL') {
hyperParams = (<TrialJobApplicationForm>trialJobDetail.form).hyperParameters.value;
} else {
throw new Error('Error: jobType error, not TRIAL');
}
this.dispatcher.sendCommand(TRIAL_END, JSON.stringify({
trial_job_id: trialJobDetail.id,
event: trialJobDetail.status,
hyper_params: hyperParams }));
break;
case 'FAILED':
case 'SYS_CANCELED':
......@@ -385,7 +395,15 @@ class NNIManager implements Manager {
// TO DO: push this job to queue for retry
this.trialJobs.delete(trialJobId);
finishedTrialJobNum++;
this.dispatcher.sendCommand(TRIAL_END, JSON.stringify({trial_job_id: trialJobDetail.id, event: trialJobDetail.status}));
if (trialJobDetail.form.jobType === 'TRIAL') {
hyperParams = (<TrialJobApplicationForm>trialJobDetail.form).hyperParameters.value;
} else {
throw new Error('Error: jobType error, not TRIAL');
}
this.dispatcher.sendCommand(TRIAL_END, JSON.stringify({
trial_job_id: trialJobDetail.id,
event: trialJobDetail.status,
hyper_params: hyperParams}));
break;
case 'WAITING':
case 'RUNNING':
......@@ -642,16 +660,30 @@ class NNIManager implements Manager {
maxExecDuration: 0, // unit: second
maxTrialNum: 0, // maxTrialNum includes all the submitted trial jobs
trainingServicePlatform: '',
searchSpace: '',
tuner: {
className: '',
classArgs: {},
checkpointDir: ''
}
searchSpace: ''
}
};
}
private async createCheckpointDir(): Promise<string> {
// TODO: test
const chkpDir: string = getCheckpointDir();
// create checkpoint directory
await mkDirP(chkpDir);
// assign this directory to exp profile's checkpointDir
if (this.experimentProfile.params.advisor) {
this.experimentProfile.params.advisor.checkpointDir = chkpDir;
}
if (this.experimentProfile.params.tuner) {
this.experimentProfile.params.tuner.checkpointDir = chkpDir;
}
if (this.experimentProfile.params.assessor) {
this.experimentProfile.params.assessor.checkpointDir = chkpDir;
}
return Promise.resolve(chkpDir);
}
private async storeMaxSequenceId(sequenceId: number): Promise<void> {
if (sequenceId > this.experimentProfile.maxSequenceId) {
this.experimentProfile.maxSequenceId = sequenceId;
......
......@@ -46,7 +46,9 @@ function startProcess(): void {
className: 'DummyAssessor',
codeDir: './',
classFileName: 'dummy_assessor.py'
}
},
// advisor
undefined
);
const proc: ChildProcess = spawn(dispatcherCmd, [], { stdio, cwd: 'core/test', shell: true });
......
......@@ -99,6 +99,15 @@ export namespace ValidationSchemas {
maxExecDuration: joi.number().min(0).required(),
multiPhase: joi.boolean(),
multiThread: joi.boolean(),
advisor: joi.object({
builtinAdvisorName: joi.string().valid('Hyperband'),
codeDir: joi.string(),
classFileName: joi.string(),
className: joi.string(),
classArgs:joi.any(),
gpuNum: joi.number().min(0),
checkpointDir: joi.string()
}),
tuner: joi.object({
builtinTunerName: joi.string().valid('TPE', 'Random', 'Anneal', 'Evolution', 'SMAC', 'BatchTuner', 'GridSearch'),
codeDir: joi.string(),
......@@ -107,7 +116,7 @@ export namespace ValidationSchemas {
classArgs: joi.any(),
gpuNum: joi.number().min(0),
checkpointDir: joi.string()
}).required(),
}),
assessor: joi.object({
builtinAssessorName: joi.string().valid('Medianstop'),
codeDir: joi.string(),
......
......@@ -6,10 +6,10 @@ For now, NNI has supported the following tuner algorithms. Note that NNI install
- Random Search
- Anneal
- Naive Evolution
- Grid Search
- SMAC (to install through `nnictl`)
- ENAS (ongoing)
- Batch (ongoing)
- Batch
- Grid Search
- Hyperband
## 1. Tuner algorithm introduction
......@@ -57,6 +57,10 @@ Note that the only acceptable types of search space are 'quniform', 'qloguniform
* Type 'quniform' will receive three values [low, high, q], where [low, high] specifies a range and 'q' specifies the number of values that will be sampled evenly. It will be sampled in a way that the first sampled value is 'low', and each of the following values is (high-low)/q larger that the value in front of it.
* Type 'qloguniform' behaves like 'quniform' except that it will first change the range to [log10(low), log10(high)] and sample and then change the sampled value back.
**Hyperband**
[Hyperband][6] tries to use limited resource to explore as many configurations as possible, and finds out the promising ones to get the final result. The basic idea is generating many configurations and to run them for small number of STEPs to find out promising one, then further training those promising ones to select several more promising one. More detail can be refered to [here](hyperband_advisor/README.md)
## 2. How to use the tuner algorithm in NNI?
User only need to do one thing: choose a Tuner```config.yaml```.
......@@ -86,3 +90,4 @@ There are two filed you need to set:
[3]: https://arxiv.org/pdf/1703.01041.pdf
[4]: https://www.cs.ubc.ca/~hutter/papers/10-TR-SMAC.pdf
[5]: https://github.com/automl/SMAC3
[6]: https://arxiv.org/pdf/1603.06560.pdf
\ No newline at end of file
......@@ -27,7 +27,7 @@ import logging
import json
import importlib
from .constants import ModuleName, ClassName, ClassArgs
from .constants import ModuleName, ClassName, ClassArgs, AdvisorModuleName, AdvisorClassName
from nni.common import enable_multi_thread
from nni.msg_dispatcher import MsgDispatcher
from nni.multi_phase.multi_phase_dispatcher import MultiPhaseMsgDispatcher
......@@ -41,7 +41,14 @@ def augment_classargs(input_class_args, classname):
input_class_args[key] = value
return input_class_args
def create_builtin_class_instance(classname, jsonstr_args):
def create_builtin_class_instance(classname, jsonstr_args, is_advisor = False):
if is_advisor:
if classname not in AdvisorModuleName or \
importlib.util.find_spec(AdvisorModuleName[classname]) is None:
raise RuntimeError('Advisor module is not found: {}'.format(classname))
class_module = importlib.import_module(AdvisorModuleName[classname])
class_constructor = getattr(class_module, AdvisorClassName[classname])
else:
if classname not in ModuleName or \
importlib.util.find_spec(ModuleName[classname]) is None:
raise RuntimeError('Tuner module is not found: {}'.format(classname))
......@@ -62,7 +69,7 @@ def create_customized_class_instance(class_dir, class_filename, classname, jsons
raise ValueError('Class file not found: {}'.format(
os.path.join(class_dir, class_filename)))
sys.path.append(class_dir)
module_name = class_filename.split('.')[0]
module_name = os.path.splitext(class_filename)[0]
class_module = importlib.import_module(module_name)
class_constructor = getattr(class_module, classname)
if jsonstr_args:
......@@ -74,7 +81,16 @@ def create_customized_class_instance(class_dir, class_filename, classname, jsons
def parse_args():
parser = argparse.ArgumentParser(description='parse command line parameters.')
parser.add_argument('--tuner_class_name', type=str, required=True,
parser.add_argument('--advisor_class_name', type=str, required=False,
help='Advisor class name, the class must be a subclass of nni.MsgDispatcherBase')
parser.add_argument('--advisor_class_filename', type=str, required=False,
help='Advisor class file path')
parser.add_argument('--advisor_args', type=str, required=False,
help='Parameters pass to advisor __init__ constructor')
parser.add_argument('--advisor_directory', type=str, required=False,
help='Advisor directory')
parser.add_argument('--tuner_class_name', type=str, required=False,
help='Tuner class name, the class must be a subclass of nni.Tuner')
parser.add_argument('--tuner_class_filename', type=str, required=False,
help='Tuner class file path')
......@@ -91,6 +107,7 @@ def parse_args():
help='Assessor directory')
parser.add_argument('--assessor_class_filename', type=str, required=False,
help='Assessor class file path')
parser.add_argument('--multi_phase', action='store_true')
parser.add_argument('--multi_thread', action='store_true')
......@@ -106,9 +123,31 @@ def main():
if args.multi_thread:
enable_multi_thread()
if args.advisor_class_name:
# advisor is enabled and starts to run
if args.multi_phase:
raise AssertionError('multi_phase has not been supported in advisor')
if args.advisor_class_name in AdvisorModuleName:
dispatcher = create_builtin_class_instance(
args.advisor_class_name,
args.advisor_args, True)
else:
dispatcher = create_customized_class_instance(
args.advisor_directory,
args.advisor_class_filename,
args.advisor_class_name,
args.advisor_args)
if dispatcher is None:
raise AssertionError('Failed to create Advisor instance')
try:
dispatcher.run()
except Exception as exception:
logger.exception(exception)
raise
else:
# tuner (and assessor) is enabled and starts to run
tuner = None
assessor = None
if args.tuner_class_name in ModuleName:
tuner = create_builtin_class_instance(
args.tuner_class_name,
......
......@@ -53,3 +53,11 @@ ClassArgs = {
'algorithm_name': 'anneal'
}
}
AdvisorModuleName = {
'Hyperband': 'nni.hyperband_advisor.hyperband_advisor'
}
AdvisorClassName = {
'Hyperband': 'Hyperband'
}
\ No newline at end of file
Hyperband on nni
===
## 1. Introduction
[Hyperband][1] is a popular automl algorithm. The basic idea of Hyperband is that it creates several brackets, each bracket has `n` randomly generated hyperparameter configurations, each configuration uses `r` resource (e.g., epoch number, batch number). After the `n` configurations is finished, it chooses top `n/eta` configurations and runs them using increased `r*eta` resource. At last, it chooses the best configuration it has found so far.
## 2. Implementation with fully parallelism
Frist, this is an example of how to write an automl algorithm based on MsgDispatcherBase, rather than Tuner and Assessor. Hyperband is implemented in this way because it integrates the functions of both Tuner and Assessor, thus, we call it advisor.
Second, this implementation fully leverages Hyperband's internal parallelism. More specifically, the next bracket is not started strictly after the current bracket, instead, it starts when there is available resource.
## 3. Usage
To use Hyperband, you should add the following spec in your experiment's yaml config file:
```
advisor:
#choice: Hyperband
builtinAdvisorName: Hyperband
classArgs:
#R: the maximum STEPS
R: 100
#eta: proportion of discarded trials
eta: 3
#choice: maximize, minimize
optimize_mode: maximize
```
Note that once you use advisor, it is not allowed to add tuner and assessor spec in the config file any more.
If you use Hyperband, among the hyperparameters (i.e., key-value pairs) received by a trial, there is one more key called `STEPS` besides the hyperparameters defined by user. By using this `STEPS`, the trial can control how long it runs.
`R` and `eta` are the parameters of Hyperband that you can change. `R` means the maximum STEPS that can be allocated to a configuration. Here, STEPS could mean the number of epochs or mini-batches. This `STEPS` should be used by the trial to control how long it runs. Refer to the example under `examples/trials/mnist-hyperband/` for details.
`eta` means `n/eta` configurations from `n` configurations will survive and rerun using more STEPS.
Here is a concrete example of `R=81` and `eta=3`:
| | s=4 | s=3 | s=2 | s=1 | s=0 |
|------|-----|-----|-----|-----|-----|
|i | n r | n r | n r | n r | n r |
|0 |81 1 |27 3 |9 9 |6 27 |5 81 |
|1 |27 3 |9 9 |3 27 |2 81 | |
|2 |9 9 |3 27 |1 81 | | |
|3 |3 27 |1 81 | | | |
|4 |1 81 | | | | |
`s` means bracket, `n` means the number of configurations that are generated, the corresponding `r` means how many STEPS these configurations run. `i` means round, for example, bracket 4 has 5 rounds, bracket 3 has 4 rounds.
About how to write trial code, please refer to the instructions under `examples/trials/mnist-hyperband/`.
## 4. To be improved
The current implementation of Hyperband can be further improved by supporting simple early stop algorithm, because it is possible that not all the configurations in the top `n/eta` perform good. The unpromising configurations can be stopped early.
In the current implementation, configurations are generated randomly, which follows the design in the [paper][1]. To further improve, configurations could be generated more wisely by leveraging advanced algorithms.
[1]: https://arxiv.org/pdf/1603.06560.pdf
# Copyright (c) Microsoft Corporation
# All rights reserved.
#
# MIT License
#
# Permission is hereby granted, free of charge,
# to any person obtaining a copy of this software and associated
# documentation files (the "Software"), to deal in the Software without restriction,
# including without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and
# to permit persons to whom the Software is furnished to do so, subject to the following conditions:
# The above copyright notice and this permission notice shall be included
# in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING
# BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
# DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
'''
hyperband_tuner.py
'''
from enum import Enum, unique
import math
import copy
import logging
import numpy as np
import json_tricks
from nni.protocol import CommandType, send
from nni.msg_dispatcher_base import MsgDispatcherBase
from nni.common import init_logger
from .. import parameter_expressions
_logger = logging.getLogger(__name__)
_next_parameter_id = 0
_KEY = 'STEPS'
@unique
class OptimizeMode(Enum):
'''
Oprimize Mode class
'''
Minimize = 'minimize'
Maximize = 'maximize'
def create_parameter_id():
'''
Create an id
'''
global _next_parameter_id # pylint: disable=global-statement
_next_parameter_id += 1
return _next_parameter_id - 1
def create_bracket_parameter_id(brackets_id, brackets_curr_decay, increased_id=-1):
'''
Create a full id for a specific bracket's hyperparameter configuration
'''
if increased_id == -1:
increased_id = str(create_parameter_id())
params_id = '_'.join([str(brackets_id),
str(brackets_curr_decay),
increased_id])
return params_id
def json2paramater(ss_spec, random_state):
'''
Randomly generate values for hyperparameters from hyperparameter space i.e., x.
ss_spec: hyperparameter space
random_state: random operator to generate random values
'''
if isinstance(ss_spec, dict):
if '_type' in ss_spec.keys():
_type = ss_spec['_type']
_value = ss_spec['_value']
if _type == 'choice':
_index = random_state.randint(len(_value))
chosen_params = json2paramater(ss_spec['_value'][_index], random_state)
else:
chosen_params = eval('parameter_expressions.' + # pylint: disable=eval-used
_type)(*(_value + [random_state]))
else:
chosen_params = dict()
for key in ss_spec.keys():
chosen_params[key] = json2paramater(ss_spec[key], random_state)
elif isinstance(ss_spec, list):
chosen_params = list()
for _, subspec in enumerate(ss_spec):
chosen_params.append(json2paramater(subspec, random_state))
else:
chosen_params = copy.deepcopy(ss_spec)
return chosen_params
class Bracket():
'''
A bracket in Hyperband, all the information of a bracket is managed by an instance of this class
'''
def __init__(self, s, s_max, eta, R, optimize_mode):
self.bracket_id = s
self.s_max = s_max
self.eta = eta
self.n = math.ceil((s_max + 1) * (eta**s) / (s + 1)) # pylint: disable=invalid-name
self.r = math.ceil(R / eta**s) # pylint: disable=invalid-name
self.i = 0
self.hyper_configs = [] # [ {id: params}, {}, ... ]
self.configs_perf = [] # [ {id: [seq, acc]}, {}, ... ]
self.num_configs_to_run = [] # [ n, n, n, ... ]
self.num_finished_configs = [] # [ n, n, n, ... ]
self.optimize_mode = optimize_mode
self.no_more_trial = False
def is_completed(self):
'''
check whether this bracket has sent out all the hyperparameter configurations
'''
return self.no_more_trial
def get_n_r(self):
'''
return the values of n and r for the next round
'''
return math.floor(self.n / self.eta**self.i), self.r * self.eta**self.i
def increase_i(self):
'''
i means the ith round. Increase i by 1
'''
self.i += 1
if self.i > self.bracket_id:
self.no_more_trial = True
def set_config_perf(self, i, parameter_id, seq, value):
'''
update trial's latest result with its sequence number, e.g., epoch number or batch number
i: the ith round
parameter_id: the id of the trial/parameter
seq: sequence number, e.g., epoch number or batch number
value: latest result with sequence number seq
'''
if parameter_id in self.configs_perf[i]:
# this should always be true if there is no retry in training service
_logger.debug('assertion: %d %d, %s %s\n',
self.configs_perf[i][parameter_id][0],
seq,
str(type(self.configs_perf[i][parameter_id][0])),
str(type(seq)))
# assert self.configs_perf[i][parameter_id][0] < seq
if self.configs_perf[i][parameter_id][0] < seq:
self.configs_perf[i][parameter_id] = [seq, value]
else:
self.configs_perf[i][parameter_id] = [seq, value]
def inform_trial_end(self, i):
'''
If the trial is finished and the corresponding round (i.e., i) has all its trials finished,
it will choose the top k trials for the next round (i.e., i+1)
'''
global _KEY # pylint: disable=global-statement
self.num_finished_configs[i] += 1
_logger.debug('bracket id: %d, round: %d %d, finished: %d, all: %d', self.bracket_id, self.i, i, self.num_finished_configs[i], self.num_configs_to_run[i])
if self.num_finished_configs[i] >= self.num_configs_to_run[i] \
and self.no_more_trial is False:
# choose candidate configs from finished configs to run in the next round
assert self.i == i + 1
this_round_perf = self.configs_perf[i]
if self.optimize_mode is OptimizeMode.Maximize:
sorted_perf = sorted(this_round_perf.items(), key=lambda kv: kv[1][1], reverse=True) # reverse
else:
sorted_perf = sorted(this_round_perf.items(), key=lambda kv: kv[1][1])
_logger.debug('bracket %s next round %s, sorted hyper configs: %s', self.bracket_id, self.i, sorted_perf)
next_n, next_r = self.get_n_r()
_logger.debug('bracket %s next round %s, next_n=%d, next_r=%d', self.bracket_id, self.i, next_n, next_r)
hyper_configs = dict()
for k in range(next_n):
params_id = sorted_perf[k][0]
params = self.hyper_configs[i][params_id]
params[_KEY] = next_r # modify r
# generate new id
increased_id = params_id.split('_')[-1]
new_id = create_bracket_parameter_id(self.bracket_id, self.i, increased_id)
hyper_configs[new_id] = params
self._record_hyper_configs(hyper_configs)
return [[key, value] for key, value in hyper_configs.items()]
return None
def get_hyperparameter_configurations(self, num, r, searchspace_json, random_state): # pylint: disable=invalid-name
'''
Randomly generate num hyperparameter configurations from search space
num: the number of hyperparameter configurations
'''
global _KEY # pylint: disable=global-statement
assert self.i == 0
hyperparameter_configs = dict()
for _ in range(num):
params_id = create_bracket_parameter_id(self.bracket_id, self.i)
params = json2paramater(searchspace_json, random_state)
params[_KEY] = r
hyperparameter_configs[params_id] = params
self._record_hyper_configs(hyperparameter_configs)
return [[key, value] for key, value in hyperparameter_configs.items()]
def _record_hyper_configs(self, hyper_configs):
'''
after generating one round of hyperconfigs, this function records the generated hyperconfigs,
creates a dict to record the performance when those hyperconifgs are running, set the number of finished configs
in this round to be 0, and increase the round number.
'''
self.hyper_configs.append(hyper_configs)
self.configs_perf.append(dict())
self.num_finished_configs.append(0)
self.num_configs_to_run.append(len(hyper_configs))
self.increase_i()
class Hyperband(MsgDispatcherBase):
'''
Hyperband inherit from MsgDispatcherBase rather than Tuner,
because it integrates both tuner's functions and assessor's functions.
This is an implementation that could fully leverage available resources, i.e., high parallelism.
A single execution of Hyperband takes a finite budget of (s_max + 1)B.
'''
def __init__(self, R, eta=3, optimize_mode='maximize'):
'''
R: the maximum amount of resource that can be allocated to a single configuration
eta: the variable that controls the proportion of configurations discarded in each round of SuccessiveHalving
B = (s_max + 1)R
'''
super()
self.R = R # pylint: disable=invalid-name
self.eta = eta
self.brackets = dict() # dict of Bracket
self.generated_hyper_configs = [] # all the configs waiting for run
self.completed_hyper_configs = [] # all the completed configs
self.s_max = math.floor(math.log(self.R, self.eta))
self.curr_s = self.s_max
self.searchspace_json = None
self.random_state = None
self.optimize_mode = OptimizeMode(optimize_mode)
# This is for the case that nnimanager requests trial config, but tuner cannot provide immediately.
# In this case, tuner increases self.credit to issue a trial config sometime later.
self.credit = 0
def load_checkpoint(self):
pass
def save_checkpont(self):
pass
def handle_initialize(self, data):
'''
data is search space
'''
self.handle_update_search_space(data)
send(CommandType.Initialized, '')
return True
def handle_request_trial_jobs(self, data):
'''
data: number of trial jobs
'''
for _ in range(data):
self._request_one_trial_job()
return True
def _request_one_trial_job(self):
'''
get one trial job, i.e., one hyperparameter configuration.
'''
if not self.generated_hyper_configs:
if self.curr_s < 0:
# have tried all configurations
ret = {
'parameter_id': '-1_0_0',
'parameter_source': 'algorithm',
'parameters': ''
}
send(CommandType.NoMoreTrialJobs, json_tricks.dumps(ret))
self.credit += 1
return True
_logger.debug('create a new bracket, self.curr_s=%d', self.curr_s)
self.brackets[self.curr_s] = Bracket(self.curr_s, self.s_max, self.eta, self.R, self.optimize_mode)
next_n, next_r = self.brackets[self.curr_s].get_n_r()
_logger.debug('new bracket, next_n=%d, next_r=%d', next_n, next_r)
assert self.searchspace_json is not None and self.random_state is not None
generated_hyper_configs = self.brackets[self.curr_s].get_hyperparameter_configurations(next_n, next_r,
self.searchspace_json,
self.random_state)
self.generated_hyper_configs = generated_hyper_configs.copy()
self.curr_s -= 1
assert self.generated_hyper_configs
params = self.generated_hyper_configs.pop()
ret = {
'parameter_id': params[0],
'parameter_source': 'algorithm',
'parameters': params[1]
}
send(CommandType.NewTrialJob, json_tricks.dumps(ret))
return True
def handle_update_search_space(self, data):
'''
data: JSON object, which is search space
'''
self.searchspace_json = data
self.random_state = np.random.RandomState()
return True
def handle_trial_end(self, data):
'''
data: it has three keys: trial_job_id, event, hyper_params
trial_job_id: the id generated by training service
event: the job's state
hyper_params: the hyperparameters (a string) generated and returned by tuner
'''
hyper_params = json_tricks.loads(data['hyper_params'])
bracket_id, i, _ = hyper_params['parameter_id'].split('_')
hyper_configs = self.brackets[int(bracket_id)].inform_trial_end(int(i))
if hyper_configs is not None:
_logger.debug('bracket %s next round %s, hyper_configs: %s', bracket_id, i, hyper_configs)
self.generated_hyper_configs = self.generated_hyper_configs + hyper_configs
for _ in range(self.credit):
if not self.generated_hyper_configs:
break
params = self.generated_hyper_configs.pop()
ret = {
'parameter_id': params[0],
'parameter_source': 'algorithm',
'parameters': params[1]
}
send(CommandType.NewTrialJob, json_tricks.dumps(ret))
self.credit -= 1
return True
def handle_report_metric_data(self, data):
'''
data: it is an object which has keys 'parameter_id', 'value', 'trial_job_id', 'type', 'sequence'.
'''
if data['type'] == 'FINAL':
self.completed_hyper_configs.append(data)
elif data['type'] == 'PERIODICAL':
bracket_id, i, _ = data['parameter_id'].split('_')
bracket_id = int(bracket_id)
self.brackets[bracket_id].set_config_perf(int(i), data['parameter_id'], data['sequence'], data['value'])
else:
raise ValueError('Data type not supported: {}'.format(data['type']))
return True
def handle_add_customized_trial(self, data):
pass
......@@ -142,6 +142,12 @@ class MsgDispatcher(MsgDispatcherBase):
return True
def handle_trial_end(self, data):
"""
data: it has three keys: trial_job_id, event, hyper_params
trial_job_id: the id generated by training service
event: the job's state
hyper_params: the hyperparameters generated and returned by tuner
"""
trial_job_id = data['trial_job_id']
_ended_trials.add(trial_job_id)
if trial_job_id in _trial_history:
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment