kubeflowTrainingService.ts 25.2 KB
Newer Older
liuzhe-lz's avatar
liuzhe-lz committed
1
2
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
3

4
'use strict';
5
6
7
8
9

import * as assert from 'assert';
import * as cpp from 'child-process-promise';
import * as fs from 'fs';
import * as path from 'path';
10
import * as component from '../../../common/component';
11
12
13

import { getExperimentId } from '../../../common/experimentStartupInfo';
import {
14
    NNIManagerIpConfig, TrialJobApplicationForm, TrialJobDetail, TrialJobStatus
15
16
} from '../../../common/trainingService';
import { delay, generateParamFileName, getExperimentRootDir, uniqueString } from '../../../common/utils';
17
18
import { CONTAINER_INSTALL_NNI_SHELL_FORMAT } from '../../common/containerJobData';
import { TrialConfigMetadataKey } from '../../common/trialConfigMetadataKey';
19
20
import { validateCodeDir } from '../../common/util';
import { AzureStorageClientUtility } from '../azureStorageClientUtils';
21
22
23
import { NFSConfig } from '../kubernetesConfig';
import { KubernetesTrialJobDetail } from '../kubernetesData';
import { KubernetesTrainingService } from '../kubernetesTrainingService';
24
import { KubeflowOperatorClientFactory } from './kubeflowApiClient';
25
26
27
import { KubeflowClusterConfig, KubeflowClusterConfigAzure, KubeflowClusterConfigFactory, KubeflowClusterConfigNFS,
    KubeflowTrialConfig, KubeflowTrialConfigFactory, KubeflowTrialConfigPytorch, KubeflowTrialConfigTensorflow
} from './kubeflowConfig';
28
import { KubeflowJobInfoCollector } from './kubeflowJobInfoCollector';
29
import { KubeflowJobRestServer } from './kubeflowJobRestServer';
30

31
// tslint:disable: no-unsafe-any no-any
32
33
34
35
36
37
38
39
/**
 * Training Service implementation for Kubeflow
 * Refer https://github.com/kubeflow/kubeflow for more info about Kubeflow
 */
@component.Singleton
class KubeflowTrainingService extends KubernetesTrainingService implements KubernetesTrainingService {
    private kubeflowClusterConfig?: KubeflowClusterConfig;
    private kubeflowTrialConfig?: KubeflowTrialConfig;
40
    private readonly kubeflowJobInfoCollector: KubeflowJobInfoCollector;
41
42

    constructor() {
43
        super();
44
        this.kubeflowJobInfoCollector = new KubeflowJobInfoCollector(this.trialJobsMap);
45
        this.experimentId = getExperimentId();
chicm-ms's avatar
chicm-ms committed
46
        this.log.info('Construct Kubeflow training service.');
47
48
49
    }

    public async run(): Promise<void> {
chicm-ms's avatar
chicm-ms committed
50
        this.log.info('Run Kubeflow training service.');
51
        this.kubernetesJobRestServer = component.get(KubeflowJobRestServer);
52
        if (this.kubernetesJobRestServer === undefined) {
53
54
55
            throw new Error('kubernetesJobRestServer not initialized!');
        }
        await this.kubernetesJobRestServer.start();
56
        this.kubernetesJobRestServer.setEnableVersionCheck = this.versionCheck;
57
58
        this.log.info(`Kubeflow Training service rest server listening on: ${this.kubernetesJobRestServer.endPoint}`);
        while (!this.stopping) {
59
            // collect metrics for Kubeflow jobs by interacting with Kubernetes API server
60
61
            await delay(3000);
            await this.kubeflowJobInfoCollector.retrieveTrialStatus(this.kubernetesCRDClient);
62
            if (this.kubernetesJobRestServer.getErrorMessage !== undefined) {
63
64
65
                throw new Error(this.kubernetesJobRestServer.getErrorMessage);
                this.stopping = true;
            }
66
        }
chicm-ms's avatar
chicm-ms committed
67
        this.log.info('Kubeflow training service exit.');
68
69
    }

70
    public async submitTrialJob(form: TrialJobApplicationForm): Promise<TrialJobDetail> {
71
        if (this.kubernetesCRDClient === undefined) {
72
73
74
            throw new Error('Kubeflow job operator client is undefined');
        }

75
        if (this.kubernetesRestServerPort === undefined) {
76
77
78
79
80
            const restServer: KubeflowJobRestServer = component.get(KubeflowJobRestServer);
            this.kubernetesRestServerPort = restServer.clusterRestServerPort;
        }
        const trialJobId: string = uniqueString(5);
        const trialWorkingFolder: string = path.join(this.CONTAINER_MOUNT_PATH, 'nni', getExperimentId(), trialJobId);
81
        const kubeflowJobName: string = `nni-exp-${this.experimentId}-trial-${trialJobId}`.toLowerCase();
82
83
        const trialLocalTempFolder: string = path.join(getExperimentRootDir(), 'trials-local', trialJobId);
        //prepare the runscript
84
        await this.prepareRunScript(trialLocalTempFolder, trialJobId, trialWorkingFolder, form);
85
86
        //upload files to sotrage
        const trialJobOutputUrl: string = await this.uploadCodeFiles(trialJobId, trialLocalTempFolder);
87
88
89
90
        let initStatus: TrialJobStatus = 'WAITING';
        if (!trialJobOutputUrl) {
            initStatus = 'FAILED';
        }
91
92
        const trialJobDetail: KubernetesTrialJobDetail = new KubernetesTrialJobDetail(
            trialJobId,
93
            initStatus,
94
95
96
97
98
99
            Date.now(),
            trialWorkingFolder,
            form,
            kubeflowJobName,
            trialJobOutputUrl
        );
100
101

        // Generate kubeflow job resource config object
102
103
104
105
        const kubeflowJobConfig: any = await this.prepareKubeflowConfig(trialJobId, trialWorkingFolder, kubeflowJobName);
        // Create kubeflow job based on generated kubeflow job resource config
        await this.kubernetesCRDClient.createKubernetesJob(kubeflowJobConfig);

106
        // Set trial job detail until create Kubeflow job successfully
107
108
109
110
        this.trialJobsMap.set(trialJobId, trialJobDetail);

        return Promise.resolve(trialJobDetail);
    }
111

112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
    // tslint:disable:no-redundant-jsdoc
    public async setClusterMetadata(key: string, value: string): Promise<void> {
        switch (key) {
            case TrialConfigMetadataKey.NNI_MANAGER_IP:
                this.nniManagerIpConfig = <NNIManagerIpConfig>JSON.parse(value);
                break;

            case TrialConfigMetadataKey.KUBEFLOW_CLUSTER_CONFIG:
                const kubeflowClusterJsonObject: object = JSON.parse(value);
                this.kubeflowClusterConfig = KubeflowClusterConfigFactory.generateKubeflowClusterConfig(kubeflowClusterJsonObject);
                if (this.kubeflowClusterConfig.storageType === 'azureStorage') {
                    const azureKubeflowClusterConfig: KubeflowClusterConfigAzure = <KubeflowClusterConfigAzure>this.kubeflowClusterConfig;
                    this.azureStorageAccountName = azureKubeflowClusterConfig.azureStorage.accountName;
                    this.azureStorageShare = azureKubeflowClusterConfig.azureStorage.azureShare;
                    await this.createAzureStorage(
                        azureKubeflowClusterConfig.keyVault.vaultName,
                        azureKubeflowClusterConfig.keyVault.name,
                        azureKubeflowClusterConfig.azureStorage.accountName,
                        azureKubeflowClusterConfig.azureStorage.azureShare
                    );
                } else if (this.kubeflowClusterConfig.storageType === 'nfs') {
                    const nfsKubeflowClusterConfig: KubeflowClusterConfigNFS = <KubeflowClusterConfigNFS>this.kubeflowClusterConfig;
                    await this.createNFSStorage(
                        nfsKubeflowClusterConfig.nfs.server,
                        nfsKubeflowClusterConfig.nfs.path
                    );
                }
139
140
                this.kubernetesCRDClient = KubeflowOperatorClientFactory.createClient(
                    this.kubeflowClusterConfig.operator, this.kubeflowClusterConfig.apiVersion);
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
                break;

            case TrialConfigMetadataKey.TRIAL_CONFIG:
                if (this.kubeflowClusterConfig === undefined) {
                    this.log.error('kubeflow cluster config is not initialized');

                    return Promise.reject(new Error('kubeflow cluster config is not initialized'));
                }

                assert(this.kubeflowClusterConfig !== undefined);
                const kubeflowTrialJsonObjsect: object = JSON.parse(value);
                this.kubeflowTrialConfig = KubeflowTrialConfigFactory.generateKubeflowTrialConfig(
                    kubeflowTrialJsonObjsect,
                    this.kubeflowClusterConfig.operator
                );

                // Validate to make sure codeDir doesn't have too many files
                try {
                    await validateCodeDir(this.kubeflowTrialConfig.codeDir);
                } catch (error) {
                    this.log.error(error);

                    return Promise.reject(new Error(error));
                }
                break;
            case TrialConfigMetadataKey.VERSION_CHECK:
                this.versionCheck = (value === 'true' || value === 'True');
                break;
            case TrialConfigMetadataKey.LOG_COLLECTION:
                this.logCollection = value;
                break;
            default:
        }

        return Promise.resolve();
    }

178
179
    /**
     * upload code files to nfs or azureStroage
180
181
     * @param trialJobId
     * @param trialLocalTempFolder
182
183
184
     * return: trialJobOutputUrl
     */
    private async uploadCodeFiles(trialJobId: string, trialLocalTempFolder: string): Promise<string> {
185
        if (this.kubeflowClusterConfig === undefined) {
186
187
188
            throw new Error('Kubeflow Cluster config is not initialized');
        }

189
190
191
192
        if (this.kubeflowTrialConfig === undefined) {
            throw new Error('Kubeflow Trial config is not initialized');
        }

193
194
        let trialJobOutputUrl: string = '';

195
        assert(this.kubeflowClusterConfig.storage === undefined
196
            || this.kubeflowClusterConfig.storage === 'azureStorage'
197
198
            || this.kubeflowClusterConfig.storage === 'nfs');

199
200
201
202
        if (this.kubeflowClusterConfig.storage === 'azureStorage') {
            if (this.azureStorageClient === undefined) {
                throw new Error('azureStorageClient is not initialized');
            }
203
204
            const azureKubeflowClusterConfig: KubeflowClusterConfigAzure = <KubeflowClusterConfigAzure>this.kubeflowClusterConfig;
            trialJobOutputUrl = await this.uploadFilesToAzureStorage(trialJobId, trialLocalTempFolder, this.kubeflowTrialConfig.codeDir, azureKubeflowClusterConfig.uploadRetryCount);
205
206
        } else if (this.kubeflowClusterConfig.storage === 'nfs' || this.kubeflowClusterConfig.storage === undefined) {
            const nfsKubeflowClusterConfig: KubeflowClusterConfigNFS = <KubeflowClusterConfigNFS>this.kubeflowClusterConfig;
207
            // Creat work dir for current trial in NFS directory
208
            await cpp.exec(`mkdir -p ${this.trialLocalNFSTempFolder}/nni/${getExperimentId()}/${trialJobId}`);
209
            // Copy script files from local dir to NFS mounted dir
210
            await cpp.exec(`cp -r ${trialLocalTempFolder}/* ${this.trialLocalNFSTempFolder}/nni/${getExperimentId()}/${trialJobId}/.`);
211
212
            // Copy codeDir to NFS mounted dir
            await cpp.exec(`cp -r ${this.kubeflowTrialConfig.codeDir}/* ${this.trialLocalNFSTempFolder}/nni/${getExperimentId()}/${trialJobId}/.`);
213
            const nfsConfig: NFSConfig = nfsKubeflowClusterConfig.nfs;
214
            trialJobOutputUrl = `nfs://${nfsConfig.server}:${path.join(nfsConfig.path, 'nni', getExperimentId(), trialJobId, 'output')}`;
215
216
217
218
        }

        return Promise.resolve(trialJobOutputUrl);
    }
219

220
221
    private async prepareRunScript(trialLocalTempFolder: string, trialJobId: string, trialWorkingFolder: string,
                                   form: TrialJobApplicationForm): Promise<void> {
222
        if (this.kubeflowClusterConfig === undefined) {
223
224
225
226
            throw new Error('Kubeflow Cluster config is not initialized');
        }

        // initialize kubeflow trial config to specific type
227
228
        let kubeflowTrialConfig: any;
        if (this.kubeflowClusterConfig.operator === 'tf-operator') {
229
            kubeflowTrialConfig = <KubeflowTrialConfigTensorflow>this.kubeflowTrialConfig;
230
        } else if (this.kubeflowClusterConfig.operator === 'pytorch-operator') {
231
            kubeflowTrialConfig = <KubeflowTrialConfigPytorch>this.kubeflowTrialConfig;
232
233
        } else {
            throw Error(`operator ${this.kubeflowClusterConfig.operator} is invalid`);
234
        }
235

236
        //create tmp trial working folder locally.
237
        await cpp.exec(`mkdir -p ${trialLocalTempFolder}`);
238
239
240
241
242
243
        const runScriptContent : string = CONTAINER_INSTALL_NNI_SHELL_FORMAT;
        // Write NNI installation file to local tmp files
        await fs.promises.writeFile(path.join(trialLocalTempFolder, 'install_nni.sh'), runScriptContent, { encoding: 'utf8' });

        // Write worker file content run_worker.sh to local tmp folders
        if (kubeflowTrialConfig.worker !== undefined) {
244
           const workerRunScriptContent: string = await this.generateRunScript('kubeflow', trialJobId, trialWorkingFolder,
245
                                                                               kubeflowTrialConfig.worker.command,
246
                                                                               form.sequenceId.toString(), 'worker',
247
                                                                               kubeflowTrialConfig.worker.gpuNum);
248
           await fs.promises.writeFile(path.join(trialLocalTempFolder, 'run_worker.sh'), workerRunScriptContent, { encoding: 'utf8' });
249
250
251
252
253
        }
        // Write parameter server file content run_ps.sh to local tmp folders
        if (this.kubeflowClusterConfig.operator === 'tf-operator') {
           const tensorflowTrialConfig: KubeflowTrialConfigTensorflow = <KubeflowTrialConfigTensorflow>this.kubeflowTrialConfig;
           if (tensorflowTrialConfig.ps !== undefined) {
254
               const psRunScriptContent: string = await this.generateRunScript('kubeflow', trialJobId, trialWorkingFolder,
255
                                                                               tensorflowTrialConfig.ps.command,
256
                                                                               form.sequenceId.toString(),
257
                                                                               'ps', tensorflowTrialConfig.ps.gpuNum);
258
259
               await fs.promises.writeFile(path.join(trialLocalTempFolder, 'run_ps.sh'), psRunScriptContent, { encoding: 'utf8' });
           }
260
261
262
        } else if (this.kubeflowClusterConfig.operator === 'pytorch-operator') {
           const pytorchTrialConfig: KubeflowTrialConfigPytorch = <KubeflowTrialConfigPytorch>this.kubeflowTrialConfig;
           if (pytorchTrialConfig.master !== undefined) {
263
               const masterRunScriptContent: string = await this.generateRunScript('kubeflow', trialJobId, trialWorkingFolder,
264
                                                                                   pytorchTrialConfig.master.command,
265
                                                                                   form.sequenceId.toString(), 'master',
266
                                                                                   pytorchTrialConfig.master.gpuNum);
267
268
               await fs.promises.writeFile(path.join(trialLocalTempFolder, 'run_master.sh'), masterRunScriptContent, { encoding: 'utf8' });
           }
269
270
        }
        // Write file content ( parameter.cfg ) to local tmp folders
271
272
273
        if (form !== undefined) {
           await fs.promises.writeFile(path.join(trialLocalTempFolder, generateParamFileName(form.hyperParameters)),
                                       form.hyperParameters.value, { encoding: 'utf8' });
274
        }
275
    }
276

277
    private async prepareKubeflowConfig(trialJobId: string, trialWorkingFolder: string, kubeflowJobName: string): Promise<any> {
278
        if (this.kubeflowClusterConfig === undefined) {
279
280
281
            throw new Error('Kubeflow Cluster config is not initialized');
        }

282
        if (this.kubeflowTrialConfig === undefined) {
283
284
285
286
            throw new Error('Kubeflow trial config is not initialized');
        }

        // initialize kubeflow trial config to specific type
287
288
        let kubeflowTrialConfig: any;
        if (this.kubeflowClusterConfig.operator === 'tf-operator') {
289
            kubeflowTrialConfig = <KubeflowTrialConfigTensorflow>this.kubeflowTrialConfig;
290
        } else if (this.kubeflowClusterConfig.operator === 'pytorch-operator') {
291
            kubeflowTrialConfig = <KubeflowTrialConfigPytorch>this.kubeflowTrialConfig;
292
293
        } else {
            throw Error(`operator ${this.kubeflowClusterConfig.operator} is invalid`);
294
        }
295

296
        const workerPodResources : any = {};
297
        if (kubeflowTrialConfig.worker !== undefined) {
298
            workerPodResources.requests = this.generatePodResource(kubeflowTrialConfig.worker.memoryMB, kubeflowTrialConfig.worker.cpuNum,
299
                                                                   kubeflowTrialConfig.worker.gpuNum);
300
        }
301
        workerPodResources.limits = {...workerPodResources.requests};
302

303
304
305
306
        const nonWorkerResources : any = {};
        if (this.kubeflowClusterConfig.operator === 'tf-operator') {
            const tensorflowTrialConfig: KubeflowTrialConfigTensorflow = <KubeflowTrialConfigTensorflow>this.kubeflowTrialConfig;
            if (tensorflowTrialConfig.ps !== undefined) {
307
                nonWorkerResources.requests = this.generatePodResource(tensorflowTrialConfig.ps.memoryMB, tensorflowTrialConfig.ps.cpuNum,
308
309
                                                                       tensorflowTrialConfig.ps.gpuNum);
                nonWorkerResources.limits = {...nonWorkerResources.requests};
310
            }
311
312
        } else if (this.kubeflowClusterConfig.operator === 'pytorch-operator') {
            const pyTorchTrialConfig: KubeflowTrialConfigPytorch = <KubeflowTrialConfigPytorch>this.kubeflowTrialConfig;
313
            nonWorkerResources.requests = this.generatePodResource(pyTorchTrialConfig.master.memoryMB, pyTorchTrialConfig.master.cpuNum,
314
315
                                                                   pyTorchTrialConfig.master.gpuNum);
            nonWorkerResources.limits = {...nonWorkerResources.requests};
316
317
318
        }

        // Generate kubeflow job resource config object
319
        const kubeflowJobConfig: any = await this.generateKubeflowJobConfig(trialJobId, trialWorkingFolder, kubeflowJobName, workerPodResources,
320
                                                                      nonWorkerResources);
321
322

        return Promise.resolve(kubeflowJobConfig);
323
    }
324
325
326
327
328
329
330
331
332

    /**
     * Generate kubeflow resource config file
     * @param trialJobId trial job id
     * @param trialWorkingFolder working folder
     * @param kubeflowJobName job name
     * @param workerPodResources worker pod template
     * @param nonWorkerPodResources non-worker pod template, like ps or master
     */
333
334
    private async generateKubeflowJobConfig(trialJobId: string, trialWorkingFolder: string, kubeflowJobName : string, workerPodResources : any,
                                            nonWorkerPodResources?: any) : Promise<any> {
335
        if (this.kubeflowClusterConfig === undefined) {
336
337
338
            throw new Error('Kubeflow Cluster config is not initialized');
        }

339
        if (this.kubeflowTrialConfig === undefined) {
340
341
342
            throw new Error('Kubeflow trial config is not initialized');
        }

343
        if (this.kubernetesCRDClient === undefined) {
344
345
346
347
            throw new Error('Kubeflow operator client is not initialized');
        }

        const replicaSpecsObj: any = {};
348
349
350
        const replicaSpecsObjMap: Map<string, object> = new Map<string, object>();
        if (this.kubeflowTrialConfig.operatorType === 'tf-operator') {
            const tensorflowTrialConfig: KubeflowTrialConfigTensorflow = <KubeflowTrialConfigTensorflow>this.kubeflowTrialConfig;
351
            let privateRegistrySecretName = await this.createRegistrySecret(tensorflowTrialConfig.worker.privateRegistryAuthPath);
352
            replicaSpecsObj.Worker = this.generateReplicaConfig(trialWorkingFolder, tensorflowTrialConfig.worker.replicas,
353
                                                                tensorflowTrialConfig.worker.image, 'run_worker.sh', workerPodResources, privateRegistrySecretName);
354
            if (tensorflowTrialConfig.ps !== undefined) {
355
                let privateRegistrySecretName: string | undefined = await this.createRegistrySecret(tensorflowTrialConfig.ps.privateRegistryAuthPath);
356
                replicaSpecsObj.Ps = this.generateReplicaConfig(trialWorkingFolder, tensorflowTrialConfig.ps.replicas,
357
                                                                tensorflowTrialConfig.ps.image, 'run_ps.sh', nonWorkerPodResources, privateRegistrySecretName);
358
            }
359
360
361
362
            replicaSpecsObjMap.set(this.kubernetesCRDClient.jobKind, {tfReplicaSpecs: replicaSpecsObj});
        } else if (this.kubeflowTrialConfig.operatorType === 'pytorch-operator') {
            const pytorchTrialConfig: KubeflowTrialConfigPytorch = <KubeflowTrialConfigPytorch>this.kubeflowTrialConfig;
            if (pytorchTrialConfig.worker !== undefined) {
363
                let privateRegistrySecretName: string | undefined = await this.createRegistrySecret(pytorchTrialConfig.worker.privateRegistryAuthPath);
364
                replicaSpecsObj.Worker = this.generateReplicaConfig(trialWorkingFolder, pytorchTrialConfig.worker.replicas,
365
                                                                    pytorchTrialConfig.worker.image, 'run_worker.sh', workerPodResources, privateRegistrySecretName);
366
            }
367
            let privateRegistrySecretName: string | undefined = await this.createRegistrySecret(pytorchTrialConfig.master.privateRegistryAuthPath);
368
            replicaSpecsObj.Master = this.generateReplicaConfig(trialWorkingFolder, pytorchTrialConfig.master.replicas,
369
                                                                pytorchTrialConfig.master.image, 'run_master.sh', nonWorkerPodResources, privateRegistrySecretName);
370

371
            replicaSpecsObjMap.set(this.kubernetesCRDClient.jobKind, {pytorchReplicaSpecs: replicaSpecsObj});
372
373
        }

374
        return Promise.resolve({
375
376
            apiVersion: `kubeflow.org/${this.kubernetesCRDClient.apiVersion}`,
            kind: this.kubernetesCRDClient.jobKind,
377
            metadata: {
378
379
380
381
382
383
384
385
386
                name: kubeflowJobName,
                namespace: 'default',
                labels: {
                    app: this.NNI_KUBERNETES_TRIAL_LABEL,
                    expId: getExperimentId(),
                    trialId: trialJobId
                }
            },
            spec: replicaSpecsObjMap.get(this.kubernetesCRDClient.jobKind)
387
        });
388
389
390
391
392
393
394
395
396
397
    }

