Unverified Commit c265903e authored by SparkSnail's avatar SparkSnail Committed by GitHub
Browse files

Support kuberflow pytorch-operator (#406)

1.Support pytorch-operator
2.remove unsupported operator
parent 80624de7
......@@ -371,6 +371,10 @@ machineList:
__operator__ specify the kubeflow's operator to be used, nni support __tf-operator__ in current version.
* __storage__
__storage__ specify the storage type of kubeflow, including {__nfs__, __azureStorage__}. This field is optional, and the default value is __nfs__. If the config use azureStorage, this field must be completed.
* __nfs__
__server__ is the host of nfs server
......
......@@ -63,6 +63,7 @@ trial:
image: {your_docker_image_for_tensorflow_worker}
kubeflowConfig:
operator: tf-operator
storage: nfs
nfs:
server: {your_nfs_server}
path: {your_nfs_server_exported_path}
......@@ -71,6 +72,7 @@ If you use Azure Kubernetes Service, you should set `kubeflowConfig` in your co
```
kubeflowConfig:
operator: tf-operator
storage: azureStorage
keyVault:
vaultName: {your_vault_name}
name: {your_secert_name}
......
......@@ -59,6 +59,15 @@ export namespace ValidationSchemas {
memoryMB: joi.number().min(100),
gpuNum: joi.number().min(0).required(),
command: joi.string().min(1).required()
}),
master: joi.object({
replicas: joi.number().min(1).required(),
image: joi.string().min(1),
outputDir: joi.string(),
cpuNum: joi.number().min(1),
memoryMB: joi.number().min(100),
gpuNum: joi.number().min(0).required(),
command: joi.string().min(1).required()
})
}),
pai_config: joi.object({
......@@ -68,6 +77,7 @@ export namespace ValidationSchemas {
}),
kubeflow_config: joi.object({
operator: joi.string().min(1).required(),
storage: joi.string().min(1),
nfs: joi.object({
server: joi.string().min(1).required(),
path: joi.string().min(1).required()
......
......@@ -23,31 +23,35 @@ import { TrialConfig } from "../common/trialConfig";
/** operator types that kubeflow supported */
export type KubeflowOperator = 'tf-operator' | 'pytorch-operator' | 'mxnet-operator' | 'caffe2-operator' | 'chainer-operator' | 'mpi-operator';
export type KubeflowOperatorPlural = 'tfjobs' | 'pytorchjobs' | 'mxjobs' | 'caffe2jobs' | 'chainerjobs' | 'mpijobs';
export type KubeflowOperator = 'tf-operator' | 'pytorch-operator' ;
export type KubeflowOperatorPlural = 'tfjobs' | 'pytorchjobs' ;
export type KubeflowOperatorJobKind = 'TFJob' | 'PyTorchJob';
export type KubeflowStorageKind = 'nfs' | 'azureStorage';
/**
* map from Kubeflow operator name to its plural name in K8S
*/
export const kubeflowOperatorMap : Map<KubeflowOperator, KubeflowOperatorPlural> = new Map<KubeflowOperator, KubeflowOperatorPlural>([
['tf-operator' , 'tfjobs'],
['pytorch-operator', 'pytorchjobs'],
['mxnet-operator', 'mxjobs'],
['caffe2-operator', 'caffe2jobs'],
['chainer-operator', 'chainerjobs'],
['mpi-operator', 'mpijobs']
['pytorch-operator', 'pytorchjobs']
]);
/**
* map from Kubeflow operator name to its job kind name in K8S
*/
export const kubeflowOperatorJobKindMap : Map<KubeflowOperator, KubeflowOperatorJobKind> = new Map<KubeflowOperator, KubeflowOperatorJobKind>([
['tf-operator' , 'TFJob'],
['pytorch-operator', 'PyTorchJob']
]);
/**
* Kuberflow cluster configuration
*
*/
export class KubeflowClusterConfig {
export class KubeflowClusterConfigBase {
/** Name of Kubeflow operator, like tf-operator */
public readonly operator: KubeflowOperator;
public readonly nfs?: NFSConfig;
public readonly keyVault?: keyVaultConfig;
public readonly azureStorage?: AzureStorage;
public readonly storage?: KubeflowStorageKind;
/**
* Constructor
......@@ -55,9 +59,27 @@ export class KubeflowClusterConfig {
* @param passWord password of Kubeflow Cluster
* @param host Host IP of Kubeflow Cluster
*/
constructor(operator: KubeflowOperator, nfs?: NFSConfig, keyVault?: keyVaultConfig, azureStorage ?: AzureStorage) {
constructor(operator: KubeflowOperator, storage?: KubeflowStorageKind) {
this.operator = operator;
this.nfs = nfs;
this.storage = storage;
}
}
export class KubeflowClusterConfigNFS extends KubeflowClusterConfigBase{
public readonly nfs: NFSConfig;
constructor(operator: KubeflowOperator, nfs: NFSConfig, storage?: KubeflowStorageKind) {
super(operator, storage)
this.nfs = nfs;
}
}
export class KubeflowClusterConfigAzure extends KubeflowClusterConfigBase{
public readonly keyVault: keyVaultConfig;
public readonly azureStorage: AzureStorage;
constructor(operator: KubeflowOperator, keyVault: keyVaultConfig, azureStorage: AzureStorage, storage?: KubeflowStorageKind) {
super(operator, storage)
this.keyVault = keyVault;
this.azureStorage = azureStorage;
}
......@@ -142,15 +164,33 @@ export class KubeflowTrialConfigTemplate {
}
}
export class KubeflowTrialConfig {
export class KubeflowTrialConfigBase {
public readonly codeDir: string;
constructor(codeDir: string) {
this.codeDir = codeDir;
}
}
export class KubeflowTrialConfigTensorflow extends KubeflowTrialConfigBase{
public readonly ps?: KubeflowTrialConfigTemplate;
public readonly worker: KubeflowTrialConfigTemplate;
constructor(codeDir: string, worker: KubeflowTrialConfigTemplate, ps?: KubeflowTrialConfigTemplate) {
this.codeDir = codeDir;
this.worker = worker;
constructor(codeDir: string, worker: KubeflowTrialConfigTemplate, ps?: KubeflowTrialConfigTemplate) {
super(codeDir);
this.ps = ps;
this.worker = worker;
}
}
export class KubeflowTrialConfigPytorch extends KubeflowTrialConfigBase{
public readonly master?: KubeflowTrialConfigTemplate;
public readonly worker: KubeflowTrialConfigTemplate;
constructor(codeDir: string, worker: KubeflowTrialConfigTemplate, master?: KubeflowTrialConfigTemplate) {
super(codeDir);
this.master = master;
this.worker = worker;
}
}
......@@ -36,7 +36,8 @@ import {
TrialJobDetail, TrialJobMetric, NNIManagerIpConfig
} from '../../common/trainingService';
import { delay, generateParamFileName, getExperimentRootDir, getIPV4Address, uniqueString, getJobCancelStatus } from '../../common/utils';
import { KubeflowClusterConfig, kubeflowOperatorMap, KubeflowTrialConfig, NFSConfig } from './kubeflowConfig';
import { KubeflowClusterConfigBase, KubeflowClusterConfigNFS, KubeflowClusterConfigAzure, kubeflowOperatorMap, KubeflowTrialConfigBase,
KubeflowTrialConfigPytorch, KubeflowTrialConfigTensorflow, NFSConfig, kubeflowOperatorJobKindMap } from './kubeflowConfig';
import { KubeflowTrialJobDetail } from './kubeflowData';
import { KubeflowJobRestServer } from './kubeflowJobRestServer';
import { KubeflowJobInfoCollector } from './kubeflowJobInfoCollector';
......@@ -47,7 +48,7 @@ import * as azureStorage from 'azure-storage';
var yaml = require('node-yaml');
var azure = require('azure-storage');
type DistTrainRole = 'worker' | 'ps';
type DistTrainRole = 'worker' | 'ps' | 'master';
/**
* Training Service implementation for Kubeflow
......@@ -64,11 +65,12 @@ class KubeflowTrainingService implements TrainingService {
private stopping: boolean = false;
private experimentId! : string;
private nextTrialSequenceId: number;
private kubeflowClusterConfig?: KubeflowClusterConfig;
private kubeflowTrialConfig?: KubeflowTrialConfig;
private kubeflowClusterConfig?: KubeflowClusterConfigBase;
private kubeflowTrialConfig?: KubeflowTrialConfigBase;
private kubeflowJobInfoCollector: KubeflowJobInfoCollector;
private kubeflowRestServerPort?: number;
private kubeflowJobPlural?: string;
private kubeflowJobKind?: string;
private readonly CONTAINER_MOUNT_PATH: string;
private azureStorageClient?: azureStorage.FileService;
private azureStorageShare?: string;
......@@ -103,8 +105,8 @@ class KubeflowTrainingService implements TrainingService {
throw new Error('Kubeflow Cluster config is not initialized');
}
if(!this.kubeflowTrialConfig || !this.kubeflowTrialConfig.worker) {
throw new Error('Kubeflow trial config or worker config is not initialized');
if(!this.kubeflowTrialConfig) {
throw new Error('Kubeflow trial config is not initialized');
}
if(!this.kubeflowJobPlural) {
......@@ -115,6 +117,15 @@ class KubeflowTrainingService implements TrainingService {
const restServer: KubeflowJobRestServer = component.get(KubeflowJobRestServer);
this.kubeflowRestServerPort = restServer.clusterRestServerPort;
}
// initialize kubeflow trial config to specific type
let kubeflowTrialConfig;
if(this.kubeflowClusterConfig.operator === 'tf-operator') {
kubeflowTrialConfig = <KubeflowTrialConfigTensorflow>this.kubeflowTrialConfig;
}else if(this.kubeflowClusterConfig.operator === 'pytorch-operator'){
kubeflowTrialConfig = <KubeflowTrialConfigPytorch>this.kubeflowTrialConfig;
}else {
throw Error(`operator ${this.kubeflowClusterConfig.operator} is invalid`)
}
const trialJobId: string = uniqueString(5);
const curTrialSequenceId: number = this.generateSequenceId();
......@@ -123,78 +134,77 @@ class KubeflowTrainingService implements TrainingService {
const trialLocalTempFolder: string = path.join(getExperimentRootDir(), 'trials-local', trialJobId);
//create tmp trial working folder locally.
await cpp.exec(`mkdir -p ${path.dirname(trialLocalTempFolder)}`);
await cpp.exec(`cp -r ${this.kubeflowTrialConfig.codeDir} ${trialLocalTempFolder}`);
await cpp.exec(`cp -r ${kubeflowTrialConfig.codeDir} ${trialLocalTempFolder}`);
const runScriptContent : string = CONTAINER_INSTALL_NNI_SHELL_FORMAT;
// Write NNI installation file to local tmp files
await fs.promises.writeFile(path.join(trialLocalTempFolder, 'install_nni.sh'), runScriptContent, { encoding: 'utf8' });
// Create tmp trial working folder locally.
await cpp.exec(`mkdir -p ${trialLocalTempFolder}`);
// Write worker file content run_worker.sh to local tmp folders
if(this.kubeflowTrialConfig.worker) {
const workerRunScriptContent: string = this.genereateRunScript(trialJobId, trialWorkingFolder,
this.kubeflowTrialConfig.worker.command, curTrialSequenceId.toString(), 'worker');
if(kubeflowTrialConfig.worker) {
const workerRunScriptContent: string = this.generateRunScript(trialJobId, trialWorkingFolder,
kubeflowTrialConfig.worker.command, curTrialSequenceId.toString(), 'worker', kubeflowTrialConfig.worker.gpuNum);
await fs.promises.writeFile(path.join(trialLocalTempFolder, 'run_worker.sh'), workerRunScriptContent, { encoding: 'utf8' });
}
// Write parameter server file content run_ps.sh to local tmp folders
if(this.kubeflowTrialConfig.ps) {
const psRunScriptContent: string = this.genereateRunScript(trialJobId, trialWorkingFolder,
this.kubeflowTrialConfig.ps.command, curTrialSequenceId.toString(), 'ps');
await fs.promises.writeFile(path.join(trialLocalTempFolder, 'run_ps.sh'), psRunScriptContent, { encoding: 'utf8' });
if(this.kubeflowClusterConfig.operator === 'tf-operator') {
let tensorflowTrialConfig: KubeflowTrialConfigTensorflow = <KubeflowTrialConfigTensorflow>this.kubeflowTrialConfig;
if(tensorflowTrialConfig.ps){
const psRunScriptContent: string = this.generateRunScript(trialJobId, trialWorkingFolder,
tensorflowTrialConfig.ps.command, curTrialSequenceId.toString(), 'ps', tensorflowTrialConfig.ps.gpuNum);
await fs.promises.writeFile(path.join(trialLocalTempFolder, 'run_ps.sh'), psRunScriptContent, { encoding: 'utf8' });
}
}
else if(this.kubeflowClusterConfig.operator === 'pytorch-operator') {
let pytorchTrialConfig: KubeflowTrialConfigPytorch = <KubeflowTrialConfigPytorch>this.kubeflowTrialConfig;
if(pytorchTrialConfig.master){
const masterRunScriptContent: string = this.generateRunScript(trialJobId, trialWorkingFolder,
pytorchTrialConfig.master.command, curTrialSequenceId.toString(), 'master', pytorchTrialConfig.master.gpuNum);
await fs.promises.writeFile(path.join(trialLocalTempFolder, 'run_master.sh'), masterRunScriptContent, { encoding: 'utf8' });
}
}
// Write file content ( parameter.cfg ) to local tmp folders
const trialForm : TrialJobApplicationForm = (<TrialJobApplicationForm>form)
if(trialForm && trialForm.hyperParameters) {
await fs.promises.writeFile(path.join(trialLocalTempFolder, generateParamFileName(trialForm.hyperParameters)),
trialForm.hyperParameters.value, { encoding: 'utf8' });
}
const kubeflowJobYamlPath = path.join(trialLocalTempFolder, `kubeflow-job-${trialJobId}.yaml`);
const kubeflowJobName = `nni-exp-${this.experimentId}-trial-${trialJobId}`.toLowerCase();
const workerPodResources : any = {};
workerPodResources.requests = {
'memory': `${this.kubeflowTrialConfig.worker.memoryMB}Mi`,
'cpu': `${this.kubeflowTrialConfig.worker.cpuNum}`,
'nvidia.com/gpu': `${this.kubeflowTrialConfig.worker.gpuNum}`
}
workerPodResources.requests = this.generatePodResource(kubeflowTrialConfig.worker.memoryMB, kubeflowTrialConfig.worker.cpuNum,
kubeflowTrialConfig.worker.gpuNum)
workerPodResources.limits = Object.assign({}, workerPodResources.requests);
let psPodResources : any = undefined;
if(this.kubeflowTrialConfig.ps) {
psPodResources = {};
psPodResources.requests = {
'memory': `${this.kubeflowTrialConfig.ps.memoryMB}Mi`,
'cpu': `${this.kubeflowTrialConfig.ps.cpuNum}`,
'nvidia.com/gpu': `${this.kubeflowTrialConfig.ps.gpuNum}`
let nonWorkerResources : any = {};
if(this.kubeflowClusterConfig.operator === 'tf-operator') {
let tensorflowTrialConfig: KubeflowTrialConfigTensorflow = <KubeflowTrialConfigTensorflow>this.kubeflowTrialConfig;
if (tensorflowTrialConfig.ps) {
nonWorkerResources.requests = this.generatePodResource(tensorflowTrialConfig.ps.memoryMB, tensorflowTrialConfig.ps.cpuNum,
tensorflowTrialConfig.ps.gpuNum)
nonWorkerResources.limits = Object.assign({}, nonWorkerResources.requests);
}
psPodResources.limits = Object.assign({}, psPodResources.requests);
}
}else if(this.kubeflowClusterConfig.operator === 'pytorch-operator'){
let pyTorchTrialConfig: KubeflowTrialConfigPytorch = <KubeflowTrialConfigPytorch>this.kubeflowTrialConfig;
if (pyTorchTrialConfig.master) {
nonWorkerResources.requests = this.generatePodResource(pyTorchTrialConfig.master.memoryMB, pyTorchTrialConfig.master.cpuNum,
pyTorchTrialConfig.master.gpuNum)
nonWorkerResources.limits = Object.assign({}, nonWorkerResources.requests);
}
}
// Generate kubeflow job resource yaml file for K8S
yaml.write(
kubeflowJobYamlPath,
this.generateKubeflowJobConfig(trialJobId, trialWorkingFolder, kubeflowJobName, workerPodResources, psPodResources),
this.generateKubeflowJobConfig(trialJobId, trialWorkingFolder, kubeflowJobName, workerPodResources, nonWorkerResources),
'utf-8'
);
let trialJobDetail: KubeflowTrialJobDetail;
//The url used in trialJobDetail
let trialJobDetailUrl: string;
if(this.kubeflowClusterConfig.nfs) {
// Creat work dir for current trial in NFS directory
await cpp.exec(`mkdir -p ${this.trialLocalNFSTempFolder}/nni/${getExperimentId()}/${trialJobId}`);
// Copy code files from local dir to NFS mounted dir
await cpp.exec(`cp -r ${trialLocalTempFolder}/* ${this.trialLocalNFSTempFolder}/nni/${getExperimentId()}/${trialJobId}/.`);
const nfsConfig: NFSConfig = this.kubeflowClusterConfig.nfs;
trialJobDetailUrl = `nfs://${nfsConfig.server}:${path.join(nfsConfig.path, 'nni', getExperimentId(), trialJobId, 'output')}`
} else {
if(this.kubeflowClusterConfig.storage && this.kubeflowClusterConfig.storage === 'azureStorage') {
try{
//upload local files to azure storage
await AzureStorageClientUtility.uploadDirectory(this.azureStorageClient,
......@@ -205,8 +215,21 @@ class KubeflowTrainingService implements TrainingService {
this.log.error(error);
return Promise.reject(error);
}
} else if(this.kubeflowClusterConfig.storage && (this.kubeflowClusterConfig.storage === 'nfs' || this.kubeflowClusterConfig.storage === undefined)) {
let nfsKubeflowClusterConfig: KubeflowClusterConfigNFS = <KubeflowClusterConfigNFS>this.kubeflowClusterConfig;
// Creat work dir for current trial in NFS directory
await cpp.exec(`mkdir -p ${this.trialLocalNFSTempFolder}/nni/${getExperimentId()}/${trialJobId}`);
// Copy code files from local dir to NFS mounted dir
await cpp.exec(`cp -r ${trialLocalTempFolder}/* ${this.trialLocalNFSTempFolder}/nni/${getExperimentId()}/${trialJobId}/.`);
const nfsConfig: NFSConfig = nfsKubeflowClusterConfig.nfs;
trialJobDetailUrl = `nfs://${nfsConfig.server}:${path.join(nfsConfig.path, 'nni', getExperimentId(), trialJobId, 'output')}`
} else {
const error: string = `kubeflowClusterConfig format error!`;
this.log.error(error);
throw new Error(error);
}
trialJobDetail = new KubeflowTrialJobDetail(
trialJobId,
'WAITING',
......@@ -218,7 +241,7 @@ class KubeflowTrainingService implements TrainingService {
trialJobDetailUrl,
this.kubeflowJobPlural
);
// Create kubeflow training jobs
await cpp.exec(`kubectl create -f ${kubeflowJobYamlPath}`);
// Set trial job detail until kubectl create resource successfully
......@@ -227,6 +250,14 @@ class KubeflowTrainingService implements TrainingService {
return Promise.resolve(trialJobDetail);
}
public generatePodResource(memory: number, cpuNum: number, gpuNum: number) {
return {
'memory': `${memory}Mi`,
'cpu': `${cpuNum}`,
'nvidia.com/gpu': `${gpuNum}`
}
}
public updateTrialJob(trialJobId: string, form: JobApplicationForm): Promise<TrialJobDetail> {
throw new MethodNotImplementedError();
}
......@@ -303,26 +334,17 @@ class KubeflowTrainingService implements TrainingService {
break;
case TrialConfigMetadataKey.KUBEFLOW_CLUSTER_CONFIG:
this.kubeflowClusterConfig = <KubeflowClusterConfig>JSON.parse(value);
// If NFS config section is valid in config file, proceed to mount and config NFS
if(this.kubeflowClusterConfig.nfs) {
//Check and mount NFS mount point here
await cpp.exec(`mkdir -p ${this.trialLocalNFSTempFolder}`);
const nfsServer: string = this.kubeflowClusterConfig.nfs.server;
const nfsPath: string = this.kubeflowClusterConfig.nfs.path;
try {
await cpp.exec(`sudo mount ${nfsServer}:${nfsPath} ${this.trialLocalNFSTempFolder}`);
} catch(error) {
const mountError: string = `Mount NFS ${nfsServer}:${nfsPath} to ${this.trialLocalNFSTempFolder} failed, error is ${error}`;
this.log.error(mountError);
throw new Error(mountError);
}
} else if(this.kubeflowClusterConfig.keyVault && this.kubeflowClusterConfig.azureStorage){
const vaultName = this.kubeflowClusterConfig.keyVault.vaultName;
const valutKeyName = this.kubeflowClusterConfig.keyVault.name;
this.azureStorageAccountName = this.kubeflowClusterConfig.azureStorage.accountName;
this.azureStorageShare = this.kubeflowClusterConfig.azureStorage.azureShare;
let kubeflowClusterJsonObject = JSON.parse(value);
let kubeflowClusterConfigBase: KubeflowClusterConfigBase = new KubeflowClusterConfigBase(kubeflowClusterJsonObject.operator, kubeflowClusterJsonObject.storage);
if(kubeflowClusterConfigBase && kubeflowClusterConfigBase.storage === 'azureStorage') {
this.kubeflowClusterConfig = new KubeflowClusterConfigAzure(kubeflowClusterJsonObject.operator, kubeflowClusterJsonObject.keyvault,
kubeflowClusterJsonObject.azureStorage, kubeflowClusterJsonObject.storage)
let azureKubeflowClusterConfig = <KubeflowClusterConfigAzure>this.kubeflowClusterConfig;
const vaultName = azureKubeflowClusterConfig.keyVault.vaultName;
const valutKeyName = azureKubeflowClusterConfig.keyVault.name;
this.azureStorageAccountName = azureKubeflowClusterConfig.azureStorage.accountName;
this.azureStorageShare = azureKubeflowClusterConfig.azureStorage.azureShare;
try{
const result = await cpp.exec(`az keyvault secret show --name ${valutKeyName} --vault-name ${vaultName}`);
if(result.stderr) {
......@@ -339,18 +361,35 @@ class KubeflowTrainingService implements TrainingService {
await cpp.exec(`kubectl create secret generic ${this.azureStorageSecretName} `
+ `--from-literal=azurestorageaccountname=${this.azureStorageAccountName} `
+ `--from-literal=azurestorageaccountkey=${storageAccountKey}`)
} catch(error){
this.log.error(`command error: ${error}`);
}catch(error) {
this.log.error(error);
throw new Error(error);
}
}else{
const clusterConfigError: string = 'kubeflow cluster config format error!';
this.log.error(clusterConfigError);
throw new Error(clusterConfigError);
} else if(kubeflowClusterConfigBase && (kubeflowClusterConfigBase.storage === 'nfs' || kubeflowClusterConfigBase.storage === undefined)) {
//Check and mount NFS mount point here
//If storage is undefined, the default value is nfs
this.kubeflowClusterConfig = new KubeflowClusterConfigNFS(kubeflowClusterJsonObject.operator, kubeflowClusterJsonObject.nfs,
kubeflowClusterJsonObject.storage)
let nfsKubeflowClusterConfig = <KubeflowClusterConfigNFS>this.kubeflowClusterConfig;
await cpp.exec(`mkdir -p ${this.trialLocalNFSTempFolder}`);
const nfsServer: string = nfsKubeflowClusterConfig.nfs.server;
const nfsPath: string = nfsKubeflowClusterConfig.nfs.path;
try {
await cpp.exec(`sudo mount ${nfsServer}:${nfsPath} ${this.trialLocalNFSTempFolder}`);
} catch(error) {
const mountError: string = `Mount NFS ${nfsServer}:${nfsPath} to ${this.trialLocalNFSTempFolder} failed, error is ${error}`;
this.log.error(mountError);
throw new Error(mountError);
}
} else {
const error: string = `kubeflowClusterConfig format error!`;
this.log.error(error);
throw new Error(error);
}
this.kubeflowJobPlural = kubeflowOperatorMap.get(this.kubeflowClusterConfig.operator);
this.kubeflowJobKind = kubeflowOperatorJobKindMap.get(this.kubeflowClusterConfig.operator)
break;
case TrialConfigMetadataKey.TRIAL_CONFIG:
......@@ -359,8 +398,20 @@ class KubeflowTrainingService implements TrainingService {
return Promise.reject(new Error('kubeflow cluster config is not initialized'));
}
this.kubeflowTrialConfig = <KubeflowTrialConfig>JSON.parse(value);
assert(this.kubeflowClusterConfig !== undefined && this.kubeflowTrialConfig.worker !== undefined);
assert(this.kubeflowClusterConfig !== undefined)
let kubeflowTrialJsonObjsect = JSON.parse(value);
if(this.kubeflowClusterConfig.operator === 'tf-operator'){
this.kubeflowTrialConfig = new KubeflowTrialConfigTensorflow(kubeflowTrialJsonObjsect.codeDir,
kubeflowTrialJsonObjsect.worker, kubeflowTrialJsonObjsect.ps);
}else if(this.kubeflowClusterConfig.operator === 'pytorch-operator'){
this.kubeflowTrialConfig = new KubeflowTrialConfigPytorch(kubeflowTrialJsonObjsect.codeDir,
kubeflowTrialJsonObjsect.worker, kubeflowTrialJsonObjsect.master);
}
if (!this.kubeflowTrialConfig){
this.log.error('kubeflow kubeflow TrialConfig is not initialized');
return Promise.reject(new Error('kubeflow kubeflow TrialConfig is not initialized'));
}
// Validate to make sure codeDir doesn't have too many files
try {
......@@ -369,7 +420,6 @@ class KubeflowTrainingService implements TrainingService {
this.log.error(error);
return Promise.reject(new Error(error));
}
break;
default:
break;
......@@ -434,9 +484,9 @@ class KubeflowTrainingService implements TrainingService {
* @param trialWorkingFolder working folder
* @param kubeflowJobName job name
* @param workerPodResources worker pod template
* @param psPodResources ps pod template
* @param nonWorkerPodResources non-worker pod template, like ps or master
*/
private generateKubeflowJobConfig(trialJobId: string, trialWorkingFolder: string, kubeflowJobName : string, workerPodResources : any, psPodResources?: any) : any {
private generateKubeflowJobConfig(trialJobId: string, trialWorkingFolder: string, kubeflowJobName : string, workerPodResources : any, nonWorkerPodResources?: any) : any {
if(!this.kubeflowClusterConfig) {
throw new Error('Kubeflow Cluster config is not initialized');
}
......@@ -445,18 +495,36 @@ class KubeflowTrainingService implements TrainingService {
throw new Error('Kubeflow trial config is not initialized');
}
const tfReplicaSpecsObj: any = {};
tfReplicaSpecsObj.Worker = this.generateReplicaConfig(trialWorkingFolder, this.kubeflowTrialConfig.worker.replicas,
this.kubeflowTrialConfig.worker.image, 'run_worker.sh', workerPodResources);
if(this.kubeflowTrialConfig.ps) {
tfReplicaSpecsObj.Ps = this.generateReplicaConfig(trialWorkingFolder, this.kubeflowTrialConfig.ps.replicas,
this.kubeflowTrialConfig.ps.image, 'run_ps.sh', psPodResources);
if(!this.kubeflowJobPlural) {
throw new Error('Kubeflow job plural name is undefined');
}
const replicaSpecsObj: any = {};
let replicaSpecsObjMap = new Map<string, object>();
if(this.kubeflowClusterConfig.operator === 'tf-operator') {
let tensorflowTrialConfig: KubeflowTrialConfigTensorflow = <KubeflowTrialConfigTensorflow>this.kubeflowTrialConfig;
replicaSpecsObj.Worker = this.generateReplicaConfig(trialWorkingFolder, tensorflowTrialConfig.worker.replicas,
tensorflowTrialConfig.worker.image, 'run_worker.sh', workerPodResources);
if (tensorflowTrialConfig.ps){
replicaSpecsObj.Ps = this.generateReplicaConfig(trialWorkingFolder, tensorflowTrialConfig.ps.replicas,
tensorflowTrialConfig.ps.image, 'run_ps.sh', nonWorkerPodResources);
}
replicaSpecsObjMap.set(this.kubeflowJobPlural, {'tfReplicaSpecs': replicaSpecsObj})
}
else if(this.kubeflowClusterConfig.operator === 'pytorch-operator') {
let pytorchTrialConfig: KubeflowTrialConfigPytorch = <KubeflowTrialConfigPytorch>this.kubeflowTrialConfig;
replicaSpecsObj.Worker = this.generateReplicaConfig(trialWorkingFolder, pytorchTrialConfig.worker.replicas,
pytorchTrialConfig.worker.image, 'run_worker.sh', workerPodResources);
if(pytorchTrialConfig.master){
replicaSpecsObj.Master = this.generateReplicaConfig(trialWorkingFolder, pytorchTrialConfig.master.replicas,
pytorchTrialConfig.master.image, 'run_master.sh', nonWorkerPodResources);
}
replicaSpecsObjMap.set(this.kubeflowJobPlural, {'pytorchReplicaSpecs': replicaSpecsObj})
}
return {
apiVersion: 'kubeflow.org/v1alpha2',
kind: 'TFJob',
kind: this.kubeflowJobKind,
metadata: {
name: kubeflowJobName,
namespace: 'default',
......@@ -466,10 +534,8 @@ class KubeflowTrainingService implements TrainingService {
trialId: trialJobId
}
},
spec: {
tfReplicaSpecs: tfReplicaSpecsObj
}
};
spec: replicaSpecsObjMap.get(this.kubeflowJobPlural)
};
}
/**
......@@ -489,30 +555,38 @@ class KubeflowTrainingService implements TrainingService {
throw new Error('Kubeflow trial config is not initialized');
}
if(!this.kubeflowJobPlural) {
throw new Error('Kubeflow job plural name is undefined');
}
let volumeSpecMap = new Map<string, object>();
if(this.kubeflowClusterConfig.nfs){
if(this.kubeflowClusterConfig.storage && this.kubeflowClusterConfig.storage === 'azureStorage'){
volumeSpecMap.set('nniVolumes', [
{
name: 'nni-vol',
nfs: {
server: `${this.kubeflowClusterConfig.nfs.server}`,
path: `${this.kubeflowClusterConfig.nfs.path}`
}
name: 'nni-vol',
azureFile: {
secretName: `${this.azureStorageSecretName}`,
shareName: `${this.azureStorageShare}`,
readonly: false
}
}])
}else if(this.kubeflowClusterConfig.azureStorage && this.kubeflowClusterConfig.keyVault){
}else {
let nfsKubeflowClusterConfig: KubeflowClusterConfigNFS = <KubeflowClusterConfigNFS> this.kubeflowClusterConfig;
volumeSpecMap.set('nniVolumes', [
{
name: 'nni-vol',
azureFile: {
secretName: `${this.azureStorageSecretName}`,
shareName: `${this.azureStorageShare}`,
readonly: false
nfs: {
server: `${nfsKubeflowClusterConfig.nfs.server}`,
path: `${nfsKubeflowClusterConfig.nfs.path}`
}
}])
}else{
const clusterConfigError: string = 'kubeflow cluster config format error!';
this.log.error(clusterConfigError);
throw new Error(clusterConfigError);
}
let containerNameMap = new Map<string, string>();
if(this.kubeflowJobPlural == 'tfjobs'){
containerNameMap.set(this.kubeflowJobPlural, 'tensorflow');
}else if(this.kubeflowJobPlural == 'pytorchjobs'){
containerNameMap.set(this.kubeflowJobPlural, 'pytorch');
}
return {
......@@ -526,7 +600,7 @@ class KubeflowTrainingService implements TrainingService {
{
// Kubeflow tensorflow operator requires that containers' name must be tensorflow
// TODO: change the name based on operator's type
name: 'tensorflow',
name: containerNameMap.get(this.kubeflowJobPlural),
image: replicaImage,
args: ["sh", `${path.join(trialWorkingFolder, runScriptFile)}`],
volumeMounts: [
......@@ -550,8 +624,8 @@ class KubeflowTrainingService implements TrainingService {
* @param command
* @param trialSequenceId sequence id
*/
private genereateRunScript(trialJobId: string, trialWorkingFolder: string,
command: string, trialSequenceId: string, roleType: DistTrainRole): string {
private generateRunScript(trialJobId: string, trialWorkingFolder: string,
command: string, trialSequenceId: string, roleType: DistTrainRole, gpuNum: number): string {
const runScriptLines: string[] = [];
runScriptLines.push('#!/bin/bash');
......@@ -563,26 +637,14 @@ class KubeflowTrainingService implements TrainingService {
runScriptLines.push(`export NNI_EXP_ID=${getExperimentId()}`);
runScriptLines.push(`export NNI_CODE_DIR=${trialWorkingFolder}`);
runScriptLines.push(`export NNI_TRIAL_SEQ_ID=${trialSequenceId}`);
// Nvidia devcie plugin for K8S has a known issue that requesting zero GPUs allocates all GPUs
// Refer https://github.com/NVIDIA/k8s-device-plugin/issues/61
// So we have to explicitly set CUDA_VISIBLE_DEVICES to empty if user sets gpuNum to 0 in NNI config file
if(this.kubeflowTrialConfig) {
switch(roleType) {
case 'ps':
if(this.kubeflowTrialConfig.ps && this.kubeflowTrialConfig.ps.gpuNum == 0) {
runScriptLines.push(`export CUDA_VISIBLE_DEVICES=''`);
}
break;
case 'worker':
if(this.kubeflowTrialConfig.worker && this.kubeflowTrialConfig.worker.gpuNum == 0) {
runScriptLines.push(`export CUDA_VISIBLE_DEVICES=''`);
}
break;
default:
break;
}
if(gpuNum === 0) {
runScriptLines.push(`export CUDA_VISIBLE_DEVICES=''`);
}
const nniManagerIp = this.nniManagerIpConfig?this.nniManagerIpConfig.nniManagerIp:getIPV4Address();
runScriptLines.push('mkdir -p $NNI_SYS_DIR');
runScriptLines.push('mkdir -p $NNI_OUTPUT_DIR');
......
......@@ -122,6 +122,14 @@ kubeflow_trial_schema = {
'memoryMB': int,
'image': str
},
Optional('master'): {
'replicas': int,
'command': str,
'gpuNum': And(int, lambda x: 0 <= x <= 99999),
'cpuNum': And(int, lambda x: 0 <= x <= 99999),
'memoryMB': int,
'image': str
},
'worker':{
'replicas': int,
'command': str,
......@@ -135,13 +143,15 @@ kubeflow_trial_schema = {
kubeflow_config_schema = {
'kubeflowConfig':Or({
'operator': Or('tf-operator', 'mxnet-operator', 'pytorch-operator'),
'operator': Or('tf-operator', 'pytorch-operator'),
Optional('storage'): Or('nfs', 'azureStorage'),
'nfs': {
'server': str,
'path': str
}
},{
'operator': Or('tf-operator', 'mxnet-operator', 'pytorch-operator'),
'operator': Or('tf-operator', 'pytorch-operator'),
Optional('storage'): Or('nfs', 'azureStorage'),
'keyVault': {
'vaultName': Regex('([0-9]|[a-z]|[A-Z]|-){1,127}'),
'name': Regex('([0-9]|[a-z]|[A-Z]|-){1,127}')
......
......@@ -35,7 +35,6 @@ def parse_relative_path(root_path, experiment_config, key):
print_warning('expand %s: %s to %s ' % (key, experiment_config[key], absolute_path))
experiment_config[key] = absolute_path
def parse_time(experiment_config):
'''Parse time format'''
unit = experiment_config['maxExecDuration'][-1]
......@@ -85,7 +84,33 @@ def validate_search_space_content(experiment_config):
print_error('please use _type and _value to specify searchspace!')
exit(1)
except:
raise Exception('searchspace file is not a valid json format!')
print_error('searchspace file is not a valid json format!')
exit(1)
def validate_kubeflow_operators(experiment_config):
'''Validate whether the kubeflow operators are valid'''
if experiment_config.get('kubeflowConfig'):
if experiment_config.get('kubeflowConfig').get('operator') == 'tf-operator':
if experiment_config.get('trial').get('master') is not None:
print_error('kubeflow with tf-operator can not set master')
exit(1)
elif experiment_config.get('kubeflowConfig').get('operator') == 'pytorch-operator':
if experiment_config.get('trial').get('ps') is not None:
print_error('kubeflow with pytorch-operator can not set ps')
exit(1)
if experiment_config.get('kubeflowConfig').get('storage') == 'nfs':
if experiment_config.get('kubeflowConfig').get('nfs') is None:
print_error('please set nfs configuration!')
exit(1)
elif experiment_config.get('kubeflowConfig').get('storage') == 'azureStorage':
if experiment_config.get('kubeflowConfig').get('azureStorage') is None:
print_error('please set azureStorage configuration!')
exit(1)
elif experiment_config.get('kubeflowConfig').get('storage') is None:
if experiment_config.get('kubeflowConfig').get('azureStorage'):
print_error('please set storage type!')
exit(1)
def validate_common_content(experiment_config):
'''Validate whether the common values in experiment_config is valid'''
......@@ -168,7 +193,7 @@ def validate_annotation_content(experiment_config, spec_key, builtin_name):
# validate searchSpaceFile
if experiment_config[spec_key].get(builtin_name):
if experiment_config.get('searchSpacePath') is None:
print_error('Please set searchSpace!')
print_error('Please set searchSpacePath!')
exit(1)
validate_search_space_content(experiment_config)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment