trial_keeper.py 8.03 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# ============================================================================================================================== #
# Copyright (c) Microsoft Corporation
# All rights reserved.
#
# MIT License
#
# Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
# documentation files (the "Software"), to deal in the Software without restriction, including without limitation
# the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and
# to permit persons to whom the Software is furnished to do so, subject to the following conditions:
# The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING
# BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
# DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
# ============================================================================================================================== #

import argparse
import sys
import os
from subprocess import Popen, PIPE
import time
import logging
import shlex
import re
28
29
import sys
import select
30
from pyhdfs import HdfsClient
31
import pkg_resources
32

33
from .constants import HOME_DIR, LOG_DIR, NNI_PLATFORM, STDOUT_FULL_PATH, STDERR_FULL_PATH
34
from .hdfsClientUtility import copyDirectoryToHdfs, copyHdfsDirectoryToLocal
35
from .log_utils import LogType, nni_log, RemoteLogger, PipeLogReader, StdOutputType
36
37
38
39
40
41
42
43
44
45
46

logger = logging.getLogger('trial_keeper')

def main_loop(args):
    '''main loop logic for trial keeper'''
    
    if not os.path.exists(LOG_DIR):
        os.makedirs(LOG_DIR)
    
    stdout_file = open(STDOUT_FULL_PATH, 'a+')
    stderr_file = open(STDERR_FULL_PATH, 'a+')
SparkSnail's avatar
SparkSnail committed
47
    trial_keeper_syslogger = RemoteLogger(args.nnimanager_ip, args.nnimanager_port, 'trial_keeper', StdOutputType.Stdout, args.log_collection)
48
    # redirect trial keeper's stdout and stderr to syslog
SparkSnail's avatar
SparkSnail committed
49
    trial_syslogger_stdout = RemoteLogger(args.nnimanager_ip, args.nnimanager_port, 'trial', StdOutputType.Stdout, args.log_collection)
50
    sys.stdout = sys.stderr = trial_keeper_syslogger
51
52
53
54
55
56
57
58
59
60
61
    # backward compatibility
    hdfs_host = None
    hdfs_output_dir = None
    if args.hdfs_host:
        hdfs_host = args.hdfs_host
    elif args.pai_hdfs_host:
        hdfs_host = args.pai_hdfs_host
    if args.hdfs_output_dir:
        hdfs_output_dir = args.hdfs_output_dir
    elif args.pai_hdfs_output_dir:
        hdfs_output_dir = args.pai_hdfs_output_dir
62

63
    if hdfs_host is not None and args.nni_hdfs_exp_dir is not None:
64
        try:
65
66
67
68
69
            if args.webhdfs_path:
                hdfs_client = HdfsClient(hosts='{0}:80'.format(hdfs_host), user_name=args.pai_user_name, webhdfs_path=args.webhdfs_path, timeout=5)
            else:
                # backward compatibility
                hdfs_client = HdfsClient(hosts='{0}:{1}'.format(hdfs_host, '50070'), user_name=args.pai_user_name, timeout=5)
70
71
72
73
        except Exception as e:
            nni_log(LogType.Error, 'Create HDFS client error: ' + str(e))
            raise e
        copyHdfsDirectoryToLocal(args.nni_hdfs_exp_dir, os.getcwd(), hdfs_client)
74

75
    # Notice: We don't appoint env, which means subprocess wil inherit current environment and that is expected behavior
76
77
    log_pipe_stdout = trial_syslogger_stdout.get_pipelog_reader()
    process = Popen(args.trial_command, shell = True, stdout = log_pipe_stdout, stderr = log_pipe_stdout)
fishyds's avatar
fishyds committed
78
    nni_log(LogType.Info, 'Trial keeper spawns a subprocess (pid {0}) to run command: {1}'.format(process.pid, shlex.split(args.trial_command)))
79

80
81
    while True:
        retCode = process.poll()
82
        # child worker process exits and all stdout data is read
83
        if retCode is not None and log_pipe_stdout.set_process_exit() and log_pipe_stdout.is_read_completed == True:
fishyds's avatar
fishyds committed
84
            nni_log(LogType.Info, 'subprocess terminated. Exit code is {}. Quit'.format(retCode))
85
            if hdfs_output_dir is not None:
86
87
88
                # Copy local directory to hdfs for OpenPAI
                nni_local_output_dir = os.environ['NNI_OUTPUT_DIR']
                try:
89
90
                    if copyDirectoryToHdfs(nni_local_output_dir, hdfs_output_dir, hdfs_client):
                        nni_log(LogType.Info, 'copy directory from {0} to {1} success!'.format(nni_local_output_dir, hdfs_output_dir))
91
                    else:
92
                        nni_log(LogType.Info, 'copy directory from {0} to {1} failed!'.format(nni_local_output_dir, hdfs_output_dir))
fishyds's avatar
fishyds committed
93
94
95
                except Exception as e:
                    nni_log(LogType.Error, 'HDFS copy directory got exception: ' + str(e))
                    raise e
96
97
98

            ## Exit as the retCode of subprocess(trial)
            exit(retCode)
99
100
101
102
103
104
105
            break

        time.sleep(2)

def trial_keeper_help_info(*args):
    print('please run --help to see guidance')

106
107
108
109
110
def check_version(args):
    try:
        trial_keeper_version = pkg_resources.get_distribution('nni').version
    except pkg_resources.ResolutionError as err:
        #package nni does not exist, try nni-tool package
SparkSnail's avatar
SparkSnail committed
111
112
        nni_log(LogType.Error, 'Package nni does not exist!')
        os._exit(1)
113
114
115
116
    if not args.version:
        # skip version check
        nni_log(LogType.Warning, 'Skipping version check!')
    else:
SparkSnail's avatar
SparkSnail committed
117
118
119
120
121
122
123
124
125
126
127
128
129
        regular = re.compile('v?(?P<version>[0-9](\.[0-9]){0,2}).*')
        try:
            trial_keeper_version = regular.search(trial_keeper_version).group('version')
            nni_log(LogType.Info, 'trial_keeper_version is {0}'.format(trial_keeper_version))
            training_service_version = regular.search(args.version).group('version')
            nni_log(LogType.Info, 'training_service_version is {0}'.format(training_service_version))
            if trial_keeper_version != training_service_version:
                nni_log(LogType.Error, 'Version does not match!')
                os._exit(1)
            else:
                nni_log(LogType.Info, 'Version match!')
        except AttributeError as err:
            nni_log(LogType.Error, err)
130

131
132
133
134
135
if __name__ == '__main__':
    '''NNI Trial Keeper main function'''
    PARSER = argparse.ArgumentParser()
    PARSER.set_defaults(func=trial_keeper_help_info)
    PARSER.add_argument('--trial_command', type=str, help='Command to launch trial process')
136
137
    PARSER.add_argument('--nnimanager_ip', type=str, default='localhost', help='NNI manager rest server IP')
    PARSER.add_argument('--nnimanager_port', type=str, default='8081', help='NNI manager rest server port')
138
139
140
141
    PARSER.add_argument('--pai_hdfs_output_dir', type=str, help='the output dir of pai_hdfs') # backward compatibility
    PARSER.add_argument('--hdfs_output_dir', type=str, help='the output dir of hdfs')
    PARSER.add_argument('--pai_hdfs_host', type=str, help='the host of pai_hdfs') # backward compatibility
    PARSER.add_argument('--hdfs_host', type=str, help='the host of hdfs')
142
    PARSER.add_argument('--pai_user_name', type=str, help='the username of hdfs')
143
    PARSER.add_argument('--nni_hdfs_exp_dir', type=str, help='nni experiment directory in hdfs')
144
    PARSER.add_argument('--webhdfs_path', type=str, help='the webhdfs path used in webhdfs URL')
145
    PARSER.add_argument('--version', type=str, help='the nni version transmitted from trainingService')
SparkSnail's avatar
SparkSnail committed
146
    PARSER.add_argument('--log_collection', type=str, help='set the way to collect log in trialkeeper')
147
148
149
    args, unknown = PARSER.parse_known_args()
    if args.trial_command is None:
        exit(1)
150
    check_version(args)
151
152
    try:
        main_loop(args)
153
    except SystemExit as se:
fishyds's avatar
fishyds committed
154
        nni_log(LogType.Info, 'NNI trial keeper exit with code {}'.format(se.code))
155
        os._exit(se.code)
156
    except Exception as e:
fishyds's avatar
fishyds committed
157
        nni_log(LogType.Error, 'Exit trial keeper with code 1 because Exception: {} is catched'.format(str(e)))
158
        os._exit(1)
159