kubeflowTrainingService.ts 26.7 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
 * Copyright (c) Microsoft Corporation
 * All rights reserved.
 *
 * MIT License
 *
 * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
 * documentation files (the "Software"), to deal in the Software without restriction, including without limitation
 * the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and
 * to permit persons to whom the Software is furnished to do so, subject to the following conditions:
 * The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
 *
 * THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING
 * BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
 * DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
 */

20
'use strict';
21
22
23
24
25

import * as assert from 'assert';
import * as cpp from 'child-process-promise';
import * as fs from 'fs';
import * as path from 'path';
26
import * as component from '../../../common/component';
27
28
29

import { getExperimentId } from '../../../common/experimentStartupInfo';
import {
30
    JobApplicationForm, NNIManagerIpConfig, TrialJobApplicationForm, TrialJobDetail, TrialJobStatus
31
32
} from '../../../common/trainingService';
import { delay, generateParamFileName, getExperimentRootDir, uniqueString } from '../../../common/utils';
33
34
import { CONTAINER_INSTALL_NNI_SHELL_FORMAT } from '../../common/containerJobData';
import { TrialConfigMetadataKey } from '../../common/trialConfigMetadataKey';
35
36
import { validateCodeDir } from '../../common/util';
import { AzureStorageClientUtility } from '../azureStorageClientUtils';
37
38
39
import { NFSConfig } from '../kubernetesConfig';
import { KubernetesTrialJobDetail } from '../kubernetesData';
import { KubernetesTrainingService } from '../kubernetesTrainingService';
40
import { KubeflowOperatorClient } from './kubeflowApiClient';
41
42
43
import { KubeflowClusterConfig, KubeflowClusterConfigAzure, KubeflowClusterConfigFactory, KubeflowClusterConfigNFS,
    KubeflowTrialConfig, KubeflowTrialConfigFactory, KubeflowTrialConfigPytorch, KubeflowTrialConfigTensorflow
} from './kubeflowConfig';
44
import { KubeflowJobInfoCollector } from './kubeflowJobInfoCollector';
45
import { KubeflowJobRestServer } from './kubeflowJobRestServer';
46

47
// tslint:disable: no-unsafe-any no-any
48
49
50
51
52
53
54
55
/**
 * Training Service implementation for Kubeflow
 * Refer https://github.com/kubeflow/kubeflow for more info about Kubeflow
 */
@component.Singleton
class KubeflowTrainingService extends KubernetesTrainingService implements KubernetesTrainingService {
    private kubeflowClusterConfig?: KubeflowClusterConfig;
    private kubeflowTrialConfig?: KubeflowTrialConfig;
56
    private readonly kubeflowJobInfoCollector: KubeflowJobInfoCollector;
57
58

    constructor() {
59
        super();
60
        this.kubeflowJobInfoCollector = new KubeflowJobInfoCollector(this.trialJobsMap);
61
        this.experimentId = getExperimentId();
62
        this.nextTrialSequenceId = -1;
chicm-ms's avatar
chicm-ms committed
63
        this.log.info('Construct Kubeflow training service.');
64
65
66
    }

    public async run(): Promise<void> {
chicm-ms's avatar
chicm-ms committed
67
        this.log.info('Run Kubeflow training service.');
68
        this.kubernetesJobRestServer = component.get(KubeflowJobRestServer);
69
        if (this.kubernetesJobRestServer === undefined) {
70
71
72
            throw new Error('kubernetesJobRestServer not initialized!');
        }
        await this.kubernetesJobRestServer.start();
73
        this.kubernetesJobRestServer.setEnableVersionCheck = this.versionCheck;
74
75
        this.log.info(`Kubeflow Training service rest server listening on: ${this.kubernetesJobRestServer.endPoint}`);
        while (!this.stopping) {
76
            // collect metrics for Kubeflow jobs by interacting with Kubernetes API server
77
78
            await delay(3000);
            await this.kubeflowJobInfoCollector.retrieveTrialStatus(this.kubernetesCRDClient);
79
            if (this.kubernetesJobRestServer.getErrorMessage !== undefined) {
80
81
82
                throw new Error(this.kubernetesJobRestServer.getErrorMessage);
                this.stopping = true;
            }
83
        }
chicm-ms's avatar
chicm-ms committed
84
        this.log.info('Kubeflow training service exit.');
85
86
87
    }

    public async submitTrialJob(form: JobApplicationForm): Promise<TrialJobDetail> {
88
        if (this.kubernetesCRDClient === undefined) {
89
90
91
            throw new Error('Kubeflow job operator client is undefined');
        }

92
        if (this.kubernetesRestServerPort === undefined) {
93
94
95
96
97
            const restServer: KubeflowJobRestServer = component.get(KubeflowJobRestServer);
            this.kubernetesRestServerPort = restServer.clusterRestServerPort;
        }
        const trialJobId: string = uniqueString(5);
        const trialWorkingFolder: string = path.join(this.CONTAINER_MOUNT_PATH, 'nni', getExperimentId(), trialJobId);
98
        const kubeflowJobName: string = `nni-exp-${this.experimentId}-trial-${trialJobId}`.toLowerCase();
99
100
101
102
103
104
        const curTrialSequenceId: number = this.generateSequenceId();
        const trialLocalTempFolder: string = path.join(getExperimentRootDir(), 'trials-local', trialJobId);
        //prepare the runscript
        await this.prepareRunScript(trialLocalTempFolder, trialJobId, trialWorkingFolder, curTrialSequenceId, form);
        //upload files to sotrage
        const trialJobOutputUrl: string = await this.uploadCodeFiles(trialJobId, trialLocalTempFolder);
105
106
107
108
        let initStatus: TrialJobStatus = 'WAITING';
        if (!trialJobOutputUrl) {
            initStatus = 'FAILED';
        }
109
110
        const trialJobDetail: KubernetesTrialJobDetail = new KubernetesTrialJobDetail(
            trialJobId,
111
            initStatus,
112
113
114
115
116
117
118
            Date.now(),
            trialWorkingFolder,
            form,
            kubeflowJobName,
            curTrialSequenceId,
            trialJobOutputUrl
        );
119
120

        // Generate kubeflow job resource config object
121
122
123
124
        const kubeflowJobConfig: any = await this.prepareKubeflowConfig(trialJobId, trialWorkingFolder, kubeflowJobName);
        // Create kubeflow job based on generated kubeflow job resource config
        await this.kubernetesCRDClient.createKubernetesJob(kubeflowJobConfig);

125
        // Set trial job detail until create Kubeflow job successfully
126
127
128
129
        this.trialJobsMap.set(trialJobId, trialJobDetail);

        return Promise.resolve(trialJobDetail);
    }
130

131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
    // tslint:disable:no-redundant-jsdoc
    public async setClusterMetadata(key: string, value: string): Promise<void> {
        switch (key) {
            case TrialConfigMetadataKey.NNI_MANAGER_IP:
                this.nniManagerIpConfig = <NNIManagerIpConfig>JSON.parse(value);
                break;

            case TrialConfigMetadataKey.KUBEFLOW_CLUSTER_CONFIG:
                const kubeflowClusterJsonObject: object = JSON.parse(value);
                this.kubeflowClusterConfig = KubeflowClusterConfigFactory.generateKubeflowClusterConfig(kubeflowClusterJsonObject);
                if (this.kubeflowClusterConfig.storageType === 'azureStorage') {
                    const azureKubeflowClusterConfig: KubeflowClusterConfigAzure = <KubeflowClusterConfigAzure>this.kubeflowClusterConfig;
                    this.azureStorageAccountName = azureKubeflowClusterConfig.azureStorage.accountName;
                    this.azureStorageShare = azureKubeflowClusterConfig.azureStorage.azureShare;
                    await this.createAzureStorage(
                        azureKubeflowClusterConfig.keyVault.vaultName,
                        azureKubeflowClusterConfig.keyVault.name,
                        azureKubeflowClusterConfig.azureStorage.accountName,
                        azureKubeflowClusterConfig.azureStorage.azureShare
                    );
                } else if (this.kubeflowClusterConfig.storageType === 'nfs') {
                    const nfsKubeflowClusterConfig: KubeflowClusterConfigNFS = <KubeflowClusterConfigNFS>this.kubeflowClusterConfig;
                    await this.createNFSStorage(
                        nfsKubeflowClusterConfig.nfs.server,
                        nfsKubeflowClusterConfig.nfs.path
                    );
                }
                this.kubernetesCRDClient = KubeflowOperatorClient.generateOperatorClient(this.kubeflowClusterConfig.operator,
                                                                                         this.kubeflowClusterConfig.apiVersion);
                break;

            case TrialConfigMetadataKey.TRIAL_CONFIG:
                if (this.kubeflowClusterConfig === undefined) {
                    this.log.error('kubeflow cluster config is not initialized');

                    return Promise.reject(new Error('kubeflow cluster config is not initialized'));
                }

                assert(this.kubeflowClusterConfig !== undefined);
                const kubeflowTrialJsonObjsect: object = JSON.parse(value);
                this.kubeflowTrialConfig = KubeflowTrialConfigFactory.generateKubeflowTrialConfig(
                    kubeflowTrialJsonObjsect,
                    this.kubeflowClusterConfig.operator
                );

                // Validate to make sure codeDir doesn't have too many files
                try {
                    await validateCodeDir(this.kubeflowTrialConfig.codeDir);
                } catch (error) {
                    this.log.error(error);

                    return Promise.reject(new Error(error));
                }
                break;
            case TrialConfigMetadataKey.VERSION_CHECK:
                this.versionCheck = (value === 'true' || value === 'True');
                break;
            case TrialConfigMetadataKey.LOG_COLLECTION:
                this.logCollection = value;
                break;
            default:
        }

        return Promise.resolve();
    }

197
198
    /**
     * upload code files to nfs or azureStroage
199
200
     * @param trialJobId
     * @param trialLocalTempFolder
201
202
203
     * return: trialJobOutputUrl
     */
    private async uploadCodeFiles(trialJobId: string, trialLocalTempFolder: string): Promise<string> {
204
        if (this.kubeflowClusterConfig === undefined) {
205
206
207
            throw new Error('Kubeflow Cluster config is not initialized');
        }

208
209
210
211
        if (this.kubeflowTrialConfig === undefined) {
            throw new Error('Kubeflow Trial config is not initialized');
        }

212
213
        let trialJobOutputUrl: string = '';

214
        assert(this.kubeflowClusterConfig.storage === undefined
215
            || this.kubeflowClusterConfig.storage === 'azureStorage'
216
217
            || this.kubeflowClusterConfig.storage === 'nfs');

218
219
220
221
        if (this.kubeflowClusterConfig.storage === 'azureStorage') {
            if (this.azureStorageClient === undefined) {
                throw new Error('azureStorageClient is not initialized');
            }
222
223
            const azureKubeflowClusterConfig: KubeflowClusterConfigAzure = <KubeflowClusterConfigAzure>this.kubeflowClusterConfig;
            trialJobOutputUrl = await this.uploadFilesToAzureStorage(trialJobId, trialLocalTempFolder, this.kubeflowTrialConfig.codeDir, azureKubeflowClusterConfig.uploadRetryCount);
224
225
        } else if (this.kubeflowClusterConfig.storage === 'nfs' || this.kubeflowClusterConfig.storage === undefined) {
            const nfsKubeflowClusterConfig: KubeflowClusterConfigNFS = <KubeflowClusterConfigNFS>this.kubeflowClusterConfig;
226
            // Creat work dir for current trial in NFS directory
227
            await cpp.exec(`mkdir -p ${this.trialLocalNFSTempFolder}/nni/${getExperimentId()}/${trialJobId}`);
228
            // Copy script files from local dir to NFS mounted dir
229
            await cpp.exec(`cp -r ${trialLocalTempFolder}/* ${this.trialLocalNFSTempFolder}/nni/${getExperimentId()}/${trialJobId}/.`);
230
231
            // Copy codeDir to NFS mounted dir
            await cpp.exec(`cp -r ${this.kubeflowTrialConfig.codeDir}/* ${this.trialLocalNFSTempFolder}/nni/${getExperimentId()}/${trialJobId}/.`);
232
            const nfsConfig: NFSConfig = nfsKubeflowClusterConfig.nfs;
233
            trialJobOutputUrl = `nfs://${nfsConfig.server}:${path.join(nfsConfig.path, 'nni', getExperimentId(), trialJobId, 'output')}`;
234
235
236
237
        }

        return Promise.resolve(trialJobOutputUrl);
    }
238

239
240
241
    private async prepareRunScript(trialLocalTempFolder: string, trialJobId: string, trialWorkingFolder: string, curTrialSequenceId: number,
                                   form: JobApplicationForm): Promise<void> {
        if (this.kubeflowClusterConfig === undefined) {
242
243
244
245
            throw new Error('Kubeflow Cluster config is not initialized');
        }

        // initialize kubeflow trial config to specific type
246
247
        let kubeflowTrialConfig: any;
        if (this.kubeflowClusterConfig.operator === 'tf-operator') {
248
            kubeflowTrialConfig = <KubeflowTrialConfigTensorflow>this.kubeflowTrialConfig;
249
        } else if (this.kubeflowClusterConfig.operator === 'pytorch-operator') {
250
            kubeflowTrialConfig = <KubeflowTrialConfigPytorch>this.kubeflowTrialConfig;
251
252
        } else {
            throw Error(`operator ${this.kubeflowClusterConfig.operator} is invalid`);
253
        }
254

255
        //create tmp trial working folder locally.
256
        await cpp.exec(`mkdir -p ${trialLocalTempFolder}`);
257
258
259
260
261
262
        const runScriptContent : string = CONTAINER_INSTALL_NNI_SHELL_FORMAT;
        // Write NNI installation file to local tmp files
        await fs.promises.writeFile(path.join(trialLocalTempFolder, 'install_nni.sh'), runScriptContent, { encoding: 'utf8' });

        // Write worker file content run_worker.sh to local tmp folders
        if (kubeflowTrialConfig.worker !== undefined) {
263
           const workerRunScriptContent: string = await this.generateRunScript('kubeflow', trialJobId, trialWorkingFolder,
264
265
266
                                                                               kubeflowTrialConfig.worker.command,
                                                                               curTrialSequenceId.toString(), 'worker',
                                                                               kubeflowTrialConfig.worker.gpuNum);
267
           await fs.promises.writeFile(path.join(trialLocalTempFolder, 'run_worker.sh'), workerRunScriptContent, { encoding: 'utf8' });
268
269
270
271
272
        }
        // Write parameter server file content run_ps.sh to local tmp folders
        if (this.kubeflowClusterConfig.operator === 'tf-operator') {
           const tensorflowTrialConfig: KubeflowTrialConfigTensorflow = <KubeflowTrialConfigTensorflow>this.kubeflowTrialConfig;
           if (tensorflowTrialConfig.ps !== undefined) {
273
               const psRunScriptContent: string = await this.generateRunScript('kubeflow', trialJobId, trialWorkingFolder,
274
275
276
                                                                               tensorflowTrialConfig.ps.command,
                                                                               curTrialSequenceId.toString(),
                                                                               'ps', tensorflowTrialConfig.ps.gpuNum);
277
278
               await fs.promises.writeFile(path.join(trialLocalTempFolder, 'run_ps.sh'), psRunScriptContent, { encoding: 'utf8' });
           }
279
280
281
        } else if (this.kubeflowClusterConfig.operator === 'pytorch-operator') {
           const pytorchTrialConfig: KubeflowTrialConfigPytorch = <KubeflowTrialConfigPytorch>this.kubeflowTrialConfig;
           if (pytorchTrialConfig.master !== undefined) {
282
               const masterRunScriptContent: string = await this.generateRunScript('kubeflow', trialJobId, trialWorkingFolder,
283
284
285
                                                                                   pytorchTrialConfig.master.command,
                                                                                   curTrialSequenceId.toString(), 'master',
                                                                                   pytorchTrialConfig.master.gpuNum);
286
287
               await fs.promises.writeFile(path.join(trialLocalTempFolder, 'run_master.sh'), masterRunScriptContent, { encoding: 'utf8' });
           }
288
289
290
291
        }
        // Write file content ( parameter.cfg ) to local tmp folders
        const trialForm : TrialJobApplicationForm = (<TrialJobApplicationForm>form);
        if (trialForm !== undefined && trialForm.hyperParameters !== undefined) {
292
           await fs.promises.writeFile(path.join(trialLocalTempFolder, generateParamFileName(trialForm.hyperParameters)),
293
294
                                       trialForm.hyperParameters.value, { encoding: 'utf8' });
        }
295
    }
296

297
    private async prepareKubeflowConfig(trialJobId: string, trialWorkingFolder: string, kubeflowJobName: string): Promise<any> {
298
        if (this.kubeflowClusterConfig === undefined) {
299
300
301
            throw new Error('Kubeflow Cluster config is not initialized');
        }

302
        if (this.kubeflowTrialConfig === undefined) {
303
304
305
306
            throw new Error('Kubeflow trial config is not initialized');
        }

        // initialize kubeflow trial config to specific type
307
308
        let kubeflowTrialConfig: any;
        if (this.kubeflowClusterConfig.operator === 'tf-operator') {
309
            kubeflowTrialConfig = <KubeflowTrialConfigTensorflow>this.kubeflowTrialConfig;
310
        } else if (this.kubeflowClusterConfig.operator === 'pytorch-operator') {
311
            kubeflowTrialConfig = <KubeflowTrialConfigPytorch>this.kubeflowTrialConfig;
312
313
        } else {
            throw Error(`operator ${this.kubeflowClusterConfig.operator} is invalid`);
314
        }
315

316
        const workerPodResources : any = {};
317
        if (kubeflowTrialConfig.worker !== undefined) {
318
            workerPodResources.requests = this.generatePodResource(kubeflowTrialConfig.worker.memoryMB, kubeflowTrialConfig.worker.cpuNum,
319
                                                                   kubeflowTrialConfig.worker.gpuNum);
320
        }
321
        workerPodResources.limits = {...workerPodResources.requests};
322

323
324
325
326
        const nonWorkerResources : any = {};
        if (this.kubeflowClusterConfig.operator === 'tf-operator') {
            const tensorflowTrialConfig: KubeflowTrialConfigTensorflow = <KubeflowTrialConfigTensorflow>this.kubeflowTrialConfig;
            if (tensorflowTrialConfig.ps !== undefined) {
327
                nonWorkerResources.requests = this.generatePodResource(tensorflowTrialConfig.ps.memoryMB, tensorflowTrialConfig.ps.cpuNum,
328
329
                                                                       tensorflowTrialConfig.ps.gpuNum);
                nonWorkerResources.limits = {...nonWorkerResources.requests};
330
            }
331
332
        } else if (this.kubeflowClusterConfig.operator === 'pytorch-operator') {
            const pyTorchTrialConfig: KubeflowTrialConfigPytorch = <KubeflowTrialConfigPytorch>this.kubeflowTrialConfig;
333
            nonWorkerResources.requests = this.generatePodResource(pyTorchTrialConfig.master.memoryMB, pyTorchTrialConfig.master.cpuNum,
334
335
                                                                   pyTorchTrialConfig.master.gpuNum);
            nonWorkerResources.limits = {...nonWorkerResources.requests};
336
337
338
        }

        // Generate kubeflow job resource config object
339
        const kubeflowJobConfig: any = await this.generateKubeflowJobConfig(trialJobId, trialWorkingFolder, kubeflowJobName, workerPodResources,
340
                                                                      nonWorkerResources);
341
342

        return Promise.resolve(kubeflowJobConfig);
343
    }
344
345
346
347
348
349
350
351
352

    /**
     * Generate kubeflow resource config file
     * @param trialJobId trial job id
     * @param trialWorkingFolder working folder
     * @param kubeflowJobName job name
     * @param workerPodResources worker pod template
     * @param nonWorkerPodResources non-worker pod template, like ps or master
     */
353
354
    private async generateKubeflowJobConfig(trialJobId: string, trialWorkingFolder: string, kubeflowJobName : string, workerPodResources : any,
                                            nonWorkerPodResources?: any) : Promise<any> {
355
        if (this.kubeflowClusterConfig === undefined) {
356
357
358
            throw new Error('Kubeflow Cluster config is not initialized');
        }

359
        if (this.kubeflowTrialConfig === undefined) {
360
361
362
            throw new Error('Kubeflow trial config is not initialized');
        }

363
        if (this.kubernetesCRDClient === undefined) {
364
365
366
367
            throw new Error('Kubeflow operator client is not initialized');
        }

        const replicaSpecsObj: any = {};
368
369
370
        const replicaSpecsObjMap: Map<string, object> = new Map<string, object>();
        if (this.kubeflowTrialConfig.operatorType === 'tf-operator') {
            const tensorflowTrialConfig: KubeflowTrialConfigTensorflow = <KubeflowTrialConfigTensorflow>this.kubeflowTrialConfig;
371
            let privateRegistrySecretName = await this.createRegistrySecret(tensorflowTrialConfig.worker.privateRegistryAuthPath);
372
            replicaSpecsObj.Worker = this.generateReplicaConfig(trialWorkingFolder, tensorflowTrialConfig.worker.replicas,
373
                                                                tensorflowTrialConfig.worker.image, 'run_worker.sh', workerPodResources, privateRegistrySecretName);
374
            if (tensorflowTrialConfig.ps !== undefined) {
375
                let privateRegistrySecretName: string | undefined = await this.createRegistrySecret(tensorflowTrialConfig.ps.privateRegistryAuthPath);
376
                replicaSpecsObj.Ps = this.generateReplicaConfig(trialWorkingFolder, tensorflowTrialConfig.ps.replicas,
377
                                                                tensorflowTrialConfig.ps.image, 'run_ps.sh', nonWorkerPodResources, privateRegistrySecretName);
378
            }
379
380
381
382
            replicaSpecsObjMap.set(this.kubernetesCRDClient.jobKind, {tfReplicaSpecs: replicaSpecsObj});
        } else if (this.kubeflowTrialConfig.operatorType === 'pytorch-operator') {
            const pytorchTrialConfig: KubeflowTrialConfigPytorch = <KubeflowTrialConfigPytorch>this.kubeflowTrialConfig;
            if (pytorchTrialConfig.worker !== undefined) {
383
                let privateRegistrySecretName: string | undefined = await this.createRegistrySecret(pytorchTrialConfig.worker.privateRegistryAuthPath);
384
                replicaSpecsObj.Worker = this.generateReplicaConfig(trialWorkingFolder, pytorchTrialConfig.worker.replicas,
385
                                                                    pytorchTrialConfig.worker.image, 'run_worker.sh', workerPodResources, privateRegistrySecretName);
386
            }
387
            let privateRegistrySecretName: string | undefined = await this.createRegistrySecret(pytorchTrialConfig.master.privateRegistryAuthPath);
388
            replicaSpecsObj.Master = this.generateReplicaConfig(trialWorkingFolder, pytorchTrialConfig.master.replicas,
389
                                                                pytorchTrialConfig.master.image, 'run_master.sh', nonWorkerPodResources, privateRegistrySecretName);
390

391
            replicaSpecsObjMap.set(this.kubernetesCRDClient.jobKind, {pytorchReplicaSpecs: replicaSpecsObj});
392
393
        }

394
        return Promise.resolve({
395
396
            apiVersion: `kubeflow.org/${this.kubernetesCRDClient.apiVersion}`,
            kind: this.kubernetesCRDClient.jobKind,
397
            metadata: {
398
399
400
401
402
403
404
405
406
                name: kubeflowJobName,
                namespace: 'default',
                labels: {
                    app: this.NNI_KUBERNETES_TRIAL_LABEL,
                    expId: getExperimentId(),
                    trialId: trialJobId
                }
            },
            spec: replicaSpecsObjMap.get(this.kubernetesCRDClient.jobKind)
407
        });
408
409
410
411
412
413
414
415
416
417
    }

    /**
     * Generate tf-operator's tfjobs replica config section
     * @param trialWorkingFolder trial working folder
     * @param replicaNumber replica number
     * @param replicaImage image
     * @param runScriptFile script file name
     * @param podResources pod resource config section
     */
418
    private generateReplicaConfig(trialWorkingFolder: string, replicaNumber: number, replicaImage: string, runScriptFile: string,
419
                                  podResources: any, privateRegistrySecretName: string | undefined): any {
420
        if (this.kubeflowClusterConfig === undefined) {
421
422
423
            throw new Error('Kubeflow Cluster config is not initialized');
        }

424
        if (this.kubeflowTrialConfig === undefined) {
425
426
427
            throw new Error('Kubeflow trial config is not initialized');
        }

428
        if (this.kubernetesCRDClient === undefined) {
429
430
            throw new Error('Kubeflow operator client is not initialized');
        }
431
        // The config spec for volume field
432
433
        const volumeSpecMap: Map<string, object> = new Map<string, object>();
        if (this.kubeflowClusterConfig.storageType === 'azureStorage') {
434
435
436
437
438
439
440
441
            volumeSpecMap.set('nniVolumes', [
            {
                    name: 'nni-vol',
                    azureFile: {
                        secretName: `${this.azureStorageSecretName}`,
                        shareName: `${this.azureStorageShare}`,
                        readonly: false
                    }
442
443
444
            }]);
        } else {
            const nfsKubeflowClusterConfig: KubeflowClusterConfigNFS = <KubeflowClusterConfigNFS> this.kubeflowClusterConfig;
445
446
447
448
449
450
451
            volumeSpecMap.set('nniVolumes', [
            {
                name: 'nni-vol',
                nfs: {
                    server: `${nfsKubeflowClusterConfig.nfs.server}`,
                    path: `${nfsKubeflowClusterConfig.nfs.path}`
                }
452
            }]);
453
        }
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
        // The config spec for container field
        const containersSpecMap: Map<string, object> = new Map<string, object>(); 
        containersSpecMap.set('containers', [
        {
                // Kubeflow tensorflow operator requires that containers' name must be tensorflow
                // TODO: change the name based on operator's type
                name: this.kubernetesCRDClient.containerName,
                image: replicaImage,
                args: ['sh', `${path.join(trialWorkingFolder, runScriptFile)}`],
                volumeMounts: [
                {
                    name: 'nni-vol',
                    mountPath: this.CONTAINER_MOUNT_PATH
                }],
                resources: podResources
            }
        ]);
        let spec: any = {
            containers: containersSpecMap.get('containers'),
            restartPolicy: 'ExitCode',
            volumes: volumeSpecMap.get('nniVolumes')
        }
        if (privateRegistrySecretName) {
            spec.imagePullSecrets = [
                {
                    name: privateRegistrySecretName
                }]
        }
482
483
484
485
        return {
            replicas: replicaNumber,
            template: {
                metadata: {
486
                    // tslint:disable-next-line:no-null-keyword
487
488
                    creationTimestamp: null
                },
489
                spec: spec
490
            }
491
        }
492
493
    }
}
494
495
// tslint:enable: no-unsafe-any no-any
export { KubeflowTrainingService };