"vscode:/vscode.git/clone" did not exist on "86c464cab5cf03f8d009e945a6fc9d4755e4822b"
Unverified Commit ac6aee81 authored by chicm-ms's avatar chicm-ms Committed by GitHub
Browse files

Multiphase refactor and support OpenPAI training service. (#1138)

* Refactor multiphase interface

* Implement multiphase on PAI

* update multiphase doc
parent 1bd3637f
......@@ -123,7 +123,7 @@ class NetworkMorphismTuner(Tuner):
"""
self.search_space = search_space
def generate_parameters(self, parameter_id):
def generate_parameters(self, parameter_id, **kwargs):
"""
Returns a set of trial neural architecture, as a serializable object.
......@@ -152,7 +152,7 @@ class NetworkMorphismTuner(Tuner):
return json_out
def receive_trial_result(self, parameter_id, parameters, value):
def receive_trial_result(self, parameter_id, parameters, value, **kwargs):
""" Record an observation of the objective function.
Parameters
......
......@@ -151,7 +151,7 @@ class SMACTuner(Tuner):
else:
self.logger.warning('update search space is not supported.')
def receive_trial_result(self, parameter_id, parameters, value):
def receive_trial_result(self, parameter_id, parameters, value, **kwargs):
"""receive_trial_result
Parameters
......@@ -209,7 +209,7 @@ class SMACTuner(Tuner):
converted_dict[key] = value
return converted_dict
def generate_parameters(self, parameter_id):
def generate_parameters(self, parameter_id, **kwargs):
"""generate one instance of hyperparameters
Parameters
......@@ -232,7 +232,7 @@ class SMACTuner(Tuner):
self.total_data[parameter_id] = challenger
return self.convert_loguniform_categorical(challenger.get_dictionary())
def generate_multiple_parameters(self, parameter_id_list):
def generate_multiple_parameters(self, parameter_id_list, **kwargs):
"""generate mutiple instances of hyperparameters
Parameters
......
......@@ -30,14 +30,14 @@ _logger = logging.getLogger(__name__)
class Tuner(Recoverable):
# pylint: disable=no-self-use,unused-argument
def generate_parameters(self, parameter_id):
def generate_parameters(self, parameter_id, **kwargs):
"""Returns a set of trial (hyper-)parameters, as a serializable object.
User code must override either this function or 'generate_multiple_parameters()'.
parameter_id: int
"""
raise NotImplementedError('Tuner: generate_parameters not implemented')
def generate_multiple_parameters(self, parameter_id_list):
def generate_multiple_parameters(self, parameter_id_list, **kwargs):
"""Returns multiple sets of trial (hyper-)parameters, as iterable of serializable objects.
Call 'generate_parameters()' by 'count' times by default.
User code must override either this function or 'generate_parameters()'.
......@@ -49,13 +49,13 @@ class Tuner(Recoverable):
for parameter_id in parameter_id_list:
try:
_logger.debug("generating param for {}".format(parameter_id))
res = self.generate_parameters(parameter_id)
res = self.generate_parameters(parameter_id, **kwargs)
except nni.NoMoreTrialError:
return result
result.append(res)
return result
def receive_trial_result(self, parameter_id, parameters, value):
def receive_trial_result(self, parameter_id, parameters, value, **kwargs):
"""Invoked when a trial reports its final result. Must override.
parameter_id: int
parameters: object created by 'generate_parameters()'
......@@ -63,7 +63,7 @@ class Tuner(Recoverable):
"""
raise NotImplementedError('Tuner: receive_trial_result not implemented')
def receive_customized_trial_result(self, parameter_id, parameters, value):
def receive_customized_trial_result(self, parameter_id, parameters, value, **kwargs):
"""Invoked when a trial added by WebUI reports its final result. Do nothing by default.
parameter_id: int
parameters: object created by user
......@@ -71,7 +71,7 @@ class Tuner(Recoverable):
"""
_logger.info('Customized trial job %s ignored by tuner', parameter_id)
def trial_end(self, parameter_id, success):
def trial_end(self, parameter_id, success, **kwargs):
"""Invoked when a trial is completed or terminated. Do nothing by default.
parameter_id: int
success: True if the trial successfully completed; False if failed or terminated
......
# Copyright (c) Microsoft Corporation. All rights reserved.
#
# MIT License
#
# Permission is hereby granted, free of charge, to any person obtaining a copy of this software and
# associated documentation files (the "Software"), to deal in the Software without restriction,
# including without limitation the rights to use, copy, modify, merge, publish, distribute,
# sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all copies or
# substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT
# NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
# DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT
# OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
# ==================================================================================================
import logging
import random
from io import BytesIO
import nni
import nni.protocol
from nni.protocol import CommandType, send, receive
from nni.multi_phase.multi_phase_tuner import MultiPhaseTuner
from nni.multi_phase.multi_phase_dispatcher import MultiPhaseMsgDispatcher
from unittest import TestCase, main
class NaiveMultiPhaseTuner(MultiPhaseTuner):
'''
supports only choices
'''
def __init__(self):
self.search_space = None
def generate_parameters(self, parameter_id, trial_job_id=None):
"""Returns a set of trial (hyper-)parameters, as a serializable object.
User code must override either this function or 'generate_multiple_parameters()'.
parameter_id: int
"""
generated_parameters = {}
if self.search_space is None:
raise AssertionError('Search space not specified')
for k in self.search_space:
param = self.search_space[k]
if not param['_type'] == 'choice':
raise ValueError('Only choice type is supported')
param_values = param['_value']
generated_parameters[k] = param_values[random.randint(0, len(param_values)-1)]
logging.getLogger(__name__).debug(generated_parameters)
return generated_parameters
def receive_trial_result(self, parameter_id, parameters, value, trial_job_id):
logging.getLogger(__name__).debug('receive_trial_result: {},{},{},{}'.format(parameter_id, parameters, value, trial_job_id))
def receive_customized_trial_result(self, parameter_id, parameters, value, trial_job_id):
pass
def update_search_space(self, search_space):
self.search_space = search_space
_in_buf = BytesIO()
_out_buf = BytesIO()
def _reverse_io():
_in_buf.seek(0)
_out_buf.seek(0)
nni.protocol._out_file = _in_buf
nni.protocol._in_file = _out_buf
def _restore_io():
_in_buf.seek(0)
_out_buf.seek(0)
nni.protocol._in_file = _in_buf
nni.protocol._out_file = _out_buf
def _test_tuner():
_reverse_io() # now we are sending to Tuner's incoming stream
send(CommandType.UpdateSearchSpace, "{\"learning_rate\": {\"_value\": [0.0001, 0.001, 0.002, 0.005, 0.01], \"_type\": \"choice\"}, \"optimizer\": {\"_value\": [\"Adam\", \"SGD\"], \"_type\": \"choice\"}}")
send(CommandType.RequestTrialJobs, '2')
send(CommandType.ReportMetricData, '{"parameter_id":0,"type":"PERIODICAL","value":10,"trial_job_id":"abc"}')
send(CommandType.ReportMetricData, '{"parameter_id":1,"type":"FINAL","value":11,"trial_job_id":"abc"}')
send(CommandType.AddCustomizedTrialJob, '{"param":-1}')
send(CommandType.ReportMetricData, '{"parameter_id":2,"type":"FINAL","value":22,"trial_job_id":"abc"}')
send(CommandType.RequestTrialJobs, '1')
send(CommandType.TrialEnd, '{"trial_job_id":"abc"}')
_restore_io()
tuner = NaiveMultiPhaseTuner()
dispatcher = MultiPhaseMsgDispatcher(tuner)
dispatcher.run()
_reverse_io() # now we are receiving from Tuner's outgoing stream
command, data = receive() # this one is customized
print(command, data)
class MultiPhaseTestCase(TestCase):
def test_tuner(self):
_test_tuner()
if __name__ == '__main__':
main()
\ No newline at end of file
......@@ -35,7 +35,7 @@ class NaiveTuner(Tuner):
self.trial_results = [ ]
self.search_space = None
def generate_parameters(self, parameter_id):
def generate_parameters(self, parameter_id, **kwargs):
# report Tuner's internal states to generated parameters,
# so we don't need to pause the main loop
self.param += 2
......@@ -45,7 +45,7 @@ class NaiveTuner(Tuner):
'search_space': self.search_space
}
def receive_trial_result(self, parameter_id, parameters, value):
def receive_trial_result(self, parameter_id, parameters, value, **kwargs):
reward = extract_scalar_reward(value)
self.trial_results.append((parameter_id, parameters['param'], reward, False))
......@@ -103,11 +103,9 @@ class TunerTestCase(TestCase):
command, data = receive() # this one is customized
data = json.loads(data)
self.assertIs(command, CommandType.NewTrialJob)
self.assertEqual(data, {
'parameter_id': 2,
'parameter_source': 'customized',
'parameters': { 'param': -1 }
})
self.assertEqual(data['parameter_id'], 2)
self.assertEqual(data['parameter_source'], 'customized')
self.assertEqual(data['parameters'], { 'param': -1 })
self._assert_params(3, 6, [[1,4,11,False], [2,-1,22,True]], {'name':'SS0'})
......
......@@ -22,7 +22,7 @@ class SimpleTuner(Tuner):
self.sig_event = Event()
self.thread_lock = Lock()
def generate_parameters(self, parameter_id):
def generate_parameters(self, parameter_id, **kwargs):
if self.f_id is None:
self.thread_lock.acquire()
self.f_id = parameter_id
......@@ -50,7 +50,7 @@ class SimpleTuner(Tuner):
self.thread_lock.release()
return self.trial_meta[parameter_id]
def receive_trial_result(self, parameter_id, parameters, reward):
def receive_trial_result(self, parameter_id, parameters, reward, **kwargs):
self.thread_lock.acquire()
if parameter_id == self.f_id:
self.trial_meta[parameter_id]['checksum'] = reward['checksum']
......
......@@ -6,9 +6,9 @@ trialConcurrency: 4
searchSpacePath: ./search_space.json
tuner:
codeDir: ../../../src/sdk/pynni/tests
classFileName: test_multi_phase_tuner.py
className: NaiveMultiPhaseTuner
builtinTunerName: TPE
classArgs:
optimize_mode: maximize
trial:
codeDir: .
......
authorName: nni
experimentName: default_test
maxExecDuration: 5m
maxTrialNum: 8
trialConcurrency: 4
searchSpacePath: ./search_space.json
tuner:
builtinTunerName: BatchTuner
trial:
codeDir: .
command: python3 multi_phase.py
gpuNum: 0
useAnnotation: false
multiPhase: true
multiThread: false
trainingServicePlatform: local
authorName: nni
experimentName: default_test
maxExecDuration: 5m
maxTrialNum: 8
trialConcurrency: 4
searchSpacePath: ./search_space.json
tuner:
builtinTunerName: Evolution
classArgs:
optimize_mode: maximize
trial:
codeDir: .
command: python3 multi_phase.py
gpuNum: 0
useAnnotation: false
multiPhase: true
multiThread: false
trainingServicePlatform: local
authorName: nni
experimentName: default_test
maxExecDuration: 5m
maxTrialNum: 8
trialConcurrency: 4
searchSpacePath: ./search_space.json
tuner:
builtinTunerName: GridSearch
trial:
codeDir: .
command: python3 multi_phase.py
gpuNum: 0
useAnnotation: false
multiPhase: true
multiThread: false
trainingServicePlatform: local
authorName: nni
experimentName: default_test
maxExecDuration: 5m
maxTrialNum: 8
trialConcurrency: 4
searchSpacePath: ./search_space.json
tuner:
builtinTunerName: MetisTuner
classArgs:
optimize_mode: maximize
trial:
codeDir: .
command: python3 multi_phase.py
gpuNum: 0
useAnnotation: false
multiPhase: true
multiThread: false
trainingServicePlatform: local
authorName: nni
experimentName: default_test
maxExecDuration: 5m
maxTrialNum: 8
trialConcurrency: 4
searchSpacePath: ./search_space.json
tuner:
builtinTunerName: TPE
classArgs:
optimize_mode: maximize
trial:
codeDir: .
command: python3 multi_phase.py
gpuNum: 0
useAnnotation: false
multiPhase: true
multiThread: false
trainingServicePlatform: local
......@@ -6,7 +6,7 @@ class MultiThreadTuner(Tuner):
def __init__(self):
self.parent_done = False
def generate_parameters(self, parameter_id):
def generate_parameters(self, parameter_id, **kwargs):
if parameter_id == 0:
return {'x': 0}
else:
......@@ -14,7 +14,7 @@ class MultiThreadTuner(Tuner):
time.sleep(2)
return {'x': 1}
def receive_trial_result(self, parameter_id, parameters, value):
def receive_trial_result(self, parameter_id, parameters, value, **kwargs):
if parameter_id == 0:
self.parent_done = True
......
......@@ -16,12 +16,12 @@ class NaiveTuner(Tuner):
self.cur = 0
_logger.info('init')
def generate_parameters(self, parameter_id):
def generate_parameters(self, parameter_id, **kwargs):
self.cur += 1
_logger.info('generate parameters: %s' % self.cur)
return { 'x': self.cur }
def receive_trial_result(self, parameter_id, parameters, value):
def receive_trial_result(self, parameter_id, parameters, value, **kwargs):
reward = extract_scalar_reward(value)
_logger.info('receive trial result: %s, %s, %s' % (parameter_id, parameters, reward))
_result.write('%d %d\n' % (parameters['x'], reward))
......
......@@ -36,6 +36,8 @@ STDERR_FULL_PATH = os.path.join(LOG_DIR, 'stderr')
STDOUT_API = '/stdout'
VERSION_API = '/version'
PARAMETER_META_API = '/parameter-file-meta'
NNI_SYS_DIR = os.environ['NNI_SYS_DIR']
NNI_TRIAL_JOB_ID = os.environ['NNI_TRIAL_JOB_ID']
NNI_EXP_ID = os.environ['NNI_EXP_ID']
MULTI_PHASE = os.environ['MULTI_PHASE']
......@@ -28,30 +28,27 @@ import re
import sys
import select
import json
import threading
from pyhdfs import HdfsClient
import pkg_resources
from .rest_utils import rest_post
from .url_utils import gen_send_stdout_url, gen_send_version_url
from .rest_utils import rest_post, rest_get
from .url_utils import gen_send_stdout_url, gen_send_version_url, gen_parameter_meta_url
from .constants import HOME_DIR, LOG_DIR, NNI_PLATFORM, STDOUT_FULL_PATH, STDERR_FULL_PATH
from .hdfsClientUtility import copyDirectoryToHdfs, copyHdfsDirectoryToLocal
from .constants import HOME_DIR, LOG_DIR, NNI_PLATFORM, STDOUT_FULL_PATH, STDERR_FULL_PATH, \
MULTI_PHASE, NNI_TRIAL_JOB_ID, NNI_SYS_DIR, NNI_EXP_ID
from .hdfsClientUtility import copyDirectoryToHdfs, copyHdfsDirectoryToLocal, copyHdfsFileToLocal
from .log_utils import LogType, nni_log, RemoteLogger, PipeLogReader, StdOutputType
logger = logging.getLogger('trial_keeper')
regular = re.compile('v?(?P<version>[0-9](\.[0-9]){0,1}).*')
def main_loop(args):
'''main loop logic for trial keeper'''
_hdfs_client = None
if not os.path.exists(LOG_DIR):
os.makedirs(LOG_DIR)
def get_hdfs_client(args):
global _hdfs_client
stdout_file = open(STDOUT_FULL_PATH, 'a+')
stderr_file = open(STDERR_FULL_PATH, 'a+')
trial_keeper_syslogger = RemoteLogger(args.nnimanager_ip, args.nnimanager_port, 'trial_keeper', StdOutputType.Stdout, args.log_collection)
# redirect trial keeper's stdout and stderr to syslog
trial_syslogger_stdout = RemoteLogger(args.nnimanager_ip, args.nnimanager_port, 'trial', StdOutputType.Stdout, args.log_collection)
sys.stdout = sys.stderr = trial_keeper_syslogger
if _hdfs_client is not None:
return _hdfs_client
# backward compatibility
hdfs_host = None
hdfs_output_dir = None
......@@ -59,21 +56,41 @@ def main_loop(args):
hdfs_host = args.hdfs_host
elif args.pai_hdfs_host:
hdfs_host = args.pai_hdfs_host
if args.hdfs_output_dir:
hdfs_output_dir = args.hdfs_output_dir
elif args.pai_hdfs_output_dir:
hdfs_output_dir = args.pai_hdfs_output_dir
else:
return None
if hdfs_host is not None and args.nni_hdfs_exp_dir is not None:
try:
if args.webhdfs_path:
hdfs_client = HdfsClient(hosts='{0}:80'.format(hdfs_host), user_name=args.pai_user_name, webhdfs_path=args.webhdfs_path, timeout=5)
_hdfs_client = HdfsClient(hosts='{0}:80'.format(hdfs_host), user_name=args.pai_user_name, webhdfs_path=args.webhdfs_path, timeout=5)
else:
# backward compatibility
hdfs_client = HdfsClient(hosts='{0}:{1}'.format(hdfs_host, '50070'), user_name=args.pai_user_name, timeout=5)
_hdfs_client = HdfsClient(hosts='{0}:{1}'.format(hdfs_host, '50070'), user_name=args.pai_user_name, timeout=5)
except Exception as e:
nni_log(LogType.Error, 'Create HDFS client error: ' + str(e))
raise e
return _hdfs_client
def main_loop(args):
'''main loop logic for trial keeper'''
if not os.path.exists(LOG_DIR):
os.makedirs(LOG_DIR)
stdout_file = open(STDOUT_FULL_PATH, 'a+')
stderr_file = open(STDERR_FULL_PATH, 'a+')
trial_keeper_syslogger = RemoteLogger(args.nnimanager_ip, args.nnimanager_port, 'trial_keeper', StdOutputType.Stdout, args.log_collection)
# redirect trial keeper's stdout and stderr to syslog
trial_syslogger_stdout = RemoteLogger(args.nnimanager_ip, args.nnimanager_port, 'trial', StdOutputType.Stdout, args.log_collection)
sys.stdout = sys.stderr = trial_keeper_syslogger
if args.hdfs_output_dir:
hdfs_output_dir = args.hdfs_output_dir
elif args.pai_hdfs_output_dir:
hdfs_output_dir = args.pai_hdfs_output_dir
hdfs_client = get_hdfs_client(args)
if hdfs_client is not None:
copyHdfsDirectoryToLocal(args.nni_hdfs_exp_dir, os.getcwd(), hdfs_client)
# Notice: We don't appoint env, which means subprocess wil inherit current environment and that is expected behavior
......@@ -138,6 +155,52 @@ def check_version(args):
except AttributeError as err:
nni_log(LogType.Error, err)
def is_multi_phase():
return MULTI_PHASE and (MULTI_PHASE in ['True', 'true'])
def download_parameter(meta_list, args):
"""
Download parameter file to local working directory.
meta_list format is defined in paiJobRestServer.ts
example meta_list:
[
{"experimentId":"yWFJarYa","trialId":"UpPkl","filePath":"/chec/nni/experiments/yWFJarYa/trials/UpPkl/parameter_1.cfg"},
{"experimentId":"yWFJarYa","trialId":"aIUMA","filePath":"/chec/nni/experiments/yWFJarYa/trials/aIUMA/parameter_1.cfg"}
]
"""
nni_log(LogType.Debug, str(meta_list))
nni_log(LogType.Debug, 'NNI_SYS_DIR: {}, trial Id: {}, experiment ID: {}'.format(NNI_SYS_DIR, NNI_TRIAL_JOB_ID, NNI_EXP_ID))
nni_log(LogType.Debug, 'NNI_SYS_DIR files: {}'.format(os.listdir(NNI_SYS_DIR)))
for meta in meta_list:
if meta['experimentId'] == NNI_EXP_ID and meta['trialId'] == NNI_TRIAL_JOB_ID:
param_fp = os.path.join(NNI_SYS_DIR, os.path.basename(meta['filePath']))
if not os.path.exists(param_fp):
hdfs_client = get_hdfs_client(args)
copyHdfsFileToLocal(meta['filePath'], param_fp, hdfs_client, override=False)
def fetch_parameter_file(args):
class FetchThread(threading.Thread):
def __init__(self, args):
super(FetchThread, self).__init__()
self.args = args
def run(self):
uri = gen_parameter_meta_url(self.args.nnimanager_ip, self.args.nnimanager_port)
nni_log(LogType.Info, uri)
while True:
res = rest_get(uri, 10)
nni_log(LogType.Debug, 'status code: {}'.format(res.status_code))
if res.status_code == 200:
meta_list = res.json()
download_parameter(meta_list, self.args)
else:
nni_log(LogType.Warning, 'rest response: {}'.format(str(res)))
time.sleep(5)
fetch_file_thread = FetchThread(args)
fetch_file_thread.start()
if __name__ == '__main__':
'''NNI Trial Keeper main function'''
PARSER = argparse.ArgumentParser()
......@@ -159,6 +222,8 @@ if __name__ == '__main__':
exit(1)
check_version(args)
try:
if is_multi_phase():
fetch_parameter_file(args)
main_loop(args)
except SystemExit as se:
nni_log(LogType.Info, 'NNI trial keeper exit with code {}'.format(se.code))
......
......@@ -18,7 +18,7 @@
# DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
from .constants import API_ROOT_URL, BASE_URL, STDOUT_API, NNI_TRIAL_JOB_ID, NNI_EXP_ID, VERSION_API
from .constants import API_ROOT_URL, BASE_URL, STDOUT_API, NNI_TRIAL_JOB_ID, NNI_EXP_ID, VERSION_API, PARAMETER_META_API
def gen_send_stdout_url(ip, port):
'''Generate send stdout url'''
......@@ -27,3 +27,7 @@ def gen_send_stdout_url(ip, port):
def gen_send_version_url(ip, port):
'''Generate send error url'''
return '{0}:{1}{2}{3}/{4}/{5}'.format(BASE_URL.format(ip), port, API_ROOT_URL, VERSION_API, NNI_EXP_ID, NNI_TRIAL_JOB_ID)
def gen_parameter_meta_url(ip, port):
'''Generate send error url'''
return '{0}:{1}{2}{3}'.format(BASE_URL.format(ip), port, API_ROOT_URL, PARAMETER_META_API)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment