Unverified Commit 2b126039 authored by fishyds's avatar fishyds Committed by GitHub
Browse files

Trial keeper refactor (#411)

* [Trial keeper refactor] refactor trial keeper stdout output
parent 45484bfb
# Copyright (c) Microsoft Corporation
# All rights reserved.
#
# MIT License
#
# Permission is hereby granted, free of charge,
# to any person obtaining a copy of this software and associated
# documentation files (the "Software"), to deal in the Software without restriction,
# including without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and
# to permit persons to whom the Software is furnished to do so, subject to the following conditions:
# The above copyright notice and this permission notice shall be included
# in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING
# BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
# DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
from datetime import datetime
from enum import Enum, unique
@unique
class LogType(Enum):
Debug = 'DEBUG'
Info = 'INFO'
Warning = 'WARNING'
Error = 'ERROR'
Critical = 'CRITICAL'
def nni_log(log_type, log_message):
'''Log message into stdout'''
dt = datetime.now()
print('[{0}] {1} {2}'.format(dt, log_type.value, log_message))
\ No newline at end of file
...@@ -24,7 +24,9 @@ import os ...@@ -24,7 +24,9 @@ import os
import re import re
import requests import requests
from datetime import datetime
from .constants import BASE_URL from .constants import BASE_URL
from .log_utils import LogType, nni_log
from .rest_utils import rest_get, rest_post, rest_put, rest_delete from .rest_utils import rest_get, rest_post, rest_put, rest_delete
from .url_utils import gen_update_metrics_url from .url_utils import gen_update_metrics_url
...@@ -34,8 +36,6 @@ NNI_EXP_ID = os.environ['NNI_EXP_ID'] ...@@ -34,8 +36,6 @@ NNI_EXP_ID = os.environ['NNI_EXP_ID']
LEN_FIELD_SIZE = 6 LEN_FIELD_SIZE = 6
MAGIC = 'ME' MAGIC = 'ME'
print('In metrics_reader, NNI_SYS_DIR is {}'.format(NNI_SYS_DIR))
class TrialMetricsReader(): class TrialMetricsReader():
''' '''
Read metrics data from a trial job Read metrics data from a trial job
...@@ -71,14 +71,14 @@ class TrialMetricsReader(): ...@@ -71,14 +71,14 @@ class TrialMetricsReader():
def _read_all_available_records(self, offset): def _read_all_available_records(self, offset):
new_offset = offset new_offset = offset
metrics = [] metrics = []
with open(self.metrics_filename, 'r') as f: with open(self.metrics_filename, 'r') as f:
print('offset is {}'.format(offset))
f.seek(offset) f.seek(offset)
while True: while True:
magic_string = f.read(len(MAGIC)) magic_string = f.read(len(MAGIC))
# empty data means EOF # empty data means EOF
if not magic_string: if not magic_string:
break break
nni_log(LogType.Info, 'Metrics file offset is {}'.format(offset))
strdatalen = f.read(LEN_FIELD_SIZE) strdatalen = f.read(LEN_FIELD_SIZE)
# empty data means EOF # empty data means EOF
if not strdatalen: if not strdatalen:
...@@ -87,7 +87,7 @@ class TrialMetricsReader(): ...@@ -87,7 +87,7 @@ class TrialMetricsReader():
data = f.read(datalen) data = f.read(datalen)
if datalen > 0 and len(data) == datalen: if datalen > 0 and len(data) == datalen:
print('data is \'{}\''.format(data)) nni_log(LogType.Info, 'data is \'{}\''.format(data))
new_offset = f.tell() new_offset = f.tell()
metrics.append(data) metrics.append(data)
else: else:
...@@ -100,7 +100,6 @@ class TrialMetricsReader(): ...@@ -100,7 +100,6 @@ class TrialMetricsReader():
Read available metrics data for a trial Read available metrics data for a trial
''' '''
if self._metrics_file_is_empty(): if self._metrics_file_is_empty():
print('metrics is empty')
return [] return []
offset = self._get_offset() offset = self._get_offset()
...@@ -114,13 +113,13 @@ def read_experiment_metrics(nnimanager_ip, nnimanager_port): ...@@ -114,13 +113,13 @@ def read_experiment_metrics(nnimanager_ip, nnimanager_port):
try: try:
reader = TrialMetricsReader() reader = TrialMetricsReader()
result['jobId'] = NNI_TRIAL_JOB_ID result['jobId'] = NNI_TRIAL_JOB_ID
result['metrics'] = reader.read_trial_metrics() result['metrics'] = reader.read_trial_metrics()
print('Result metrics is {}'.format(json.dumps(result))) if len(result['metrics']) > 0:
if len(result['metrics']) > 0: nni_log(LogType.Info, 'Result metrics is {}'.format(json.dumps(result)))
response = rest_post(gen_update_metrics_url(BASE_URL.format(nnimanager_ip), nnimanager_port, NNI_EXP_ID, NNI_TRIAL_JOB_ID), json.dumps(result), 10) response = rest_post(gen_update_metrics_url(BASE_URL.format(nnimanager_ip), nnimanager_port, NNI_EXP_ID, NNI_TRIAL_JOB_ID), json.dumps(result), 10)
print('Response code is {}'.format(response.status_code)) nni_log(LogType.Info,'Report metrics to NNI manager completed, http response code is {}'.format(response.status_code))
except Exception: except Exception as e:
#TODO error logging to file #Error logging
pass nni_log(LogType.Error, 'Error when reading metrics data: ' + str(e))
return json.dumps(result) return json.dumps(result)
\ No newline at end of file
...@@ -27,8 +27,9 @@ import shlex ...@@ -27,8 +27,9 @@ import shlex
import re import re
from pyhdfs import HdfsClient from pyhdfs import HdfsClient
from .hdfsClientUtility import copyDirectoryToHdfs
from .constants import HOME_DIR, LOG_DIR, NNI_PLATFORM, STDOUT_FULL_PATH, STDERR_FULL_PATH from .constants import HOME_DIR, LOG_DIR, NNI_PLATFORM, STDOUT_FULL_PATH, STDERR_FULL_PATH
from .hdfsClientUtility import copyDirectoryToHdfs
from .log_utils import LogType, nni_log
from .metrics_reader import read_experiment_metrics from .metrics_reader import read_experiment_metrics
logger = logging.getLogger('trial_keeper') logger = logging.getLogger('trial_keeper')
...@@ -41,35 +42,33 @@ def main_loop(args): ...@@ -41,35 +42,33 @@ def main_loop(args):
stdout_file = open(STDOUT_FULL_PATH, 'a+') stdout_file = open(STDOUT_FULL_PATH, 'a+')
stderr_file = open(STDERR_FULL_PATH, 'a+') stderr_file = open(STDERR_FULL_PATH, 'a+')
print(shlex.split(args.trial_command))
# Notice: We don't appoint env, which means subprocess wil inherit current environment and that is expected behavior # Notice: We don't appoint env, which means subprocess wil inherit current environment and that is expected behavior
process = Popen(args.trial_command, shell = True, stdout = stdout_file, stderr = stderr_file) process = Popen(args.trial_command, shell = True, stdout = stdout_file, stderr = stderr_file)
print('Subprocess pid is {}'.format(process.pid)) nni_log(LogType.Info, 'Trial keeper spawns a subprocess (pid {0}) to run command: {1}'.format(process.pid, shlex.split(args.trial_command)))
while True: while True:
retCode = process.poll() retCode = process.poll()
## Read experiment metrics, to avoid missing metrics ## Read experiment metrics, to avoid missing metrics
read_experiment_metrics(args.nnimanager_ip, args.nnimanager_port) read_experiment_metrics(args.nnimanager_ip, args.nnimanager_port)
if retCode is not None: if retCode is not None:
print('subprocess terminated. Exit code is {}. Quit'.format(retCode)) nni_log(LogType.Info, 'subprocess terminated. Exit code is {}. Quit'.format(retCode))
if NNI_PLATFORM == 'pai': if NNI_PLATFORM == 'pai':
# Copy local directory to hdfs for OpenPAI # Copy local directory to hdfs for OpenPAI
nni_local_output_dir = os.environ['NNI_OUTPUT_DIR'] nni_local_output_dir = os.environ['NNI_OUTPUT_DIR']
try: try:
hdfs_client = HdfsClient(hosts='{0}:{1}'.format(args.pai_hdfs_host, '50070'), user_name=args.pai_user_name, timeout=5) hdfs_client = HdfsClient(hosts='{0}:{1}'.format(args.pai_hdfs_host, '50070'), user_name=args.pai_user_name, timeout=5)
if copyDirectoryToHdfs(nni_local_output_dir, args.pai_hdfs_output_dir, hdfs_client): if copyDirectoryToHdfs(nni_local_output_dir, args.pai_hdfs_output_dir, hdfs_client):
print('copy directory from {0} to {1} success!'.format(nni_local_output_dir, args.pai_hdfs_output_dir)) nni_log(LogType.Info, 'copy directory from {0} to {1} success!'.format(nni_local_output_dir, args.pai_hdfs_output_dir))
else: else:
print('copy directory from {0} to {1} failed!'.format(nni_local_output_dir, args.pai_hdfs_output_dir)) nni_log(LogType.Info, 'copy directory from {0} to {1} failed!'.format(nni_local_output_dir, args.pai_hdfs_output_dir))
except Exception as exception: except Exception as e:
print('HDFS copy directory got exception') nni_log(LogType.Error, 'HDFS copy directory got exception: ' + str(e))
raise exception raise e
## Exit as the retCode of subprocess(trial) ## Exit as the retCode of subprocess(trial)
exit(retCode) exit(retCode)
break break
else:
print('subprocess pid: {} is still alive'.format(process.pid))
time.sleep(2) time.sleep(2)
...@@ -93,9 +92,9 @@ if __name__ == '__main__': ...@@ -93,9 +92,9 @@ if __name__ == '__main__':
try: try:
main_loop(args) main_loop(args)
except SystemExit as se: except SystemExit as se:
print('NNI trial keeper exit with code {}'.format(se.code)) nni_log(LogType.Info, 'NNI trial keeper exit with code {}'.format(se.code))
sys.exit(se.code) sys.exit(se.code)
except Exception as e: except Exception as e:
print('Exit trial keeper with code 1 because Exception: {} is catched'.format(str(e))) nni_log(LogType.Error, 'Exit trial keeper with code 1 because Exception: {} is catched'.format(str(e)))
sys.exit(1) sys.exit(1)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment