# ============================================================================================================================== # # Copyright (c) Microsoft Corporation # All rights reserved. # # MIT License # # Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated # documentation files (the "Software"), to deal in the Software without restriction, including without limitation # the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and # to permit persons to whom the Software is furnished to do so, subject to the following conditions: # The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING # BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, # DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. # ============================================================================================================================== # import argparse import os from subprocess import Popen import time import logging import shlex import re import sys import json import threading from pyhdfs import HdfsClient import pkg_resources from .rest_utils import rest_post, rest_get from .url_utils import gen_send_version_url, gen_parameter_meta_url from .constants import LOG_DIR, NNI_PLATFORM, MULTI_PHASE, NNI_TRIAL_JOB_ID, NNI_SYS_DIR, NNI_EXP_ID from .hdfsClientUtility import copyDirectoryToHdfs, copyHdfsDirectoryToLocal, copyHdfsFileToLocal from .log_utils import LogType, nni_log, RemoteLogger, StdOutputType logger = logging.getLogger('trial_keeper') regular = re.compile('v?(?P[0-9](\.[0-9]){0,1}).*') _hdfs_client = None def get_hdfs_client(args): global _hdfs_client if _hdfs_client is not None: return _hdfs_client # backward compatibility hdfs_host = None if args.hdfs_host: hdfs_host = args.hdfs_host elif args.pai_hdfs_host: hdfs_host = args.pai_hdfs_host else: return None if hdfs_host is not None and args.nni_hdfs_exp_dir is not None: try: if args.webhdfs_path: _hdfs_client = HdfsClient(hosts='{0}:80'.format(hdfs_host), user_name=args.pai_user_name, webhdfs_path=args.webhdfs_path, timeout=5) else: # backward compatibility _hdfs_client = HdfsClient(hosts='{0}:{1}'.format(hdfs_host, '50070'), user_name=args.pai_user_name, timeout=5) except Exception as e: nni_log(LogType.Error, 'Create HDFS client error: ' + str(e)) raise e return _hdfs_client def main_loop(args): '''main loop logic for trial keeper''' if not os.path.exists(LOG_DIR): os.makedirs(LOG_DIR) trial_keeper_syslogger = RemoteLogger(args.nnimanager_ip, args.nnimanager_port, 'trial_keeper', StdOutputType.Stdout, args.log_collection) # redirect trial keeper's stdout and stderr to syslog trial_syslogger_stdout = RemoteLogger(args.nnimanager_ip, args.nnimanager_port, 'trial', StdOutputType.Stdout, args.log_collection) sys.stdout = sys.stderr = trial_keeper_syslogger hdfs_output_dir = None if args.hdfs_output_dir: hdfs_output_dir = args.hdfs_output_dir elif args.pai_hdfs_output_dir: hdfs_output_dir = args.pai_hdfs_output_dir hdfs_client = get_hdfs_client(args) if hdfs_client is not None: copyHdfsDirectoryToLocal(args.nni_hdfs_exp_dir, os.getcwd(), hdfs_client) # Notice: We don't appoint env, which means subprocess wil inherit current environment and that is expected behavior log_pipe_stdout = trial_syslogger_stdout.get_pipelog_reader() process = Popen(args.trial_command, shell=True, stdout=log_pipe_stdout, stderr=log_pipe_stdout) nni_log(LogType.Info, 'Trial keeper spawns a subprocess (pid {0}) to run command: {1}'.format(process.pid, shlex.split( args.trial_command))) while True: retCode = process.poll() # child worker process exits and all stdout data is read if retCode is not None and log_pipe_stdout.set_process_exit() and log_pipe_stdout.is_read_completed == True: nni_log(LogType.Info, 'subprocess terminated. Exit code is {}. Quit'.format(retCode)) if hdfs_output_dir is not None: # Copy local directory to hdfs for OpenPAI nni_local_output_dir = os.environ['NNI_OUTPUT_DIR'] try: if copyDirectoryToHdfs(nni_local_output_dir, hdfs_output_dir, hdfs_client): nni_log(LogType.Info, 'copy directory from {0} to {1} success!'.format(nni_local_output_dir, hdfs_output_dir)) else: nni_log(LogType.Info, 'copy directory from {0} to {1} failed!'.format(nni_local_output_dir, hdfs_output_dir)) except Exception as e: nni_log(LogType.Error, 'HDFS copy directory got exception: ' + str(e)) raise e ## Exit as the retCode of subprocess(trial) exit(retCode) break time.sleep(2) def trial_keeper_help_info(*args): print('please run --help to see guidance') def check_version(args): try: trial_keeper_version = pkg_resources.get_distribution('nni').version except pkg_resources.ResolutionError as err: # package nni does not exist, try nni-tool package nni_log(LogType.Error, 'Package nni does not exist!') os._exit(1) if not args.nni_manager_version: # skip version check nni_log(LogType.Warning, 'Skipping version check!') else: try: trial_keeper_version = regular.search(trial_keeper_version).group('version') nni_log(LogType.Info, 'trial_keeper_version is {0}'.format(trial_keeper_version)) nni_manager_version = regular.search(args.nni_manager_version).group('version') nni_log(LogType.Info, 'nni_manager_version is {0}'.format(nni_manager_version)) log_entry = {} if trial_keeper_version != nni_manager_version: nni_log(LogType.Error, 'Version does not match!') error_message = 'NNIManager version is {0}, TrialKeeper version is {1}, NNI version does not match!'.format( nni_manager_version, trial_keeper_version) log_entry['tag'] = 'VCFail' log_entry['msg'] = error_message rest_post(gen_send_version_url(args.nnimanager_ip, args.nnimanager_port), json.dumps(log_entry), 10, False) os._exit(1) else: nni_log(LogType.Info, 'Version match!') log_entry['tag'] = 'VCSuccess' rest_post(gen_send_version_url(args.nnimanager_ip, args.nnimanager_port), json.dumps(log_entry), 10, False) except AttributeError as err: nni_log(LogType.Error, err) def is_multi_phase(): return MULTI_PHASE and (MULTI_PHASE in ['True', 'true']) def download_parameter(meta_list, args): """ Download parameter file to local working directory. meta_list format is defined in paiJobRestServer.ts example meta_list: [ {"experimentId":"yWFJarYa","trialId":"UpPkl","filePath":"/chec/nni/experiments/yWFJarYa/trials/UpPkl/parameter_1.cfg"}, {"experimentId":"yWFJarYa","trialId":"aIUMA","filePath":"/chec/nni/experiments/yWFJarYa/trials/aIUMA/parameter_1.cfg"} ] """ nni_log(LogType.Debug, str(meta_list)) nni_log(LogType.Debug, 'NNI_SYS_DIR: {}, trial Id: {}, experiment ID: {}'.format(NNI_SYS_DIR, NNI_TRIAL_JOB_ID, NNI_EXP_ID)) nni_log(LogType.Debug, 'NNI_SYS_DIR files: {}'.format(os.listdir(NNI_SYS_DIR))) for meta in meta_list: if meta['experimentId'] == NNI_EXP_ID and meta['trialId'] == NNI_TRIAL_JOB_ID: param_fp = os.path.join(NNI_SYS_DIR, os.path.basename(meta['filePath'])) if not os.path.exists(param_fp): hdfs_client = get_hdfs_client(args) copyHdfsFileToLocal(meta['filePath'], param_fp, hdfs_client, override=False) def fetch_parameter_file(args): class FetchThread(threading.Thread): def __init__(self, args): super(FetchThread, self).__init__() self.args = args def run(self): uri = gen_parameter_meta_url(self.args.nnimanager_ip, self.args.nnimanager_port) nni_log(LogType.Info, uri) while True: res = rest_get(uri, 10) nni_log(LogType.Debug, 'status code: {}'.format(res.status_code)) if res.status_code == 200: meta_list = res.json() download_parameter(meta_list, self.args) else: nni_log(LogType.Warning, 'rest response: {}'.format(str(res))) time.sleep(5) fetch_file_thread = FetchThread(args) fetch_file_thread.start() if __name__ == '__main__': '''NNI Trial Keeper main function''' PARSER = argparse.ArgumentParser() PARSER.set_defaults(func=trial_keeper_help_info) PARSER.add_argument('--trial_command', type=str, help='Command to launch trial process') PARSER.add_argument('--nnimanager_ip', type=str, default='localhost', help='NNI manager rest server IP') PARSER.add_argument('--nnimanager_port', type=str, default='8081', help='NNI manager rest server port') PARSER.add_argument('--pai_hdfs_output_dir', type=str, help='the output dir of pai_hdfs') # backward compatibility PARSER.add_argument('--hdfs_output_dir', type=str, help='the output dir of hdfs') PARSER.add_argument('--pai_hdfs_host', type=str, help='the host of pai_hdfs') # backward compatibility PARSER.add_argument('--hdfs_host', type=str, help='the host of hdfs') PARSER.add_argument('--pai_user_name', type=str, help='the username of hdfs') PARSER.add_argument('--nni_hdfs_exp_dir', type=str, help='nni experiment directory in hdfs') PARSER.add_argument('--webhdfs_path', type=str, help='the webhdfs path used in webhdfs URL') PARSER.add_argument('--nni_manager_version', type=str, help='the nni version transmitted from nniManager') PARSER.add_argument('--log_collection', type=str, help='set the way to collect log in trialkeeper') args, unknown = PARSER.parse_known_args() if args.trial_command is None: exit(1) check_version(args) try: if NNI_PLATFORM == 'pai' and is_multi_phase(): fetch_parameter_file(args) main_loop(args) except SystemExit as se: nni_log(LogType.Info, 'NNI trial keeper exit with code {}'.format(se.code)) os._exit(se.code) except Exception as e: nni_log(LogType.Error, 'Exit trial keeper with code 1 because Exception: {} is catched'.format(str(e))) os._exit(1)