trial_keeper.py 10.2 KB
Newer Older
liuzhe-lz's avatar
liuzhe-lz committed
1
2
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.
3
4
5

import argparse
import os
chicm-ms's avatar
chicm-ms committed
6
from subprocess import Popen
7
8
9
10
import time
import logging
import shlex
import re
11
import sys
12
import json
13
import threading
14
from pyhdfs import HdfsClient
15
import pkg_resources
16
from .rest_utils import rest_post, rest_get
chicm-ms's avatar
chicm-ms committed
17
from .url_utils import gen_send_version_url, gen_parameter_meta_url
18

chicm-ms's avatar
chicm-ms committed
19
from .constants import LOG_DIR, NNI_PLATFORM, MULTI_PHASE, NNI_TRIAL_JOB_ID, NNI_SYS_DIR, NNI_EXP_ID
20
from .hdfsClientUtility import copyDirectoryToHdfs, copyHdfsDirectoryToLocal, copyHdfsFileToLocal
chicm-ms's avatar
chicm-ms committed
21
from .log_utils import LogType, nni_log, RemoteLogger, StdOutputType
22
23

logger = logging.getLogger('trial_keeper')
24
regular = re.compile('v?(?P<version>[0-9](\.[0-9]){0,1}).*')
25

26
_hdfs_client = None
27

chicm-ms's avatar
chicm-ms committed
28

29
30
def get_hdfs_client(args):
    global _hdfs_client
31

32
33
    if _hdfs_client is not None:
        return _hdfs_client
34
35
    # backward compatibility
    hdfs_host = None
chicm-ms's avatar
chicm-ms committed
36

37
38
39
40
    if args.hdfs_host:
        hdfs_host = args.hdfs_host
    elif args.pai_hdfs_host:
        hdfs_host = args.pai_hdfs_host
41
42
    else:
        return None
43

44
    if hdfs_host is not None and args.nni_hdfs_exp_dir is not None:
45
        try:
46
            if args.webhdfs_path:
chicm-ms's avatar
chicm-ms committed
47
48
                _hdfs_client = HdfsClient(hosts='{0}:80'.format(hdfs_host), user_name=args.pai_user_name,
                                          webhdfs_path=args.webhdfs_path, timeout=5)
49
50
            else:
                # backward compatibility
chicm-ms's avatar
chicm-ms committed
51
52
                _hdfs_client = HdfsClient(hosts='{0}:{1}'.format(hdfs_host, '50070'), user_name=args.pai_user_name,
                                          timeout=5)
53
54
55
        except Exception as e:
            nni_log(LogType.Error, 'Create HDFS client error: ' + str(e))
            raise e
56
57
    return _hdfs_client

chicm-ms's avatar
chicm-ms committed
58

59
60
61
62
63
64
def main_loop(args):
    '''main loop logic for trial keeper'''

    if not os.path.exists(LOG_DIR):
        os.makedirs(LOG_DIR)

chicm-ms's avatar
chicm-ms committed
65
66
    trial_keeper_syslogger = RemoteLogger(args.nnimanager_ip, args.nnimanager_port, 'trial_keeper',
                                          StdOutputType.Stdout, args.log_collection)
67
    # redirect trial keeper's stdout and stderr to syslog
chicm-ms's avatar
chicm-ms committed
68
69
    trial_syslogger_stdout = RemoteLogger(args.nnimanager_ip, args.nnimanager_port, 'trial', StdOutputType.Stdout,
                                          args.log_collection)
70
    sys.stdout = sys.stderr = trial_keeper_syslogger
chicm-ms's avatar
chicm-ms committed
71
72
    hdfs_output_dir = None

73
74
75
76
77
78
79
80
    if args.hdfs_output_dir:
        hdfs_output_dir = args.hdfs_output_dir
    elif args.pai_hdfs_output_dir:
        hdfs_output_dir = args.pai_hdfs_output_dir

    hdfs_client = get_hdfs_client(args)

    if hdfs_client is not None:
81
        copyHdfsDirectoryToLocal(args.nni_hdfs_exp_dir, os.getcwd(), hdfs_client)
82

83
    # Notice: We don't appoint env, which means subprocess wil inherit current environment and that is expected behavior
84
    log_pipe_stdout = trial_syslogger_stdout.get_pipelog_reader()
chicm-ms's avatar
chicm-ms committed
85
86
87
88
    process = Popen(args.trial_command, shell=True, stdout=log_pipe_stdout, stderr=log_pipe_stdout)
    nni_log(LogType.Info, 'Trial keeper spawns a subprocess (pid {0}) to run command: {1}'.format(process.pid,
                                                                                                  shlex.split(
                                                                                                      args.trial_command)))
89

90
91
    while True:
        retCode = process.poll()
92
        # child worker process exits and all stdout data is read
93
        if retCode is not None and log_pipe_stdout.set_process_exit() and log_pipe_stdout.is_read_completed == True:
fishyds's avatar
fishyds committed
94
            nni_log(LogType.Info, 'subprocess terminated. Exit code is {}. Quit'.format(retCode))
95
            if hdfs_output_dir is not None:
96
97
98
                # Copy local directory to hdfs for OpenPAI
                nni_local_output_dir = os.environ['NNI_OUTPUT_DIR']
                try:
99
                    if copyDirectoryToHdfs(nni_local_output_dir, hdfs_output_dir, hdfs_client):
chicm-ms's avatar
chicm-ms committed
100
101
                        nni_log(LogType.Info,
                                'copy directory from {0} to {1} success!'.format(nni_local_output_dir, hdfs_output_dir))
102
                    else:
chicm-ms's avatar
chicm-ms committed
103
104
                        nni_log(LogType.Info,
                                'copy directory from {0} to {1} failed!'.format(nni_local_output_dir, hdfs_output_dir))
fishyds's avatar
fishyds committed
105
106
107
                except Exception as e:
                    nni_log(LogType.Error, 'HDFS copy directory got exception: ' + str(e))
                    raise e
108
109
110

            ## Exit as the retCode of subprocess(trial)
            exit(retCode)
111
112
113
114
            break

        time.sleep(2)

chicm-ms's avatar
chicm-ms committed
115

116
117
118
def trial_keeper_help_info(*args):
    print('please run --help to see guidance')

chicm-ms's avatar
chicm-ms committed
119

120
121
122
123
def check_version(args):
    try:
        trial_keeper_version = pkg_resources.get_distribution('nni').version
    except pkg_resources.ResolutionError as err:
chicm-ms's avatar
chicm-ms committed
124
        # package nni does not exist, try nni-tool package
SparkSnail's avatar
SparkSnail committed
125
126
        nni_log(LogType.Error, 'Package nni does not exist!')
        os._exit(1)
127
    if not args.nni_manager_version:
128
129
130
        # skip version check
        nni_log(LogType.Warning, 'Skipping version check!')
    else:
SparkSnail's avatar
SparkSnail committed
131
132
133
        try:
            trial_keeper_version = regular.search(trial_keeper_version).group('version')
            nni_log(LogType.Info, 'trial_keeper_version is {0}'.format(trial_keeper_version))
134
135
136
137
            nni_manager_version = regular.search(args.nni_manager_version).group('version')
            nni_log(LogType.Info, 'nni_manager_version is {0}'.format(nni_manager_version))
            log_entry = {}
            if trial_keeper_version != nni_manager_version:
SparkSnail's avatar
SparkSnail committed
138
                nni_log(LogType.Error, 'Version does not match!')
chicm-ms's avatar
chicm-ms committed
139
140
                error_message = 'NNIManager version is {0}, TrialKeeper version is {1}, NNI version does not match!'.format(
                    nni_manager_version, trial_keeper_version)
141
142
                log_entry['tag'] = 'VCFail'
                log_entry['msg'] = error_message
chicm-ms's avatar
chicm-ms committed
143
144
                rest_post(gen_send_version_url(args.nnimanager_ip, args.nnimanager_port), json.dumps(log_entry), 10,
                          False)
SparkSnail's avatar
SparkSnail committed
145
146
147
                os._exit(1)
            else:
                nni_log(LogType.Info, 'Version match!')
148
                log_entry['tag'] = 'VCSuccess'
chicm-ms's avatar
chicm-ms committed
149
150
                rest_post(gen_send_version_url(args.nnimanager_ip, args.nnimanager_port), json.dumps(log_entry), 10,
                          False)
SparkSnail's avatar
SparkSnail committed
151
152
        except AttributeError as err:
            nni_log(LogType.Error, err)
153

chicm-ms's avatar
chicm-ms committed
154

155
156
157
def is_multi_phase():
    return MULTI_PHASE and (MULTI_PHASE in ['True', 'true'])

chicm-ms's avatar
chicm-ms committed
158

159
160
161
162
163
164
165
166
167
168
169
def download_parameter(meta_list, args):
    """
    Download parameter file to local working directory.
    meta_list format is defined in paiJobRestServer.ts
    example meta_list:
    [
        {"experimentId":"yWFJarYa","trialId":"UpPkl","filePath":"/chec/nni/experiments/yWFJarYa/trials/UpPkl/parameter_1.cfg"},
        {"experimentId":"yWFJarYa","trialId":"aIUMA","filePath":"/chec/nni/experiments/yWFJarYa/trials/aIUMA/parameter_1.cfg"}
    ]
    """
    nni_log(LogType.Debug, str(meta_list))
chicm-ms's avatar
chicm-ms committed
170
171
    nni_log(LogType.Debug,
            'NNI_SYS_DIR: {}, trial Id: {}, experiment ID: {}'.format(NNI_SYS_DIR, NNI_TRIAL_JOB_ID, NNI_EXP_ID))
172
173
174
175
176
177
178
179
    nni_log(LogType.Debug, 'NNI_SYS_DIR files: {}'.format(os.listdir(NNI_SYS_DIR)))
    for meta in meta_list:
        if meta['experimentId'] == NNI_EXP_ID and meta['trialId'] == NNI_TRIAL_JOB_ID:
            param_fp = os.path.join(NNI_SYS_DIR, os.path.basename(meta['filePath']))
            if not os.path.exists(param_fp):
                hdfs_client = get_hdfs_client(args)
                copyHdfsFileToLocal(meta['filePath'], param_fp, hdfs_client, override=False)

chicm-ms's avatar
chicm-ms committed
180

181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
def fetch_parameter_file(args):
    class FetchThread(threading.Thread):
        def __init__(self, args):
            super(FetchThread, self).__init__()
            self.args = args

        def run(self):
            uri = gen_parameter_meta_url(self.args.nnimanager_ip, self.args.nnimanager_port)
            nni_log(LogType.Info, uri)

            while True:
                res = rest_get(uri, 10)
                nni_log(LogType.Debug, 'status code: {}'.format(res.status_code))
                if res.status_code == 200:
                    meta_list = res.json()
                    download_parameter(meta_list, self.args)
                else:
                    nni_log(LogType.Warning, 'rest response: {}'.format(str(res)))
                time.sleep(5)

    fetch_file_thread = FetchThread(args)
    fetch_file_thread.start()

chicm-ms's avatar
chicm-ms committed
204

205
206
207
208
209
if __name__ == '__main__':
    '''NNI Trial Keeper main function'''
    PARSER = argparse.ArgumentParser()
    PARSER.set_defaults(func=trial_keeper_help_info)
    PARSER.add_argument('--trial_command', type=str, help='Command to launch trial process')
210
211
    PARSER.add_argument('--nnimanager_ip', type=str, default='localhost', help='NNI manager rest server IP')
    PARSER.add_argument('--nnimanager_port', type=str, default='8081', help='NNI manager rest server port')
chicm-ms's avatar
chicm-ms committed
212
    PARSER.add_argument('--pai_hdfs_output_dir', type=str, help='the output dir of pai_hdfs')  # backward compatibility
213
    PARSER.add_argument('--hdfs_output_dir', type=str, help='the output dir of hdfs')
chicm-ms's avatar
chicm-ms committed
214
    PARSER.add_argument('--pai_hdfs_host', type=str, help='the host of pai_hdfs')  # backward compatibility
215
    PARSER.add_argument('--hdfs_host', type=str, help='the host of hdfs')
216
    PARSER.add_argument('--pai_user_name', type=str, help='the username of hdfs')
217
    PARSER.add_argument('--nni_hdfs_exp_dir', type=str, help='nni experiment directory in hdfs')
218
    PARSER.add_argument('--webhdfs_path', type=str, help='the webhdfs path used in webhdfs URL')
219
    PARSER.add_argument('--nni_manager_version', type=str, help='the nni version transmitted from nniManager')
SparkSnail's avatar
SparkSnail committed
220
    PARSER.add_argument('--log_collection', type=str, help='set the way to collect log in trialkeeper')
221
222
223
    args, unknown = PARSER.parse_known_args()
    if args.trial_command is None:
        exit(1)
224
    check_version(args)
225
    try:
226
        if NNI_PLATFORM == 'pai' and is_multi_phase():
227
            fetch_parameter_file(args)
228
        main_loop(args)
229
    except SystemExit as se:
fishyds's avatar
fishyds committed
230
        nni_log(LogType.Info, 'NNI trial keeper exit with code {}'.format(se.code))
231
        os._exit(se.code)
232
    except Exception as e:
fishyds's avatar
fishyds committed
233
        nni_log(LogType.Error, 'Exit trial keeper with code 1 because Exception: {} is catched'.format(str(e)))
234
        os._exit(1)