trial_keeper.py 10.7 KB
Newer Older
liuzhe-lz's avatar
liuzhe-lz committed
1
2
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.
3
4

import argparse
5
6
import ctypes
import json
7
import logging
8
import os
9
import re
10
import shlex
11
import sys
12
import threading
13
14
15
import time
from subprocess import Popen

16
import pkg_resources
17
from pyhdfs import HdfsClient
18

19
20
21
22
23
24
25
from .constants import (LOG_DIR, MULTI_PHASE, NNI_EXP_ID, NNI_PLATFORM,
                        NNI_SYS_DIR, NNI_TRIAL_JOB_ID)
from .hdfsClientUtility import (copyDirectoryToHdfs, copyHdfsDirectoryToLocal,
                                copyHdfsFileToLocal)
from .log_utils import LogType, RemoteLogger, StdOutputType, nni_log
from .rest_utils import rest_get, rest_post
from .url_utils import gen_parameter_meta_url, gen_send_version_url
26
27

logger = logging.getLogger('trial_keeper')
28
regular = re.compile('v?(?P<version>[0-9](\.[0-9]){0,1}).*')
29

30
_hdfs_client = None
31

chicm-ms's avatar
chicm-ms committed
32

33
34
def get_hdfs_client(args):
    global _hdfs_client
35

36
37
    if _hdfs_client is not None:
        return _hdfs_client
38
39
    # backward compatibility
    hdfs_host = None
chicm-ms's avatar
chicm-ms committed
40

41
42
43
44
    if args.hdfs_host:
        hdfs_host = args.hdfs_host
    elif args.pai_hdfs_host:
        hdfs_host = args.pai_hdfs_host
45
46
    else:
        return None
47

48
    if hdfs_host is not None and args.nni_hdfs_exp_dir is not None:
49
        try:
50
            if args.webhdfs_path:
chicm-ms's avatar
chicm-ms committed
51
52
                _hdfs_client = HdfsClient(hosts='{0}:80'.format(hdfs_host), user_name=args.pai_user_name,
                                          webhdfs_path=args.webhdfs_path, timeout=5)
53
54
            else:
                # backward compatibility
chicm-ms's avatar
chicm-ms committed
55
56
                _hdfs_client = HdfsClient(hosts='{0}:{1}'.format(hdfs_host, '50070'), user_name=args.pai_user_name,
                                          timeout=5)
57
58
59
        except Exception as e:
            nni_log(LogType.Error, 'Create HDFS client error: ' + str(e))
            raise e
60
61
    return _hdfs_client

chicm-ms's avatar
chicm-ms committed
62

63
64
65
66
67
68
def main_loop(args):
    '''main loop logic for trial keeper'''

    if not os.path.exists(LOG_DIR):
        os.makedirs(LOG_DIR)

chicm-ms's avatar
chicm-ms committed
69
70
    trial_keeper_syslogger = RemoteLogger(args.nnimanager_ip, args.nnimanager_port, 'trial_keeper',
                                          StdOutputType.Stdout, args.log_collection)
71
    # redirect trial keeper's stdout and stderr to syslog
chicm-ms's avatar
chicm-ms committed
72
73
    trial_syslogger_stdout = RemoteLogger(args.nnimanager_ip, args.nnimanager_port, 'trial', StdOutputType.Stdout,
                                          args.log_collection)
74
    sys.stdout = sys.stderr = trial_keeper_syslogger
chicm-ms's avatar
chicm-ms committed
75
76
    hdfs_output_dir = None

77
78
79
80
81
82
83
84
    if args.hdfs_output_dir:
        hdfs_output_dir = args.hdfs_output_dir
    elif args.pai_hdfs_output_dir:
        hdfs_output_dir = args.pai_hdfs_output_dir

    hdfs_client = get_hdfs_client(args)

    if hdfs_client is not None:
85
        copyHdfsDirectoryToLocal(args.nni_hdfs_exp_dir, os.getcwd(), hdfs_client)
86

87
88
89
90
    if args.job_id_file:
        with open(args.job_id_file, 'w') as job_file:
            job_file.write("%d" % os.getpid())

91
    # Notice: We don't appoint env, which means subprocess wil inherit current environment and that is expected behavior
92
    log_pipe_stdout = trial_syslogger_stdout.get_pipelog_reader()
chicm-ms's avatar
chicm-ms committed
93
94
95
96
    process = Popen(args.trial_command, shell=True, stdout=log_pipe_stdout, stderr=log_pipe_stdout)
    nni_log(LogType.Info, 'Trial keeper spawns a subprocess (pid {0}) to run command: {1}'.format(process.pid,
                                                                                                  shlex.split(
                                                                                                      args.trial_command)))
97

98
99
    while True:
        retCode = process.poll()
100
        # child worker process exits and all stdout data is read
101
        if retCode is not None and log_pipe_stdout.set_process_exit() and log_pipe_stdout.is_read_completed == True:
102
103
104
            # In Windows, the retCode -1 is 4294967295. It's larger than c_long, and raise OverflowError.
            # So covert it to int32.
            retCode = ctypes.c_long(retCode).value
fishyds's avatar
fishyds committed
105
            nni_log(LogType.Info, 'subprocess terminated. Exit code is {}. Quit'.format(retCode))
106
            if hdfs_output_dir is not None:
107
108
109
                # Copy local directory to hdfs for OpenPAI
                nni_local_output_dir = os.environ['NNI_OUTPUT_DIR']
                try:
110
                    if copyDirectoryToHdfs(nni_local_output_dir, hdfs_output_dir, hdfs_client):
chicm-ms's avatar
chicm-ms committed
111
112
                        nni_log(LogType.Info,
                                'copy directory from {0} to {1} success!'.format(nni_local_output_dir, hdfs_output_dir))
113
                    else:
chicm-ms's avatar
chicm-ms committed
114
115
                        nni_log(LogType.Info,
                                'copy directory from {0} to {1} failed!'.format(nni_local_output_dir, hdfs_output_dir))
fishyds's avatar
fishyds committed
116
117
118
                except Exception as e:
                    nni_log(LogType.Error, 'HDFS copy directory got exception: ' + str(e))
                    raise e
119
120
121

            ## Exit as the retCode of subprocess(trial)
            exit(retCode)
122
123
124
125
            break

        time.sleep(2)

chicm-ms's avatar
chicm-ms committed
126

127
128
129
def trial_keeper_help_info(*args):
    print('please run --help to see guidance')

chicm-ms's avatar
chicm-ms committed
130

131
132
133
134
def check_version(args):
    try:
        trial_keeper_version = pkg_resources.get_distribution('nni').version
    except pkg_resources.ResolutionError as err:
chicm-ms's avatar
chicm-ms committed
135
        # package nni does not exist, try nni-tool package
SparkSnail's avatar
SparkSnail committed
136
137
        nni_log(LogType.Error, 'Package nni does not exist!')
        os._exit(1)
138
    if not args.nni_manager_version:
139
140
141
        # skip version check
        nni_log(LogType.Warning, 'Skipping version check!')
    else:
SparkSnail's avatar
SparkSnail committed
142
143
144
        try:
            trial_keeper_version = regular.search(trial_keeper_version).group('version')
            nni_log(LogType.Info, 'trial_keeper_version is {0}'.format(trial_keeper_version))
145
146
147
148
            nni_manager_version = regular.search(args.nni_manager_version).group('version')
            nni_log(LogType.Info, 'nni_manager_version is {0}'.format(nni_manager_version))
            log_entry = {}
            if trial_keeper_version != nni_manager_version:
SparkSnail's avatar
SparkSnail committed
149
                nni_log(LogType.Error, 'Version does not match!')
chicm-ms's avatar
chicm-ms committed
150
151
                error_message = 'NNIManager version is {0}, TrialKeeper version is {1}, NNI version does not match!'.format(
                    nni_manager_version, trial_keeper_version)
152
153
                log_entry['tag'] = 'VCFail'
                log_entry['msg'] = error_message
chicm-ms's avatar
chicm-ms committed
154
155
                rest_post(gen_send_version_url(args.nnimanager_ip, args.nnimanager_port), json.dumps(log_entry), 10,
                          False)
SparkSnail's avatar
SparkSnail committed
156
157
158
                os._exit(1)
            else:
                nni_log(LogType.Info, 'Version match!')
159
                log_entry['tag'] = 'VCSuccess'
chicm-ms's avatar
chicm-ms committed
160
161
                rest_post(gen_send_version_url(args.nnimanager_ip, args.nnimanager_port), json.dumps(log_entry), 10,
                          False)
SparkSnail's avatar
SparkSnail committed
162
163
        except AttributeError as err:
            nni_log(LogType.Error, err)
164

chicm-ms's avatar
chicm-ms committed
165

166
167
168
def is_multi_phase():
    return MULTI_PHASE and (MULTI_PHASE in ['True', 'true'])

chicm-ms's avatar
chicm-ms committed
169

170
171
172
173
174
175
176
177
178
179
180
def download_parameter(meta_list, args):
    """
    Download parameter file to local working directory.
    meta_list format is defined in paiJobRestServer.ts
    example meta_list:
    [
        {"experimentId":"yWFJarYa","trialId":"UpPkl","filePath":"/chec/nni/experiments/yWFJarYa/trials/UpPkl/parameter_1.cfg"},
        {"experimentId":"yWFJarYa","trialId":"aIUMA","filePath":"/chec/nni/experiments/yWFJarYa/trials/aIUMA/parameter_1.cfg"}
    ]
    """
    nni_log(LogType.Debug, str(meta_list))
chicm-ms's avatar
chicm-ms committed
181
182
    nni_log(LogType.Debug,
            'NNI_SYS_DIR: {}, trial Id: {}, experiment ID: {}'.format(NNI_SYS_DIR, NNI_TRIAL_JOB_ID, NNI_EXP_ID))
183
184
185
186
187
188
189
190
    nni_log(LogType.Debug, 'NNI_SYS_DIR files: {}'.format(os.listdir(NNI_SYS_DIR)))
    for meta in meta_list:
        if meta['experimentId'] == NNI_EXP_ID and meta['trialId'] == NNI_TRIAL_JOB_ID:
            param_fp = os.path.join(NNI_SYS_DIR, os.path.basename(meta['filePath']))
            if not os.path.exists(param_fp):
                hdfs_client = get_hdfs_client(args)
                copyHdfsFileToLocal(meta['filePath'], param_fp, hdfs_client, override=False)

chicm-ms's avatar
chicm-ms committed
191

192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
def fetch_parameter_file(args):
    class FetchThread(threading.Thread):
        def __init__(self, args):
            super(FetchThread, self).__init__()
            self.args = args

        def run(self):
            uri = gen_parameter_meta_url(self.args.nnimanager_ip, self.args.nnimanager_port)
            nni_log(LogType.Info, uri)

            while True:
                res = rest_get(uri, 10)
                nni_log(LogType.Debug, 'status code: {}'.format(res.status_code))
                if res.status_code == 200:
                    meta_list = res.json()
                    download_parameter(meta_list, self.args)
                else:
                    nni_log(LogType.Warning, 'rest response: {}'.format(str(res)))
                time.sleep(5)

    fetch_file_thread = FetchThread(args)
    fetch_file_thread.start()

chicm-ms's avatar
chicm-ms committed
215

216
217
218
219
220
if __name__ == '__main__':
    '''NNI Trial Keeper main function'''
    PARSER = argparse.ArgumentParser()
    PARSER.set_defaults(func=trial_keeper_help_info)
    PARSER.add_argument('--trial_command', type=str, help='Command to launch trial process')
221
222
    PARSER.add_argument('--nnimanager_ip', type=str, default='localhost', help='NNI manager rest server IP')
    PARSER.add_argument('--nnimanager_port', type=str, default='8081', help='NNI manager rest server port')
chicm-ms's avatar
chicm-ms committed
223
    PARSER.add_argument('--pai_hdfs_output_dir', type=str, help='the output dir of pai_hdfs')  # backward compatibility
224
    PARSER.add_argument('--hdfs_output_dir', type=str, help='the output dir of hdfs')
chicm-ms's avatar
chicm-ms committed
225
    PARSER.add_argument('--pai_hdfs_host', type=str, help='the host of pai_hdfs')  # backward compatibility
226
    PARSER.add_argument('--hdfs_host', type=str, help='the host of hdfs')
227
    PARSER.add_argument('--pai_user_name', type=str, help='the username of hdfs')
228
    PARSER.add_argument('--nni_hdfs_exp_dir', type=str, help='nni experiment directory in hdfs')
229
    PARSER.add_argument('--webhdfs_path', type=str, help='the webhdfs path used in webhdfs URL')
230
    PARSER.add_argument('--nni_manager_version', type=str, help='the nni version transmitted from nniManager')
SparkSnail's avatar
SparkSnail committed
231
    PARSER.add_argument('--log_collection', type=str, help='set the way to collect log in trialkeeper')
232
    PARSER.add_argument('--job_id_file', type=str, help='set job id file for operating and monitoring job.')
233
234
235
    args, unknown = PARSER.parse_known_args()
    if args.trial_command is None:
        exit(1)
236
    check_version(args)
237
    try:
238
        if NNI_PLATFORM == 'paiYarn' and is_multi_phase():
239
            fetch_parameter_file(args)
240
        main_loop(args)
241
    except SystemExit as se:
fishyds's avatar
fishyds committed
242
        nni_log(LogType.Info, 'NNI trial keeper exit with code {}'.format(se.code))
243
        os._exit(se.code)
244
    except Exception as e:
fishyds's avatar
fishyds committed
245
        nni_log(LogType.Error, 'Exit trial keeper with code 1 because Exception: {} is catched'.format(str(e)))
246
        os._exit(1)