Unverified Commit e341df81 authored by fishyds's avatar fishyds Committed by GitHub
Browse files

[Kubeflow training service] Update kubeflow exp job config schema to support...

[Kubeflow training service] Update kubeflow exp job config schema to support distributed training (#387)

* Support distributed training on tf-operator, for worker and ps

* Update validation rule for kubeflow config

* small code refactor adjustment for private methods

* Use different output folder for ps and worker
parent a5d614de
...@@ -39,8 +39,26 @@ export namespace ValidationSchemas { ...@@ -39,8 +39,26 @@ export namespace ValidationSchemas {
outputDir: joi.string(), outputDir: joi.string(),
cpuNum: joi.number().min(1), cpuNum: joi.number().min(1),
memoryMB: joi.number().min(100), memoryMB: joi.number().min(100),
gpuNum: joi.number().min(0).required(), gpuNum: joi.number().min(0),
command: joi.string().min(1).required() command: joi.string().min(1),
worker: joi.object({
replicas: joi.number().min(1).required(),
image: joi.string().min(1),
outputDir: joi.string(),
cpuNum: joi.number().min(1),
memoryMB: joi.number().min(100),
gpuNum: joi.number().min(0).required(),
command: joi.string().min(1).required()
}),
ps: joi.object({
replicas: joi.number().min(1).required(),
image: joi.string().min(1),
outputDir: joi.string(),
cpuNum: joi.number().min(1),
memoryMB: joi.number().min(100),
gpuNum: joi.number().min(0).required(),
command: joi.string().min(1).required()
})
}), }),
pai_config: joi.object({ pai_config: joi.object({
userName: joi.string().min(1).required(), userName: joi.string().min(1).required(),
......
...@@ -79,15 +79,44 @@ export class NFSConfig { ...@@ -79,15 +79,44 @@ export class NFSConfig {
/** /**
* Trial job configuration for Kubeflow * Trial job configuration for Kubeflow
*/ */
export class KubeflowTrialConfig extends TrialConfig { export class KubeflowTrialConfigTemplate {
/** replication number of current role */
public readonly replicas: number;
/** CPU number */
public readonly cpuNum: number; public readonly cpuNum: number;
/** Memory */
public readonly memoryMB: number; public readonly memoryMB: number;
/** Docker image */
public readonly image: string; public readonly image: string;
/** Trail command */
public readonly command : string;
/** Required GPU number for trial job. The number should be in [0,100] */
public readonly gpuNum : number;
constructor(command : string, codeDir : string, gpuNum : number, cpuNum: number, memoryMB: number, image: string) { constructor(replicas: number, command : string, gpuNum : number,
super(command, codeDir, gpuNum); cpuNum: number, memoryMB: number, image: string) {
this.replicas = replicas;
this.command = command;
this.gpuNum = gpuNum;
this.cpuNum = cpuNum; this.cpuNum = cpuNum;
this.memoryMB = memoryMB; this.memoryMB = memoryMB;
this.image = image; this.image = image;
} }
}
export class KubeflowTrialConfig {
public readonly codeDir: string;
public readonly ps?: KubeflowTrialConfigTemplate;
public readonly worker: KubeflowTrialConfigTemplate;
constructor(codeDir: string, worker: KubeflowTrialConfigTemplate, ps?: KubeflowTrialConfigTemplate) {
this.codeDir = codeDir;
this.worker = worker;
this.ps = ps;
}
} }
\ No newline at end of file
...@@ -72,7 +72,7 @@ mkdir -p $NNI_OUTPUT_DIR ...@@ -72,7 +72,7 @@ mkdir -p $NNI_OUTPUT_DIR
cp -rT $NNI_CODE_DIR $NNI_SYS_DIR cp -rT $NNI_CODE_DIR $NNI_SYS_DIR
cd $NNI_SYS_DIR cd $NNI_SYS_DIR
sh install_nni.sh # Check and install NNI pkg sh install_nni.sh # Check and install NNI pkg
python3 -m nni_trial_tool.trial_keeper --trial_command '{6}' --nnimanager_ip '{7}' --nnimanager_port '{8}' 1>$NNI_OUTPUT_DIR/trialkeeper_stdout 2>$NNI_OUTPUT_DIR//trialkeeper_stderr python3 -m nni_trial_tool.trial_keeper --trial_command '{6}' --nnimanager_ip '{7}' --nnimanager_port '{8}' 1>$NNI_OUTPUT_DIR/trialkeeper_stdout 2>$NNI_OUTPUT_DIR/trialkeeper_stderr
` `
export type KubeflowTFJobType = 'Created' | 'Running' | 'Failed' | 'Succeeded'; export type KubeflowTFJobType = 'Created' | 'Running' | 'Failed' | 'Succeeded';
\ No newline at end of file
...@@ -37,13 +37,15 @@ import { ...@@ -37,13 +37,15 @@ import {
TrialJobDetail, TrialJobMetric TrialJobDetail, TrialJobMetric
} from '../../common/trainingService'; } from '../../common/trainingService';
import { delay, generateParamFileName, getExperimentRootDir, getIPV4Address, uniqueString } from '../../common/utils'; import { delay, generateParamFileName, getExperimentRootDir, getIPV4Address, uniqueString } from '../../common/utils';
import { KubeflowClusterConfig, kubeflowOperatorMap, KubeflowTrialConfig, NFSConfig } from './kubeflowConfig'; import { KubeflowClusterConfig, kubeflowOperatorMap, KubeflowTrialConfig, KubeflowTrialConfigTemplate, NFSConfig } from './kubeflowConfig';
import { KubeflowTrialJobDetail, KUBEFLOW_RUN_SHELL_FORMAT } from './kubeflowData'; import { KubeflowTrialJobDetail, KUBEFLOW_RUN_SHELL_FORMAT } from './kubeflowData';
import { KubeflowJobRestServer } from './kubeflowJobRestServer'; import { KubeflowJobRestServer } from './kubeflowJobRestServer';
import { KubeflowJobInfoCollector } from './kubeflowJobInfoCollector'; import { KubeflowJobInfoCollector } from './kubeflowJobInfoCollector';
var yaml = require('node-yaml'); var yaml = require('node-yaml');
type DistTrainRole = 'worker' | 'ps';
/** /**
* Training Service implementation for Kubeflow * Training Service implementation for Kubeflow
* Refer https://github.com/kubeflow/kubeflow for more info about Kubeflow * Refer https://github.com/kubeflow/kubeflow for more info about Kubeflow
...@@ -64,7 +66,7 @@ class KubeflowTrainingService implements TrainingService { ...@@ -64,7 +66,7 @@ class KubeflowTrainingService implements TrainingService {
private kubeflowJobInfoCollector: KubeflowJobInfoCollector; private kubeflowJobInfoCollector: KubeflowJobInfoCollector;
private kubeflowRestServerPort?: number; private kubeflowRestServerPort?: number;
private kubeflowJobPlural?: string; private kubeflowJobPlural?: string;
private readonly CONTAINER_MOUNT_PATH: string; private readonly CONTAINER_MOUNT_PATH: string;
constructor() { constructor() {
this.log = getLogger(); this.log = getLogger();
...@@ -93,8 +95,8 @@ class KubeflowTrainingService implements TrainingService { ...@@ -93,8 +95,8 @@ class KubeflowTrainingService implements TrainingService {
throw new Error('Kubeflow Cluster config is not initialized'); throw new Error('Kubeflow Cluster config is not initialized');
} }
if(!this.kubeflowTrialConfig) { if(!this.kubeflowTrialConfig || !this.kubeflowTrialConfig.worker) {
throw new Error('Kubeflow trial config is not initialized'); throw new Error('Kubeflow trial config or worker config is not initialized');
} }
if(!this.kubeflowJobPlural) { if(!this.kubeflowJobPlural) {
...@@ -119,47 +121,57 @@ class KubeflowTrainingService implements TrainingService { ...@@ -119,47 +121,57 @@ class KubeflowTrainingService implements TrainingService {
// Write NNI installation file to local tmp files // Write NNI installation file to local tmp files
await fs.promises.writeFile(path.join(trialLocalTempFolder, 'install_nni.sh'), runScriptContent, { encoding: 'utf8' }); await fs.promises.writeFile(path.join(trialLocalTempFolder, 'install_nni.sh'), runScriptContent, { encoding: 'utf8' });
const kubeflowRunScriptContent: string = String.Format( // Create tmp trial working folder locally.
KUBEFLOW_RUN_SHELL_FORMAT,
`$PWD/nni/${trialJobId}`,
path.join(trialWorkingFolder, 'output'),
trialJobId,
getExperimentId(),
trialWorkingFolder,
curTrialSequenceId,
this.kubeflowTrialConfig.command,
getIPV4Address(),
this.kubeflowRestServerPort
);
//create tmp trial working folder locally.
await cpp.exec(`mkdir -p ${trialLocalTempFolder}`); await cpp.exec(`mkdir -p ${trialLocalTempFolder}`);
// Write file content ( run.sh and parameter.cfg ) to local tmp files // Write worker file content run_worker.sh to local tmp folders
await fs.promises.writeFile(path.join(trialLocalTempFolder, 'run.sh'), kubeflowRunScriptContent, { encoding: 'utf8' }); if(this.kubeflowTrialConfig.worker) {
const workerRunScriptContent: string = this.genereateRunScript(trialJobId, trialWorkingFolder,
this.kubeflowTrialConfig.worker.command, curTrialSequenceId.toString(), 'worker');
await fs.promises.writeFile(path.join(trialLocalTempFolder, 'run_worker.sh'), workerRunScriptContent, { encoding: 'utf8' });
}
// Write parameter server file content run_ps.sh to local tmp folders
if(this.kubeflowTrialConfig.ps) {
const psRunScriptContent: string = this.genereateRunScript(trialJobId, trialWorkingFolder,
this.kubeflowTrialConfig.ps.command, curTrialSequenceId.toString(), 'ps');
await fs.promises.writeFile(path.join(trialLocalTempFolder, 'run_ps.sh'), psRunScriptContent, { encoding: 'utf8' });
}
// Write file content ( parameter.cfg ) to local tmp folders // Write file content ( parameter.cfg ) to local tmp folders
const trialForm : TrialJobApplicationForm = (<TrialJobApplicationForm>form) const trialForm : TrialJobApplicationForm = (<TrialJobApplicationForm>form)
if(trialForm && trialForm.hyperParameters) { if(trialForm && trialForm.hyperParameters) {
await fs.promises.writeFile(path.join(trialLocalTempFolder, generateParamFileName(trialForm.hyperParameters)), await fs.promises.writeFile(path.join(trialLocalTempFolder, generateParamFileName(trialForm.hyperParameters)),
trialForm.hyperParameters.value, { encoding: 'utf8' }); trialForm.hyperParameters.value, { encoding: 'utf8' });
} }
const kubeflowJobYamlPath = path.join(trialLocalTempFolder, `kubeflow-job-${trialJobId}.yaml`); const kubeflowJobYamlPath = path.join(trialLocalTempFolder, `kubeflow-job-${trialJobId}.yaml`);
const kubeflowJobName = `nni-exp-${this.experimentId}-trial-${trialJobId}`.toLowerCase(); const kubeflowJobName = `nni-exp-${this.experimentId}-trial-${trialJobId}`.toLowerCase();
const podResources : any = {}; const workerPodResources : any = {};
podResources.requests = { workerPodResources.requests = {
'memory': `${this.kubeflowTrialConfig.memoryMB}Mi`, 'memory': `${this.kubeflowTrialConfig.worker.memoryMB}Mi`,
'cpu': `${this.kubeflowTrialConfig.cpuNum}`, 'cpu': `${this.kubeflowTrialConfig.worker.cpuNum}`,
'nvidia.com/gpu': `${this.kubeflowTrialConfig.gpuNum}` 'nvidia.com/gpu': `${this.kubeflowTrialConfig.worker.gpuNum}`
} }
workerPodResources.limits = Object.assign({}, workerPodResources.requests);
podResources.limits = Object.assign({}, podResources.requests);
let psPodResources : any = undefined;
if(this.kubeflowTrialConfig.ps) {
psPodResources = {};
psPodResources.requests = {
'memory': `${this.kubeflowTrialConfig.ps.memoryMB}Mi`,
'cpu': `${this.kubeflowTrialConfig.ps.cpuNum}`,
'nvidia.com/gpu': `${this.kubeflowTrialConfig.ps.gpuNum}`
}
psPodResources.limits = Object.assign({}, psPodResources.requests);
}
// Generate kubeflow job resource yaml file for K8S // Generate kubeflow job resource yaml file for K8S
yaml.write( yaml.write(
kubeflowJobYamlPath, kubeflowJobYamlPath,
this.generateKubeflowJobConfig(trialJobId, trialWorkingFolder, kubeflowJobName, podResources), this.generateKubeflowJobConfig(trialJobId, trialWorkingFolder, kubeflowJobName, workerPodResources, psPodResources),
'utf-8' 'utf-8'
); );
...@@ -281,6 +293,7 @@ class KubeflowTrainingService implements TrainingService { ...@@ -281,6 +293,7 @@ class KubeflowTrainingService implements TrainingService {
} }
this.kubeflowTrialConfig = <KubeflowTrialConfig>JSON.parse(value); this.kubeflowTrialConfig = <KubeflowTrialConfig>JSON.parse(value);
assert(this.kubeflowClusterConfig !== undefined && this.kubeflowTrialConfig.worker !== undefined);
break; break;
default: default:
break; break;
...@@ -339,7 +352,15 @@ class KubeflowTrainingService implements TrainingService { ...@@ -339,7 +352,15 @@ class KubeflowTrainingService implements TrainingService {
return this.metricsEmitter; return this.metricsEmitter;
} }
private generateKubeflowJobConfig(trialJobId: string, trialWorkingFolder: string, kubeflowJobName : string, podResources : any) : any { /**
* Generate kubeflow resource config file
* @param trialJobId trial job id
* @param trialWorkingFolder working folder
* @param kubeflowJobName job name
* @param workerPodResources worker pod template
* @param psPodResources ps pod template
*/
private generateKubeflowJobConfig(trialJobId: string, trialWorkingFolder: string, kubeflowJobName : string, workerPodResources : any, psPodResources?: any) : any {
if(!this.kubeflowClusterConfig) { if(!this.kubeflowClusterConfig) {
throw new Error('Kubeflow Cluster config is not initialized'); throw new Error('Kubeflow Cluster config is not initialized');
} }
...@@ -348,6 +369,15 @@ class KubeflowTrainingService implements TrainingService { ...@@ -348,6 +369,15 @@ class KubeflowTrainingService implements TrainingService {
throw new Error('Kubeflow trial config is not initialized'); throw new Error('Kubeflow trial config is not initialized');
} }
const tfReplicaSpecsObj: any = {};
tfReplicaSpecsObj.Worker = this.generateReplicaConfig(trialWorkingFolder, this.kubeflowTrialConfig.worker.replicas,
this.kubeflowTrialConfig.worker.image, 'run_worker.sh', workerPodResources);
if(this.kubeflowTrialConfig.ps) {
tfReplicaSpecsObj.Ps = this.generateReplicaConfig(trialWorkingFolder, this.kubeflowTrialConfig.ps.replicas,
this.kubeflowTrialConfig.ps.image, 'run_ps.sh', psPodResources);
}
return { return {
apiVersion: 'kubeflow.org/v1alpha2', apiVersion: 'kubeflow.org/v1alpha2',
kind: 'TFJob', kind: 'TFJob',
...@@ -361,44 +391,84 @@ class KubeflowTrainingService implements TrainingService { ...@@ -361,44 +391,84 @@ class KubeflowTrainingService implements TrainingService {
} }
}, },
spec: { spec: {
tfReplicaSpecs: { tfReplicaSpecs: tfReplicaSpecsObj
Worker: {
replicas: 1,
template: {
metadata: {
creationTimestamp: null
},
spec: {
containers: [
{
// Kubeflow tensorflow operator requires that containers' name must be tensorflow
// TODO: change the name based on operator's type
name: 'tensorflow',
image: this.kubeflowTrialConfig.image,
args: ["sh", `${path.join(trialWorkingFolder, 'run.sh')}`],
volumeMounts: [{
name: 'nni-nfs-vol',
mountPath: this.CONTAINER_MOUNT_PATH
}],
resources: podResources//,
//workingDir: '/tmp/nni/nuDEP'
}],
restartPolicy: 'ExitCode',
volumes: [{
name: 'nni-nfs-vol',
nfs: {
server: `${this.kubeflowClusterConfig.nfs.server}`,
path: `${this.kubeflowClusterConfig.nfs.path}`
}
}]
}
}
}
}
} }
}; };
} }
/**
* Generate tf-operator's tfjobs replica config section
* @param trialWorkingFolder trial working folder
* @param replicaNumber replica number
* @param replicaImage image
* @param runScriptFile script file name
* @param podResources pod resource config section
*/
private generateReplicaConfig(trialWorkingFolder: string, replicaNumber: number, replicaImage: string, runScriptFile: string, podResources: any): any {
if(!this.kubeflowClusterConfig) {
throw new Error('Kubeflow Cluster config is not initialized');
}
if(!this.kubeflowTrialConfig) {
throw new Error('Kubeflow trial config is not initialized');
}
return {
replicas: replicaNumber,
template: {
metadata: {
creationTimestamp: null
},
spec: {
containers: [
{
// Kubeflow tensorflow operator requires that containers' name must be tensorflow
// TODO: change the name based on operator's type
name: 'tensorflow',
image: replicaImage,
args: ["sh", `${path.join(trialWorkingFolder, runScriptFile)}`],
volumeMounts: [{
name: 'nni-nfs-vol',
mountPath: this.CONTAINER_MOUNT_PATH
}],
resources: podResources
}],
restartPolicy: 'ExitCode',
volumes: [{
name: 'nni-nfs-vol',
nfs: {
server: `${this.kubeflowClusterConfig.nfs.server}`,
path: `${this.kubeflowClusterConfig.nfs.path}`
}
}]
}
}
};
}
/**
* Genereate run script for different roles(like worker or ps)
* @param trialJobId trial job id
* @param trialWorkingFolder working folder
* @param command
* @param trialSequenceId sequence id
*/
private genereateRunScript(trialJobId: string, trialWorkingFolder: string,
command: string, trialSequenceId: string, roleType: DistTrainRole): string {
return String.Format(
KUBEFLOW_RUN_SHELL_FORMAT,
`$PWD/nni/${trialJobId}`,
path.join(trialWorkingFolder, `${roleType}_output`),
trialJobId,
getExperimentId(),
trialWorkingFolder,
trialSequenceId,
command,
getIPV4Address(),
this.kubeflowRestServerPort
);
}
private generateSequenceId(): number { private generateSequenceId(): number {
if (this.nextTrialSequenceId === -1) { if (this.nextTrialSequenceId === -1) {
this.nextTrialSequenceId = getInitTrialSequenceId(); this.nextTrialSequenceId = getInitTrialSequenceId();
......
...@@ -92,12 +92,23 @@ pai_config_schema = { ...@@ -92,12 +92,23 @@ pai_config_schema = {
kubeflow_trial_schema = { kubeflow_trial_schema = {
'trial':{ 'trial':{
'command': str, 'codeDir': os.path.exists,
'codeDir': os.path.exists, Optional('ps'): {
'gpuNum': And(int, lambda x: 0 <= x <= 99999), 'replicas': int,
'cpuNum': And(int, lambda x: 0 <= x <= 99999), 'command': str,
'memoryMB': int, 'gpuNum': And(int, lambda x: 0 <= x <= 99999),
'image': str 'cpuNum': And(int, lambda x: 0 <= x <= 99999),
'memoryMB': int,
'image': str
},
'worker':{
'replicas': int,
'command': str,
'gpuNum': And(int, lambda x: 0 <= x <= 99999),
'cpuNum': And(int, lambda x: 0 <= x <= 99999),
'memoryMB': int,
'image': str
}
} }
} }
......
...@@ -99,21 +99,7 @@ def start_rest_server(port, platform, mode, config_file_name, experiment_id=None ...@@ -99,21 +99,7 @@ def start_rest_server(port, platform, mode, config_file_name, experiment_id=None
def set_trial_config(experiment_config, port, config_file_name): def set_trial_config(experiment_config, port, config_file_name):
'''set trial configuration''' '''set trial configuration'''
request_data = dict() request_data = dict()
value_dict = dict() request_data['trial_config'] = experiment_config['trial']
value_dict['command'] = experiment_config['trial']['command']
value_dict['codeDir'] = experiment_config['trial']['codeDir']
value_dict['gpuNum'] = experiment_config['trial']['gpuNum']
if experiment_config['trial'].get('cpuNum'):
value_dict['cpuNum'] = experiment_config['trial']['cpuNum']
if experiment_config['trial'].get('memoryMB'):
value_dict['memoryMB'] = experiment_config['trial']['memoryMB']
if experiment_config['trial'].get('image'):
value_dict['image'] = experiment_config['trial']['image']
if experiment_config['trial'].get('dataDir'):
value_dict['dataDir'] = experiment_config['trial']['dataDir']
if experiment_config['trial'].get('outputDir'):
value_dict['outputDir'] = experiment_config['trial']['outputDir']
request_data['trial_config'] = value_dict
response = rest_put(cluster_metadata_url(port), json.dumps(request_data), 20) response = rest_put(cluster_metadata_url(port), json.dumps(request_data), 20)
if check_response(response): if check_response(response):
return True return True
...@@ -211,31 +197,18 @@ def set_experiment(experiment_config, mode, port, config_file_name): ...@@ -211,31 +197,18 @@ def set_experiment(experiment_config, mode, port, config_file_name):
elif experiment_config['trainingServicePlatform'] == 'remote': elif experiment_config['trainingServicePlatform'] == 'remote':
request_data['clusterMetaData'].append( request_data['clusterMetaData'].append(
{'key': 'machine_list', 'value': experiment_config['machineList']}) {'key': 'machine_list', 'value': experiment_config['machineList']})
value_dict = dict()
value_dict['command'] = experiment_config['trial']['command']
value_dict['codeDir'] = experiment_config['trial']['codeDir']
value_dict['gpuNum'] = experiment_config['trial']['gpuNum']
request_data['clusterMetaData'].append( request_data['clusterMetaData'].append(
{'key': 'trial_config', 'value': value_dict}) {'key': 'trial_config', 'value': experiment_config['trial']})
elif experiment_config['trainingServicePlatform'] == 'pai': elif experiment_config['trainingServicePlatform'] == 'pai':
request_data['clusterMetaData'].append( request_data['clusterMetaData'].append(
{'key': 'pai_config', 'value': experiment_config['paiConfig']}) {'key': 'pai_config', 'value': experiment_config['paiConfig']})
value_dict = dict()
value_dict['command'] = experiment_config['trial']['command']
value_dict['codeDir'] = experiment_config['trial']['codeDir']
value_dict['gpuNum'] = experiment_config['trial']['gpuNum']
if experiment_config['trial'].get('cpuNum'):
value_dict['cpuNum'] = experiment_config['trial']['cpuNum']
if experiment_config['trial'].get('memoryMB'):
value_dict['memoryMB'] = experiment_config['trial']['memoryMB']
if experiment_config['trial'].get('image'):
value_dict['image'] = experiment_config['trial']['image']
if experiment_config['trial'].get('dataDir'):
value_dict['dataDir'] = experiment_config['trial']['dataDir']
if experiment_config['trial'].get('outputDir'):
value_dict['outputDir'] = experiment_config['trial']['outputDir']
request_data['clusterMetaData'].append( request_data['clusterMetaData'].append(
{'key': 'trial_config', 'value': value_dict}) {'key': 'trial_config', 'value': experiment_config['trial']})
elif experiment_config['trainingServicePlatform'] == 'kubeflow':
request_data['clusterMetaData'].append(
{'key': 'kubeflow_config', 'value': experiment_config['kubeflowConfig']})
request_data['clusterMetaData'].append(
{'key': 'trial_config', 'value': experiment_config['trial']})
response = rest_post(experiment_url(port), json.dumps(request_data), 20) response = rest_post(experiment_url(port), json.dumps(request_data), 20)
if check_response(response): if check_response(response):
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment