Unverified Commit d8e55165 authored by fishyds's avatar fishyds Committed by GitHub
Browse files

[Kubeflow training service] Use Kubernete API server to replace kubectl dependency (#472)

[Kubeflow training service] Use Kubernete API server to replace kubectl dependency
parent 07e19a30
......@@ -24,6 +24,8 @@ trial:
image: msranni/nni:latest
kubeflowConfig:
operator: tf-operator
apiVersion: v1alpha2
storage: nfs
nfs:
server: 10.10.10.10
path: /var/nfs/general
\ No newline at end of file
......@@ -37,6 +37,8 @@ trial:
image: msranni/nni:latest
kubeflowConfig:
operator: tf-operator
apiVersion: v1alpha2
storage: nfs
nfs:
# Your NFS server IP, like 10.10.10.10
server: {your_nfs_server_ip}
......
......@@ -24,6 +24,8 @@ trial:
image: msranni/nni:latest
kubeflowConfig:
operator: tf-operator
apiVersion: v1alpha2
storage: nfs
nfs:
server: 10.10.10.10
path: /var/nfs/general
\ No newline at end of file
......@@ -25,6 +25,8 @@ trial:
image: msranni/nni:latest
kubeflowConfig:
operator: tf-operator
apiVersion: v1alpha2
storage: nfs
nfs:
server: 10.10.10.10
path: /var/nfs/general
\ No newline at end of file
{
"kind": "CustomResourceDefinition",
"spec": {
"scope": "Namespaced",
"version": "v1alpha2",
"group": "kubeflow.org",
"names": {
"kind": "PyTorchJob",
"plural": "pytorchjobs",
"singular": "pytorchjob"
}
},
"apiVersion": "apiextensions.k8s.io/v1beta1",
"metadata": {
"name": "pytorchjobs.kubeflow.org"
}
}
{
"kind": "CustomResourceDefinition",
"spec": {
"scope": "Namespaced",
"version": "v1beta1",
"group": "kubeflow.org",
"names": {
"kind": "PyTorchJob",
"plural": "pytorchjobs",
"singular": "pytorchjob"
}
},
"apiVersion": "apiextensions.k8s.io/v1beta1",
"metadata": {
"name": "pytorchjobs.kubeflow.org"
}
}
{
"kind": "CustomResourceDefinition",
"spec": {
"scope": "Namespaced",
"version": "v1alpha2",
"group": "kubeflow.org",
"names": {
"kind": "TFJob",
"plural": "tfjobs",
"singular": "tfjob"
}
},
"apiVersion": "apiextensions.k8s.io/v1beta1",
"metadata": {
"name": "tfjobs.kubeflow.org"
}
}
{
"kind": "CustomResourceDefinition",
"spec": {
"scope": "Namespaced",
"version": "v1beta1",
"group": "kubeflow.org",
"names": {
"kind": "TFJob",
"plural": "tfjobs",
"singular": "tfjob"
}
},
"apiVersion": "apiextensions.k8s.io/v1beta1",
"metadata": {
"name": "tfjobs.kubeflow.org"
}
}
......@@ -3,7 +3,7 @@
"version": "1.0.0",
"main": "index.js",
"scripts": {
"postbuild": "cp -rf scripts ./dist/",
"postbuild": "cp -rf scripts ./dist/ && cp -rf config ./dist/",
"build": "tsc",
"test": "mocha -r ts-node/register -t 15000 --recursive **/*.test.ts --colors",
"start": "node dist/main.js"
......@@ -15,7 +15,6 @@
"express": "^4.16.3",
"express-joi-validator": "^2.0.0",
"node-nvidia-smi": "^1.0.0",
"node-yaml": "^3.1.1",
"rx": "^4.1.0",
"sqlite3": "^4.0.2",
"ssh2": "^0.6.1",
......@@ -26,7 +25,8 @@
"typescript-ioc": "^1.2.4",
"typescript-string-operations": "^1.3.1",
"webhdfs":"^1.2.0",
"azure-storage": "^2.10.2"
"azure-storage": "^2.10.2",
"kubernetes-client": "^6.5.0"
},
"devDependencies": {
"@types/chai": "^4.1.4",
......
......@@ -78,6 +78,7 @@ export namespace ValidationSchemas {
kubeflow_config: joi.object({
operator: joi.string().min(1).required(),
storage: joi.string().min(1),
apiVersion: joi.string().min(1),
nfs: joi.object({
server: joi.string().min(1).required(),
path: joi.string().min(1).required()
......
import { TrialConfig } from "../common/trialConfig";
/**
* Copyright (c) Microsoft Corporation
* All rights reserved.
......@@ -21,28 +19,11 @@ import { TrialConfig } from "../common/trialConfig";
'use strict';
/** operator types that kubeflow supported */
export type KubeflowOperator = 'tf-operator' | 'pytorch-operator' ;
export type KubeflowOperatorPlural = 'tfjobs' | 'pytorchjobs' ;
export type KubeflowOperatorJobKind = 'TFJob' | 'PyTorchJob';
export type KubeflowStorageKind = 'nfs' | 'azureStorage';
/**
* map from Kubeflow operator name to its plural name in K8S
*/
export const kubeflowOperatorMap : Map<KubeflowOperator, KubeflowOperatorPlural> = new Map<KubeflowOperator, KubeflowOperatorPlural>([
['tf-operator' , 'tfjobs'],
['pytorch-operator', 'pytorchjobs']
]);
/**
* map from Kubeflow operator name to its job kind name in K8S
*/
export const kubeflowOperatorJobKindMap : Map<KubeflowOperator, KubeflowOperatorJobKind> = new Map<KubeflowOperator, KubeflowOperatorJobKind>([
['tf-operator' , 'TFJob'],
['pytorch-operator', 'PyTorchJob']
]);
export type DistTrainRole = 'worker' | 'ps' | 'master';
export type OperatorApiVersion = 'v1alpha2' | 'v1beta1';
/**
* Kuberflow cluster configuration
......@@ -51,7 +32,8 @@ export const kubeflowOperatorJobKindMap : Map<KubeflowOperator, KubeflowOperator
export class KubeflowClusterConfigBase {
/** Name of Kubeflow operator, like tf-operator */
public readonly operator: KubeflowOperator;
public readonly storage?: KubeflowStorageKind;
public readonly apiVersion: OperatorApiVersion;
public readonly storage?: KubeflowStorageKind;
/**
* Constructor
......@@ -59,8 +41,9 @@ export class KubeflowClusterConfigBase {
* @param passWord password of Kubeflow Cluster
* @param host Host IP of Kubeflow Cluster
*/
constructor(operator: KubeflowOperator, storage?: KubeflowStorageKind) {
constructor(operator: KubeflowOperator, apiVersion: OperatorApiVersion, storage?: KubeflowStorageKind) {
this.operator = operator;
this.apiVersion = apiVersion;
this.storage = storage;
}
}
......@@ -68,8 +51,10 @@ export class KubeflowClusterConfigBase {
export class KubeflowClusterConfigNFS extends KubeflowClusterConfigBase{
public readonly nfs: NFSConfig;
constructor(operator: KubeflowOperator, nfs: NFSConfig, storage?: KubeflowStorageKind) {
super(operator, storage)
constructor(operator: KubeflowOperator,
apiVersion: OperatorApiVersion,
nfs: NFSConfig, storage?: KubeflowStorageKind) {
super(operator, apiVersion, storage);
this.nfs = nfs;
}
}
......@@ -78,8 +63,12 @@ export class KubeflowClusterConfigAzure extends KubeflowClusterConfigBase{
public readonly keyVault: keyVaultConfig;
public readonly azureStorage: AzureStorage;
constructor(operator: KubeflowOperator, keyVault: keyVaultConfig, azureStorage: AzureStorage, storage?: KubeflowStorageKind) {
super(operator, storage)
constructor(operator: KubeflowOperator,
apiVersion: OperatorApiVersion,
keyVault: keyVaultConfig,
azureStorage: AzureStorage,
storage?: KubeflowStorageKind) {
super(operator, apiVersion, storage);
this.keyVault = keyVault;
this.azureStorage = azureStorage;
}
......@@ -184,10 +173,10 @@ export class KubeflowTrialConfigTensorflow extends KubeflowTrialConfigBase{
}
export class KubeflowTrialConfigPytorch extends KubeflowTrialConfigBase{
public readonly master?: KubeflowTrialConfigTemplate;
public readonly worker: KubeflowTrialConfigTemplate;
public readonly master: KubeflowTrialConfigTemplate;
public readonly worker?: KubeflowTrialConfigTemplate;
constructor(codeDir: string, worker: KubeflowTrialConfigTemplate, master?: KubeflowTrialConfigTemplate) {
constructor(codeDir: string, master: KubeflowTrialConfigTemplate, worker?: KubeflowTrialConfigTemplate) {
super(codeDir);
this.master = master;
this.worker = worker;
......
......@@ -38,11 +38,10 @@ export class KubeflowTrialJobDetail implements TrialJobDetail {
public kubeflowJobName: string;
public sequenceId: number;
public queryJobFailedCount: number;
public k8sPluralName: string
constructor(id: string, status: TrialJobStatus, submitTime: number,
workingDirectory: string, form: JobApplicationForm,
kubeflowJobName: string, sequenceId: number, url: string, k8sPluralName: string) {
kubeflowJobName: string, sequenceId: number, url: string) {
this.id = id;
this.status = status;
this.submitTime = submitTime;
......@@ -53,7 +52,6 @@ export class KubeflowTrialJobDetail implements TrialJobDetail {
this.tags = [];
this.queryJobFailedCount = 0;
this.url = url;
this.k8sPluralName = k8sPluralName;
}
}
......
......@@ -19,11 +19,13 @@
'use strict';
import * as assert from 'assert';
import * as cpp from 'child-process-promise';
import { getLogger, Logger } from '../../common/log';
import { KubeflowTrialJobDetail, KubeflowTFJobType} from './kubeflowData';
import { NNIError, NNIErrorNames } from '../../common/errors';
import { TrialJobStatus } from '../../common/trainingService';
import { KubeflowOperatorClient } from './kubernetesApiClient';
/**
* Collector Kubeflow jobs info from Kubernetes cluster, and update kubeflow job status locally
......@@ -32,14 +34,14 @@ export class KubeflowJobInfoCollector {
private readonly trialJobsMap : Map<string, KubeflowTrialJobDetail>;
private readonly log: Logger = getLogger();
private readonly statusesNeedToCheck: TrialJobStatus[];
private readonly MAX_FAILED_QUERY_JOB_NUMBER: number = 30;
constructor(jobMap: Map<string, KubeflowTrialJobDetail>) {
this.trialJobsMap = jobMap;
this.statusesNeedToCheck = ['RUNNING', 'WAITING'];
}
public async retrieveTrialStatus() : Promise<void> {
public async retrieveTrialStatus(operatorClient: KubeflowOperatorClient | undefined) : Promise<void> {
assert(operatorClient !== undefined);
const updateKubeflowTrialJobs : Promise<void>[] = [];
for(let [trialJobId, kubeflowTrialJob] of this.trialJobsMap) {
if (!kubeflowTrialJob) {
......@@ -49,33 +51,30 @@ export class KubeflowJobInfoCollector {
if( Date.now() - kubeflowTrialJob.submitTime < 20 * 1000) {
return Promise.resolve();
}
updateKubeflowTrialJobs.push(this.retrieveSingleTrialJobInfo(kubeflowTrialJob))
updateKubeflowTrialJobs.push(this.retrieveSingleTrialJobInfo(operatorClient, kubeflowTrialJob))
}
await Promise.all(updateKubeflowTrialJobs);
}
private async retrieveSingleTrialJobInfo(kubeflowTrialJob : KubeflowTrialJobDetail) : Promise<void> {
private async retrieveSingleTrialJobInfo(operatorClient: KubeflowOperatorClient | undefined,
kubeflowTrialJob : KubeflowTrialJobDetail) : Promise<void> {
if (!this.statusesNeedToCheck.includes(kubeflowTrialJob.status)) {
return Promise.resolve();
}
let result : cpp.childProcessPromise.Result;
if(operatorClient === undefined) {
return Promise.reject('operatorClient is undefined');
}
let kubeflowJobInfo: any;
try {
result = await cpp.exec(`kubectl get ${kubeflowTrialJob.k8sPluralName} ${kubeflowTrialJob.kubeflowJobName} -o json`);
if(result.stderr) {
this.log.error(`Get ${kubeflowTrialJob.k8sPluralName} ${kubeflowTrialJob.kubeflowJobName} failed. Error is ${result.stderr}, failed checking number is ${kubeflowTrialJob.queryJobFailedCount}`);
kubeflowTrialJob.queryJobFailedCount++;
if(kubeflowTrialJob.queryJobFailedCount >= this.MAX_FAILED_QUERY_JOB_NUMBER) {
kubeflowTrialJob.status = 'UNKNOWN';
}
}
kubeflowJobInfo = await operatorClient.getKubeflowJob(kubeflowTrialJob.kubeflowJobName);
} catch(error) {
this.log.error(`kubectl get ${kubeflowTrialJob.k8sPluralName} ${kubeflowTrialJob.kubeflowJobName} failed, error is ${error}`);
this.log.error(`Get job ${kubeflowTrialJob.kubeflowJobName} info failed, error is ${error}`);
return Promise.resolve();
}
const kubeflowJobInfo = JSON.parse(result.stdout);
if(kubeflowJobInfo.status && kubeflowJobInfo.status.conditions) {
const latestCondition = kubeflowJobInfo.status.conditions[kubeflowJobInfo.status.conditions.length - 1];
const tfJobType : KubeflowTFJobType = <KubeflowTFJobType>latestCondition.type;
......
......@@ -20,6 +20,7 @@
'use strict'
import * as assert from 'assert';
import * as azureStorage from 'azure-storage';
import * as component from '../../common/component';
import * as cpp from 'child-process-promise';
import * as fs from 'fs';
......@@ -36,27 +37,24 @@ import {
TrialJobDetail, TrialJobMetric, NNIManagerIpConfig
} from '../../common/trainingService';
import { delay, generateParamFileName, getExperimentRootDir, getIPV4Address, uniqueString, getJobCancelStatus } from '../../common/utils';
import { KubeflowClusterConfigBase, KubeflowClusterConfigNFS, KubeflowClusterConfigAzure, kubeflowOperatorMap, KubeflowTrialConfigBase,
KubeflowTrialConfigPytorch, KubeflowTrialConfigTensorflow, NFSConfig, kubeflowOperatorJobKindMap } from './kubeflowConfig';
import { DistTrainRole, KubeflowClusterConfigBase, KubeflowClusterConfigNFS, KubeflowClusterConfigAzure, KubeflowTrialConfigBase,
KubeflowTrialConfigPytorch, KubeflowTrialConfigTensorflow, NFSConfig } from './kubeflowConfig';
import { KubeflowTrialJobDetail } from './kubeflowData';
import { KubeflowJobRestServer } from './kubeflowJobRestServer';
import { KubeflowJobInfoCollector } from './kubeflowJobInfoCollector';
import { validateCodeDir } from '../common/util';
import { AzureStorageClientUtility } from './azureStorageClientUtils';
import * as azureStorage from 'azure-storage';
import { GeneralK8sClient, KubeflowOperatorClient } from './kubernetesApiClient';
var yaml = require('node-yaml');
var azure = require('azure-storage');
type DistTrainRole = 'worker' | 'ps' | 'master';
/**
* Training Service implementation for Kubeflow
* Refer https://github.com/kubeflow/kubeflow for more info about Kubeflow
*/
@component.Singleton
class KubeflowTrainingService implements TrainingService {
private readonly NNI_KUBEFLOW_TRIAL_LABEL = 'nni-kubeflow-trial';
private readonly NNI_KUBEFLOW_TRIAL_LABEL: string = 'nni-kubeflow-trial';
private readonly log!: Logger;
private readonly metricsEmitter: EventEmitter;
private readonly trialJobsMap: Map<string, KubeflowTrialJobDetail>;
......@@ -69,8 +67,8 @@ class KubeflowTrainingService implements TrainingService {
private kubeflowTrialConfig?: KubeflowTrialConfigBase;
private kubeflowJobInfoCollector: KubeflowJobInfoCollector;
private kubeflowRestServerPort?: number;
private kubeflowJobPlural?: string;
private kubeflowJobKind?: string;
private operatorClient?: KubeflowOperatorClient;
private readonly genericK8sClient: GeneralK8sClient;
private readonly CONTAINER_MOUNT_PATH: string;
private azureStorageClient?: azureStorage.FileService;
private azureStorageShare?: string;
......@@ -82,6 +80,7 @@ class KubeflowTrainingService implements TrainingService {
this.log = getLogger();
this.metricsEmitter = new EventEmitter();
this.trialJobsMap = new Map<string, KubeflowTrialJobDetail>();
this.genericK8sClient = new GeneralK8sClient();
this.kubeflowJobInfoCollector = new KubeflowJobInfoCollector(this.trialJobsMap);
this.trialLocalNFSTempFolder = path.join(getExperimentRootDir(), 'trials-nfs-tmp');
this.experimentId = getExperimentId();
......@@ -94,9 +93,9 @@ class KubeflowTrainingService implements TrainingService {
await restServer.start();
this.log.info(`Kubeflow Training service rest server listening on: ${restServer.endPoint}`);
while (!this.stopping) {
// collect metrics by calling 'kubectl get' command on Kubeflow jobs
// collect metrics for Kubeflow jobs by interacting with Kubernetes API server
await delay(3000);
await this.kubeflowJobInfoCollector.retrieveTrialStatus();
await this.kubeflowJobInfoCollector.retrieveTrialStatus(this.operatorClient);
}
}
......@@ -109,8 +108,8 @@ class KubeflowTrainingService implements TrainingService {
throw new Error('Kubeflow trial config is not initialized');
}
if(!this.kubeflowJobPlural) {
throw new Error('Kubeflow job plural name is undefined');
if(!this.operatorClient) {
throw new Error('Kubeflow job operator client is undefined');
}
if(!this.kubeflowRestServerPort) {
......@@ -171,12 +170,13 @@ class KubeflowTrainingService implements TrainingService {
await fs.promises.writeFile(path.join(trialLocalTempFolder, generateParamFileName(trialForm.hyperParameters)),
trialForm.hyperParameters.value, { encoding: 'utf8' });
}
const kubeflowJobYamlPath = path.join(trialLocalTempFolder, `kubeflow-job-${trialJobId}.yaml`);
const kubeflowJobName = `nni-exp-${this.experimentId}-trial-${trialJobId}`.toLowerCase();
const workerPodResources : any = {};
workerPodResources.requests = this.generatePodResource(kubeflowTrialConfig.worker.memoryMB, kubeflowTrialConfig.worker.cpuNum,
kubeflowTrialConfig.worker.gpuNum)
if(kubeflowTrialConfig.worker) {
workerPodResources.requests = this.generatePodResource(kubeflowTrialConfig.worker.memoryMB, kubeflowTrialConfig.worker.cpuNum,
kubeflowTrialConfig.worker.gpuNum)
}
workerPodResources.limits = Object.assign({}, workerPodResources.requests);
let nonWorkerResources : any = {};
......@@ -189,33 +189,31 @@ class KubeflowTrainingService implements TrainingService {
}
}else if(this.kubeflowClusterConfig.operator === 'pytorch-operator'){
let pyTorchTrialConfig: KubeflowTrialConfigPytorch = <KubeflowTrialConfigPytorch>this.kubeflowTrialConfig;
if (pyTorchTrialConfig.master) {
nonWorkerResources.requests = this.generatePodResource(pyTorchTrialConfig.master.memoryMB, pyTorchTrialConfig.master.cpuNum,
pyTorchTrialConfig.master.gpuNum)
nonWorkerResources.limits = Object.assign({}, nonWorkerResources.requests);
}
nonWorkerResources.requests = this.generatePodResource(pyTorchTrialConfig.master.memoryMB, pyTorchTrialConfig.master.cpuNum,
pyTorchTrialConfig.master.gpuNum)
nonWorkerResources.limits = Object.assign({}, nonWorkerResources.requests);
}
// Generate kubeflow job resource yaml file for K8S
yaml.write(
kubeflowJobYamlPath,
this.generateKubeflowJobConfig(trialJobId, trialWorkingFolder, kubeflowJobName, workerPodResources, nonWorkerResources),
'utf-8'
);
let trialJobDetail: KubeflowTrialJobDetail;
//The url used in trialJobDetail
let trialJobDetailUrl: string;
if(this.kubeflowClusterConfig.storage && this.kubeflowClusterConfig.storage === 'azureStorage') {
//The output url used in trialJobDetail
let trialJobOutputUrl: string = '';
assert(!this.kubeflowClusterConfig.storage
|| this.kubeflowClusterConfig.storage === 'azureStorage'
|| this.kubeflowClusterConfig.storage === 'nfs');
if(this.kubeflowClusterConfig.storage === 'azureStorage') {
try{
//upload local files to azure storage
await AzureStorageClientUtility.uploadDirectory(this.azureStorageClient,
`nni/${getExperimentId()}/${trialJobId}`, this.azureStorageShare, `${trialLocalTempFolder}`);
trialJobDetailUrl = `https://${this.azureStorageAccountName}.file.core.windows.net/${this.azureStorageShare}/${path.join('nni', getExperimentId(), trialJobId, 'output')}`
trialJobOutputUrl = `https://${this.azureStorageAccountName}.file.core.windows.net/${this.azureStorageShare}/${path.join('nni', getExperimentId(), trialJobId, 'output')}`
}catch(error){
this.log.error(error);
return Promise.reject(error);
}
} else if(this.kubeflowClusterConfig.storage && (this.kubeflowClusterConfig.storage === 'nfs' || this.kubeflowClusterConfig.storage === undefined)) {
} else if(this.kubeflowClusterConfig.storage === 'nfs' || this.kubeflowClusterConfig.storage === undefined) {
let nfsKubeflowClusterConfig: KubeflowClusterConfigNFS = <KubeflowClusterConfigNFS>this.kubeflowClusterConfig;
// Creat work dir for current trial in NFS directory
await cpp.exec(`mkdir -p ${this.trialLocalNFSTempFolder}/nni/${getExperimentId()}/${trialJobId}`);
......@@ -223,14 +221,10 @@ class KubeflowTrainingService implements TrainingService {
await cpp.exec(`cp -r ${trialLocalTempFolder}/* ${this.trialLocalNFSTempFolder}/nni/${getExperimentId()}/${trialJobId}/.`);
const nfsConfig: NFSConfig = nfsKubeflowClusterConfig.nfs;
trialJobDetailUrl = `nfs://${nfsConfig.server}:${path.join(nfsConfig.path, 'nni', getExperimentId(), trialJobId, 'output')}`
} else {
const error: string = `kubeflowClusterConfig format error!`;
this.log.error(error);
throw new Error(error);
trialJobOutputUrl = `nfs://${nfsConfig.server}:${path.join(nfsConfig.path, 'nni', getExperimentId(), trialJobId, 'output')}`
}
trialJobDetail = new KubeflowTrialJobDetail(
const trialJobDetail: KubeflowTrialJobDetail = new KubeflowTrialJobDetail(
trialJobId,
'WAITING',
Date.now(),
......@@ -238,13 +232,16 @@ class KubeflowTrainingService implements TrainingService {
form,
kubeflowJobName,
curTrialSequenceId,
trialJobDetailUrl,
this.kubeflowJobPlural
trialJobOutputUrl
);
// Create kubeflow training jobs
await cpp.exec(`kubectl create -f ${kubeflowJobYamlPath}`);
// Set trial job detail until kubectl create resource successfully
// Generate kubeflow job resource config object
const kubeflowJobConfig: any = this.generateKubeflowJobConfig(trialJobId, trialWorkingFolder, kubeflowJobName, workerPodResources, nonWorkerResources);
// Create kubeflow job based on generated kubeflow job resource config
await this.operatorClient.createKubeflowJob(kubeflowJobConfig);
// Set trial job detail until create Kubeflow job successfully
this.trialJobsMap.set(trialJobId, trialJobDetail);
return Promise.resolve(trialJobDetail);
......@@ -306,17 +303,23 @@ class KubeflowTrainingService implements TrainingService {
const errorMessage: string = `CancelTrialJob: trial job id ${trialJobId} not found`;
this.log.error(errorMessage);
return Promise.reject(errorMessage);
}
if(!this.kubeflowJobPlural) {
const errorMessage: string = `CancelTrialJob: trial job id ${trialJobId} failed because kubeflowJobPlural is undefined`;
}
if(!this.operatorClient) {
const errorMessage: string = `CancelTrialJob: trial job id ${trialJobId} failed because operatorClient is undefined`;
this.log.error(errorMessage);
return Promise.reject(errorMessage);
}
const result: cpp.childProcessPromise.Result = await cpp.exec(`kubectl delete ${this.kubeflowJobPlural} -l `
+ `app=${this.NNI_KUBEFLOW_TRIAL_LABEL},expId=${getExperimentId()},trialId=${trialJobId}`);
if(result.stderr) {
const errorMessage: string = `kubectl delete ${this.kubeflowJobPlural} for trial ${trialJobId} failed: ${result.stderr}`;
try {
await this.operatorClient.deleteKubeflowJob(new Map(
[
['app', this.NNI_KUBEFLOW_TRIAL_LABEL],
['expId', getExperimentId()],
['trialId', trialJobId]
]
));
} catch(err) {
const errorMessage: string = `Delete trial ${trialJobId} failed: ${err}`;
this.log.error(errorMessage);
return Promise.reject(errorMessage);
}
......@@ -335,17 +338,21 @@ class KubeflowTrainingService implements TrainingService {
case TrialConfigMetadataKey.KUBEFLOW_CLUSTER_CONFIG:
let kubeflowClusterJsonObject = JSON.parse(value);
let kubeflowClusterConfigBase: KubeflowClusterConfigBase = new KubeflowClusterConfigBase(kubeflowClusterJsonObject.operator, kubeflowClusterJsonObject.storage);
let kubeflowClusterConfigBase: KubeflowClusterConfigBase
= new KubeflowClusterConfigBase(kubeflowClusterJsonObject.operator, kubeflowClusterJsonObject.apiVersion, kubeflowClusterJsonObject.storage);
if(kubeflowClusterConfigBase && kubeflowClusterConfigBase.storage === 'azureStorage') {
this.kubeflowClusterConfig = new KubeflowClusterConfigAzure(kubeflowClusterJsonObject.operator, kubeflowClusterJsonObject.keyvault,
kubeflowClusterJsonObject.azureStorage, kubeflowClusterJsonObject.storage)
let azureKubeflowClusterConfig = <KubeflowClusterConfigAzure>this.kubeflowClusterConfig;
const azureKubeflowClusterConfig: KubeflowClusterConfigAzure =
new KubeflowClusterConfigAzure(kubeflowClusterJsonObject.operator,
kubeflowClusterJsonObject.apiVersion,
kubeflowClusterJsonObject.keyvault,
kubeflowClusterJsonObject.azureStorage, kubeflowClusterJsonObject.storage);
const vaultName = azureKubeflowClusterConfig.keyVault.vaultName;
const valutKeyName = azureKubeflowClusterConfig.keyVault.name;
this.azureStorageAccountName = azureKubeflowClusterConfig.azureStorage.accountName;
this.azureStorageShare = azureKubeflowClusterConfig.azureStorage.azureShare;
try{
try {
const result = await cpp.exec(`az keyvault secret show --name ${valutKeyName} --vault-name ${vaultName}`);
if(result.stderr) {
const errorMessage: string = result.stderr;
......@@ -358,19 +365,41 @@ class KubeflowTrainingService implements TrainingService {
await AzureStorageClientUtility.createShare(this.azureStorageClient, this.azureStorageShare);
//create sotrage secret
this.azureStorageSecretName = 'nni-secret-' + uniqueString(8).toLowerCase();
await cpp.exec(`kubectl create secret generic ${this.azureStorageSecretName} `
+ `--from-literal=azurestorageaccountname=${this.azureStorageAccountName} `
+ `--from-literal=azurestorageaccountkey=${storageAccountKey}`)
}catch(error) {
await this.genericK8sClient.createSecret(
{
apiVersion: 'v1',
kind: 'Secret',
metadata: {
name: this.azureStorageSecretName,
namespace: 'default',
labels: {
app: this.NNI_KUBEFLOW_TRIAL_LABEL,
expId: getExperimentId()
}
},
type: 'Opaque',
data: {
azurestorageaccountname: this.azureStorageAccountName,
azurestorageaccountkey: storageAccountKey
}
}
);
} catch(error) {
this.log.error(error);
throw new Error(error);
}
this.kubeflowClusterConfig = azureKubeflowClusterConfig;
} else if(kubeflowClusterConfigBase && (kubeflowClusterConfigBase.storage === 'nfs' || kubeflowClusterConfigBase.storage === undefined)) {
//Check and mount NFS mount point here
//If storage is undefined, the default value is nfs
this.kubeflowClusterConfig = new KubeflowClusterConfigNFS(kubeflowClusterJsonObject.operator, kubeflowClusterJsonObject.nfs,
kubeflowClusterJsonObject.storage)
let nfsKubeflowClusterConfig = <KubeflowClusterConfigNFS>this.kubeflowClusterConfig;
const nfsKubeflowClusterConfig: KubeflowClusterConfigNFS =
new KubeflowClusterConfigNFS(kubeflowClusterJsonObject.operator,
kubeflowClusterJsonObject.apiVersion,
kubeflowClusterJsonObject.nfs,
kubeflowClusterJsonObject.storage);
await cpp.exec(`mkdir -p ${this.trialLocalNFSTempFolder}`);
const nfsServer: string = nfsKubeflowClusterConfig.nfs.server;
const nfsPath: string = nfsKubeflowClusterConfig.nfs.path;
......@@ -382,14 +411,15 @@ class KubeflowTrainingService implements TrainingService {
this.log.error(mountError);
throw new Error(mountError);
}
this.kubeflowClusterConfig = nfsKubeflowClusterConfig;
} else {
const error: string = `kubeflowClusterConfig format error!`;
this.log.error(error);
throw new Error(error);
}
this.kubeflowJobPlural = kubeflowOperatorMap.get(this.kubeflowClusterConfig.operator);
this.kubeflowJobKind = kubeflowOperatorJobKindMap.get(this.kubeflowClusterConfig.operator)
this.operatorClient = KubeflowOperatorClient.generateOperatorClient(this.kubeflowClusterConfig.operator,
this.kubeflowClusterConfig.apiVersion);
break;
case TrialConfigMetadataKey.TRIAL_CONFIG:
......@@ -405,7 +435,7 @@ class KubeflowTrainingService implements TrainingService {
kubeflowTrialJsonObjsect.worker, kubeflowTrialJsonObjsect.ps);
}else if(this.kubeflowClusterConfig.operator === 'pytorch-operator'){
this.kubeflowTrialConfig = new KubeflowTrialConfigPytorch(kubeflowTrialJsonObjsect.codeDir,
kubeflowTrialJsonObjsect.worker, kubeflowTrialJsonObjsect.master);
kubeflowTrialJsonObjsect.master, kubeflowTrialJsonObjsect.worker);
}
if (!this.kubeflowTrialConfig){
......@@ -444,14 +474,19 @@ class KubeflowTrainingService implements TrainingService {
kubeflowTrialJob.status = 'SYS_CANCELED';
}
}
assert(this.kubeflowJobPlural !== undefined);
// Delete all kubeflow jobs whose expId label is current experiment id
try {
await cpp.exec(`kubectl delete ${this.kubeflowJobPlural} -l app=${this.NNI_KUBEFLOW_TRIAL_LABEL},expId=${getExperimentId()}`);
if(this.operatorClient) {
await this.operatorClient.deleteKubeflowJob(new Map(
[
['app', this.NNI_KUBEFLOW_TRIAL_LABEL],
['expId', getExperimentId()]
]
));
}
} catch(error) {
this.log.error(`Delete ${this.kubeflowJobPlural} with label: app=${this.NNI_KUBEFLOW_TRIAL_LABEL},expId=${getExperimentId()} failed, error is ${error}`);
this.log.error(`Delete kubeflow job with label: app=${this.NNI_KUBEFLOW_TRIAL_LABEL},expId=${getExperimentId()} failed, error is ${error}`);
}
// Unmount NFS
......@@ -495,9 +530,10 @@ class KubeflowTrainingService implements TrainingService {
throw new Error('Kubeflow trial config is not initialized');
}
if(!this.kubeflowJobPlural) {
throw new Error('Kubeflow job plural name is undefined');
if(!this.operatorClient) {
throw new Error('Kubeflow operator client is not initialized');
}
const replicaSpecsObj: any = {};
let replicaSpecsObjMap = new Map<string, object>();
......@@ -505,26 +541,28 @@ class KubeflowTrainingService implements TrainingService {
let tensorflowTrialConfig: KubeflowTrialConfigTensorflow = <KubeflowTrialConfigTensorflow>this.kubeflowTrialConfig;
replicaSpecsObj.Worker = this.generateReplicaConfig(trialWorkingFolder, tensorflowTrialConfig.worker.replicas,
tensorflowTrialConfig.worker.image, 'run_worker.sh', workerPodResources);
if (tensorflowTrialConfig.ps){
replicaSpecsObj.Ps = this.generateReplicaConfig(trialWorkingFolder, tensorflowTrialConfig.ps.replicas,
tensorflowTrialConfig.ps.image, 'run_ps.sh', nonWorkerPodResources);
}
replicaSpecsObjMap.set(this.kubeflowJobPlural, {'tfReplicaSpecs': replicaSpecsObj})
replicaSpecsObjMap.set(this.operatorClient.jobKind, {'tfReplicaSpecs': replicaSpecsObj})
}
else if(this.kubeflowClusterConfig.operator === 'pytorch-operator') {
let pytorchTrialConfig: KubeflowTrialConfigPytorch = <KubeflowTrialConfigPytorch>this.kubeflowTrialConfig;
replicaSpecsObj.Worker = this.generateReplicaConfig(trialWorkingFolder, pytorchTrialConfig.worker.replicas,
pytorchTrialConfig.worker.image, 'run_worker.sh', workerPodResources);
if(pytorchTrialConfig.master){
replicaSpecsObj.Master = this.generateReplicaConfig(trialWorkingFolder, pytorchTrialConfig.master.replicas,
pytorchTrialConfig.master.image, 'run_master.sh', nonWorkerPodResources);
if(pytorchTrialConfig.worker) {
replicaSpecsObj.Worker = this.generateReplicaConfig(trialWorkingFolder, pytorchTrialConfig.worker.replicas,
pytorchTrialConfig.worker.image, 'run_worker.sh', workerPodResources);
}
replicaSpecsObjMap.set(this.kubeflowJobPlural, {'pytorchReplicaSpecs': replicaSpecsObj})
replicaSpecsObj.Master = this.generateReplicaConfig(trialWorkingFolder, pytorchTrialConfig.master.replicas,
pytorchTrialConfig.master.image, 'run_master.sh', nonWorkerPodResources);
replicaSpecsObjMap.set(this.operatorClient.jobKind, {'pytorchReplicaSpecs': replicaSpecsObj})
}
return {
apiVersion: 'kubeflow.org/v1alpha2',
kind: this.kubeflowJobKind,
apiVersion: `kubeflow.org/${this.operatorClient.apiVersion}`,
kind: this.operatorClient.jobKind,
metadata: {
name: kubeflowJobName,
namespace: 'default',
......@@ -534,7 +572,7 @@ class KubeflowTrainingService implements TrainingService {
trialId: trialJobId
}
},
spec: replicaSpecsObjMap.get(this.kubeflowJobPlural)
spec: replicaSpecsObjMap.get(this.operatorClient.jobKind)
};
}
......@@ -555,8 +593,8 @@ class KubeflowTrainingService implements TrainingService {
throw new Error('Kubeflow trial config is not initialized');
}
if(!this.kubeflowJobPlural) {
throw new Error('Kubeflow job plural name is undefined');
if(!this.operatorClient) {
throw new Error('Kubeflow operator client is not initialized');
}
let volumeSpecMap = new Map<string, object>();
......@@ -582,13 +620,6 @@ class KubeflowTrainingService implements TrainingService {
}])
}
let containerNameMap = new Map<string, string>();
if(this.kubeflowJobPlural == 'tfjobs'){
containerNameMap.set(this.kubeflowJobPlural, 'tensorflow');
}else if(this.kubeflowJobPlural == 'pytorchjobs'){
containerNameMap.set(this.kubeflowJobPlural, 'pytorch');
}
return {
replicas: replicaNumber,
template: {
......@@ -600,7 +631,7 @@ class KubeflowTrainingService implements TrainingService {
{
// Kubeflow tensorflow operator requires that containers' name must be tensorflow
// TODO: change the name based on operator's type
name: containerNameMap.get(this.kubeflowJobPlural),
name: this.operatorClient.containerName,
image: replicaImage,
args: ["sh", `${path.join(trialWorkingFolder, runScriptFile)}`],
volumeMounts: [
......
/**
* Copyright (c) Microsoft Corporation
* All rights reserved.
*
* MIT License
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
* documentation files (the "Software"), to deal in the Software without restriction, including without limitation
* the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and
* to permit persons to whom the Software is furnished to do so, subject to the following conditions:
* The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING
* BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
* DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
'use strict';
import * as fs from 'fs';
import * as os from 'os'
import * as path from 'path';
import { getLogger, Logger } from '../../common/log';
import { KubeflowOperator, OperatorApiVersion } from './kubeflowConfig';
var K8SClient = require('kubernetes-client').Client;
var K8SConfig = require('kubernetes-client').config;
/**
* Generict Kubernetes client, target version >= 1.9
*/
class GeneralK8sClient {
protected readonly client: any;
protected readonly log: Logger = getLogger();
constructor() {
this.client = new K8SClient({ config: K8SConfig.fromKubeconfig(path.join(os.homedir(), '.kube', 'config')), version: '1.9'});
this.client.loadSpec();
}
public async createSecret(secretManifest: any): Promise<boolean> {
let result: Promise<boolean>;
const response : any = await this.client.api.v1.namespaces('default').secrets.post({body: secretManifest});
if(response.statusCode && (response.statusCode >= 200 && response.statusCode <= 299)) {
result = Promise.resolve(true);
} else {
result = Promise.reject(`Create secrets failed, statusCode is ${response.statusCode}`);
}
return result;
}
}
abstract class KubeflowOperatorClient {
protected readonly client: any;
protected readonly log: Logger = getLogger();
protected crdSchema: any;
constructor() {
this.client = new K8SClient({ config: K8SConfig.fromKubeconfig(path.join(os.homedir(), '.kube', 'config'))});
this.client.loadSpec();
}
protected abstract get operator(): any;
public abstract get containerName(): string;
/**
* Factory method to generate operator cliet
*/
public static generateOperatorClient(kubeflowOperator: KubeflowOperator,
operatorApiVersion: OperatorApiVersion): KubeflowOperatorClient {
if(kubeflowOperator === 'tf-operator') {
if(operatorApiVersion == 'v1alpha2') {
return new TFOperatorClientV1Alpha2();
} else if(operatorApiVersion == 'v1beta1') {
return new TFOperatorClientV1Beta1();
}
} else if(kubeflowOperator === 'pytorch-operator') {
if(operatorApiVersion == 'v1alpha2') {
return new PytorchOperatorClientV1Alpha2();
} else if(operatorApiVersion == 'v1beta1') {
return new PytorchOperatorClientV1Beta1();
}
}
throw new Error(`Invalid operator ${kubeflowOperator} or apiVersion ${operatorApiVersion}`);
}
public get jobKind(): string {
if(this.crdSchema
&& this.crdSchema.spec
&& this.crdSchema.spec.names
&& this.crdSchema.spec.names.kind) {
return this.crdSchema.spec.names.kind;
} else {
throw new Error('KubeflowOperatorClient: getJobKind failed, kind is undefined in crd schema!');
}
}
public get apiVersion(): string {
if(this.crdSchema
&& this.crdSchema.spec
&& this.crdSchema.spec.version) {
return this.crdSchema.spec.version;
} else {
throw new Error('KubeflowOperatorClient: get apiVersion failed, version is undefined in crd schema!');
}
}
public async createKubeflowJob(jobManifest: any): Promise<boolean> {
let result: Promise<boolean>;
const response : any = await this.operator.post({body: jobManifest});
if(response.statusCode && (response.statusCode >= 200 && response.statusCode <= 299)) {
result = Promise.resolve(true);
} else {
result = Promise.reject(`KubeflowOperatorClient create tfjobs failed, statusCode is ${response.statusCode}`);
}
return result;
}
//TODO : replace any
public async getKubeflowJob(kubeflowJobName: string): Promise<any> {
let result: Promise<any>;
const response : any = await this.operator(kubeflowJobName).get();
if(response.statusCode && (response.statusCode >= 200 && response.statusCode <= 299)) {
result = Promise.resolve(response.body);
} else {
result = Promise.reject(`KubeflowOperatorClient get tfjobs failed, statusCode is ${response.statusCode}`);
}
return result;
}
public async deleteKubeflowJob(labels: Map<string, string>): Promise<boolean> {
let result: Promise<boolean>;
// construct match query from labels for deleting tfjob
const matchQuery: string = Array.from(labels.keys()).map(labelKey => `${labelKey}=${labels.get(labelKey)}`).join(',');
try {
const deleteResult : any = await this.operator().delete({ qs: { labelSelector: matchQuery } });
if(deleteResult.statusCode && deleteResult.statusCode >= 200 && deleteResult.statusCode <= 299) {
result = Promise.resolve(true);
} else {
result = Promise.reject(`KubeflowOperatorClient, delete labels ${matchQuery} get wrong statusCode ${deleteResult.statusCode}`);
}
} catch(err) {
result = Promise.reject(err);
}
return result;
}
}
class TFOperatorClientV1Alpha2 extends KubeflowOperatorClient {
/**
* constructor, to initialize tfjob CRD definition
*/
public constructor() {
super();
this.crdSchema = JSON.parse(fs.readFileSync('./config/kubeflow/tfjob-crd-v1alpha2.json', 'utf8'));
this.client.addCustomResourceDefinition(this.crdSchema);
}
protected get operator(): any {
return this.client.apis["kubeflow.org"].v1alpha2.namespaces('default').tfjobs;
}
public get containerName(): string {
return 'tensorflow';
}
}
class TFOperatorClientV1Beta1 extends KubeflowOperatorClient {
/**
* constructor, to initialize tfjob CRD definition
*/
public constructor() {
super();
this.crdSchema = JSON.parse(fs.readFileSync('./config/kubeflow/tfjob-crd-v1beta1.json', 'utf8'));
this.client.addCustomResourceDefinition(this.crdSchema);
}
protected get operator(): any {
return this.client.apis["kubeflow.org"].v1beta1.namespaces('default').tfjobs;
}
public get containerName(): string {
return 'tensorflow';
}
}
class PytorchOperatorClientV1Alpha2 extends KubeflowOperatorClient {
/**
* constructor, to initialize tfjob CRD definition
*/
public constructor() {
super();
this.crdSchema = JSON.parse(fs.readFileSync('./config/kubeflow/pytorchjob-crd-v1alpha2.json', 'utf8'));
this.client.addCustomResourceDefinition(this.crdSchema);
}
protected get operator(): any {
return this.client.apis["kubeflow.org"].v1alpha2.namespaces('default').pytorchjobs;
}
public get containerName(): string {
return 'pytorch';
}
}
class PytorchOperatorClientV1Beta1 extends KubeflowOperatorClient {
/**
* constructor, to initialize tfjob CRD definition
*/
public constructor() {
super();
this.crdSchema = JSON.parse(fs.readFileSync('./config/kubeflow/pytorchjob-crd-v1beta1.json', 'utf8'));
this.client.addCustomResourceDefinition(this.crdSchema);
}
protected get operator(): any {
return this.client.apis["kubeflow.org"].v1beta1.namespaces('default').pytorchjobs;
}
public get containerName(): string {
return 'pytorch';
}
}
export { KubeflowOperatorClient, GeneralK8sClient };
......@@ -140,7 +140,7 @@ kubeflow_trial_schema = {
'memoryMB': int,
'image': str
},
'worker':{
Optional('worker'):{
'replicas': int,
'command': str,
'gpuNum': And(int, lambda x: 0 <= x <= 99999),
......@@ -154,6 +154,7 @@ kubeflow_trial_schema = {
kubeflow_config_schema = {
'kubeflowConfig':Or({
'operator': Or('tf-operator', 'pytorch-operator'),
'apiVersion': str,
Optional('storage'): Or('nfs', 'azureStorage'),
'nfs': {
'server': str,
......
......@@ -94,10 +94,16 @@ def validate_kubeflow_operators(experiment_config):
if experiment_config.get('trial').get('master') is not None:
print_error('kubeflow with tf-operator can not set master')
exit(1)
if experiment_config.get('trial').get('worker') is None:
print_error('kubeflow with tf-operator must set worker')
exit(1)
elif experiment_config.get('kubeflowConfig').get('operator') == 'pytorch-operator':
if experiment_config.get('trial').get('ps') is not None:
print_error('kubeflow with pytorch-operator can not set ps')
exit(1)
if experiment_config.get('trial').get('master') is None:
print_error('kubeflow with pytorch-operator must set master')
exit(1)
if experiment_config.get('kubeflowConfig').get('storage') == 'nfs':
if experiment_config.get('kubeflowConfig').get('nfs') is None:
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment