kubeflowTrainingService.ts 26.4 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
 * Copyright (c) Microsoft Corporation
 * All rights reserved.
 *
 * MIT License
 *
 * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
 * documentation files (the "Software"), to deal in the Software without restriction, including without limitation
 * the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and
 * to permit persons to whom the Software is furnished to do so, subject to the following conditions:
 * The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
 *
 * THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING
 * BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
 * DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
 */

20
'use strict';
21
22
23
24
25

import * as assert from 'assert';
import * as cpp from 'child-process-promise';
import * as fs from 'fs';
import * as path from 'path';
26
import * as component from '../../../common/component';
27
28
29

import { getExperimentId } from '../../../common/experimentStartupInfo';
import {
30
    JobApplicationForm, NNIManagerIpConfig, TrialJobApplicationForm, TrialJobDetail
31
32
} from '../../../common/trainingService';
import { delay, generateParamFileName, getExperimentRootDir, uniqueString } from '../../../common/utils';
33
34
import { CONTAINER_INSTALL_NNI_SHELL_FORMAT } from '../../common/containerJobData';
import { TrialConfigMetadataKey } from '../../common/trialConfigMetadataKey';
35
36
import { validateCodeDir } from '../../common/util';
import { AzureStorageClientUtility } from '../azureStorageClientUtils';
37
38
39
import { NFSConfig } from '../kubernetesConfig';
import { KubernetesTrialJobDetail } from '../kubernetesData';
import { KubernetesTrainingService } from '../kubernetesTrainingService';
40
import { KubeflowOperatorClient } from './kubeflowApiClient';
41
42
43
import { KubeflowClusterConfig, KubeflowClusterConfigAzure, KubeflowClusterConfigFactory, KubeflowClusterConfigNFS,
    KubeflowTrialConfig, KubeflowTrialConfigFactory, KubeflowTrialConfigPytorch, KubeflowTrialConfigTensorflow
} from './kubeflowConfig';
44
import { KubeflowJobInfoCollector } from './kubeflowJobInfoCollector';
45
import { KubeflowJobRestServer } from './kubeflowJobRestServer';
46

47
// tslint:disable: no-unsafe-any no-any
48
49
50
51
52
53
54
55
/**
 * Training Service implementation for Kubeflow
 * Refer https://github.com/kubeflow/kubeflow for more info about Kubeflow
 */
@component.Singleton
class KubeflowTrainingService extends KubernetesTrainingService implements KubernetesTrainingService {
    private kubeflowClusterConfig?: KubeflowClusterConfig;
    private kubeflowTrialConfig?: KubeflowTrialConfig;
56
    private readonly kubeflowJobInfoCollector: KubeflowJobInfoCollector;
57
58

    constructor() {
59
        super();
60
        this.kubeflowJobInfoCollector = new KubeflowJobInfoCollector(this.trialJobsMap);
61
        this.experimentId = getExperimentId();
62
        this.nextTrialSequenceId = -1;
chicm-ms's avatar
chicm-ms committed
63
        this.log.info('Construct Kubeflow training service.');
64
65
66
    }

    public async run(): Promise<void> {
chicm-ms's avatar
chicm-ms committed
67
        this.log.info('Run Kubeflow training service.');
68
        this.kubernetesJobRestServer = component.get(KubeflowJobRestServer);
69
        if (this.kubernetesJobRestServer === undefined) {
70
71
72
            throw new Error('kubernetesJobRestServer not initialized!');
        }
        await this.kubernetesJobRestServer.start();
73
        this.kubernetesJobRestServer.setEnableVersionCheck = this.versionCheck;
74
75
        this.log.info(`Kubeflow Training service rest server listening on: ${this.kubernetesJobRestServer.endPoint}`);
        while (!this.stopping) {
76
            // collect metrics for Kubeflow jobs by interacting with Kubernetes API server
77
78
            await delay(3000);
            await this.kubeflowJobInfoCollector.retrieveTrialStatus(this.kubernetesCRDClient);
79
            if (this.kubernetesJobRestServer.getErrorMessage !== undefined) {
80
81
82
                throw new Error(this.kubernetesJobRestServer.getErrorMessage);
                this.stopping = true;
            }
83
        }
chicm-ms's avatar
chicm-ms committed
84
        this.log.info('Kubeflow training service exit.');
85
86
87
    }

    public async submitTrialJob(form: JobApplicationForm): Promise<TrialJobDetail> {
88
        if (this.kubernetesCRDClient === undefined) {
89
90
91
            throw new Error('Kubeflow job operator client is undefined');
        }

92
        if (this.kubernetesRestServerPort === undefined) {
93
94
95
96
97
            const restServer: KubeflowJobRestServer = component.get(KubeflowJobRestServer);
            this.kubernetesRestServerPort = restServer.clusterRestServerPort;
        }
        const trialJobId: string = uniqueString(5);
        const trialWorkingFolder: string = path.join(this.CONTAINER_MOUNT_PATH, 'nni', getExperimentId(), trialJobId);
98
        const kubeflowJobName: string = `nni-exp-${this.experimentId}-trial-${trialJobId}`.toLowerCase();
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
        const curTrialSequenceId: number = this.generateSequenceId();
        const trialLocalTempFolder: string = path.join(getExperimentRootDir(), 'trials-local', trialJobId);
        //prepare the runscript
        await this.prepareRunScript(trialLocalTempFolder, trialJobId, trialWorkingFolder, curTrialSequenceId, form);
        //upload files to sotrage
        const trialJobOutputUrl: string = await this.uploadCodeFiles(trialJobId, trialLocalTempFolder);
        const trialJobDetail: KubernetesTrialJobDetail = new KubernetesTrialJobDetail(
            trialJobId,
            'WAITING',
            Date.now(),
            trialWorkingFolder,
            form,
            kubeflowJobName,
            curTrialSequenceId,
            trialJobOutputUrl
        );
115
116

        // Generate kubeflow job resource config object
117
118
119
120
        const kubeflowJobConfig: any = await this.prepareKubeflowConfig(trialJobId, trialWorkingFolder, kubeflowJobName);
        // Create kubeflow job based on generated kubeflow job resource config
        await this.kubernetesCRDClient.createKubernetesJob(kubeflowJobConfig);

121
        // Set trial job detail until create Kubeflow job successfully
122
123
124
125
        this.trialJobsMap.set(trialJobId, trialJobDetail);

        return Promise.resolve(trialJobDetail);
    }
126

127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
    // tslint:disable:no-redundant-jsdoc
    public async setClusterMetadata(key: string, value: string): Promise<void> {
        switch (key) {
            case TrialConfigMetadataKey.NNI_MANAGER_IP:
                this.nniManagerIpConfig = <NNIManagerIpConfig>JSON.parse(value);
                break;

            case TrialConfigMetadataKey.KUBEFLOW_CLUSTER_CONFIG:
                const kubeflowClusterJsonObject: object = JSON.parse(value);
                this.kubeflowClusterConfig = KubeflowClusterConfigFactory.generateKubeflowClusterConfig(kubeflowClusterJsonObject);
                if (this.kubeflowClusterConfig.storageType === 'azureStorage') {
                    const azureKubeflowClusterConfig: KubeflowClusterConfigAzure = <KubeflowClusterConfigAzure>this.kubeflowClusterConfig;
                    this.azureStorageAccountName = azureKubeflowClusterConfig.azureStorage.accountName;
                    this.azureStorageShare = azureKubeflowClusterConfig.azureStorage.azureShare;
                    await this.createAzureStorage(
                        azureKubeflowClusterConfig.keyVault.vaultName,
                        azureKubeflowClusterConfig.keyVault.name,
                        azureKubeflowClusterConfig.azureStorage.accountName,
                        azureKubeflowClusterConfig.azureStorage.azureShare
                    );
                } else if (this.kubeflowClusterConfig.storageType === 'nfs') {
                    const nfsKubeflowClusterConfig: KubeflowClusterConfigNFS = <KubeflowClusterConfigNFS>this.kubeflowClusterConfig;
                    await this.createNFSStorage(
                        nfsKubeflowClusterConfig.nfs.server,
                        nfsKubeflowClusterConfig.nfs.path
                    );
                }
                this.kubernetesCRDClient = KubeflowOperatorClient.generateOperatorClient(this.kubeflowClusterConfig.operator,
                                                                                         this.kubeflowClusterConfig.apiVersion);
                break;

            case TrialConfigMetadataKey.TRIAL_CONFIG:
                if (this.kubeflowClusterConfig === undefined) {
                    this.log.error('kubeflow cluster config is not initialized');

                    return Promise.reject(new Error('kubeflow cluster config is not initialized'));
                }

                assert(this.kubeflowClusterConfig !== undefined);
                const kubeflowTrialJsonObjsect: object = JSON.parse(value);
                this.kubeflowTrialConfig = KubeflowTrialConfigFactory.generateKubeflowTrialConfig(
                    kubeflowTrialJsonObjsect,
                    this.kubeflowClusterConfig.operator
                );

                // Validate to make sure codeDir doesn't have too many files
                try {
                    await validateCodeDir(this.kubeflowTrialConfig.codeDir);
                } catch (error) {
                    this.log.error(error);

                    return Promise.reject(new Error(error));
                }
                break;
            case TrialConfigMetadataKey.VERSION_CHECK:
                this.versionCheck = (value === 'true' || value === 'True');
                break;
            case TrialConfigMetadataKey.LOG_COLLECTION:
                this.logCollection = value;
                break;
            default:
        }

        return Promise.resolve();
    }

193
194
    /**
     * upload code files to nfs or azureStroage
195
196
     * @param trialJobId
     * @param trialLocalTempFolder
197
198
199
     * return: trialJobOutputUrl
     */
    private async uploadCodeFiles(trialJobId: string, trialLocalTempFolder: string): Promise<string> {
200
        if (this.kubeflowClusterConfig === undefined) {
201
202
203
            throw new Error('Kubeflow Cluster config is not initialized');
        }

204
205
206
207
        if (this.kubeflowTrialConfig === undefined) {
            throw new Error('Kubeflow Trial config is not initialized');
        }

208
209
        let trialJobOutputUrl: string = '';

210
        assert(this.kubeflowClusterConfig.storage === undefined
211
            || this.kubeflowClusterConfig.storage === 'azureStorage'
212
213
            || this.kubeflowClusterConfig.storage === 'nfs');

214
215
216
217
218
        if (this.kubeflowClusterConfig.storage === 'azureStorage') {
            if (this.azureStorageClient === undefined) {
                throw new Error('azureStorageClient is not initialized');
            }
            try {
219
                //upload local files, including scripts for running the trial and configuration (e.g., hyperparameters) for the trial, to azure storage
220
                await AzureStorageClientUtility.uploadDirectory(this.azureStorageClient,
221
222
                                                                `nni/${getExperimentId()}/${trialJobId}`, this.azureStorageShare,
                                                                `${trialLocalTempFolder}`);
223
224
225
226
                //upload code files to azure storage
                await AzureStorageClientUtility.uploadDirectory(this.azureStorageClient,
                                                                `nni/${getExperimentId()}/${trialJobId}`, this.azureStorageShare,
                                                                `${this.kubeflowTrialConfig.codeDir}`);
227

228
229
                trialJobOutputUrl = `https://${this.azureStorageAccountName}.file.core.windows.net/${this.azureStorageShare}` + 
                                    `/${path.join('nni', getExperimentId(), trialJobId, 'output')}`;
230
            } catch (error) {
231
                this.log.error(error);
232

233
234
                return Promise.reject(error);
            }
235
236
        } else if (this.kubeflowClusterConfig.storage === 'nfs' || this.kubeflowClusterConfig.storage === undefined) {
            const nfsKubeflowClusterConfig: KubeflowClusterConfigNFS = <KubeflowClusterConfigNFS>this.kubeflowClusterConfig;
237
            // Creat work dir for current trial in NFS directory
238
            await cpp.exec(`mkdir -p ${this.trialLocalNFSTempFolder}/nni/${getExperimentId()}/${trialJobId}`);
239
            // Copy script files from local dir to NFS mounted dir
240
            await cpp.exec(`cp -r ${trialLocalTempFolder}/* ${this.trialLocalNFSTempFolder}/nni/${getExperimentId()}/${trialJobId}/.`);
241
242
            // Copy codeDir to NFS mounted dir
            await cpp.exec(`cp -r ${this.kubeflowTrialConfig.codeDir}/* ${this.trialLocalNFSTempFolder}/nni/${getExperimentId()}/${trialJobId}/.`);
243
            const nfsConfig: NFSConfig = nfsKubeflowClusterConfig.nfs;
244
            trialJobOutputUrl = `nfs://${nfsConfig.server}:${path.join(nfsConfig.path, 'nni', getExperimentId(), trialJobId, 'output')}`;
245
246
247
248
        }

        return Promise.resolve(trialJobOutputUrl);
    }
249

250
251
252
    private async prepareRunScript(trialLocalTempFolder: string, trialJobId: string, trialWorkingFolder: string, curTrialSequenceId: number,
                                   form: JobApplicationForm): Promise<void> {
        if (this.kubeflowClusterConfig === undefined) {
253
254
255
256
            throw new Error('Kubeflow Cluster config is not initialized');
        }

        // initialize kubeflow trial config to specific type
257
258
        let kubeflowTrialConfig: any;
        if (this.kubeflowClusterConfig.operator === 'tf-operator') {
259
            kubeflowTrialConfig = <KubeflowTrialConfigTensorflow>this.kubeflowTrialConfig;
260
        } else if (this.kubeflowClusterConfig.operator === 'pytorch-operator') {
261
            kubeflowTrialConfig = <KubeflowTrialConfigPytorch>this.kubeflowTrialConfig;
262
263
        } else {
            throw Error(`operator ${this.kubeflowClusterConfig.operator} is invalid`);
264
        }
265

266
        //create tmp trial working folder locally.
267
        await cpp.exec(`mkdir -p ${trialLocalTempFolder}`);
268
269
270
271
272
273
        const runScriptContent : string = CONTAINER_INSTALL_NNI_SHELL_FORMAT;
        // Write NNI installation file to local tmp files
        await fs.promises.writeFile(path.join(trialLocalTempFolder, 'install_nni.sh'), runScriptContent, { encoding: 'utf8' });

        // Write worker file content run_worker.sh to local tmp folders
        if (kubeflowTrialConfig.worker !== undefined) {
274
           const workerRunScriptContent: string = await this.generateRunScript('kubeflow', trialJobId, trialWorkingFolder,
275
276
277
                                                                               kubeflowTrialConfig.worker.command,
                                                                               curTrialSequenceId.toString(), 'worker',
                                                                               kubeflowTrialConfig.worker.gpuNum);
278
           await fs.promises.writeFile(path.join(trialLocalTempFolder, 'run_worker.sh'), workerRunScriptContent, { encoding: 'utf8' });
279
280
281
282
283
        }
        // Write parameter server file content run_ps.sh to local tmp folders
        if (this.kubeflowClusterConfig.operator === 'tf-operator') {
           const tensorflowTrialConfig: KubeflowTrialConfigTensorflow = <KubeflowTrialConfigTensorflow>this.kubeflowTrialConfig;
           if (tensorflowTrialConfig.ps !== undefined) {
284
               const psRunScriptContent: string = await this.generateRunScript('kubeflow', trialJobId, trialWorkingFolder,
285
286
287
                                                                               tensorflowTrialConfig.ps.command,
                                                                               curTrialSequenceId.toString(),
                                                                               'ps', tensorflowTrialConfig.ps.gpuNum);
288
289
               await fs.promises.writeFile(path.join(trialLocalTempFolder, 'run_ps.sh'), psRunScriptContent, { encoding: 'utf8' });
           }
290
291
292
        } else if (this.kubeflowClusterConfig.operator === 'pytorch-operator') {
           const pytorchTrialConfig: KubeflowTrialConfigPytorch = <KubeflowTrialConfigPytorch>this.kubeflowTrialConfig;
           if (pytorchTrialConfig.master !== undefined) {
293
               const masterRunScriptContent: string = await this.generateRunScript('kubeflow', trialJobId, trialWorkingFolder,
294
295
296
                                                                                   pytorchTrialConfig.master.command,
                                                                                   curTrialSequenceId.toString(), 'master',
                                                                                   pytorchTrialConfig.master.gpuNum);
297
298
               await fs.promises.writeFile(path.join(trialLocalTempFolder, 'run_master.sh'), masterRunScriptContent, { encoding: 'utf8' });
           }
299
300
301
302
        }
        // Write file content ( parameter.cfg ) to local tmp folders
        const trialForm : TrialJobApplicationForm = (<TrialJobApplicationForm>form);
        if (trialForm !== undefined && trialForm.hyperParameters !== undefined) {
303
           await fs.promises.writeFile(path.join(trialLocalTempFolder, generateParamFileName(trialForm.hyperParameters)),
304
305
                                       trialForm.hyperParameters.value, { encoding: 'utf8' });
        }
306
    }
307

308
    private async prepareKubeflowConfig(trialJobId: string, trialWorkingFolder: string, kubeflowJobName: string): Promise<any> {
309
        if (this.kubeflowClusterConfig === undefined) {
310
311
312
            throw new Error('Kubeflow Cluster config is not initialized');
        }

313
        if (this.kubeflowTrialConfig === undefined) {
314
315
316
317
            throw new Error('Kubeflow trial config is not initialized');
        }

        // initialize kubeflow trial config to specific type
318
319
        let kubeflowTrialConfig: any;
        if (this.kubeflowClusterConfig.operator === 'tf-operator') {
320
            kubeflowTrialConfig = <KubeflowTrialConfigTensorflow>this.kubeflowTrialConfig;
321
        } else if (this.kubeflowClusterConfig.operator === 'pytorch-operator') {
322
            kubeflowTrialConfig = <KubeflowTrialConfigPytorch>this.kubeflowTrialConfig;
323
324
        } else {
            throw Error(`operator ${this.kubeflowClusterConfig.operator} is invalid`);
325
        }
326

327
        const workerPodResources : any = {};
328
        if (kubeflowTrialConfig.worker !== undefined) {
329
            workerPodResources.requests = this.generatePodResource(kubeflowTrialConfig.worker.memoryMB, kubeflowTrialConfig.worker.cpuNum,
330
                                                                   kubeflowTrialConfig.worker.gpuNum);
331
        }
332
        workerPodResources.limits = {...workerPodResources.requests};
333

334
335
336
337
        const nonWorkerResources : any = {};
        if (this.kubeflowClusterConfig.operator === 'tf-operator') {
            const tensorflowTrialConfig: KubeflowTrialConfigTensorflow = <KubeflowTrialConfigTensorflow>this.kubeflowTrialConfig;
            if (tensorflowTrialConfig.ps !== undefined) {
338
                nonWorkerResources.requests = this.generatePodResource(tensorflowTrialConfig.ps.memoryMB, tensorflowTrialConfig.ps.cpuNum,
339
340
                                                                       tensorflowTrialConfig.ps.gpuNum);
                nonWorkerResources.limits = {...nonWorkerResources.requests};
341
            }
342
343
        } else if (this.kubeflowClusterConfig.operator === 'pytorch-operator') {
            const pyTorchTrialConfig: KubeflowTrialConfigPytorch = <KubeflowTrialConfigPytorch>this.kubeflowTrialConfig;
344
            nonWorkerResources.requests = this.generatePodResource(pyTorchTrialConfig.master.memoryMB, pyTorchTrialConfig.master.cpuNum,
345
346
                                                                   pyTorchTrialConfig.master.gpuNum);
            nonWorkerResources.limits = {...nonWorkerResources.requests};
347
348
349
        }

        // Generate kubeflow job resource config object
350
351
        const kubeflowJobConfig: any = this.generateKubeflowJobConfig(trialJobId, trialWorkingFolder, kubeflowJobName, workerPodResources,
                                                                      nonWorkerResources);
352
353

        return Promise.resolve(kubeflowJobConfig);
354
    }
355
356
357
358
359
360
361
362
363

    /**
     * Generate kubeflow resource config file
     * @param trialJobId trial job id
     * @param trialWorkingFolder working folder
     * @param kubeflowJobName job name
     * @param workerPodResources worker pod template
     * @param nonWorkerPodResources non-worker pod template, like ps or master
     */
364
365
366
    private generateKubeflowJobConfig(trialJobId: string, trialWorkingFolder: string, kubeflowJobName : string, workerPodResources : any,
                                      nonWorkerPodResources?: any) : any {
        if (this.kubeflowClusterConfig === undefined) {
367
368
369
            throw new Error('Kubeflow Cluster config is not initialized');
        }

370
        if (this.kubeflowTrialConfig === undefined) {
371
372
373
            throw new Error('Kubeflow trial config is not initialized');
        }

374
        if (this.kubernetesCRDClient === undefined) {
375
376
377
378
            throw new Error('Kubeflow operator client is not initialized');
        }

        const replicaSpecsObj: any = {};
379
        const replicaSpecsObjMap: Map<string, object> = new Map<string, object>();
380

381
382
        if (this.kubeflowTrialConfig.operatorType === 'tf-operator') {
            const tensorflowTrialConfig: KubeflowTrialConfigTensorflow = <KubeflowTrialConfigTensorflow>this.kubeflowTrialConfig;
383
            replicaSpecsObj.Worker = this.generateReplicaConfig(trialWorkingFolder, tensorflowTrialConfig.worker.replicas,
384
385
                                                                tensorflowTrialConfig.worker.image, 'run_worker.sh', workerPodResources);
            if (tensorflowTrialConfig.ps !== undefined) {
386
                replicaSpecsObj.Ps = this.generateReplicaConfig(trialWorkingFolder, tensorflowTrialConfig.ps.replicas,
387
                                                                tensorflowTrialConfig.ps.image, 'run_ps.sh', nonWorkerPodResources);
388
            }
389
390
391
392
            replicaSpecsObjMap.set(this.kubernetesCRDClient.jobKind, {tfReplicaSpecs: replicaSpecsObj});
        } else if (this.kubeflowTrialConfig.operatorType === 'pytorch-operator') {
            const pytorchTrialConfig: KubeflowTrialConfigPytorch = <KubeflowTrialConfigPytorch>this.kubeflowTrialConfig;
            if (pytorchTrialConfig.worker !== undefined) {
393
                replicaSpecsObj.Worker = this.generateReplicaConfig(trialWorkingFolder, pytorchTrialConfig.worker.replicas,
394
                                                                    pytorchTrialConfig.worker.image, 'run_worker.sh', workerPodResources);
395
            }
396
            replicaSpecsObj.Master = this.generateReplicaConfig(trialWorkingFolder, pytorchTrialConfig.master.replicas,
397
                                                                pytorchTrialConfig.master.image, 'run_master.sh', nonWorkerPodResources);
398

399
            replicaSpecsObjMap.set(this.kubernetesCRDClient.jobKind, {pytorchReplicaSpecs: replicaSpecsObj});
400
401
402
403
404
        }

        return {
            apiVersion: `kubeflow.org/${this.kubernetesCRDClient.apiVersion}`,
            kind: this.kubernetesCRDClient.jobKind,
405
            metadata: {
406
407
408
409
410
411
412
413
414
                name: kubeflowJobName,
                namespace: 'default',
                labels: {
                    app: this.NNI_KUBERNETES_TRIAL_LABEL,
                    expId: getExperimentId(),
                    trialId: trialJobId
                }
            },
            spec: replicaSpecsObjMap.get(this.kubernetesCRDClient.jobKind)
415
        };
416
417
418
419
420
421
422
423
424
425
    }

    /**
     * Generate tf-operator's tfjobs replica config section
     * @param trialWorkingFolder trial working folder
     * @param replicaNumber replica number
     * @param replicaImage image
     * @param runScriptFile script file name
     * @param podResources pod resource config section
     */
426
427
428
    private generateReplicaConfig(trialWorkingFolder: string, replicaNumber: number, replicaImage: string, runScriptFile: string,
                                  podResources: any): any {
        if (this.kubeflowClusterConfig === undefined) {
429
430
431
            throw new Error('Kubeflow Cluster config is not initialized');
        }

432
        if (this.kubeflowTrialConfig === undefined) {
433
434
435
            throw new Error('Kubeflow trial config is not initialized');
        }

436
        if (this.kubernetesCRDClient === undefined) {
437
438
439
            throw new Error('Kubeflow operator client is not initialized');
        }

440
441
        const volumeSpecMap: Map<string, object> = new Map<string, object>();
        if (this.kubeflowClusterConfig.storageType === 'azureStorage') {
442
443
444
445
446
447
448
449
            volumeSpecMap.set('nniVolumes', [
            {
                    name: 'nni-vol',
                    azureFile: {
                        secretName: `${this.azureStorageSecretName}`,
                        shareName: `${this.azureStorageShare}`,
                        readonly: false
                    }
450
451
452
            }]);
        } else {
            const nfsKubeflowClusterConfig: KubeflowClusterConfigNFS = <KubeflowClusterConfigNFS> this.kubeflowClusterConfig;
453
454
455
456
457
458
459
            volumeSpecMap.set('nniVolumes', [
            {
                name: 'nni-vol',
                nfs: {
                    server: `${nfsKubeflowClusterConfig.nfs.server}`,
                    path: `${nfsKubeflowClusterConfig.nfs.path}`
                }
460
            }]);
461
462
463
464
465
466
        }

        return {
            replicas: replicaNumber,
            template: {
                metadata: {
467
                    // tslint:disable-next-line:no-null-keyword
468
469
470
471
472
473
474
475
476
                    creationTimestamp: null
                },
                spec: {
                    containers: [
                    {
                        // Kubeflow tensorflow operator requires that containers' name must be tensorflow
                        // TODO: change the name based on operator's type
                        name: this.kubernetesCRDClient.containerName,
                        image: replicaImage,
477
                        args: ['sh', `${path.join(trialWorkingFolder, runScriptFile)}`],
478
479
480
481
482
483
484
485
486
487
488
489
490
491
                        volumeMounts: [
                        {
                            name: 'nni-vol',
                            mountPath: this.CONTAINER_MOUNT_PATH
                        }],
                        resources: podResources
                    }],
                    restartPolicy: 'ExitCode',
                    volumes: volumeSpecMap.get('nniVolumes')
                }
            }
        };
    }
}
492
493
// tslint:enable: no-unsafe-any no-any
export { KubeflowTrainingService };