Unverified Commit 1c56fea8 authored by chicm-ms's avatar chicm-ms Committed by GitHub
Browse files

Merge pull request #21 from microsoft/master

pull code
parents 12410686 97829ccd
...@@ -36,6 +36,8 @@ STDERR_FULL_PATH = os.path.join(LOG_DIR, 'stderr') ...@@ -36,6 +36,8 @@ STDERR_FULL_PATH = os.path.join(LOG_DIR, 'stderr')
STDOUT_API = '/stdout' STDOUT_API = '/stdout'
VERSION_API = '/version' VERSION_API = '/version'
PARAMETER_META_API = '/parameter-file-meta'
NNI_SYS_DIR = os.environ['NNI_SYS_DIR'] NNI_SYS_DIR = os.environ['NNI_SYS_DIR']
NNI_TRIAL_JOB_ID = os.environ['NNI_TRIAL_JOB_ID'] NNI_TRIAL_JOB_ID = os.environ['NNI_TRIAL_JOB_ID']
NNI_EXP_ID = os.environ['NNI_EXP_ID'] NNI_EXP_ID = os.environ['NNI_EXP_ID']
\ No newline at end of file MULTI_PHASE = os.environ['MULTI_PHASE']
...@@ -28,30 +28,27 @@ import re ...@@ -28,30 +28,27 @@ import re
import sys import sys
import select import select
import json import json
import threading
from pyhdfs import HdfsClient from pyhdfs import HdfsClient
import pkg_resources import pkg_resources
from .rest_utils import rest_post from .rest_utils import rest_post, rest_get
from .url_utils import gen_send_stdout_url, gen_send_version_url from .url_utils import gen_send_stdout_url, gen_send_version_url, gen_parameter_meta_url
from .constants import HOME_DIR, LOG_DIR, NNI_PLATFORM, STDOUT_FULL_PATH, STDERR_FULL_PATH from .constants import HOME_DIR, LOG_DIR, NNI_PLATFORM, STDOUT_FULL_PATH, STDERR_FULL_PATH, \
from .hdfsClientUtility import copyDirectoryToHdfs, copyHdfsDirectoryToLocal MULTI_PHASE, NNI_TRIAL_JOB_ID, NNI_SYS_DIR, NNI_EXP_ID
from .hdfsClientUtility import copyDirectoryToHdfs, copyHdfsDirectoryToLocal, copyHdfsFileToLocal
from .log_utils import LogType, nni_log, RemoteLogger, PipeLogReader, StdOutputType from .log_utils import LogType, nni_log, RemoteLogger, PipeLogReader, StdOutputType
logger = logging.getLogger('trial_keeper') logger = logging.getLogger('trial_keeper')
regular = re.compile('v?(?P<version>[0-9](\.[0-9]){0,1}).*') regular = re.compile('v?(?P<version>[0-9](\.[0-9]){0,1}).*')
def main_loop(args): _hdfs_client = None
'''main loop logic for trial keeper'''
if not os.path.exists(LOG_DIR): def get_hdfs_client(args):
os.makedirs(LOG_DIR) global _hdfs_client
stdout_file = open(STDOUT_FULL_PATH, 'a+') if _hdfs_client is not None:
stderr_file = open(STDERR_FULL_PATH, 'a+') return _hdfs_client
trial_keeper_syslogger = RemoteLogger(args.nnimanager_ip, args.nnimanager_port, 'trial_keeper', StdOutputType.Stdout, args.log_collection)
# redirect trial keeper's stdout and stderr to syslog
trial_syslogger_stdout = RemoteLogger(args.nnimanager_ip, args.nnimanager_port, 'trial', StdOutputType.Stdout, args.log_collection)
sys.stdout = sys.stderr = trial_keeper_syslogger
# backward compatibility # backward compatibility
hdfs_host = None hdfs_host = None
hdfs_output_dir = None hdfs_output_dir = None
...@@ -59,21 +56,41 @@ def main_loop(args): ...@@ -59,21 +56,41 @@ def main_loop(args):
hdfs_host = args.hdfs_host hdfs_host = args.hdfs_host
elif args.pai_hdfs_host: elif args.pai_hdfs_host:
hdfs_host = args.pai_hdfs_host hdfs_host = args.pai_hdfs_host
if args.hdfs_output_dir: else:
hdfs_output_dir = args.hdfs_output_dir return None
elif args.pai_hdfs_output_dir:
hdfs_output_dir = args.pai_hdfs_output_dir
if hdfs_host is not None and args.nni_hdfs_exp_dir is not None: if hdfs_host is not None and args.nni_hdfs_exp_dir is not None:
try: try:
if args.webhdfs_path: if args.webhdfs_path:
hdfs_client = HdfsClient(hosts='{0}:80'.format(hdfs_host), user_name=args.pai_user_name, webhdfs_path=args.webhdfs_path, timeout=5) _hdfs_client = HdfsClient(hosts='{0}:80'.format(hdfs_host), user_name=args.pai_user_name, webhdfs_path=args.webhdfs_path, timeout=5)
else: else:
# backward compatibility # backward compatibility
hdfs_client = HdfsClient(hosts='{0}:{1}'.format(hdfs_host, '50070'), user_name=args.pai_user_name, timeout=5) _hdfs_client = HdfsClient(hosts='{0}:{1}'.format(hdfs_host, '50070'), user_name=args.pai_user_name, timeout=5)
except Exception as e: except Exception as e:
nni_log(LogType.Error, 'Create HDFS client error: ' + str(e)) nni_log(LogType.Error, 'Create HDFS client error: ' + str(e))
raise e raise e
return _hdfs_client
def main_loop(args):
'''main loop logic for trial keeper'''
if not os.path.exists(LOG_DIR):
os.makedirs(LOG_DIR)
stdout_file = open(STDOUT_FULL_PATH, 'a+')
stderr_file = open(STDERR_FULL_PATH, 'a+')
trial_keeper_syslogger = RemoteLogger(args.nnimanager_ip, args.nnimanager_port, 'trial_keeper', StdOutputType.Stdout, args.log_collection)
# redirect trial keeper's stdout and stderr to syslog
trial_syslogger_stdout = RemoteLogger(args.nnimanager_ip, args.nnimanager_port, 'trial', StdOutputType.Stdout, args.log_collection)
sys.stdout = sys.stderr = trial_keeper_syslogger
if args.hdfs_output_dir:
hdfs_output_dir = args.hdfs_output_dir
elif args.pai_hdfs_output_dir:
hdfs_output_dir = args.pai_hdfs_output_dir
hdfs_client = get_hdfs_client(args)
if hdfs_client is not None:
copyHdfsDirectoryToLocal(args.nni_hdfs_exp_dir, os.getcwd(), hdfs_client) copyHdfsDirectoryToLocal(args.nni_hdfs_exp_dir, os.getcwd(), hdfs_client)
# Notice: We don't appoint env, which means subprocess wil inherit current environment and that is expected behavior # Notice: We don't appoint env, which means subprocess wil inherit current environment and that is expected behavior
...@@ -138,6 +155,52 @@ def check_version(args): ...@@ -138,6 +155,52 @@ def check_version(args):
except AttributeError as err: except AttributeError as err:
nni_log(LogType.Error, err) nni_log(LogType.Error, err)
def is_multi_phase():
return MULTI_PHASE and (MULTI_PHASE in ['True', 'true'])
def download_parameter(meta_list, args):
"""
Download parameter file to local working directory.
meta_list format is defined in paiJobRestServer.ts
example meta_list:
[
{"experimentId":"yWFJarYa","trialId":"UpPkl","filePath":"/chec/nni/experiments/yWFJarYa/trials/UpPkl/parameter_1.cfg"},
{"experimentId":"yWFJarYa","trialId":"aIUMA","filePath":"/chec/nni/experiments/yWFJarYa/trials/aIUMA/parameter_1.cfg"}
]
"""
nni_log(LogType.Debug, str(meta_list))
nni_log(LogType.Debug, 'NNI_SYS_DIR: {}, trial Id: {}, experiment ID: {}'.format(NNI_SYS_DIR, NNI_TRIAL_JOB_ID, NNI_EXP_ID))
nni_log(LogType.Debug, 'NNI_SYS_DIR files: {}'.format(os.listdir(NNI_SYS_DIR)))
for meta in meta_list:
if meta['experimentId'] == NNI_EXP_ID and meta['trialId'] == NNI_TRIAL_JOB_ID:
param_fp = os.path.join(NNI_SYS_DIR, os.path.basename(meta['filePath']))
if not os.path.exists(param_fp):
hdfs_client = get_hdfs_client(args)
copyHdfsFileToLocal(meta['filePath'], param_fp, hdfs_client, override=False)
def fetch_parameter_file(args):
class FetchThread(threading.Thread):
def __init__(self, args):
super(FetchThread, self).__init__()
self.args = args
def run(self):
uri = gen_parameter_meta_url(self.args.nnimanager_ip, self.args.nnimanager_port)
nni_log(LogType.Info, uri)
while True:
res = rest_get(uri, 10)
nni_log(LogType.Debug, 'status code: {}'.format(res.status_code))
if res.status_code == 200:
meta_list = res.json()
download_parameter(meta_list, self.args)
else:
nni_log(LogType.Warning, 'rest response: {}'.format(str(res)))
time.sleep(5)
fetch_file_thread = FetchThread(args)
fetch_file_thread.start()
if __name__ == '__main__': if __name__ == '__main__':
'''NNI Trial Keeper main function''' '''NNI Trial Keeper main function'''
PARSER = argparse.ArgumentParser() PARSER = argparse.ArgumentParser()
...@@ -159,6 +222,8 @@ if __name__ == '__main__': ...@@ -159,6 +222,8 @@ if __name__ == '__main__':
exit(1) exit(1)
check_version(args) check_version(args)
try: try:
if is_multi_phase():
fetch_parameter_file(args)
main_loop(args) main_loop(args)
except SystemExit as se: except SystemExit as se:
nni_log(LogType.Info, 'NNI trial keeper exit with code {}'.format(se.code)) nni_log(LogType.Info, 'NNI trial keeper exit with code {}'.format(se.code))
......
...@@ -18,7 +18,7 @@ ...@@ -18,7 +18,7 @@
# DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
from .constants import API_ROOT_URL, BASE_URL, STDOUT_API, NNI_TRIAL_JOB_ID, NNI_EXP_ID, VERSION_API from .constants import API_ROOT_URL, BASE_URL, STDOUT_API, NNI_TRIAL_JOB_ID, NNI_EXP_ID, VERSION_API, PARAMETER_META_API
def gen_send_stdout_url(ip, port): def gen_send_stdout_url(ip, port):
'''Generate send stdout url''' '''Generate send stdout url'''
...@@ -26,4 +26,8 @@ def gen_send_stdout_url(ip, port): ...@@ -26,4 +26,8 @@ def gen_send_stdout_url(ip, port):
def gen_send_version_url(ip, port): def gen_send_version_url(ip, port):
'''Generate send error url''' '''Generate send error url'''
return '{0}:{1}{2}{3}/{4}/{5}'.format(BASE_URL.format(ip), port, API_ROOT_URL, VERSION_API, NNI_EXP_ID, NNI_TRIAL_JOB_ID) return '{0}:{1}{2}{3}/{4}/{5}'.format(BASE_URL.format(ip), port, API_ROOT_URL, VERSION_API, NNI_EXP_ID, NNI_TRIAL_JOB_ID)
\ No newline at end of file
def gen_parameter_meta_url(ip, port):
'''Generate send error url'''
return '{0}:{1}{2}{3}'.format(BASE_URL.format(ip), port, API_ROOT_URL, PARAMETER_META_API)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment