Unverified Commit c297650a authored by SparkSnail's avatar SparkSnail Committed by GitHub
Browse files

Support remoteLoggingType (#901)

If user set remoteloggingType in config file, log content will not be transmitted from trialkeeper
parent d10b8bca
...@@ -203,6 +203,10 @@ machineList: ...@@ -203,6 +203,10 @@ machineList:
__logLevel__ sets log level for the experiment, available log levels are: `trace, debug, info, warning, error, fatal`. The default value is `info`. __logLevel__ sets log level for the experiment, available log levels are: `trace, debug, info, warning, error, fatal`. The default value is `info`.
* __logCollection__
* Description
__logCollection__ set the way to collect log in remote, pai, kubeflow, frameworkcontroller platform. There are two ways to collect log, one way is from `http`, trial keeper will post log content back from http request in this way, but this way may slow down the speed to process logs in trialKeeper. The other way is `none`, trial keeper will not post log content back, and only post job metrics. If your log content is too big, you could consider setting this param be `none`.
* __tuner__ * __tuner__
* Description * Description
......
...@@ -37,6 +37,7 @@ interface ExperimentParams { ...@@ -37,6 +37,7 @@ interface ExperimentParams {
multiPhase?: boolean; multiPhase?: boolean;
multiThread?: boolean; multiThread?: boolean;
versionCheck?: boolean; versionCheck?: boolean;
logCollection?: string;
tuner?: { tuner?: {
className: string; className: string;
builtinTunerName?: string; builtinTunerName?: string;
......
...@@ -131,6 +131,10 @@ class NNIManager implements Manager { ...@@ -131,6 +131,10 @@ class NNIManager implements Manager {
if (expParams.versionCheck !== undefined) { if (expParams.versionCheck !== undefined) {
this.trainingService.setClusterMetadata('version_check', expParams.versionCheck.toString()); this.trainingService.setClusterMetadata('version_check', expParams.versionCheck.toString());
} }
// Set up logCollection config
if (expParams.logCollection !== undefined) {
this.trainingService.setClusterMetadata('log_collection', expParams.logCollection.toString());
}
const dispatcherCommand: string = getMsgDispatcherCommand(expParams.tuner, expParams.assessor, expParams.advisor, const dispatcherCommand: string = getMsgDispatcherCommand(expParams.tuner, expParams.assessor, expParams.advisor,
expParams.multiPhase, expParams.multiThread); expParams.multiPhase, expParams.multiThread);
......
...@@ -142,6 +142,7 @@ export namespace ValidationSchemas { ...@@ -142,6 +142,7 @@ export namespace ValidationSchemas {
multiPhase: joi.boolean(), multiPhase: joi.boolean(),
multiThread: joi.boolean(), multiThread: joi.boolean(),
versionCheck: joi.boolean(), versionCheck: joi.boolean(),
logCollection: joi.string(),
advisor: joi.object({ advisor: joi.object({
builtinAdvisorName: joi.string().valid('Hyperband'), builtinAdvisorName: joi.string().valid('Hyperband'),
codeDir: joi.string(), codeDir: joi.string(),
......
...@@ -32,5 +32,6 @@ export enum TrialConfigMetadataKey { ...@@ -32,5 +32,6 @@ export enum TrialConfigMetadataKey {
KUBEFLOW_CLUSTER_CONFIG = 'kubeflow_config', KUBEFLOW_CLUSTER_CONFIG = 'kubeflow_config',
NNI_MANAGER_IP = 'nni_manager_ip', NNI_MANAGER_IP = 'nni_manager_ip',
FRAMEWORKCONTROLLER_CLUSTER_CONFIG = 'frameworkcontroller_config', FRAMEWORKCONTROLLER_CLUSTER_CONFIG = 'frameworkcontroller_config',
VERSION_CHECK = 'version_check' VERSION_CHECK = 'version_check',
LOG_COLLECTION = 'log_collection'
} }
...@@ -270,6 +270,9 @@ class FrameworkControllerTrainingService extends KubernetesTrainingService imple ...@@ -270,6 +270,9 @@ class FrameworkControllerTrainingService extends KubernetesTrainingService imple
case TrialConfigMetadataKey.VERSION_CHECK: case TrialConfigMetadataKey.VERSION_CHECK:
this.versionCheck = (value === 'true' || value === 'True'); this.versionCheck = (value === 'true' || value === 'True');
break; break;
case TrialConfigMetadataKey.LOG_COLLECTION:
this.logCollection = value;
break;
default: default:
break; break;
} }
......
...@@ -320,6 +320,9 @@ class KubeflowTrainingService extends KubernetesTrainingService implements Kuber ...@@ -320,6 +320,9 @@ class KubeflowTrainingService extends KubernetesTrainingService implements Kuber
case TrialConfigMetadataKey.VERSION_CHECK: case TrialConfigMetadataKey.VERSION_CHECK:
this.versionCheck = (value === 'true' || value === 'True'); this.versionCheck = (value === 'true' || value === 'True');
break; break;
case TrialConfigMetadataKey.LOG_COLLECTION:
this.logCollection = value;
break;
default: default:
break; break;
} }
......
...@@ -71,5 +71,5 @@ mkdir -p $NNI_OUTPUT_DIR ...@@ -71,5 +71,5 @@ mkdir -p $NNI_OUTPUT_DIR
cp -rT $NNI_CODE_DIR $NNI_SYS_DIR cp -rT $NNI_CODE_DIR $NNI_SYS_DIR
cd $NNI_SYS_DIR cd $NNI_SYS_DIR
sh install_nni.sh sh install_nni.sh
python3 -m nni_trial_tool.trial_keeper --trial_command '{8}' --nnimanager_ip {9} --nnimanager_port {10} --version '{11}'` python3 -m nni_trial_tool.trial_keeper --trial_command '{8}' --nnimanager_ip {9} --nnimanager_port {10} --version '{11}' --log_collection '{12}'`
+ `1>$NNI_OUTPUT_DIR/trialkeeper_stdout 2>$NNI_OUTPUT_DIR/trialkeeper_stderr` + `1>$NNI_OUTPUT_DIR/trialkeeper_stdout 2>$NNI_OUTPUT_DIR/trialkeeper_stderr`
...@@ -62,6 +62,7 @@ abstract class KubernetesTrainingService { ...@@ -62,6 +62,7 @@ abstract class KubernetesTrainingService {
protected kubernetesJobRestServer?: KubernetesJobRestServer; protected kubernetesJobRestServer?: KubernetesJobRestServer;
protected kubernetesClusterConfig?: KubernetesClusterConfig; protected kubernetesClusterConfig?: KubernetesClusterConfig;
protected versionCheck?: boolean = true; protected versionCheck?: boolean = true;
protected logCollection: string;
constructor() { constructor() {
this.log = getLogger(); this.log = getLogger();
...@@ -72,6 +73,7 @@ abstract class KubernetesTrainingService { ...@@ -72,6 +73,7 @@ abstract class KubernetesTrainingService {
this.nextTrialSequenceId = -1; this.nextTrialSequenceId = -1;
this.CONTAINER_MOUNT_PATH = '/tmp/mount'; this.CONTAINER_MOUNT_PATH = '/tmp/mount';
this.genericK8sClient = new GeneralK8sClient(); this.genericK8sClient = new GeneralK8sClient();
this.logCollection = 'none';
} }
public generatePodResource(memory: number, cpuNum: number, gpuNum: number) { public generatePodResource(memory: number, cpuNum: number, gpuNum: number) {
...@@ -204,7 +206,8 @@ abstract class KubernetesTrainingService { ...@@ -204,7 +206,8 @@ abstract class KubernetesTrainingService {
command, command,
nniManagerIp, nniManagerIp,
this.kubernetesRestServerPort, this.kubernetesRestServerPort,
version version,
this.logCollection
); );
return Promise.resolve(runScript); return Promise.resolve(runScript);
} }
......
...@@ -64,7 +64,7 @@ export const PAI_TRIAL_COMMAND_FORMAT: string = ...@@ -64,7 +64,7 @@ export const PAI_TRIAL_COMMAND_FORMAT: string =
`export NNI_PLATFORM=pai NNI_SYS_DIR={0} NNI_OUTPUT_DIR={1} NNI_TRIAL_JOB_ID={2} NNI_EXP_ID={3} NNI_TRIAL_SEQ_ID={4} `export NNI_PLATFORM=pai NNI_SYS_DIR={0} NNI_OUTPUT_DIR={1} NNI_TRIAL_JOB_ID={2} NNI_EXP_ID={3} NNI_TRIAL_SEQ_ID={4}
&& cd $NNI_SYS_DIR && sh install_nni.sh && cd $NNI_SYS_DIR && sh install_nni.sh
&& python3 -m nni_trial_tool.trial_keeper --trial_command '{5}' --nnimanager_ip '{6}' --nnimanager_port '{7}' && python3 -m nni_trial_tool.trial_keeper --trial_command '{5}' --nnimanager_ip '{6}' --nnimanager_port '{7}'
--pai_hdfs_output_dir '{8}' --pai_hdfs_host '{9}' --pai_user_name {10} --nni_hdfs_exp_dir '{11}' --webhdfs_path '/webhdfs/api/v1' --version '{12}'`; --pai_hdfs_output_dir '{8}' --pai_hdfs_host '{9}' --pai_user_name {10} --nni_hdfs_exp_dir '{11}' --webhdfs_path '/webhdfs/api/v1' --version '{12}' --log_collection '{13}'`;
export const PAI_OUTPUT_DIR_FORMAT: string = export const PAI_OUTPUT_DIR_FORMAT: string =
`hdfs://{0}:9000/`; `hdfs://{0}:9000/`;
......
...@@ -76,6 +76,7 @@ class PAITrainingService implements TrainingService { ...@@ -76,6 +76,7 @@ class PAITrainingService implements TrainingService {
private nniManagerIpConfig?: NNIManagerIpConfig; private nniManagerIpConfig?: NNIManagerIpConfig;
private copyExpCodeDirPromise?: Promise<void>; private copyExpCodeDirPromise?: Promise<void>;
private versionCheck?: boolean = true; private versionCheck?: boolean = true;
private logCollection: string;
constructor() { constructor() {
this.log = getLogger(); this.log = getLogger();
...@@ -88,6 +89,7 @@ class PAITrainingService implements TrainingService { ...@@ -88,6 +89,7 @@ class PAITrainingService implements TrainingService {
this.hdfsDirPattern = 'hdfs://(?<host>([0-9]{1,3}.){3}[0-9]{1,3})(:[0-9]{2,5})?(?<baseDir>/.*)?'; this.hdfsDirPattern = 'hdfs://(?<host>([0-9]{1,3}.){3}[0-9]{1,3})(:[0-9]{2,5})?(?<baseDir>/.*)?';
this.nextTrialSequenceId = -1; this.nextTrialSequenceId = -1;
this.paiTokenUpdateInterval = 7200000; //2hours this.paiTokenUpdateInterval = 7200000; //2hours
this.logCollection = 'none';
this.log.info('Construct OpenPAI training service.'); this.log.info('Construct OpenPAI training service.');
} }
...@@ -228,7 +230,8 @@ class PAITrainingService implements TrainingService { ...@@ -228,7 +230,8 @@ class PAITrainingService implements TrainingService {
this.hdfsOutputHost, this.hdfsOutputHost,
this.paiClusterConfig.userName, this.paiClusterConfig.userName,
HDFSClientUtility.getHdfsExpCodeDir(this.paiClusterConfig.userName), HDFSClientUtility.getHdfsExpCodeDir(this.paiClusterConfig.userName),
version version,
this.logCollection
).replace(/\r\n|\n|\r/gm, ''); ).replace(/\r\n|\n|\r/gm, '');
console.log(`nniPAItrial command is ${nniPaiTrialCommand.trim()}`); console.log(`nniPAItrial command is ${nniPaiTrialCommand.trim()}`);
...@@ -442,6 +445,9 @@ class PAITrainingService implements TrainingService { ...@@ -442,6 +445,9 @@ class PAITrainingService implements TrainingService {
case TrialConfigMetadataKey.VERSION_CHECK: case TrialConfigMetadataKey.VERSION_CHECK:
this.versionCheck = (value === 'true' || value === 'True'); this.versionCheck = (value === 'true' || value === 'True');
break; break;
case TrialConfigMetadataKey.LOG_COLLECTION:
this.logCollection = value;
break;
default: default:
//Reject for unknown keys //Reject for unknown keys
throw new Error(`Uknown key: ${key}`); throw new Error(`Uknown key: ${key}`);
......
...@@ -250,8 +250,8 @@ export NNI_PLATFORM=remote NNI_SYS_DIR={0} NNI_OUTPUT_DIR={1} NNI_TRIAL_JOB_ID={ ...@@ -250,8 +250,8 @@ export NNI_PLATFORM=remote NNI_SYS_DIR={0} NNI_OUTPUT_DIR={1} NNI_TRIAL_JOB_ID={
cd $NNI_SYS_DIR cd $NNI_SYS_DIR
sh install_nni.sh sh install_nni.sh
echo $$ >{6} echo $$ >{6}
python3 -m nni_trial_tool.trial_keeper --trial_command '{7}' --nnimanager_ip '{8}' --nnimanager_port '{9}' --version '{10}' 1>$NNI_OUTPUT_DIR/trialkeeper_stdout 2>$NNI_OUTPUT_DIR/trialkeeper_stderr python3 -m nni_trial_tool.trial_keeper --trial_command '{7}' --nnimanager_ip '{8}' --nnimanager_port '{9}' --version '{10}' --log_collection '{11}' 1>$NNI_OUTPUT_DIR/trialkeeper_stdout 2>$NNI_OUTPUT_DIR/trialkeeper_stderr
echo $? \`date +%s%3N\` >{11}`; echo $? \`date +%s%3N\` >{12}`;
export const HOST_JOB_SHELL_FORMAT: string = export const HOST_JOB_SHELL_FORMAT: string =
`#!/bin/bash `#!/bin/bash
......
...@@ -77,6 +77,7 @@ class RemoteMachineTrainingService implements TrainingService { ...@@ -77,6 +77,7 @@ class RemoteMachineTrainingService implements TrainingService {
private readonly remoteOS: string; private readonly remoteOS: string;
private nniManagerIpConfig?: NNIManagerIpConfig; private nniManagerIpConfig?: NNIManagerIpConfig;
private versionCheck: boolean = true; private versionCheck: boolean = true;
private logCollection: string;
constructor(@component.Inject timer: ObservableTimer) { constructor(@component.Inject timer: ObservableTimer) {
this.remoteOS = 'linux'; this.remoteOS = 'linux';
...@@ -91,6 +92,7 @@ class RemoteMachineTrainingService implements TrainingService { ...@@ -91,6 +92,7 @@ class RemoteMachineTrainingService implements TrainingService {
this.timer = timer; this.timer = timer;
this.log = getLogger(); this.log = getLogger();
this.trialSequenceId = -1; this.trialSequenceId = -1;
this.logCollection = 'none';
this.log.info('Construct remote machine training service.'); this.log.info('Construct remote machine training service.');
} }
...@@ -376,6 +378,9 @@ class RemoteMachineTrainingService implements TrainingService { ...@@ -376,6 +378,9 @@ class RemoteMachineTrainingService implements TrainingService {
case TrialConfigMetadataKey.VERSION_CHECK: case TrialConfigMetadataKey.VERSION_CHECK:
this.versionCheck = (value === 'true' || value === 'True'); this.versionCheck = (value === 'true' || value === 'True');
break; break;
case TrialConfigMetadataKey.LOG_COLLECTION:
this.logCollection = value;
break;
default: default:
//Reject for unknown keys //Reject for unknown keys
throw new Error(`Uknown key: ${key}`); throw new Error(`Uknown key: ${key}`);
...@@ -598,6 +603,7 @@ class RemoteMachineTrainingService implements TrainingService { ...@@ -598,6 +603,7 @@ class RemoteMachineTrainingService implements TrainingService {
nniManagerIp, nniManagerIp,
this.remoteRestServerPort, this.remoteRestServerPort,
version, version,
this.logCollection,
path.join(trialWorkingFolder, '.nni', 'code') path.join(trialWorkingFolder, '.nni', 'code')
) )
......
...@@ -36,6 +36,7 @@ Optional('nniManagerIp'): str, ...@@ -36,6 +36,7 @@ Optional('nniManagerIp'): str,
Optional('logDir'): os.path.isdir, Optional('logDir'): os.path.isdir,
Optional('debug'): bool, Optional('debug'): bool,
Optional('logLevel'): Or('trace', 'debug', 'info', 'warning', 'error', 'fatal'), Optional('logLevel'): Or('trace', 'debug', 'info', 'warning', 'error', 'fatal'),
Optional('logCollection'): Or('http', 'none'),
'useAnnotation': bool, 'useAnnotation': bool,
Optional('advisor'): Or({ Optional('advisor'): Or({
'builtinAdvisorName': Or('Hyperband'), 'builtinAdvisorName': Or('Hyperband'),
......
...@@ -274,6 +274,8 @@ def set_experiment(experiment_config, mode, port, config_file_name): ...@@ -274,6 +274,8 @@ def set_experiment(experiment_config, mode, port, config_file_name):
#debug mode should disable version check #debug mode should disable version check
if experiment_config.get('debug') is not None: if experiment_config.get('debug') is not None:
request_data['versionCheck'] = not experiment_config.get('debug') request_data['versionCheck'] = not experiment_config.get('debug')
if experiment_config.get('logCollection'):
request_data['logCollection'] = experiment_config.get('logCollection')
request_data['clusterMetaData'] = [] request_data['clusterMetaData'] = []
if experiment_config['trainingServicePlatform'] == 'local': if experiment_config['trainingServicePlatform'] == 'local':
......
...@@ -25,6 +25,7 @@ import logging ...@@ -25,6 +25,7 @@ import logging
import logging.handlers import logging.handlers
import time import time
import threading import threading
import re
from datetime import datetime from datetime import datetime
from enum import Enum, unique from enum import Enum, unique
...@@ -81,7 +82,7 @@ class RemoteLogger(object): ...@@ -81,7 +82,7 @@ class RemoteLogger(object):
""" """
NNI remote logger NNI remote logger
""" """
def __init__(self, syslog_host, syslog_port, tag, std_output_type, log_level=logging.INFO): def __init__(self, syslog_host, syslog_port, tag, std_output_type, log_collection, log_level=logging.INFO):
''' '''
constructor constructor
''' '''
...@@ -94,12 +95,13 @@ class RemoteLogger(object): ...@@ -94,12 +95,13 @@ class RemoteLogger(object):
self.orig_stdout = sys.__stdout__ self.orig_stdout = sys.__stdout__
else: else:
self.orig_stdout = sys.__stderr__ self.orig_stdout = sys.__stderr__
self.log_collection = log_collection
def get_pipelog_reader(self): def get_pipelog_reader(self):
''' '''
Get pipe for remote logger Get pipe for remote logger
''' '''
return PipeLogReader(self.logger, logging.INFO) return PipeLogReader(self.logger, self.log_collection, logging.INFO)
def write(self, buf): def write(self, buf):
''' '''
...@@ -117,7 +119,7 @@ class PipeLogReader(threading.Thread): ...@@ -117,7 +119,7 @@ class PipeLogReader(threading.Thread):
""" """
The reader thread reads log data from pipe The reader thread reads log data from pipe
""" """
def __init__(self, logger, log_level=logging.INFO): def __init__(self, logger, log_collection, log_level=logging.INFO):
"""Setup the object with a logger and a loglevel """Setup the object with a logger and a loglevel
and start the thread and start the thread
""" """
...@@ -131,6 +133,8 @@ class PipeLogReader(threading.Thread): ...@@ -131,6 +133,8 @@ class PipeLogReader(threading.Thread):
self.orig_stdout = sys.__stdout__ self.orig_stdout = sys.__stdout__
self._is_read_completed = False self._is_read_completed = False
self.process_exit = False self.process_exit = False
self.log_collection = log_collection
self.log_pattern = re.compile(r'^NNISDK_MEb\'.*\'$')
def _populateQueue(stream, queue): def _populateQueue(stream, queue):
''' '''
...@@ -143,8 +147,6 @@ class PipeLogReader(threading.Thread): ...@@ -143,8 +147,6 @@ class PipeLogReader(threading.Thread):
line = self.queue.get(True, 5) line = self.queue.get(True, 5)
try: try:
self.logger.log(self.log_level, line.rstrip()) self.logger.log(self.log_level, line.rstrip())
self.orig_stdout.write(line.rstrip() + '\n')
self.orig_stdout.flush()
except Exception as e: except Exception as e:
pass pass
except Exception as e: except Exception as e:
...@@ -165,9 +167,17 @@ class PipeLogReader(threading.Thread): ...@@ -165,9 +167,17 @@ class PipeLogReader(threading.Thread):
def run(self): def run(self):
"""Run the thread, logging everything. """Run the thread, logging everything.
If the log_collection is 'none', the log content will not be enqueued
""" """
for line in iter(self.pipeReader.readline, ''): for line in iter(self.pipeReader.readline, ''):
self.orig_stdout.write(line.rstrip() + '\n')
self.orig_stdout.flush()
if self.log_collection == 'none':
# If not match metrics, do not put the line into queue
if not self.log_pattern.match(line):
continue
self.queue.put(line) self.queue.put(line)
self.pipeReader.close() self.pipeReader.close()
def close(self): def close(self):
......
...@@ -44,10 +44,9 @@ def main_loop(args): ...@@ -44,10 +44,9 @@ def main_loop(args):
stdout_file = open(STDOUT_FULL_PATH, 'a+') stdout_file = open(STDOUT_FULL_PATH, 'a+')
stderr_file = open(STDERR_FULL_PATH, 'a+') stderr_file = open(STDERR_FULL_PATH, 'a+')
trial_keeper_syslogger = RemoteLogger(args.nnimanager_ip, args.nnimanager_port, 'trial_keeper', StdOutputType.Stdout, args.log_collection)
trial_keeper_syslogger = RemoteLogger(args.nnimanager_ip, args.nnimanager_port, 'trial_keeper', StdOutputType.Stdout)
# redirect trial keeper's stdout and stderr to syslog # redirect trial keeper's stdout and stderr to syslog
trial_syslogger_stdout = RemoteLogger(args.nnimanager_ip, args.nnimanager_port, 'trial', StdOutputType.Stdout) trial_syslogger_stdout = RemoteLogger(args.nnimanager_ip, args.nnimanager_port, 'trial', StdOutputType.Stdout, args.log_collection)
sys.stdout = sys.stderr = trial_keeper_syslogger sys.stdout = sys.stderr = trial_keeper_syslogger
# backward compatibility # backward compatibility
hdfs_host = None hdfs_host = None
...@@ -144,6 +143,7 @@ if __name__ == '__main__': ...@@ -144,6 +143,7 @@ if __name__ == '__main__':
PARSER.add_argument('--nni_hdfs_exp_dir', type=str, help='nni experiment directory in hdfs') PARSER.add_argument('--nni_hdfs_exp_dir', type=str, help='nni experiment directory in hdfs')
PARSER.add_argument('--webhdfs_path', type=str, help='the webhdfs path used in webhdfs URL') PARSER.add_argument('--webhdfs_path', type=str, help='the webhdfs path used in webhdfs URL')
PARSER.add_argument('--version', type=str, help='the nni version transmitted from trainingService') PARSER.add_argument('--version', type=str, help='the nni version transmitted from trainingService')
PARSER.add_argument('--log_collection', type=str, help='set the way to collect log in trialkeeper')
args, unknown = PARSER.parse_known_args() args, unknown = PARSER.parse_known_args()
if args.trial_command is None: if args.trial_command is None:
exit(1) exit(1)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment