"src/vscode:/vscode.git/clone" did not exist on "93dae28f964b9f570d8502d9fe4d257acc56a085"
Unverified Commit cb83ac0f authored by fishyds's avatar fishyds Committed by GitHub
Browse files

NNI logging architecture improvement (#539)

* Removed unused log code, refactor to rename some class name in nni sdk and trial_tools

* Fix the regression bug that loca/remote mode doesnt work
parent 6224a4f4
......@@ -23,8 +23,12 @@ import * as assert from 'assert';
import { Request, Response, Router } from 'express';
import * as bodyParser from 'body-parser';
import * as component from '../../common/component';
import * as fs from 'fs'
import * as path from 'path'
import { getBasePort, getExperimentId } from '../../common/experimentStartupInfo';
import { RestServer } from '../../common/restServer'
import { getLogDir } from '../../common/utils';
import { Writable } from 'stream';
/**
* Cluster Job Training service Rest server, provides rest API to support Cluster job metrics update
......@@ -33,6 +37,7 @@ import { RestServer } from '../../common/restServer'
@component.Singleton
export abstract class ClusterJobRestServer extends RestServer{
private readonly API_ROOT_URL: string = '/api/v1/nni-pai';
private readonly NNI_METRICS_PATTERN: string = `NNISDK_MEb'(?<metrics>.*?)'`;
private readonly expId: string = getExperimentId();
......@@ -88,6 +93,38 @@ export abstract class ClusterJobRestServer extends RestServer{
}
});
router.post(`/stdout/${this.expId}/:trialId`, (req: Request, res: Response) => {
const trialLogPath: string = path.join(getLogDir(), `trial_${req.params.trialId}.log`);
try {
let skipLogging: boolean = false;
if(req.body.tag === 'trial' && req.body.msg !== undefined) {
const metricsContent = req.body.msg.match(this.NNI_METRICS_PATTERN);
if(metricsContent && metricsContent.groups) {
this.handleTrialMetrics(req.params.trialId, [metricsContent.groups['metrics']]);
skipLogging = true;
}
}
if(!skipLogging){
// Construct write stream to write remote trial's log into local file
const writeStream: Writable = fs.createWriteStream(trialLogPath, {
flags: 'a+',
encoding: 'utf8',
autoClose: true
});
writeStream.write(req.body.msg + '\n');
writeStream.end();
}
res.send();
}
catch(err) {
this.log.error(`json parse stdout data error: ${err}`);
res.status(500);
res.send(err.message);
}
});
return router;
}
......
......@@ -92,6 +92,7 @@ class PAITrainingService implements TrainingService {
public async run(): Promise<void> {
const restServer: PAIJobRestServer = component.get(PAIJobRestServer);
await restServer.start();
this.log.info(`PAI Training service rest server listening on: ${restServer.endPoint}`);
while (!this.stopping) {
await this.updatePaiToken();
......
......@@ -44,10 +44,13 @@ _time_format = '%Y-%m-%d %H:%M:%S'
class _LoggerFileWrapper(TextIOBase):
def __init__(self, logger_file):
self.file = logger_file
self.orig_stdout = sys.stdout
def write(self, s):
if s != '\n':
time = datetime.now().strftime(_time_format)
self.orig_stdout.write(s + '\n')
self.orig_stdout.flush()
self.file.write('[{}] PRINT '.format(time) + s + '\n')
self.file.flush()
return len(s)
......
......@@ -34,8 +34,11 @@ _metric_file = open(os.path.join(_sysdir, '.nni', 'metrics'), 'wb')
_outputdir = os.environ['NNI_OUTPUT_DIR']
if not os.path.exists(_outputdir):
os.makedirs(_outputdir)
_log_file_path = os.path.join(_outputdir, 'trial.log')
init_logger(_log_file_path)
_nni_platform = os.environ['NNI_PLATFORM']
if _nni_platform != 'pai':
_log_file_path = os.path.join(_outputdir, 'trial.log')
init_logger(_log_file_path)
_multiphase = os.environ.get('MULTI_PHASE')
......@@ -74,11 +77,16 @@ def get_next_parameter():
return params
def send_metric(string):
data = (string + '\n').encode('utf8')
assert len(data) < 1000000, 'Metric too long'
_metric_file.write(b'ME%06d%b' % (len(data), data))
_metric_file.flush()
subprocess.run(['touch', _metric_file.name], check = True)
if _nni_platform == 'pai':
data = (string).encode('utf8')
assert len(data) < 1000000, 'Metric too long'
print('NNISDK_ME%s' % (data))
else:
data = (string + '\n').encode('utf8')
assert len(data) < 1000000, 'Metric too long'
_metric_file.write(b'ME%06d%b' % (len(data), data))
_metric_file.flush()
subprocess.run(['touch', _metric_file.name], check = True)
def get_sequence_id():
return os.environ['NNI_TRIAL_SEQ_ID']
\ No newline at end of file
......@@ -34,4 +34,10 @@ STDOUT_FULL_PATH = os.path.join(LOG_DIR, 'stdout')
STDERR_FULL_PATH = os.path.join(LOG_DIR, 'stderr')
UPDATE_METRICS_API = '/update-metrics'
\ No newline at end of file
UPDATE_METRICS_API = '/update-metrics'
STDOUT_API = '/stdout'
NNI_SYS_DIR = os.environ['NNI_SYS_DIR']
NNI_TRIAL_JOB_ID = os.environ['NNI_TRIAL_JOB_ID']
NNI_EXP_ID = os.environ['NNI_EXP_ID']
......@@ -18,8 +18,23 @@
# DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
import os
import sys
import json
import logging
import logging.handlers
import time
import threading
from datetime import datetime
from enum import Enum, unique
from logging import StreamHandler
from queue import Queue
from .rest_utils import rest_get, rest_post, rest_put, rest_delete
from .constants import NNI_EXP_ID, NNI_TRIAL_JOB_ID, STDOUT_API
from .url_utils import gen_send_stdout_url
@unique
class LogType(Enum):
......@@ -29,7 +44,135 @@ class LogType(Enum):
Error = 'ERROR'
Critical = 'CRITICAL'
@unique
class StdOutputType(Enum):
Stdout = 'stdout',
Stderr = 'stderr'
def nni_log(log_type, log_message):
'''Log message into stdout'''
dt = datetime.now()
print('[{0}] {1} {2}'.format(dt, log_type.value, log_message))
\ No newline at end of file
print('[{0}] {1} {2}'.format(dt, log_type.value, log_message))
class NNIRestLogHanlder(StreamHandler):
def __init__(self, host, port, tag, std_output_type=StdOutputType.Stdout):
StreamHandler.__init__(self)
self.host = host
self.port = port
self.tag = tag
self.std_output_type = std_output_type
self.orig_stdout = sys.__stdout__
self.orig_stderr = sys.__stderr__
def emit(self, record):
log_entry = {}
log_entry['tag'] = self.tag
log_entry['stdOutputType'] = self.std_output_type.name
log_entry['msg'] = self.format(record)
try:
response = rest_post(gen_send_stdout_url(self.host, self.port), json.dumps(log_entry), 10, True)
except Exception as e:
self.orig_stderr.write(str(e) + '\n')
self.orig_stderr.flush()
class RemoteLogger(object):
"""
NNI remote logger
"""
def __init__(self, syslog_host, syslog_port, tag, std_output_type, log_level=logging.INFO):
'''
constructor
'''
self.logger = logging.getLogger('nni_syslog_{}'.format(tag))
self.log_level = log_level
self.logger.setLevel(self.log_level)
handler = NNIRestLogHanlder(syslog_host, syslog_port, tag)
self.logger.addHandler(handler)
if std_output_type == StdOutputType.Stdout:
self.orig_stdout = sys.__stdout__
else:
self.orig_stdout = sys.__stderr__
def get_pipelog_reader(self):
'''
Get pipe for remote logger
'''
return PipeLogReader(self.logger, logging.INFO)
def write(self, buf):
'''
Write buffer data into logger/stdout
'''
for line in buf.rstrip().splitlines():
self.orig_stdout.write(line.rstrip() + '\n')
self.orig_stdout.flush()
try:
self.logger.log(self.log_level, line.rstrip())
except Exception as e:
pass
class PipeLogReader(threading.Thread):
"""
The reader thread reads log data from pipe
"""
def __init__(self, logger, log_level=logging.INFO):
"""Setup the object with a logger and a loglevel
and start the thread
"""
threading.Thread.__init__(self)
self.queue = Queue()
self.logger = logger
self.daemon = False
self.log_level = log_level
self.fdRead, self.fdWrite = os.pipe()
self.pipeReader = os.fdopen(self.fdRead)
self.orig_stdout = sys.__stdout__
self._is_read_completed = False
def _populateQueue(stream, queue):
'''
Collect lines from 'stream' and put them in 'quque'.
'''
time.sleep(5)
while True:
try:
line = self.queue.get(True, 5)
try:
self.logger.log(self.log_level, line.rstrip())
self.orig_stdout.write(line.rstrip() + '\n')
self.orig_stdout.flush()
except Exception as e:
pass
except Exception as e:
self._is_read_completed = True
break
self.pip_log_reader_thread = threading.Thread(target = _populateQueue,
args = (self.pipeReader, self.queue))
self.pip_log_reader_thread.daemon = True
self.start()
self.pip_log_reader_thread.start()
def fileno(self):
"""Return the write file descriptor of the pipe
"""
return self.fdWrite
def run(self):
"""Run the thread, logging everything.
"""
for line in iter(self.pipeReader.readline, ''):
self.queue.put(line)
self.pipeReader.close()
def close(self):
"""Close the write end of the pipe.
"""
os.close(self.fdWrite)
@property
def is_read_completed(self):
"""Return if read is completed
"""
return self._is_read_completed
\ No newline at end of file
......@@ -25,14 +25,11 @@ import re
import requests
from datetime import datetime
from .constants import BASE_URL
from .constants import BASE_URL, NNI_EXP_ID, NNI_TRIAL_JOB_ID, NNI_SYS_DIR
from .log_utils import LogType, nni_log
from .rest_utils import rest_get, rest_post, rest_put, rest_delete
from .url_utils import gen_update_metrics_url
NNI_SYS_DIR = os.environ['NNI_SYS_DIR']
NNI_TRIAL_JOB_ID = os.environ['NNI_TRIAL_JOB_ID']
NNI_EXP_ID = os.environ['NNI_EXP_ID']
LEN_FIELD_SIZE = 6
MAGIC = 'ME'
......@@ -116,7 +113,7 @@ def read_experiment_metrics(nnimanager_ip, nnimanager_port):
result['metrics'] = reader.read_trial_metrics()
if len(result['metrics']) > 0:
nni_log(LogType.Info, 'Result metrics is {}'.format(json.dumps(result)))
response = rest_post(gen_update_metrics_url(BASE_URL.format(nnimanager_ip), nnimanager_port, NNI_EXP_ID, NNI_TRIAL_JOB_ID), json.dumps(result), 10)
response = rest_post(gen_update_metrics_url(nnimanager_ip, nnimanager_port), json.dumps(result), 10)
nni_log(LogType.Info,'Report metrics to NNI manager completed, http response code is {}'.format(response.status_code))
except Exception as e:
#Error logging
......
......@@ -31,13 +31,15 @@ def rest_get(url, timeout):
print('Get exception {0} when sending http get to url {1}'.format(str(e), url))
return None
def rest_post(url, data, timeout):
def rest_post(url, data, timeout, rethrow_exception=False):
'''Call rest post method'''
try:
response = requests.post(url, headers={'Accept': 'application/json', 'Content-Type': 'application/json'},\
data=data, timeout=timeout)
return response
except Exception as e:
if rethrow_exception is True:
raise
print('Get exception {0} when sending http post to url {1}'.format(str(e), url))
return None
......
......@@ -25,11 +25,13 @@ import time
import logging
import shlex
import re
import sys
import select
from pyhdfs import HdfsClient
from .constants import HOME_DIR, LOG_DIR, NNI_PLATFORM, STDOUT_FULL_PATH, STDERR_FULL_PATH
from .hdfsClientUtility import copyDirectoryToHdfs, copyHdfsDirectoryToLocal
from .log_utils import LogType, nni_log
from .log_utils import LogType, nni_log, RemoteLogger, PipeLogReader, StdOutputType
from .metrics_reader import read_experiment_metrics
logger = logging.getLogger('trial_keeper')
......@@ -42,6 +44,11 @@ def main_loop(args):
stdout_file = open(STDOUT_FULL_PATH, 'a+')
stderr_file = open(STDERR_FULL_PATH, 'a+')
trial_keeper_syslogger = RemoteLogger(args.nnimanager_ip, args.nnimanager_port, 'trial_keeper', StdOutputType.Stdout)
# redirect trial keeper's stdout and stderr to syslog
trial_syslogger_stdout = RemoteLogger(args.nnimanager_ip, args.nnimanager_port, 'trial', StdOutputType.Stdout)
sys.stdout = sys.stderr = trial_keeper_syslogger
if args.pai_hdfs_host is not None and args.nni_hdfs_exp_dir is not None:
try:
......@@ -52,15 +59,15 @@ def main_loop(args):
copyHdfsDirectoryToLocal(args.nni_hdfs_exp_dir, os.getcwd(), hdfs_client)
# Notice: We don't appoint env, which means subprocess wil inherit current environment and that is expected behavior
process = Popen(args.trial_command, shell = True, stdout = stdout_file, stderr = stderr_file)
log_pipe_stdout = trial_syslogger_stdout.get_pipelog_reader()
process = Popen(args.trial_command, shell = True, stdout = log_pipe_stdout, stderr = log_pipe_stdout)
nni_log(LogType.Info, 'Trial keeper spawns a subprocess (pid {0}) to run command: {1}'.format(process.pid, shlex.split(args.trial_command)))
while True:
retCode = process.poll()
## Read experiment metrics, to avoid missing metrics
read_experiment_metrics(args.nnimanager_ip, args.nnimanager_port)
if retCode is not None:
#read_experiment_metrics(args.nnimanager_ip, args.nnimanager_port)
if retCode is not None and log_pipe_stdout.is_read_completed == True:
nni_log(LogType.Info, 'subprocess terminated. Exit code is {}. Quit'.format(retCode))
if args.pai_hdfs_output_dir is not None:
# Copy local directory to hdfs for OpenPAI
......@@ -102,8 +109,8 @@ if __name__ == '__main__':
main_loop(args)
except SystemExit as se:
nni_log(LogType.Info, 'NNI trial keeper exit with code {}'.format(se.code))
sys.exit(se.code)
os._exit(se.code)
except Exception as e:
nni_log(LogType.Error, 'Exit trial keeper with code 1 because Exception: {} is catched'.format(str(e)))
sys.exit(1)
os._exit(1)
......@@ -18,8 +18,12 @@
# DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
from .constants import API_ROOT_URL, UPDATE_METRICS_API
from .constants import API_ROOT_URL, BASE_URL, UPDATE_METRICS_API, STDOUT_API, NNI_TRIAL_JOB_ID, NNI_EXP_ID
def gen_update_metrics_url(base_url, port, exp_id, trial_job_id):
def gen_update_metrics_url(ip, port):
'''Generate update trial metrics url'''
return '{0}:{1}{2}{3}/{4}/:{5}'.format(base_url, port, API_ROOT_URL, UPDATE_METRICS_API, exp_id, trial_job_id)
\ No newline at end of file
return '{0}:{1}{2}{3}/{4}/{5}'.format(BASE_URL.format(ip), port, API_ROOT_URL, UPDATE_METRICS_API, NNI_EXP_ID, NNI_TRIAL_JOB_ID)
def gen_send_stdout_url(ip, port):
'''Generate send stdout url'''
return '{0}:{1}{2}{3}/{4}/{5}'.format(BASE_URL.format(ip), port, API_ROOT_URL, STDOUT_API, NNI_EXP_ID, NNI_TRIAL_JOB_ID)
\ No newline at end of file
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment