trial_keeper.py 11.1 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# ============================================================================================================================== #
# Copyright (c) Microsoft Corporation
# All rights reserved.
#
# MIT License
#
# Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
# documentation files (the "Software"), to deal in the Software without restriction, including without limitation
# the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and
# to permit persons to whom the Software is furnished to do so, subject to the following conditions:
# The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING
# BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
# DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
# ============================================================================================================================== #

import argparse
import sys
import os
from subprocess import Popen, PIPE
import time
import logging
import shlex
import re
28
29
import sys
import select
30
import json
31
import threading
32
from pyhdfs import HdfsClient
33
import pkg_resources
34
35
from .rest_utils import rest_post, rest_get
from .url_utils import gen_send_stdout_url, gen_send_version_url, gen_parameter_meta_url
36

37
38
39
from .constants import HOME_DIR, LOG_DIR, NNI_PLATFORM, STDOUT_FULL_PATH, STDERR_FULL_PATH, \
    MULTI_PHASE, NNI_TRIAL_JOB_ID, NNI_SYS_DIR, NNI_EXP_ID
from .hdfsClientUtility import copyDirectoryToHdfs, copyHdfsDirectoryToLocal, copyHdfsFileToLocal
40
from .log_utils import LogType, nni_log, RemoteLogger, PipeLogReader, StdOutputType
41
42

logger = logging.getLogger('trial_keeper')
43
regular = re.compile('v?(?P<version>[0-9](\.[0-9]){0,1}).*')
44

45
_hdfs_client = None
46

47
48
def get_hdfs_client(args):
    global _hdfs_client
49

50
51
    if _hdfs_client is not None:
        return _hdfs_client
52
53
54
55
56
57
58
    # backward compatibility
    hdfs_host = None
    hdfs_output_dir = None
    if args.hdfs_host:
        hdfs_host = args.hdfs_host
    elif args.pai_hdfs_host:
        hdfs_host = args.pai_hdfs_host
59
60
    else:
        return None
61

62
    if hdfs_host is not None and args.nni_hdfs_exp_dir is not None:
63
        try:
64
            if args.webhdfs_path:
65
                _hdfs_client = HdfsClient(hosts='{0}:80'.format(hdfs_host), user_name=args.pai_user_name, webhdfs_path=args.webhdfs_path, timeout=5)
66
67
            else:
                # backward compatibility
68
                _hdfs_client = HdfsClient(hosts='{0}:{1}'.format(hdfs_host, '50070'), user_name=args.pai_user_name, timeout=5)
69
70
71
        except Exception as e:
            nni_log(LogType.Error, 'Create HDFS client error: ' + str(e))
            raise e
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
    return _hdfs_client

def main_loop(args):
    '''main loop logic for trial keeper'''

    if not os.path.exists(LOG_DIR):
        os.makedirs(LOG_DIR)

    stdout_file = open(STDOUT_FULL_PATH, 'a+')
    stderr_file = open(STDERR_FULL_PATH, 'a+')
    trial_keeper_syslogger = RemoteLogger(args.nnimanager_ip, args.nnimanager_port, 'trial_keeper', StdOutputType.Stdout, args.log_collection)
    # redirect trial keeper's stdout and stderr to syslog
    trial_syslogger_stdout = RemoteLogger(args.nnimanager_ip, args.nnimanager_port, 'trial', StdOutputType.Stdout, args.log_collection)
    sys.stdout = sys.stderr = trial_keeper_syslogger
    if args.hdfs_output_dir:
        hdfs_output_dir = args.hdfs_output_dir
    elif args.pai_hdfs_output_dir:
        hdfs_output_dir = args.pai_hdfs_output_dir

    hdfs_client = get_hdfs_client(args)

    if hdfs_client is not None:
94
        copyHdfsDirectoryToLocal(args.nni_hdfs_exp_dir, os.getcwd(), hdfs_client)
95

96
    # Notice: We don't appoint env, which means subprocess wil inherit current environment and that is expected behavior
97
98
    log_pipe_stdout = trial_syslogger_stdout.get_pipelog_reader()
    process = Popen(args.trial_command, shell = True, stdout = log_pipe_stdout, stderr = log_pipe_stdout)
fishyds's avatar
fishyds committed
99
    nni_log(LogType.Info, 'Trial keeper spawns a subprocess (pid {0}) to run command: {1}'.format(process.pid, shlex.split(args.trial_command)))
100

101
102
    while True:
        retCode = process.poll()
103
        # child worker process exits and all stdout data is read
104
        if retCode is not None and log_pipe_stdout.set_process_exit() and log_pipe_stdout.is_read_completed == True:
fishyds's avatar
fishyds committed
105
            nni_log(LogType.Info, 'subprocess terminated. Exit code is {}. Quit'.format(retCode))
106
            if hdfs_output_dir is not None:
107
108
109
                # Copy local directory to hdfs for OpenPAI
                nni_local_output_dir = os.environ['NNI_OUTPUT_DIR']
                try:
110
111
                    if copyDirectoryToHdfs(nni_local_output_dir, hdfs_output_dir, hdfs_client):
                        nni_log(LogType.Info, 'copy directory from {0} to {1} success!'.format(nni_local_output_dir, hdfs_output_dir))
112
                    else:
113
                        nni_log(LogType.Info, 'copy directory from {0} to {1} failed!'.format(nni_local_output_dir, hdfs_output_dir))
fishyds's avatar
fishyds committed
114
115
116
                except Exception as e:
                    nni_log(LogType.Error, 'HDFS copy directory got exception: ' + str(e))
                    raise e
117
118
119

            ## Exit as the retCode of subprocess(trial)
            exit(retCode)
120
121
122
123
124
125
126
            break

        time.sleep(2)

def trial_keeper_help_info(*args):
    print('please run --help to see guidance')

127
128
129
130
131
def check_version(args):
    try:
        trial_keeper_version = pkg_resources.get_distribution('nni').version
    except pkg_resources.ResolutionError as err:
        #package nni does not exist, try nni-tool package
SparkSnail's avatar
SparkSnail committed
132
133
        nni_log(LogType.Error, 'Package nni does not exist!')
        os._exit(1)
134
    if not args.nni_manager_version:
135
136
137
        # skip version check
        nni_log(LogType.Warning, 'Skipping version check!')
    else:
SparkSnail's avatar
SparkSnail committed
138
139
140
        try:
            trial_keeper_version = regular.search(trial_keeper_version).group('version')
            nni_log(LogType.Info, 'trial_keeper_version is {0}'.format(trial_keeper_version))
141
142
143
144
            nni_manager_version = regular.search(args.nni_manager_version).group('version')
            nni_log(LogType.Info, 'nni_manager_version is {0}'.format(nni_manager_version))
            log_entry = {}
            if trial_keeper_version != nni_manager_version:
SparkSnail's avatar
SparkSnail committed
145
                nni_log(LogType.Error, 'Version does not match!')
146
147
148
149
                error_message = 'NNIManager version is {0}, TrialKeeper version is {1}, NNI version does not match!'.format(nni_manager_version, trial_keeper_version)
                log_entry['tag'] = 'VCFail'
                log_entry['msg'] = error_message
                rest_post(gen_send_version_url(args.nnimanager_ip, args.nnimanager_port), json.dumps(log_entry), 10, False)
SparkSnail's avatar
SparkSnail committed
150
151
152
                os._exit(1)
            else:
                nni_log(LogType.Info, 'Version match!')
153
154
                log_entry['tag'] = 'VCSuccess'
                rest_post(gen_send_version_url(args.nnimanager_ip, args.nnimanager_port), json.dumps(log_entry), 10, False)
SparkSnail's avatar
SparkSnail committed
155
156
        except AttributeError as err:
            nni_log(LogType.Error, err)
157

158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
def is_multi_phase():
    return MULTI_PHASE and (MULTI_PHASE in ['True', 'true'])

def download_parameter(meta_list, args):
    """
    Download parameter file to local working directory.
    meta_list format is defined in paiJobRestServer.ts
    example meta_list:
    [
        {"experimentId":"yWFJarYa","trialId":"UpPkl","filePath":"/chec/nni/experiments/yWFJarYa/trials/UpPkl/parameter_1.cfg"},
        {"experimentId":"yWFJarYa","trialId":"aIUMA","filePath":"/chec/nni/experiments/yWFJarYa/trials/aIUMA/parameter_1.cfg"}
    ]
    """
    nni_log(LogType.Debug, str(meta_list))
    nni_log(LogType.Debug, 'NNI_SYS_DIR: {}, trial Id: {}, experiment ID: {}'.format(NNI_SYS_DIR, NNI_TRIAL_JOB_ID, NNI_EXP_ID))
    nni_log(LogType.Debug, 'NNI_SYS_DIR files: {}'.format(os.listdir(NNI_SYS_DIR)))
    for meta in meta_list:
        if meta['experimentId'] == NNI_EXP_ID and meta['trialId'] == NNI_TRIAL_JOB_ID:
            param_fp = os.path.join(NNI_SYS_DIR, os.path.basename(meta['filePath']))
            if not os.path.exists(param_fp):
                hdfs_client = get_hdfs_client(args)
                copyHdfsFileToLocal(meta['filePath'], param_fp, hdfs_client, override=False)

def fetch_parameter_file(args):
    class FetchThread(threading.Thread):
        def __init__(self, args):
            super(FetchThread, self).__init__()
            self.args = args

        def run(self):
            uri = gen_parameter_meta_url(self.args.nnimanager_ip, self.args.nnimanager_port)
            nni_log(LogType.Info, uri)

            while True:
                res = rest_get(uri, 10)
                nni_log(LogType.Debug, 'status code: {}'.format(res.status_code))
                if res.status_code == 200:
                    meta_list = res.json()
                    download_parameter(meta_list, self.args)
                else:
                    nni_log(LogType.Warning, 'rest response: {}'.format(str(res)))
                time.sleep(5)

    fetch_file_thread = FetchThread(args)
    fetch_file_thread.start()

204
205
206
207
208
if __name__ == '__main__':
    '''NNI Trial Keeper main function'''
    PARSER = argparse.ArgumentParser()
    PARSER.set_defaults(func=trial_keeper_help_info)
    PARSER.add_argument('--trial_command', type=str, help='Command to launch trial process')
209
210
    PARSER.add_argument('--nnimanager_ip', type=str, default='localhost', help='NNI manager rest server IP')
    PARSER.add_argument('--nnimanager_port', type=str, default='8081', help='NNI manager rest server port')
211
212
213
214
    PARSER.add_argument('--pai_hdfs_output_dir', type=str, help='the output dir of pai_hdfs') # backward compatibility
    PARSER.add_argument('--hdfs_output_dir', type=str, help='the output dir of hdfs')
    PARSER.add_argument('--pai_hdfs_host', type=str, help='the host of pai_hdfs') # backward compatibility
    PARSER.add_argument('--hdfs_host', type=str, help='the host of hdfs')
215
    PARSER.add_argument('--pai_user_name', type=str, help='the username of hdfs')
216
    PARSER.add_argument('--nni_hdfs_exp_dir', type=str, help='nni experiment directory in hdfs')
217
    PARSER.add_argument('--webhdfs_path', type=str, help='the webhdfs path used in webhdfs URL')
218
    PARSER.add_argument('--nni_manager_version', type=str, help='the nni version transmitted from nniManager')
SparkSnail's avatar
SparkSnail committed
219
    PARSER.add_argument('--log_collection', type=str, help='set the way to collect log in trialkeeper')
220
221
222
    args, unknown = PARSER.parse_known_args()
    if args.trial_command is None:
        exit(1)
223
    check_version(args)
224
    try:
225
226
        if is_multi_phase():
            fetch_parameter_file(args)
227
        main_loop(args)
228
    except SystemExit as se:
fishyds's avatar
fishyds committed
229
        nni_log(LogType.Info, 'NNI trial keeper exit with code {}'.format(se.code))
230
        os._exit(se.code)
231
    except Exception as e:
fishyds's avatar
fishyds committed
232
        nni_log(LogType.Error, 'Exit trial keeper with code 1 because Exception: {} is catched'.format(str(e)))
233
        os._exit(1)
234