    /**
     * Generate tf-operator's tfjobs replica config section
     * @param trialWorkingFolder trial working folder
     * @param replicaNumber replica number
     * @param replicaImage image
     * @param runScriptFile script file name
     * @param podResources pod resource config section
     */
398
    private generateReplicaConfig(trialWorkingFolder: string, replicaNumber: number, replicaImage: string, runScriptFile: string,
399
                                  podResources: any, privateRegistrySecretName: string | undefined): any {
400
        if (this.kubeflowClusterConfig === undefined) {
401
402
403
            throw new Error('Kubeflow Cluster config is not initialized');
        }

404
        if (this.kubeflowTrialConfig === undefined) {
405
406
407
            throw new Error('Kubeflow trial config is not initialized');
        }

408
        if (this.kubernetesCRDClient === undefined) {
409
410
            throw new Error('Kubeflow operator client is not initialized');
        }
411
        // The config spec for volume field
412
413
        const volumeSpecMap: Map<string, object> = new Map<string, object>();
        if (this.kubeflowClusterConfig.storageType === 'azureStorage') {
414
415
416
417
418
419
420
421
            volumeSpecMap.set('nniVolumes', [
            {
                    name: 'nni-vol',
                    azureFile: {
                        secretName: `${this.azureStorageSecretName}`,
                        shareName: `${this.azureStorageShare}`,
                        readonly: false
                    }
422
423
424
            }]);
        } else {
            const nfsKubeflowClusterConfig: KubeflowClusterConfigNFS = <KubeflowClusterConfigNFS> this.kubeflowClusterConfig;
425
426
427
428
429
430
431
            volumeSpecMap.set('nniVolumes', [
            {
                name: 'nni-vol',
                nfs: {
                    server: `${nfsKubeflowClusterConfig.nfs.server}`,
                    path: `${nfsKubeflowClusterConfig.nfs.path}`
                }
432
            }]);
433
        }
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
        // The config spec for container field
        const containersSpecMap: Map<string, object> = new Map<string, object>(); 
        containersSpecMap.set('containers', [
        {
                // Kubeflow tensorflow operator requires that containers' name must be tensorflow
                // TODO: change the name based on operator's type
                name: this.kubernetesCRDClient.containerName,
                image: replicaImage,
                args: ['sh', `${path.join(trialWorkingFolder, runScriptFile)}`],
                volumeMounts: [
                {
                    name: 'nni-vol',
                    mountPath: this.CONTAINER_MOUNT_PATH
                }],
                resources: podResources
            }
        ]);
        let spec: any = {
            containers: containersSpecMap.get('containers'),
            restartPolicy: 'ExitCode',
            volumes: volumeSpecMap.get('nniVolumes')
        }
        if (privateRegistrySecretName) {
            spec.imagePullSecrets = [
                {
                    name: privateRegistrySecretName
                }]
        }
462
463
464
465
        return {
            replicas: replicaNumber,
            template: {
                metadata: {
466
                    // tslint:disable-next-line:no-null-keyword
467
468
                    creationTimestamp: null
                },
469
                spec: spec
470
            }
471
        }
472
473
    }
}
474
475
// tslint:enable: no-unsafe-any no-any
export { KubeflowTrainingService };