"ts/webui/src/git@developer.sourcefind.cn:OpenDAS/nni.git" did not exist on "a9dcc00670c611f8d610213d12ca2e54099a3913"
Unverified Commit 37354dff authored by fishyds's avatar fishyds Committed by GitHub
Browse files

[Logging architecture refactor] Remove unused metrics related code in nni...

[Logging architecture refactor] Remove unused metrics related code in nni trial_tools, support kubeflow mode for logging architecture refactor (#551)

* Remove unused metrics related code in nni trial_tools, support kubeflow mode for logging architecture refactor
parent ce1bc481
...@@ -36,7 +36,7 @@ if not os.path.exists(_outputdir): ...@@ -36,7 +36,7 @@ if not os.path.exists(_outputdir):
os.makedirs(_outputdir) os.makedirs(_outputdir)
_nni_platform = os.environ['NNI_PLATFORM'] _nni_platform = os.environ['NNI_PLATFORM']
if _nni_platform != 'pai': if _nni_platform not in ['pai', 'kubeflow']:
_log_file_path = os.path.join(_outputdir, 'trial.log') _log_file_path = os.path.join(_outputdir, 'trial.log')
init_logger(_log_file_path) init_logger(_log_file_path)
...@@ -77,7 +77,7 @@ def get_next_parameter(): ...@@ -77,7 +77,7 @@ def get_next_parameter():
return params return params
def send_metric(string): def send_metric(string):
if _nni_platform == 'pai': if _nni_platform in ['pai', 'kubeflow']:
data = (string).encode('utf8') data = (string).encode('utf8')
assert len(data) < 1000000, 'Metric too long' assert len(data) < 1000000, 'Metric too long'
print('NNISDK_ME%s' % (data)) print('NNISDK_ME%s' % (data))
......
...@@ -34,10 +34,7 @@ STDOUT_FULL_PATH = os.path.join(LOG_DIR, 'stdout') ...@@ -34,10 +34,7 @@ STDOUT_FULL_PATH = os.path.join(LOG_DIR, 'stdout')
STDERR_FULL_PATH = os.path.join(LOG_DIR, 'stderr') STDERR_FULL_PATH = os.path.join(LOG_DIR, 'stderr')
UPDATE_METRICS_API = '/update-metrics'
STDOUT_API = '/stdout' STDOUT_API = '/stdout'
NNI_SYS_DIR = os.environ['NNI_SYS_DIR'] NNI_SYS_DIR = os.environ['NNI_SYS_DIR']
NNI_TRIAL_JOB_ID = os.environ['NNI_TRIAL_JOB_ID'] NNI_TRIAL_JOB_ID = os.environ['NNI_TRIAL_JOB_ID']
NNI_EXP_ID = os.environ['NNI_EXP_ID'] NNI_EXP_ID = os.environ['NNI_EXP_ID']
\ No newline at end of file
# ============================================================================================================================== #
# Copyright (c) Microsoft Corporation
# All rights reserved.
#
# MIT License
#
# Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
# documentation files (the "Software"), to deal in the Software without restriction, including without limitation
# the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and
# to permit persons to whom the Software is furnished to do so, subject to the following conditions:
# The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING
# BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
# DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
# ============================================================================================================================== #
import argparse
import errno
import json
import os
import re
import requests
from datetime import datetime
from .constants import BASE_URL, NNI_EXP_ID, NNI_TRIAL_JOB_ID, NNI_SYS_DIR
from .log_utils import LogType, nni_log
from .rest_utils import rest_get, rest_post, rest_put, rest_delete
from .url_utils import gen_update_metrics_url
LEN_FIELD_SIZE = 6
MAGIC = 'ME'
class TrialMetricsReader():
'''
Read metrics data from a trial job
'''
def __init__(self):
metrics_base_dir = os.path.join(NNI_SYS_DIR, '.nni')
self.offset_filename = os.path.join(metrics_base_dir, 'metrics_offset')
self.metrics_filename = os.path.join(metrics_base_dir, 'metrics')
if not os.path.exists(metrics_base_dir):
os.makedirs(metrics_base_dir)
def _metrics_file_is_empty(self):
if not os.path.isfile(self.metrics_filename):
return True
statinfo = os.stat(self.metrics_filename)
return statinfo.st_size == 0
def _get_offset(self):
offset = 0
if os.path.isfile(self.offset_filename):
with open(self.offset_filename, 'r') as f:
offset = int(f.readline())
return offset
def _write_offset(self, offset):
statinfo = os.stat(self.metrics_filename)
if offset < 0 or offset > statinfo.st_size:
raise ValueError('offset value is invalid: {}'.format(offset))
with open(self.offset_filename, 'w') as f:
f.write(str(offset)+'\n')
def _read_all_available_records(self, offset):
new_offset = offset
metrics = []
with open(self.metrics_filename, 'r') as f:
f.seek(offset)
while True:
magic_string = f.read(len(MAGIC))
# empty data means EOF
if not magic_string:
break
nni_log(LogType.Info, 'Metrics file offset is {}'.format(offset))
strdatalen = f.read(LEN_FIELD_SIZE)
# empty data means EOF
if not strdatalen:
raise ValueError("metric file {} format error after offset: {}.".format(self.metrics_filename, new_offset))
datalen = int(strdatalen)
data = f.read(datalen)
if datalen > 0 and len(data) == datalen:
nni_log(LogType.Info, 'data is \'{}\''.format(data))
new_offset = f.tell()
metrics.append(data)
else:
raise ValueError("metric file {} format error after offset: {}.".format(self.metrics_filename, new_offset))
self._write_offset(new_offset)
return metrics
def read_trial_metrics(self):
'''
Read available metrics data for a trial
'''
if self._metrics_file_is_empty():
return []
offset = self._get_offset()
return self._read_all_available_records(offset)
def read_experiment_metrics(nnimanager_ip, nnimanager_port):
'''
Read metrics data for specified trial jobs
'''
result = {}
try:
reader = TrialMetricsReader()
result['jobId'] = NNI_TRIAL_JOB_ID
result['metrics'] = reader.read_trial_metrics()
if len(result['metrics']) > 0:
nni_log(LogType.Info, 'Result metrics is {}'.format(json.dumps(result)))
response = rest_post(gen_update_metrics_url(nnimanager_ip, nnimanager_port), json.dumps(result), 10)
nni_log(LogType.Info,'Report metrics to NNI manager completed, http response code is {}'.format(response.status_code))
except Exception as e:
#Error logging
nni_log(LogType.Error, 'Error when reading metrics data: ' + str(e))
return json.dumps(result)
\ No newline at end of file
...@@ -32,7 +32,6 @@ from pyhdfs import HdfsClient ...@@ -32,7 +32,6 @@ from pyhdfs import HdfsClient
from .constants import HOME_DIR, LOG_DIR, NNI_PLATFORM, STDOUT_FULL_PATH, STDERR_FULL_PATH from .constants import HOME_DIR, LOG_DIR, NNI_PLATFORM, STDOUT_FULL_PATH, STDERR_FULL_PATH
from .hdfsClientUtility import copyDirectoryToHdfs, copyHdfsDirectoryToLocal from .hdfsClientUtility import copyDirectoryToHdfs, copyHdfsDirectoryToLocal
from .log_utils import LogType, nni_log, RemoteLogger, PipeLogReader, StdOutputType from .log_utils import LogType, nni_log, RemoteLogger, PipeLogReader, StdOutputType
from .metrics_reader import read_experiment_metrics
logger = logging.getLogger('trial_keeper') logger = logging.getLogger('trial_keeper')
...@@ -65,8 +64,7 @@ def main_loop(args): ...@@ -65,8 +64,7 @@ def main_loop(args):
while True: while True:
retCode = process.poll() retCode = process.poll()
## Read experiment metrics, to avoid missing metrics # child worker process exits and all stdout data is read
#read_experiment_metrics(args.nnimanager_ip, args.nnimanager_port)
if retCode is not None and log_pipe_stdout.is_read_completed == True: if retCode is not None and log_pipe_stdout.is_read_completed == True:
nni_log(LogType.Info, 'subprocess terminated. Exit code is {}. Quit'.format(retCode)) nni_log(LogType.Info, 'subprocess terminated. Exit code is {}. Quit'.format(retCode))
if args.pai_hdfs_output_dir is not None: if args.pai_hdfs_output_dir is not None:
......
...@@ -18,11 +18,7 @@ ...@@ -18,11 +18,7 @@
# DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
from .constants import API_ROOT_URL, BASE_URL, UPDATE_METRICS_API, STDOUT_API, NNI_TRIAL_JOB_ID, NNI_EXP_ID from .constants import API_ROOT_URL, BASE_URL, STDOUT_API, NNI_TRIAL_JOB_ID, NNI_EXP_ID
def gen_update_metrics_url(ip, port):
'''Generate update trial metrics url'''
return '{0}:{1}{2}{3}/{4}/{5}'.format(BASE_URL.format(ip), port, API_ROOT_URL, UPDATE_METRICS_API, NNI_EXP_ID, NNI_TRIAL_JOB_ID)
def gen_send_stdout_url(ip, port): def gen_send_stdout_url(ip, port):
'''Generate send stdout url''' '''Generate send stdout url'''
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment