kubeflowTrainingService.ts 24.6 KB
Newer Older
liuzhe-lz's avatar
liuzhe-lz committed
1
2
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
3

4
'use strict';
5
6
7
8
9

import * as assert from 'assert';
import * as cpp from 'child-process-promise';
import * as fs from 'fs';
import * as path from 'path';
10
import * as component from '../../../common/component';
11
12
13

import { getExperimentId } from '../../../common/experimentStartupInfo';
import {
14
    NNIManagerIpConfig, TrialJobApplicationForm, TrialJobDetail, TrialJobStatus
15
16
} from '../../../common/trainingService';
import { delay, generateParamFileName, getExperimentRootDir, uniqueString } from '../../../common/utils';
17
18
import { CONTAINER_INSTALL_NNI_SHELL_FORMAT } from '../../common/containerJobData';
import { TrialConfigMetadataKey } from '../../common/trialConfigMetadataKey';
19
import { validateCodeDir } from '../../common/util';
20
21
22
import { NFSConfig } from '../kubernetesConfig';
import { KubernetesTrialJobDetail } from '../kubernetesData';
import { KubernetesTrainingService } from '../kubernetesTrainingService';
23
import { KubeflowOperatorClientFactory } from './kubeflowApiClient';
24
25
26
import { KubeflowClusterConfig, KubeflowClusterConfigAzure, KubeflowClusterConfigFactory, KubeflowClusterConfigNFS,
    KubeflowTrialConfig, KubeflowTrialConfigFactory, KubeflowTrialConfigPytorch, KubeflowTrialConfigTensorflow
} from './kubeflowConfig';
27
import { KubeflowJobInfoCollector } from './kubeflowJobInfoCollector';
28
import { KubeflowJobRestServer } from './kubeflowJobRestServer';
29
30
31
32
33
34
35
36
37

/**
 * Training Service implementation for Kubeflow
 * Refer https://github.com/kubeflow/kubeflow for more info about Kubeflow
 */
@component.Singleton
class KubeflowTrainingService extends KubernetesTrainingService implements KubernetesTrainingService {
    private kubeflowClusterConfig?: KubeflowClusterConfig;
    private kubeflowTrialConfig?: KubeflowTrialConfig;
38
    private readonly kubeflowJobInfoCollector: KubeflowJobInfoCollector;
39
40

    constructor() {
41
        super();
42
        this.kubeflowJobInfoCollector = new KubeflowJobInfoCollector(this.trialJobsMap);
43
        this.experimentId = getExperimentId();
chicm-ms's avatar
chicm-ms committed
44
        this.log.info('Construct Kubeflow training service.');
45
46
47
    }

    public async run(): Promise<void> {
chicm-ms's avatar
chicm-ms committed
48
        this.log.info('Run Kubeflow training service.');
49
        this.kubernetesJobRestServer = component.get(KubeflowJobRestServer);
50
        if (this.kubernetesJobRestServer === undefined) {
51
52
53
            throw new Error('kubernetesJobRestServer not initialized!');
        }
        await this.kubernetesJobRestServer.start();
54
        this.kubernetesJobRestServer.setEnableVersionCheck = this.versionCheck;
55
56
        this.log.info(`Kubeflow Training service rest server listening on: ${this.kubernetesJobRestServer.endPoint}`);
        while (!this.stopping) {
57
            // collect metrics for Kubeflow jobs by interacting with Kubernetes API server
58
59
            await delay(3000);
            await this.kubeflowJobInfoCollector.retrieveTrialStatus(this.kubernetesCRDClient);
60
            if (this.kubernetesJobRestServer.getErrorMessage !== undefined) {
61
62
63
                throw new Error(this.kubernetesJobRestServer.getErrorMessage);
                this.stopping = true;
            }
64
        }
chicm-ms's avatar
chicm-ms committed
65
        this.log.info('Kubeflow training service exit.');
66
67
    }

68
    public async submitTrialJob(form: TrialJobApplicationForm): Promise<TrialJobDetail> {
69
        if (this.kubernetesCRDClient === undefined) {
70
71
72
            throw new Error('Kubeflow job operator client is undefined');
        }

73
        if (this.kubernetesRestServerPort === undefined) {
74
75
76
            const restServer: KubeflowJobRestServer = component.get(KubeflowJobRestServer);
            this.kubernetesRestServerPort = restServer.clusterRestServerPort;
        }
77
78
79
80
81
82

        // upload code Dir to storage
        if (this.copyExpCodeDirPromise !== undefined) {
            await this.copyExpCodeDirPromise;
        }

83
84
        const trialJobId: string = uniqueString(5);
        const trialWorkingFolder: string = path.join(this.CONTAINER_MOUNT_PATH, 'nni', getExperimentId(), trialJobId);
85
        const kubeflowJobName: string = `nni-exp-${this.experimentId}-trial-${trialJobId}`.toLowerCase();
86
87
        const trialLocalTempFolder: string = path.join(getExperimentRootDir(), 'trials-local', trialJobId);
        //prepare the runscript
88
        await this.prepareRunScript(trialLocalTempFolder, trialJobId, trialWorkingFolder, form);
89
90
        //upload script files to sotrage
        const trialJobOutputUrl: string = await this.uploadFolder(trialLocalTempFolder, `nni/${getExperimentId()}/${trialJobId}`);
91
92
93
94
        let initStatus: TrialJobStatus = 'WAITING';
        if (!trialJobOutputUrl) {
            initStatus = 'FAILED';
        }
95
96
        const trialJobDetail: KubernetesTrialJobDetail = new KubernetesTrialJobDetail(
            trialJobId,
97
            initStatus,
98
99
100
101
102
103
            Date.now(),
            trialWorkingFolder,
            form,
            kubeflowJobName,
            trialJobOutputUrl
        );
104
105

        // Generate kubeflow job resource config object
106
107
108
109
        const kubeflowJobConfig: any = await this.prepareKubeflowConfig(trialJobId, trialWorkingFolder, kubeflowJobName);
        // Create kubeflow job based on generated kubeflow job resource config
        await this.kubernetesCRDClient.createKubernetesJob(kubeflowJobConfig);

110
        // Set trial job detail until create Kubeflow job successfully
111
112
113
114
        this.trialJobsMap.set(trialJobId, trialJobDetail);

        return Promise.resolve(trialJobDetail);
    }
115

116
117
118
119
120
121
    public async setClusterMetadata(key: string, value: string): Promise<void> {
        switch (key) {
            case TrialConfigMetadataKey.NNI_MANAGER_IP:
                this.nniManagerIpConfig = <NNIManagerIpConfig>JSON.parse(value);
                break;

chicm-ms's avatar
chicm-ms committed
122
            case TrialConfigMetadataKey.KUBEFLOW_CLUSTER_CONFIG: {
123
124
125
126
127
128
129
130
                const kubeflowClusterJsonObject: object = JSON.parse(value);
                this.kubeflowClusterConfig = KubeflowClusterConfigFactory.generateKubeflowClusterConfig(kubeflowClusterJsonObject);
                if (this.kubeflowClusterConfig.storageType === 'azureStorage') {
                    const azureKubeflowClusterConfig: KubeflowClusterConfigAzure = <KubeflowClusterConfigAzure>this.kubeflowClusterConfig;
                    this.azureStorageAccountName = azureKubeflowClusterConfig.azureStorage.accountName;
                    this.azureStorageShare = azureKubeflowClusterConfig.azureStorage.azureShare;
                    await this.createAzureStorage(
                        azureKubeflowClusterConfig.keyVault.vaultName,
chicm-ms's avatar
chicm-ms committed
131
                        azureKubeflowClusterConfig.keyVault.name
132
133
134
135
136
137
138
139
                    );
                } else if (this.kubeflowClusterConfig.storageType === 'nfs') {
                    const nfsKubeflowClusterConfig: KubeflowClusterConfigNFS = <KubeflowClusterConfigNFS>this.kubeflowClusterConfig;
                    await this.createNFSStorage(
                        nfsKubeflowClusterConfig.nfs.server,
                        nfsKubeflowClusterConfig.nfs.path
                    );
                }
140
141
                this.kubernetesCRDClient = KubeflowOperatorClientFactory.createClient(
                    this.kubeflowClusterConfig.operator, this.kubeflowClusterConfig.apiVersion);
142
                break;
chicm-ms's avatar
chicm-ms committed
143
144
            }
            case TrialConfigMetadataKey.TRIAL_CONFIG: {
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
                if (this.kubeflowClusterConfig === undefined) {
                    this.log.error('kubeflow cluster config is not initialized');

                    return Promise.reject(new Error('kubeflow cluster config is not initialized'));
                }

                assert(this.kubeflowClusterConfig !== undefined);
                const kubeflowTrialJsonObjsect: object = JSON.parse(value);
                this.kubeflowTrialConfig = KubeflowTrialConfigFactory.generateKubeflowTrialConfig(
                    kubeflowTrialJsonObjsect,
                    this.kubeflowClusterConfig.operator
                );

                // Validate to make sure codeDir doesn't have too many files
                try {
                    await validateCodeDir(this.kubeflowTrialConfig.codeDir);
161
162
                    //upload codeDir to storage
                    this.copyExpCodeDirPromise = this.uploadFolder(this.kubeflowTrialConfig.codeDir, `nni/${getExperimentId()}/nni-code`);
163
164
165
166
167
168
                } catch (error) {
                    this.log.error(error);

                    return Promise.reject(new Error(error));
                }
                break;
chicm-ms's avatar
chicm-ms committed
169
            }
170
171
172
173
174
175
176
177
178
179
180
181
            case TrialConfigMetadataKey.VERSION_CHECK:
                this.versionCheck = (value === 'true' || value === 'True');
                break;
            case TrialConfigMetadataKey.LOG_COLLECTION:
                this.logCollection = value;
                break;
            default:
        }

        return Promise.resolve();
    }

182
    /**
183
     * upload local folder to nfs or azureStroage
184
     */
185
    private async uploadFolder(srcDirectory: string, destDirectory: string): Promise<string> {
186
        if (this.kubeflowClusterConfig === undefined) {
187
188
189
            throw new Error('Kubeflow Cluster config is not initialized');
        }

190
191
192
193
        if (this.kubeflowTrialConfig === undefined) {
            throw new Error('Kubeflow Trial config is not initialized');
        }

194
        assert(this.kubeflowClusterConfig.storage === undefined
195
            || this.kubeflowClusterConfig.storage === 'azureStorage'
196
197
            || this.kubeflowClusterConfig.storage === 'nfs');

198
199
200
201
        if (this.kubeflowClusterConfig.storage === 'azureStorage') {
            if (this.azureStorageClient === undefined) {
                throw new Error('azureStorageClient is not initialized');
            }
202
            const azureKubeflowClusterConfig: KubeflowClusterConfigAzure = <KubeflowClusterConfigAzure>this.kubeflowClusterConfig;
203
            return await this.uploadFolderToAzureStorage(srcDirectory, destDirectory, azureKubeflowClusterConfig.uploadRetryCount);
204
        } else if (this.kubeflowClusterConfig.storage === 'nfs' || this.kubeflowClusterConfig.storage === undefined) {
205
206
            await cpp.exec(`mkdir -p ${this.trialLocalNFSTempFolder}/${destDirectory}`);
            await cpp.exec(`cp -r ${srcDirectory}/* ${this.trialLocalNFSTempFolder}/${destDirectory}/.`);
207
            const nfsKubeflowClusterConfig: KubeflowClusterConfigNFS = <KubeflowClusterConfigNFS>this.kubeflowClusterConfig;
208
            const nfsConfig: NFSConfig = nfsKubeflowClusterConfig.nfs;
209
            return `nfs://${nfsConfig.server}:${destDirectory}`;
210
        }
211
        return '';
212
    }
213

214
215
    private async prepareRunScript(trialLocalTempFolder: string, trialJobId: string, trialWorkingFolder: string,
                                   form: TrialJobApplicationForm): Promise<void> {
216
        if (this.kubeflowClusterConfig === undefined) {
217
218
219
220
            throw new Error('Kubeflow Cluster config is not initialized');
        }

        // initialize kubeflow trial config to specific type
221
222
        let kubeflowTrialConfig: any;
        if (this.kubeflowClusterConfig.operator === 'tf-operator') {
223
            kubeflowTrialConfig = <KubeflowTrialConfigTensorflow>this.kubeflowTrialConfig;
224
        } else if (this.kubeflowClusterConfig.operator === 'pytorch-operator') {
225
            kubeflowTrialConfig = <KubeflowTrialConfigPytorch>this.kubeflowTrialConfig;
226
227
        } else {
            throw Error(`operator ${this.kubeflowClusterConfig.operator} is invalid`);
228
        }
229

230
        //create tmp trial working folder locally.
231
        await cpp.exec(`mkdir -p ${trialLocalTempFolder}`);
chicm-ms's avatar
chicm-ms committed
232
        const runScriptContent: string = CONTAINER_INSTALL_NNI_SHELL_FORMAT;
233
234
235
236
237
        // Write NNI installation file to local tmp files
        await fs.promises.writeFile(path.join(trialLocalTempFolder, 'install_nni.sh'), runScriptContent, { encoding: 'utf8' });

        // Write worker file content run_worker.sh to local tmp folders
        if (kubeflowTrialConfig.worker !== undefined) {
238
           const workerRunScriptContent: string = await this.generateRunScript('kubeflow', trialJobId, trialWorkingFolder,
239
                                                                               kubeflowTrialConfig.worker.command,
240
                                                                               form.sequenceId.toString(), 'worker',
241
                                                                               kubeflowTrialConfig.worker.gpuNum);
242
           await fs.promises.writeFile(path.join(trialLocalTempFolder, 'run_worker.sh'), workerRunScriptContent, { encoding: 'utf8' });
243
244
245
246
247
        }
        // Write parameter server file content run_ps.sh to local tmp folders
        if (this.kubeflowClusterConfig.operator === 'tf-operator') {
           const tensorflowTrialConfig: KubeflowTrialConfigTensorflow = <KubeflowTrialConfigTensorflow>this.kubeflowTrialConfig;
           if (tensorflowTrialConfig.ps !== undefined) {
248
               const psRunScriptContent: string = await this.generateRunScript('kubeflow', trialJobId, trialWorkingFolder,
249
                                                                               tensorflowTrialConfig.ps.command,
250
                                                                               form.sequenceId.toString(),
251
                                                                               'ps', tensorflowTrialConfig.ps.gpuNum);
252
253
               await fs.promises.writeFile(path.join(trialLocalTempFolder, 'run_ps.sh'), psRunScriptContent, { encoding: 'utf8' });
           }
254
255
256
        } else if (this.kubeflowClusterConfig.operator === 'pytorch-operator') {
           const pytorchTrialConfig: KubeflowTrialConfigPytorch = <KubeflowTrialConfigPytorch>this.kubeflowTrialConfig;
           if (pytorchTrialConfig.master !== undefined) {
257
               const masterRunScriptContent: string = await this.generateRunScript('kubeflow', trialJobId, trialWorkingFolder,
258
                                                                                   pytorchTrialConfig.master.command,
259
                                                                                   form.sequenceId.toString(), 'master',
260
                                                                                   pytorchTrialConfig.master.gpuNum);
261
262
               await fs.promises.writeFile(path.join(trialLocalTempFolder, 'run_master.sh'), masterRunScriptContent, { encoding: 'utf8' });
           }
263
264
        }
        // Write file content ( parameter.cfg ) to local tmp folders
265
266
267
        if (form !== undefined) {
           await fs.promises.writeFile(path.join(trialLocalTempFolder, generateParamFileName(form.hyperParameters)),
                                       form.hyperParameters.value, { encoding: 'utf8' });
268
        }
269
    }
270

271
    private async prepareKubeflowConfig(trialJobId: string, trialWorkingFolder: string, kubeflowJobName: string): Promise<any> {
272
        if (this.kubeflowClusterConfig === undefined) {
273
274
275
            throw new Error('Kubeflow Cluster config is not initialized');
        }

276
        if (this.kubeflowTrialConfig === undefined) {
277
278
279
280
            throw new Error('Kubeflow trial config is not initialized');
        }

        // initialize kubeflow trial config to specific type
281
282
        let kubeflowTrialConfig: any;
        if (this.kubeflowClusterConfig.operator === 'tf-operator') {
283
            kubeflowTrialConfig = <KubeflowTrialConfigTensorflow>this.kubeflowTrialConfig;
284
        } else if (this.kubeflowClusterConfig.operator === 'pytorch-operator') {
285
            kubeflowTrialConfig = <KubeflowTrialConfigPytorch>this.kubeflowTrialConfig;
286
287
        } else {
            throw Error(`operator ${this.kubeflowClusterConfig.operator} is invalid`);
288
        }
289

chicm-ms's avatar
chicm-ms committed
290
        const workerPodResources: any = {};
291
        if (kubeflowTrialConfig.worker !== undefined) {
292
            workerPodResources.requests = this.generatePodResource(kubeflowTrialConfig.worker.memoryMB, kubeflowTrialConfig.worker.cpuNum,
293
                                                                   kubeflowTrialConfig.worker.gpuNum);
294
        }
295
        workerPodResources.limits = {...workerPodResources.requests};
296

chicm-ms's avatar
chicm-ms committed
297
        const nonWorkerResources: any = {};
298
299
300
        if (this.kubeflowClusterConfig.operator === 'tf-operator') {
            const tensorflowTrialConfig: KubeflowTrialConfigTensorflow = <KubeflowTrialConfigTensorflow>this.kubeflowTrialConfig;
            if (tensorflowTrialConfig.ps !== undefined) {
301
                nonWorkerResources.requests = this.generatePodResource(tensorflowTrialConfig.ps.memoryMB, tensorflowTrialConfig.ps.cpuNum,
302
303
                                                                       tensorflowTrialConfig.ps.gpuNum);
                nonWorkerResources.limits = {...nonWorkerResources.requests};
304
            }
305
306
        } else if (this.kubeflowClusterConfig.operator === 'pytorch-operator') {
            const pyTorchTrialConfig: KubeflowTrialConfigPytorch = <KubeflowTrialConfigPytorch>this.kubeflowTrialConfig;
307
            nonWorkerResources.requests = this.generatePodResource(pyTorchTrialConfig.master.memoryMB, pyTorchTrialConfig.master.cpuNum,
308
309
                                                                   pyTorchTrialConfig.master.gpuNum);
            nonWorkerResources.limits = {...nonWorkerResources.requests};
310
311
312
        }

        // Generate kubeflow job resource config object
313
        const kubeflowJobConfig: any = await this.generateKubeflowJobConfig(trialJobId, trialWorkingFolder, kubeflowJobName, workerPodResources,
314
                                                                      nonWorkerResources);
315
316

        return Promise.resolve(kubeflowJobConfig);
317
    }
318
319
320
321
322
323
324
325
326

    /**
     * Generate kubeflow resource config file
     * @param trialJobId trial job id
     * @param trialWorkingFolder working folder
     * @param kubeflowJobName job name
     * @param workerPodResources worker pod template
     * @param nonWorkerPodResources non-worker pod template, like ps or master
     */
chicm-ms's avatar
chicm-ms committed
327
328
    private async generateKubeflowJobConfig(trialJobId: string, trialWorkingFolder: string, kubeflowJobName: string, workerPodResources: any,
                                            nonWorkerPodResources?: any): Promise<any> {
329
        if (this.kubeflowClusterConfig === undefined) {
330
331
332
            throw new Error('Kubeflow Cluster config is not initialized');
        }

333
        if (this.kubeflowTrialConfig === undefined) {
334
335
336
            throw new Error('Kubeflow trial config is not initialized');
        }

337
        if (this.kubernetesCRDClient === undefined) {
338
339
340
341
            throw new Error('Kubeflow operator client is not initialized');
        }

        const replicaSpecsObj: any = {};
342
343
344
        const replicaSpecsObjMap: Map<string, object> = new Map<string, object>();
        if (this.kubeflowTrialConfig.operatorType === 'tf-operator') {
            const tensorflowTrialConfig: KubeflowTrialConfigTensorflow = <KubeflowTrialConfigTensorflow>this.kubeflowTrialConfig;
chicm-ms's avatar
chicm-ms committed
345
            const privateRegistrySecretName = await this.createRegistrySecret(tensorflowTrialConfig.worker.privateRegistryAuthPath);
346
            replicaSpecsObj.Worker = this.generateReplicaConfig(trialWorkingFolder, tensorflowTrialConfig.worker.replicas,
347
                                                                tensorflowTrialConfig.worker.image, 'run_worker.sh', workerPodResources, privateRegistrySecretName);
348
            if (tensorflowTrialConfig.ps !== undefined) {
chicm-ms's avatar
chicm-ms committed
349
                const privateRegistrySecretName: string | undefined = await this.createRegistrySecret(tensorflowTrialConfig.ps.privateRegistryAuthPath);
350
                replicaSpecsObj.Ps = this.generateReplicaConfig(trialWorkingFolder, tensorflowTrialConfig.ps.replicas,
351
                                                                tensorflowTrialConfig.ps.image, 'run_ps.sh', nonWorkerPodResources, privateRegistrySecretName);
352
            }
353
354
355
356
            replicaSpecsObjMap.set(this.kubernetesCRDClient.jobKind, {tfReplicaSpecs: replicaSpecsObj});
        } else if (this.kubeflowTrialConfig.operatorType === 'pytorch-operator') {
            const pytorchTrialConfig: KubeflowTrialConfigPytorch = <KubeflowTrialConfigPytorch>this.kubeflowTrialConfig;
            if (pytorchTrialConfig.worker !== undefined) {
chicm-ms's avatar
chicm-ms committed
357
                const privateRegistrySecretName: string | undefined = await this.createRegistrySecret(pytorchTrialConfig.worker.privateRegistryAuthPath);
358
                replicaSpecsObj.Worker = this.generateReplicaConfig(trialWorkingFolder, pytorchTrialConfig.worker.replicas,
359
                                                                    pytorchTrialConfig.worker.image, 'run_worker.sh', workerPodResources, privateRegistrySecretName);
360
            }
chicm-ms's avatar
chicm-ms committed
361
            const privateRegistrySecretName: string | undefined = await this.createRegistrySecret(pytorchTrialConfig.master.privateRegistryAuthPath);
362
            replicaSpecsObj.Master = this.generateReplicaConfig(trialWorkingFolder, pytorchTrialConfig.master.replicas,
363
                                                                pytorchTrialConfig.master.image, 'run_master.sh', nonWorkerPodResources, privateRegistrySecretName);
364

365
            replicaSpecsObjMap.set(this.kubernetesCRDClient.jobKind, {pytorchReplicaSpecs: replicaSpecsObj});
366
367
        }

368
        return Promise.resolve({
369
370
            apiVersion: `kubeflow.org/${this.kubernetesCRDClient.apiVersion}`,
            kind: this.kubernetesCRDClient.jobKind,
371
            metadata: {
372
373
374
375
376
377
378
379
380
                name: kubeflowJobName,
                namespace: 'default',
                labels: {
                    app: this.NNI_KUBERNETES_TRIAL_LABEL,
                    expId: getExperimentId(),
                    trialId: trialJobId
                }
            },
            spec: replicaSpecsObjMap.get(this.kubernetesCRDClient.jobKind)
381
        });
382
383
384
385
386
387
388
389
390
391
    }

    /**
     * Generate tf-operator's tfjobs replica config section
     * @param trialWorkingFolder trial working folder
     * @param replicaNumber replica number
     * @param replicaImage image
     * @param runScriptFile script file name
     * @param podResources pod resource config section
     */
392
    private generateReplicaConfig(trialWorkingFolder: string, replicaNumber: number, replicaImage: string, runScriptFile: string,
393
                                  podResources: any, privateRegistrySecretName: string | undefined): any {
394
        if (this.kubeflowClusterConfig === undefined) {
395
396
397
            throw new Error('Kubeflow Cluster config is not initialized');
        }

398
        if (this.kubeflowTrialConfig === undefined) {
399
400
401
            throw new Error('Kubeflow trial config is not initialized');
        }

402
        if (this.kubernetesCRDClient === undefined) {
403
404
            throw new Error('Kubeflow operator client is not initialized');
        }
405
        // The config spec for volume field
406
407
        const volumeSpecMap: Map<string, object> = new Map<string, object>();
        if (this.kubeflowClusterConfig.storageType === 'azureStorage') {
408
409
410
411
412
413
414
415
            volumeSpecMap.set('nniVolumes', [
            {
                    name: 'nni-vol',
                    azureFile: {
                        secretName: `${this.azureStorageSecretName}`,
                        shareName: `${this.azureStorageShare}`,
                        readonly: false
                    }
416
417
418
            }]);
        } else {
            const nfsKubeflowClusterConfig: KubeflowClusterConfigNFS = <KubeflowClusterConfigNFS> this.kubeflowClusterConfig;
419
420
421
422
423
424
425
            volumeSpecMap.set('nniVolumes', [
            {
                name: 'nni-vol',
                nfs: {
                    server: `${nfsKubeflowClusterConfig.nfs.server}`,
                    path: `${nfsKubeflowClusterConfig.nfs.path}`
                }
426
            }]);
427
        }
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
        // The config spec for container field
        const containersSpecMap: Map<string, object> = new Map<string, object>(); 
        containersSpecMap.set('containers', [
        {
                // Kubeflow tensorflow operator requires that containers' name must be tensorflow
                // TODO: change the name based on operator's type
                name: this.kubernetesCRDClient.containerName,
                image: replicaImage,
                args: ['sh', `${path.join(trialWorkingFolder, runScriptFile)}`],
                volumeMounts: [
                {
                    name: 'nni-vol',
                    mountPath: this.CONTAINER_MOUNT_PATH
                }],
                resources: podResources
            }
        ]);
chicm-ms's avatar
chicm-ms committed
445
        const spec: any = {
446
447
448
449
450
451
452
453
454
455
            containers: containersSpecMap.get('containers'),
            restartPolicy: 'ExitCode',
            volumes: volumeSpecMap.get('nniVolumes')
        }
        if (privateRegistrySecretName) {
            spec.imagePullSecrets = [
                {
                    name: privateRegistrySecretName
                }]
        }
456
457
458
459
460
461
        return {
            replicas: replicaNumber,
            template: {
                metadata: {
                    creationTimestamp: null
                },
462
                spec: spec
463
            }
464
        }
465
466
    }
}
467
export { KubeflowTrainingService };