Unverified Commit 806afeb6 authored by fishyds's avatar fishyds Committed by GitHub
Browse files

[Kubeflow Training Service] V1, merge from kubeflow branch to master branch (#382)

* Kubeflow TrainingService support, v1 (#373)

1. Create new Training Service: kubeflow trainning service, use 'kubectl' and kubeflow tfjobs CRD to submit and manage jobs
2. Update nni python SDK to support new kubeflow platform
3. Update nni python SDK's get_sequende_id() implementation, read NNI_TRIAL_SEQ_ID env variable, instead of reading .nni/sequence_id file
4. This version only supports Tensorflow operator. Will add more operators' support in future versions
parent b749266d
declare module 'child-process-promise' {
export function exec(command: string): Promise<void>;
export function exec(command: string): Promise<childProcessPromise.Result>;
export namespace childProcessPromise {
interface Result {
stdout: string;
stderr: string,
message: string
}
}
}
\ No newline at end of file
......@@ -27,7 +27,7 @@ if env_args.platform is None:
from .standalone import *
elif env_args.platform == 'unittest':
from .test import *
elif env_args.platform in ('local', 'remote', 'pai'):
elif env_args.platform in ('local', 'remote', 'pai', 'kubeflow'):
from .local import *
else:
raise RuntimeError('Unknown platform %s' % env_args.platform)
......@@ -79,5 +79,4 @@ def send_metric(string):
_metric_file.flush()
def get_sequence_id():
with open(os.path.join(_sysdir, '.nni', 'sequence_id'), 'r') as f:
return int(f.read().strip())
return os.environ['NNI_TRIAL_SEQ_ID']
\ No newline at end of file
......@@ -28,7 +28,7 @@ Optional('description'): str,
'trialConcurrency': And(int, lambda n: 1 <=n <= 999999),
Optional('maxExecDuration'): Regex(r'^[1-9][0-9]*[s|m|h|d]$'),
Optional('maxTrialNum'): And(int, lambda x: 1 <= x <= 99999),
'trainingServicePlatform': And(str, lambda x: x in ['remote', 'local', 'pai']),
'trainingServicePlatform': And(str, lambda x: x in ['remote', 'local', 'pai', 'kubeflow']),
Optional('searchSpacePath'): os.path.exists,
Optional('multiPhase'): bool,
'useAnnotation': bool,
......@@ -89,6 +89,28 @@ pai_config_schema = {
}
}
kubeflow_trial_schema = {
'trial':{
'command': str,
'codeDir': os.path.exists,
'gpuNum': And(int, lambda x: 0 <= x <= 99999),
'cpuNum': And(int, lambda x: 0 <= x <= 99999),
'memoryMB': int,
'image': str
}
}
kubeflow_config_schema = {
'kubeflowConfig':{
'operator': Or('tf-operator', 'mxnet-operator', 'pytorch-operato'),
'nfs': {
'server': str,
'path': str
},
'kubernetesServer': str
}
}
machine_list_schima = {
Optional('machineList'):[Or({
'ip': str,
......@@ -109,3 +131,5 @@ LOCAL_CONFIG_SCHEMA = Schema({**common_schema, **common_trial_schema})
REMOTE_CONFIG_SCHEMA = Schema({**common_schema, **common_trial_schema, **machine_list_schima})
PAI_CONFIG_SCHEMA = Schema({**common_schema, **pai_trial_schema, **pai_config_schema})
KUBEFLOW_CONFIG_SCHEMA = Schema({**common_schema, **kubeflow_trial_schema, **kubeflow_config_schema})
......@@ -37,7 +37,6 @@ import random
import site
from pathlib import Path
def get_log_path(config_file_name):
'''generate stdout and stderr log path'''
stdout_full_path = os.path.join(NNICTL_HOME_DIR, config_file_name, 'stdout')
......@@ -66,7 +65,7 @@ def start_rest_server(port, platform, mode, config_file_name, experiment_id=None
'You could use \'nnictl create --help\' to get help information' % port)
exit(1)
if platform == 'pai' and detect_port(int(port) + 1):
if (platform == 'pai' or platform == 'kubeflow') and detect_port(int(port) + 1):
print_error('PAI mode need an additional adjacent port %d, and the port %d is used by another process!\n' \
'You could set another port to start experiment!\n' \
'You could use \'nnictl create --help\' to get help information' % ((int(port) + 1), (int(port) + 1)))
......@@ -165,6 +164,23 @@ def set_pai_config(experiment_config, port, config_file_name):
#set trial_config
return set_trial_config(experiment_config, port, config_file_name), err_message
def set_kubeflow_config(experiment_config, port, config_file_name):
'''set kubeflow configuration'''
kubeflow_config_data = dict()
kubeflow_config_data['kubeflow_config'] = experiment_config['kubeflowConfig']
response = rest_put(cluster_metadata_url(port), json.dumps(kubeflow_config_data), 20)
err_message = None
if not response or not response.status_code == 200:
if response is not None:
err_message = response.text
_, stderr_full_path = get_log_path(config_file_name)
with open(stderr_full_path, 'a+') as fout:
fout.write(json.dumps(json.loads(err_message), indent=4, sort_keys=True, separators=(',', ':')))
return False, err_message
#set trial_config
return set_trial_config(experiment_config, port, config_file_name), err_message
def set_experiment(experiment_config, mode, port, config_file_name):
'''Call startExperiment (rest POST /experiment) with yaml file content'''
request_data = dict()
......@@ -312,6 +328,22 @@ def launch_experiment(args, experiment_config, mode, config_file_name, experimen
raise Exception(ERROR_INFO % 'Restful server stopped!')
exit(1)
#set kubeflow config
if experiment_config['trainingServicePlatform'] == 'kubeflow':
print_normal('Setting kubeflow config...')
config_result, err_msg = set_kubeflow_config(experiment_config, args.port, config_file_name)
if config_result:
print_normal('Successfully set kubeflow config!')
else:
if err_msg:
print_error('Failed! Error is: {}'.format(err_msg))
try:
cmds = ['pkill', '-P', str(rest_process.pid)]
call(cmds)
except Exception:
raise Exception(ERROR_INFO % 'Restful server stopped!')
exit(1)
# start a new experiment
print_normal('Starting experiment...')
response = set_experiment(experiment_config, mode, args.port, config_file_name)
......
......@@ -20,7 +20,7 @@
import os
import json
from .config_schema import LOCAL_CONFIG_SCHEMA, REMOTE_CONFIG_SCHEMA, PAI_CONFIG_SCHEMA
from .config_schema import LOCAL_CONFIG_SCHEMA, REMOTE_CONFIG_SCHEMA, PAI_CONFIG_SCHEMA, KUBEFLOW_CONFIG_SCHEMA
from .common_utils import get_json_content, print_error
def expand_path(experiment_config, key):
......@@ -77,18 +77,17 @@ def validate_search_space_content(experiment_config):
except:
raise Exception('searchspace file is not a valid json format!')
def validate_common_content(experiment_config):
'''Validate whether the common values in experiment_config is valid'''
if not experiment_config.get('trainingServicePlatform') or \
experiment_config.get('trainingServicePlatform') not in ['local', 'remote', 'pai']:
experiment_config.get('trainingServicePlatform') not in ['local', 'remote', 'pai', 'kubeflow']:
print_error('Please set correct trainingServicePlatform!')
exit(0)
schema_dict = {
'local': LOCAL_CONFIG_SCHEMA,
'remote': REMOTE_CONFIG_SCHEMA,
'pai': PAI_CONFIG_SCHEMA
'pai': PAI_CONFIG_SCHEMA,
'kubeflow': KUBEFLOW_CONFIG_SCHEMA
}
try:
schema_dict.get(experiment_config['trainingServicePlatform']).validate(experiment_config)
......
......@@ -28,6 +28,8 @@ HOME_DIR = os.path.join(os.environ['HOME'], 'nni')
LOG_DIR = os.environ['NNI_OUTPUT_DIR']
NNI_PLATFORM = os.environ['NNI_PLATFORM']
STDOUT_FULL_PATH = os.path.join(LOG_DIR, 'stdout')
STDERR_FULL_PATH = os.path.join(LOG_DIR, 'stderr')
......
......@@ -28,7 +28,7 @@ import re
from pyhdfs import HdfsClient
from .hdfsClientUtility import copyDirectoryToHdfs
from .constants import HOME_DIR, LOG_DIR, STDOUT_FULL_PATH, STDERR_FULL_PATH
from .constants import HOME_DIR, LOG_DIR, NNI_PLATFORM, STDOUT_FULL_PATH, STDERR_FULL_PATH
from .metrics_reader import read_experiment_metrics
logger = logging.getLogger('trial_keeper')
......@@ -52,7 +52,8 @@ def main_loop(args):
if retCode is not None:
print('subprocess terminated. Exit code is {}. Quit'.format(retCode))
#copy local directory to hdfs
if NNI_PLATFORM == 'pai':
# Copy local directory to hdfs for OpenPAI
nni_local_output_dir = os.environ['NNI_OUTPUT_DIR']
try:
hdfs_client = HdfsClient(hosts='{0}:{1}'.format(args.pai_hdfs_host, '50070'), user_name=args.pai_user_name, timeout=5)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment