trial_keeper.py 11.5 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# ============================================================================================================================== #
# Copyright (c) Microsoft Corporation
# All rights reserved.
#
# MIT License
#
# Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
# documentation files (the "Software"), to deal in the Software without restriction, including without limitation
# the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and
# to permit persons to whom the Software is furnished to do so, subject to the following conditions:
# The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING
# BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
# DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
# ============================================================================================================================== #

import argparse
import os
chicm-ms's avatar
chicm-ms committed
22
from subprocess import Popen
23
24
25
26
import time
import logging
import shlex
import re
27
import sys
28
import json
29
import threading
30
from pyhdfs import HdfsClient
31
import pkg_resources
32
from .rest_utils import rest_post, rest_get
chicm-ms's avatar
chicm-ms committed
33
from .url_utils import gen_send_version_url, gen_parameter_meta_url
34

chicm-ms's avatar
chicm-ms committed
35
from .constants import LOG_DIR, NNI_PLATFORM, MULTI_PHASE, NNI_TRIAL_JOB_ID, NNI_SYS_DIR, NNI_EXP_ID
36
from .hdfsClientUtility import copyDirectoryToHdfs, copyHdfsDirectoryToLocal, copyHdfsFileToLocal
chicm-ms's avatar
chicm-ms committed
37
from .log_utils import LogType, nni_log, RemoteLogger, StdOutputType
38
39

logger = logging.getLogger('trial_keeper')
40
regular = re.compile('v?(?P<version>[0-9](\.[0-9]){0,1}).*')
41

42
_hdfs_client = None
43

chicm-ms's avatar
chicm-ms committed
44

45
46
def get_hdfs_client(args):
    global _hdfs_client
47

48
49
    if _hdfs_client is not None:
        return _hdfs_client
50
51
    # backward compatibility
    hdfs_host = None
chicm-ms's avatar
chicm-ms committed
52

53
54
55
56
    if args.hdfs_host:
        hdfs_host = args.hdfs_host
    elif args.pai_hdfs_host:
        hdfs_host = args.pai_hdfs_host
57
58
    else:
        return None
59

60
    if hdfs_host is not None and args.nni_hdfs_exp_dir is not None:
61
        try:
62
            if args.webhdfs_path:
chicm-ms's avatar
chicm-ms committed
63
64
                _hdfs_client = HdfsClient(hosts='{0}:80'.format(hdfs_host), user_name=args.pai_user_name,
                                          webhdfs_path=args.webhdfs_path, timeout=5)
65
66
            else:
                # backward compatibility
chicm-ms's avatar
chicm-ms committed
67
68
                _hdfs_client = HdfsClient(hosts='{0}:{1}'.format(hdfs_host, '50070'), user_name=args.pai_user_name,
                                          timeout=5)
69
70
71
        except Exception as e:
            nni_log(LogType.Error, 'Create HDFS client error: ' + str(e))
            raise e
72
73
    return _hdfs_client

chicm-ms's avatar
chicm-ms committed
74

75
76
77
78
79
80
def main_loop(args):
    '''main loop logic for trial keeper'''

    if not os.path.exists(LOG_DIR):
        os.makedirs(LOG_DIR)

chicm-ms's avatar
chicm-ms committed
81
82
    trial_keeper_syslogger = RemoteLogger(args.nnimanager_ip, args.nnimanager_port, 'trial_keeper',
                                          StdOutputType.Stdout, args.log_collection)
83
    # redirect trial keeper's stdout and stderr to syslog
chicm-ms's avatar
chicm-ms committed
84
85
    trial_syslogger_stdout = RemoteLogger(args.nnimanager_ip, args.nnimanager_port, 'trial', StdOutputType.Stdout,
                                          args.log_collection)
86
    sys.stdout = sys.stderr = trial_keeper_syslogger
chicm-ms's avatar
chicm-ms committed
87
88
    hdfs_output_dir = None

89
90
91
92
93
94
95
96
    if args.hdfs_output_dir:
        hdfs_output_dir = args.hdfs_output_dir
    elif args.pai_hdfs_output_dir:
        hdfs_output_dir = args.pai_hdfs_output_dir

    hdfs_client = get_hdfs_client(args)

    if hdfs_client is not None:
97
        copyHdfsDirectoryToLocal(args.nni_hdfs_exp_dir, os.getcwd(), hdfs_client)
98

99
    # Notice: We don't appoint env, which means subprocess wil inherit current environment and that is expected behavior
100
    log_pipe_stdout = trial_syslogger_stdout.get_pipelog_reader()
chicm-ms's avatar
chicm-ms committed
101
102
103
104
    process = Popen(args.trial_command, shell=True, stdout=log_pipe_stdout, stderr=log_pipe_stdout)
    nni_log(LogType.Info, 'Trial keeper spawns a subprocess (pid {0}) to run command: {1}'.format(process.pid,
                                                                                                  shlex.split(
                                                                                                      args.trial_command)))
105

106
107
    while True:
        retCode = process.poll()
108
        # child worker process exits and all stdout data is read
109
        if retCode is not None and log_pipe_stdout.set_process_exit() and log_pipe_stdout.is_read_completed == True:
fishyds's avatar
fishyds committed
110
            nni_log(LogType.Info, 'subprocess terminated. Exit code is {}. Quit'.format(retCode))
111
            if hdfs_output_dir is not None:
112
113
114
                # Copy local directory to hdfs for OpenPAI
                nni_local_output_dir = os.environ['NNI_OUTPUT_DIR']
                try:
115
                    if copyDirectoryToHdfs(nni_local_output_dir, hdfs_output_dir, hdfs_client):
chicm-ms's avatar
chicm-ms committed
116
117
                        nni_log(LogType.Info,
                                'copy directory from {0} to {1} success!'.format(nni_local_output_dir, hdfs_output_dir))
118
                    else:
chicm-ms's avatar
chicm-ms committed
119
120
                        nni_log(LogType.Info,
                                'copy directory from {0} to {1} failed!'.format(nni_local_output_dir, hdfs_output_dir))
fishyds's avatar
fishyds committed
121
122
123
                except Exception as e:
                    nni_log(LogType.Error, 'HDFS copy directory got exception: ' + str(e))
                    raise e
124
125
126

            ## Exit as the retCode of subprocess(trial)
            exit(retCode)
127
128
129
130
            break

        time.sleep(2)

chicm-ms's avatar
chicm-ms committed
131

132
133
134
def trial_keeper_help_info(*args):
    print('please run --help to see guidance')

chicm-ms's avatar
chicm-ms committed
135

136
137
138
139
def check_version(args):
    try:
        trial_keeper_version = pkg_resources.get_distribution('nni').version
    except pkg_resources.ResolutionError as err:
chicm-ms's avatar
chicm-ms committed
140
        # package nni does not exist, try nni-tool package
SparkSnail's avatar
SparkSnail committed
141
142
        nni_log(LogType.Error, 'Package nni does not exist!')
        os._exit(1)
143
    if not args.nni_manager_version:
144
145
146
        # skip version check
        nni_log(LogType.Warning, 'Skipping version check!')
    else:
SparkSnail's avatar
SparkSnail committed
147
148
149
        try:
            trial_keeper_version = regular.search(trial_keeper_version).group('version')
            nni_log(LogType.Info, 'trial_keeper_version is {0}'.format(trial_keeper_version))
150
151
152
153
            nni_manager_version = regular.search(args.nni_manager_version).group('version')
            nni_log(LogType.Info, 'nni_manager_version is {0}'.format(nni_manager_version))
            log_entry = {}
            if trial_keeper_version != nni_manager_version:
SparkSnail's avatar
SparkSnail committed
154
                nni_log(LogType.Error, 'Version does not match!')
chicm-ms's avatar
chicm-ms committed
155
156
                error_message = 'NNIManager version is {0}, TrialKeeper version is {1}, NNI version does not match!'.format(
                    nni_manager_version, trial_keeper_version)
157
158
                log_entry['tag'] = 'VCFail'
                log_entry['msg'] = error_message
chicm-ms's avatar
chicm-ms committed
159
160
                rest_post(gen_send_version_url(args.nnimanager_ip, args.nnimanager_port), json.dumps(log_entry), 10,
                          False)
SparkSnail's avatar
SparkSnail committed
161
162
163
                os._exit(1)
            else:
                nni_log(LogType.Info, 'Version match!')
164
                log_entry['tag'] = 'VCSuccess'
chicm-ms's avatar
chicm-ms committed
165
166
                rest_post(gen_send_version_url(args.nnimanager_ip, args.nnimanager_port), json.dumps(log_entry), 10,
                          False)
SparkSnail's avatar
SparkSnail committed
167
168
        except AttributeError as err:
            nni_log(LogType.Error, err)
169

chicm-ms's avatar
chicm-ms committed
170

171
172
173
def is_multi_phase():
    return MULTI_PHASE and (MULTI_PHASE in ['True', 'true'])

chicm-ms's avatar
chicm-ms committed
174

175
176
177
178
179
180
181
182
183
184
185
def download_parameter(meta_list, args):
    """
    Download parameter file to local working directory.
    meta_list format is defined in paiJobRestServer.ts
    example meta_list:
    [
        {"experimentId":"yWFJarYa","trialId":"UpPkl","filePath":"/chec/nni/experiments/yWFJarYa/trials/UpPkl/parameter_1.cfg"},
        {"experimentId":"yWFJarYa","trialId":"aIUMA","filePath":"/chec/nni/experiments/yWFJarYa/trials/aIUMA/parameter_1.cfg"}
    ]
    """
    nni_log(LogType.Debug, str(meta_list))
chicm-ms's avatar
chicm-ms committed
186
187
    nni_log(LogType.Debug,
            'NNI_SYS_DIR: {}, trial Id: {}, experiment ID: {}'.format(NNI_SYS_DIR, NNI_TRIAL_JOB_ID, NNI_EXP_ID))
188
189
190
191
192
193
194
195
    nni_log(LogType.Debug, 'NNI_SYS_DIR files: {}'.format(os.listdir(NNI_SYS_DIR)))
    for meta in meta_list:
        if meta['experimentId'] == NNI_EXP_ID and meta['trialId'] == NNI_TRIAL_JOB_ID:
            param_fp = os.path.join(NNI_SYS_DIR, os.path.basename(meta['filePath']))
            if not os.path.exists(param_fp):
                hdfs_client = get_hdfs_client(args)
                copyHdfsFileToLocal(meta['filePath'], param_fp, hdfs_client, override=False)

chicm-ms's avatar
chicm-ms committed
196

197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
def fetch_parameter_file(args):
    class FetchThread(threading.Thread):
        def __init__(self, args):
            super(FetchThread, self).__init__()
            self.args = args

        def run(self):
            uri = gen_parameter_meta_url(self.args.nnimanager_ip, self.args.nnimanager_port)
            nni_log(LogType.Info, uri)

            while True:
                res = rest_get(uri, 10)
                nni_log(LogType.Debug, 'status code: {}'.format(res.status_code))
                if res.status_code == 200:
                    meta_list = res.json()
                    download_parameter(meta_list, self.args)
                else:
                    nni_log(LogType.Warning, 'rest response: {}'.format(str(res)))
                time.sleep(5)

    fetch_file_thread = FetchThread(args)
    fetch_file_thread.start()

chicm-ms's avatar
chicm-ms committed
220

221
222
223
224
225
if __name__ == '__main__':
    '''NNI Trial Keeper main function'''
    PARSER = argparse.ArgumentParser()
    PARSER.set_defaults(func=trial_keeper_help_info)
    PARSER.add_argument('--trial_command', type=str, help='Command to launch trial process')
226
227
    PARSER.add_argument('--nnimanager_ip', type=str, default='localhost', help='NNI manager rest server IP')
    PARSER.add_argument('--nnimanager_port', type=str, default='8081', help='NNI manager rest server port')
chicm-ms's avatar
chicm-ms committed
228
    PARSER.add_argument('--pai_hdfs_output_dir', type=str, help='the output dir of pai_hdfs')  # backward compatibility
229
    PARSER.add_argument('--hdfs_output_dir', type=str, help='the output dir of hdfs')
chicm-ms's avatar
chicm-ms committed
230
    PARSER.add_argument('--pai_hdfs_host', type=str, help='the host of pai_hdfs')  # backward compatibility
231
    PARSER.add_argument('--hdfs_host', type=str, help='the host of hdfs')
232
    PARSER.add_argument('--pai_user_name', type=str, help='the username of hdfs')
233
    PARSER.add_argument('--nni_hdfs_exp_dir', type=str, help='nni experiment directory in hdfs')
234
    PARSER.add_argument('--webhdfs_path', type=str, help='the webhdfs path used in webhdfs URL')
235
    PARSER.add_argument('--nni_manager_version', type=str, help='the nni version transmitted from nniManager')
SparkSnail's avatar
SparkSnail committed
236
    PARSER.add_argument('--log_collection', type=str, help='set the way to collect log in trialkeeper')
237
238
239
    args, unknown = PARSER.parse_known_args()
    if args.trial_command is None:
        exit(1)
240
    check_version(args)
241
    try:
242
        if NNI_PLATFORM == 'pai' and is_multi_phase():
243
            fetch_parameter_file(args)
244
        main_loop(args)
245
    except SystemExit as se:
fishyds's avatar
fishyds committed
246
        nni_log(LogType.Info, 'NNI trial keeper exit with code {}'.format(se.code))
247
        os._exit(se.code)
248
    except Exception as e:
fishyds's avatar
fishyds committed
249
        nni_log(LogType.Error, 'Exit trial keeper with code 1 because Exception: {} is catched'.format(str(e)))
250
        os._exit(1)