kubeflowTrainingService.ts 34 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/**
 * Copyright (c) Microsoft Corporation
 * All rights reserved.
 *
 * MIT License
 *
 * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
 * documentation files (the "Software"), to deal in the Software without restriction, including without limitation
 * the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and
 * to permit persons to whom the Software is furnished to do so, subject to the following conditions:
 * The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
 *
 * THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING
 * BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
 * DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
 */

'use strict'

import * as assert from 'assert';
23
import * as azureStorage from 'azure-storage';
24
25
26
27
28
29
30
31
32
33
34
35
36
import * as component from '../../common/component';
import * as cpp from 'child-process-promise';
import * as fs from 'fs';
import * as path from 'path';

import { CONTAINER_INSTALL_NNI_SHELL_FORMAT } from '../common/containerJobData';
import { EventEmitter } from 'events';
import { getExperimentId, getInitTrialSequenceId } from '../../common/experimentStartupInfo';
import { getLogger, Logger } from '../../common/log';
import { MethodNotImplementedError } from '../../common/errors';
import { TrialConfigMetadataKey } from '../common/trialConfigMetadataKey';
import {
    JobApplicationForm, TrainingService, TrialJobApplicationForm,
37
    TrialJobDetail, TrialJobMetric, NNIManagerIpConfig
38
} from '../../common/trainingService';
QuanluZhang's avatar
QuanluZhang committed
39
import { delay, generateParamFileName, getExperimentRootDir, getIPV4Address, uniqueString, getJobCancelStatus } from '../../common/utils';
40
41
import { DistTrainRole, KubeflowClusterConfigBase, KubeflowClusterConfigNFS, KubeflowClusterConfigAzure, KubeflowTrialConfigBase,
     KubeflowTrialConfigPytorch, KubeflowTrialConfigTensorflow, NFSConfig } from './kubeflowConfig';
42
import { KubeflowTrialJobDetail } from './kubeflowData';
43
44
import { KubeflowJobRestServer } from './kubeflowJobRestServer';
import { KubeflowJobInfoCollector } from './kubeflowJobInfoCollector';
45
import { validateCodeDir } from '../common/util';
SparkSnail's avatar
SparkSnail committed
46
import { AzureStorageClientUtility } from './azureStorageClientUtils';
47
import { GeneralK8sClient, KubeflowOperatorClient } from './kubernetesApiClient';
48

SparkSnail's avatar
SparkSnail committed
49
var azure = require('azure-storage');
50
51
52
53
54
55
56

/**
 * Training Service implementation for Kubeflow
 * Refer https://github.com/kubeflow/kubeflow for more info about Kubeflow
 */
@component.Singleton
class KubeflowTrainingService implements TrainingService {
57
    private readonly NNI_KUBEFLOW_TRIAL_LABEL: string = 'nni-kubeflow-trial';
58
59
60
61
62
63
64
65
    private readonly log!: Logger;
    private readonly metricsEmitter: EventEmitter;
    private readonly trialJobsMap: Map<string, KubeflowTrialJobDetail>;
    /**  experiment root dir in NFS */
    private readonly trialLocalNFSTempFolder: string;
    private stopping: boolean = false;
    private experimentId! : string;
    private nextTrialSequenceId: number;
66
67
    private kubeflowClusterConfig?: KubeflowClusterConfigBase;
    private kubeflowTrialConfig?: KubeflowTrialConfigBase;
68
69
    private kubeflowJobInfoCollector: KubeflowJobInfoCollector;
    private kubeflowRestServerPort?: number;
70
71
    private operatorClient?: KubeflowOperatorClient;
    private readonly genericK8sClient: GeneralK8sClient;
72
    private readonly CONTAINER_MOUNT_PATH: string;
SparkSnail's avatar
SparkSnail committed
73
74
75
76
    private azureStorageClient?: azureStorage.FileService;
    private azureStorageShare?: string;
    private azureStorageSecretName?: string;
    private azureStorageAccountName?: string;
77
    private nniManagerIpConfig?: NNIManagerIpConfig;
78
79
80
81
82
    
    constructor() {        
        this.log = getLogger();
        this.metricsEmitter = new EventEmitter();
        this.trialJobsMap = new Map<string, KubeflowTrialJobDetail>();
83
        this.genericK8sClient = new GeneralK8sClient();
84
85
86
87
        this.kubeflowJobInfoCollector = new KubeflowJobInfoCollector(this.trialJobsMap);
        this.trialLocalNFSTempFolder = path.join(getExperimentRootDir(), 'trials-nfs-tmp');
        this.experimentId = getExperimentId();      
        this.nextTrialSequenceId = -1;
SparkSnail's avatar
SparkSnail committed
88
        this.CONTAINER_MOUNT_PATH = '/tmp/mount';
89
90
91
92
93
94
95
    }

    public async run(): Promise<void> {
        const restServer: KubeflowJobRestServer = component.get(KubeflowJobRestServer);
        await restServer.start();
        this.log.info(`Kubeflow Training service rest server listening on: ${restServer.endPoint}`);
        while (!this.stopping) {
96
            // collect metrics for Kubeflow jobs by interacting with Kubernetes API server  
97
            await delay(3000);
98
            await this.kubeflowJobInfoCollector.retrieveTrialStatus(this.operatorClient);
99
100
101
102
103
104
105
106
        }
    }

    public async submitTrialJob(form: JobApplicationForm): Promise<TrialJobDetail> {
        if(!this.kubeflowClusterConfig) {
            throw new Error('Kubeflow Cluster config is not initialized');
        }

107
108
        if(!this.kubeflowTrialConfig) {
            throw new Error('Kubeflow trial config is not initialized');
109
110
        }

111
112
        if(!this.operatorClient) {
            throw new Error('Kubeflow job operator client is undefined');
113
114
115
116
117
118
        }

        if(!this.kubeflowRestServerPort) {
            const restServer: KubeflowJobRestServer = component.get(KubeflowJobRestServer);
            this.kubeflowRestServerPort = restServer.clusterRestServerPort;
        }
119
120
121
122
123
124
125
126
127
        // initialize kubeflow trial config to specific type
        let kubeflowTrialConfig;
        if(this.kubeflowClusterConfig.operator === 'tf-operator') {
            kubeflowTrialConfig = <KubeflowTrialConfigTensorflow>this.kubeflowTrialConfig;
        }else if(this.kubeflowClusterConfig.operator === 'pytorch-operator'){
            kubeflowTrialConfig = <KubeflowTrialConfigPytorch>this.kubeflowTrialConfig;
        }else {
            throw Error(`operator ${this.kubeflowClusterConfig.operator} is invalid`)
        }
128
129
130
131
132
133
134
135

        const trialJobId: string = uniqueString(5);
        const curTrialSequenceId: number = this.generateSequenceId();
        // Set trial's NFS working folder
        const trialWorkingFolder: string = path.join(this.CONTAINER_MOUNT_PATH, 'nni', getExperimentId(), trialJobId);
        const trialLocalTempFolder: string = path.join(getExperimentRootDir(), 'trials-local', trialJobId);
        //create tmp trial working folder locally.
        await cpp.exec(`mkdir -p ${path.dirname(trialLocalTempFolder)}`);
136
        await cpp.exec(`cp -r ${kubeflowTrialConfig.codeDir} ${trialLocalTempFolder}`);
137
138
139
        const runScriptContent : string = CONTAINER_INSTALL_NNI_SHELL_FORMAT;
        // Write NNI installation file to local tmp files
        await fs.promises.writeFile(path.join(trialLocalTempFolder, 'install_nni.sh'), runScriptContent, { encoding: 'utf8' });
140
        // Create tmp trial working folder locally.
141
142
        await cpp.exec(`mkdir -p ${trialLocalTempFolder}`);

143
        // Write worker file content run_worker.sh to local tmp folders
144
145
146
        if(kubeflowTrialConfig.worker) {
            const workerRunScriptContent: string = this.generateRunScript(trialJobId, trialWorkingFolder, 
                    kubeflowTrialConfig.worker.command, curTrialSequenceId.toString(), 'worker', kubeflowTrialConfig.worker.gpuNum);
147
148
149
150

            await fs.promises.writeFile(path.join(trialLocalTempFolder, 'run_worker.sh'), workerRunScriptContent, { encoding: 'utf8' });
        }
        // Write parameter server file content run_ps.sh to local tmp folders
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
        if(this.kubeflowClusterConfig.operator === 'tf-operator') {
            let tensorflowTrialConfig: KubeflowTrialConfigTensorflow = <KubeflowTrialConfigTensorflow>this.kubeflowTrialConfig;
            if(tensorflowTrialConfig.ps){
                const psRunScriptContent: string = this.generateRunScript(trialJobId, trialWorkingFolder, 
                    tensorflowTrialConfig.ps.command, curTrialSequenceId.toString(), 'ps', tensorflowTrialConfig.ps.gpuNum);
                await fs.promises.writeFile(path.join(trialLocalTempFolder, 'run_ps.sh'), psRunScriptContent, { encoding: 'utf8' });
            }
        }
        else if(this.kubeflowClusterConfig.operator === 'pytorch-operator') {
            let pytorchTrialConfig: KubeflowTrialConfigPytorch = <KubeflowTrialConfigPytorch>this.kubeflowTrialConfig;
            if(pytorchTrialConfig.master){
                const masterRunScriptContent: string = this.generateRunScript(trialJobId, trialWorkingFolder, 
                    pytorchTrialConfig.master.command, curTrialSequenceId.toString(), 'master', pytorchTrialConfig.master.gpuNum);
                await fs.promises.writeFile(path.join(trialLocalTempFolder, 'run_master.sh'), masterRunScriptContent, { encoding: 'utf8' });
            }
166
        }
167
168
169
170
171
        // Write file content ( parameter.cfg ) to local tmp folders
        const trialForm : TrialJobApplicationForm = (<TrialJobApplicationForm>form)
        if(trialForm && trialForm.hyperParameters) {
            await fs.promises.writeFile(path.join(trialLocalTempFolder, generateParamFileName(trialForm.hyperParameters)), 
                            trialForm.hyperParameters.value, { encoding: 'utf8' });
172
        }
173
        const kubeflowJobName = `nni-exp-${this.experimentId}-trial-${trialJobId}`.toLowerCase();
174
        
175
        const workerPodResources : any = {};
176
177
178
179
        if(kubeflowTrialConfig.worker) {
            workerPodResources.requests = this.generatePodResource(kubeflowTrialConfig.worker.memoryMB, kubeflowTrialConfig.worker.cpuNum, 
                kubeflowTrialConfig.worker.gpuNum)
        }
180
181
        workerPodResources.limits = Object.assign({}, workerPodResources.requests);

182
183
184
185
186
187
188
        let nonWorkerResources : any = {};
        if(this.kubeflowClusterConfig.operator === 'tf-operator') {
            let tensorflowTrialConfig: KubeflowTrialConfigTensorflow = <KubeflowTrialConfigTensorflow>this.kubeflowTrialConfig;
            if (tensorflowTrialConfig.ps) {
                nonWorkerResources.requests = this.generatePodResource(tensorflowTrialConfig.ps.memoryMB, tensorflowTrialConfig.ps.cpuNum, 
                    tensorflowTrialConfig.ps.gpuNum)
                    nonWorkerResources.limits = Object.assign({}, nonWorkerResources.requests); 
189
            }
190
191
        }else if(this.kubeflowClusterConfig.operator === 'pytorch-operator'){
            let pyTorchTrialConfig: KubeflowTrialConfigPytorch = <KubeflowTrialConfigPytorch>this.kubeflowTrialConfig;
192
193
194
195
            nonWorkerResources.requests = this.generatePodResource(pyTorchTrialConfig.master.memoryMB, pyTorchTrialConfig.master.cpuNum, 
                pyTorchTrialConfig.master.gpuNum)
                nonWorkerResources.limits = Object.assign({}, nonWorkerResources.requests); 
            
196
        }       
197
198
199
200
201
202
203
204
205

        //The output url used in trialJobDetail
        let trialJobOutputUrl: string = '';

        assert(!this.kubeflowClusterConfig.storage 
            || this.kubeflowClusterConfig.storage === 'azureStorage' 
            || this.kubeflowClusterConfig.storage === 'nfs');

        if(this.kubeflowClusterConfig.storage === 'azureStorage') {
SparkSnail's avatar
SparkSnail committed
206
207
208
209
210
            try{
                //upload local files to azure storage
                await AzureStorageClientUtility.uploadDirectory(this.azureStorageClient, 
                    `nni/${getExperimentId()}/${trialJobId}`, this.azureStorageShare, `${trialLocalTempFolder}`);

211
                trialJobOutputUrl = `https://${this.azureStorageAccountName}.file.core.windows.net/${this.azureStorageShare}/${path.join('nni', getExperimentId(), trialJobId, 'output')}`
SparkSnail's avatar
SparkSnail committed
212
213
214
215
            }catch(error){
                this.log.error(error);
                return Promise.reject(error);
            }
216
        } else if(this.kubeflowClusterConfig.storage === 'nfs' || this.kubeflowClusterConfig.storage === undefined) {
217
218
219
220
221
222
223
            let nfsKubeflowClusterConfig: KubeflowClusterConfigNFS = <KubeflowClusterConfigNFS>this.kubeflowClusterConfig;
            // Creat work dir for current trial in NFS directory 
            await cpp.exec(`mkdir -p ${this.trialLocalNFSTempFolder}/nni/${getExperimentId()}/${trialJobId}`);
            // Copy code files from local dir to NFS mounted dir
            await cpp.exec(`cp -r ${trialLocalTempFolder}/* ${this.trialLocalNFSTempFolder}/nni/${getExperimentId()}/${trialJobId}/.`);
        
            const nfsConfig: NFSConfig = nfsKubeflowClusterConfig.nfs;
224
            trialJobOutputUrl = `nfs://${nfsConfig.server}:${path.join(nfsConfig.path, 'nni', getExperimentId(), trialJobId, 'output')}`
SparkSnail's avatar
SparkSnail committed
225
        }
226

227
        const trialJobDetail: KubeflowTrialJobDetail = new KubeflowTrialJobDetail(
228
229
230
231
232
233
234
            trialJobId,
            'WAITING',
            Date.now(),
            trialWorkingFolder,
            form,
            kubeflowJobName,
            curTrialSequenceId,
235
            trialJobOutputUrl
SparkSnail's avatar
SparkSnail committed
236
        );
237
238
239
240
241
242
243
244

        // Generate kubeflow job resource config object        
        const kubeflowJobConfig: any = this.generateKubeflowJobConfig(trialJobId, trialWorkingFolder, kubeflowJobName, workerPodResources, nonWorkerResources);

        // Create kubeflow job based on generated kubeflow job resource config
        await this.operatorClient.createKubeflowJob(kubeflowJobConfig);

        // Set trial job detail until create Kubeflow job successfully 
245
246
247
248
249
        this.trialJobsMap.set(trialJobId, trialJobDetail);

        return Promise.resolve(trialJobDetail);
    }

250
251
252
253
254
255
256
257
    public generatePodResource(memory: number, cpuNum: number, gpuNum: number) {
        return {
            'memory': `${memory}Mi`,
            'cpu': `${cpuNum}`,
            'nvidia.com/gpu': `${gpuNum}`
        }
    }

258
259
260
261
262
    public updateTrialJob(trialJobId: string, form: JobApplicationForm): Promise<TrialJobDetail> {
        throw new MethodNotImplementedError();
    }

    public listTrialJobs(): Promise<TrialJobDetail[]> {
263
264
265
266
267
268
269
270
271
        const jobs: TrialJobDetail[] = [];
        
        this.trialJobsMap.forEach(async (value: KubeflowTrialJobDetail, key: string) => {
            if (value.form.jobType === 'TRIAL') {
                jobs.push(await this.getTrialJob(key));
            }
        });

        return Promise.resolve(jobs);
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
    }

    public getTrialJob(trialJobId: string): Promise<TrialJobDetail> {
        if(!this.kubeflowClusterConfig) {
            throw new Error('Kubeflow Cluster config is not initialized');
        }

        const kubeflowTrialJob: TrialJobDetail | undefined = this.trialJobsMap.get(trialJobId);

        if (!kubeflowTrialJob) {
            return Promise.reject(`trial job ${trialJobId} not found`)
        }        

        return Promise.resolve(kubeflowTrialJob);
    }

    public addTrialJobMetricListener(listener: (metric: TrialJobMetric) => void) {
        this.metricsEmitter.on('metric', listener);
    }

    public removeTrialJobMetricListener(listener: (metric: TrialJobMetric) => void) {
        this.metricsEmitter.off('metric', listener);
    }
 
    public get isMultiPhaseJobSupported(): boolean {
        return false;
    }

QuanluZhang's avatar
QuanluZhang committed
300
    public async cancelTrialJob(trialJobId: string, isEarlyStopped: boolean = false): Promise<void> {
301
302
303
304
305
        const trialJobDetail : KubeflowTrialJobDetail | undefined =  this.trialJobsMap.get(trialJobId);
        if(!trialJobDetail) {
            const errorMessage: string = `CancelTrialJob: trial job id ${trialJobId} not found`;
            this.log.error(errorMessage);
            return Promise.reject(errorMessage);
306
307
308
        }        
        if(!this.operatorClient) {
            const errorMessage: string = `CancelTrialJob: trial job id ${trialJobId} failed because operatorClient is undefined`;
309
310
311
312
            this.log.error(errorMessage);
            return Promise.reject(errorMessage);
        }

313
314
315
316
317
318
319
320
321
322
        try {
            await this.operatorClient.deleteKubeflowJob(new Map(
                [
                    ['app', this.NNI_KUBEFLOW_TRIAL_LABEL],
                    ['expId', getExperimentId()],
                    ['trialId', trialJobId]
                ]
            ));
        } catch(err) {
            const errorMessage: string = `Delete trial ${trialJobId} failed: ${err}`;
323
324
325
326
327
            this.log.error(errorMessage);
            return Promise.reject(errorMessage);
        }

        trialJobDetail.endTime = Date.now();
QuanluZhang's avatar
QuanluZhang committed
328
        trialJobDetail.status = getJobCancelStatus(isEarlyStopped);
329
330
331
332
333
334

        return Promise.resolve();
    }

    public async setClusterMetadata(key: string, value: string): Promise<void> {
        switch (key) {
335
336
337
338
            case TrialConfigMetadataKey.NNI_MANAGER_IP:
                this.nniManagerIpConfig = <NNIManagerIpConfig>JSON.parse(value);
                break;
            
339
            case TrialConfigMetadataKey.KUBEFLOW_CLUSTER_CONFIG:
340
                let kubeflowClusterJsonObject = JSON.parse(value);
341
342
                let kubeflowClusterConfigBase: KubeflowClusterConfigBase 
                        = new KubeflowClusterConfigBase(kubeflowClusterJsonObject.operator, kubeflowClusterJsonObject.apiVersion, kubeflowClusterJsonObject.storage);
343
344
                
                if(kubeflowClusterConfigBase && kubeflowClusterConfigBase.storage === 'azureStorage') {
345
346
347
348
349
350
                    const azureKubeflowClusterConfig: KubeflowClusterConfigAzure = 
                        new KubeflowClusterConfigAzure(kubeflowClusterJsonObject.operator, 
                            kubeflowClusterJsonObject.apiVersion,
                            kubeflowClusterJsonObject.keyvault, 
                            kubeflowClusterJsonObject.azureStorage, kubeflowClusterJsonObject.storage);

351
352
353
354
                    const vaultName = azureKubeflowClusterConfig.keyVault.vaultName;
                    const valutKeyName = azureKubeflowClusterConfig.keyVault.name;
                    this.azureStorageAccountName = azureKubeflowClusterConfig.azureStorage.accountName;
                    this.azureStorageShare = azureKubeflowClusterConfig.azureStorage.azureShare;
355
                    try {
SparkSnail's avatar
SparkSnail committed
356
357
358
359
360
361
362
363
364
365
366
367
                        const result = await cpp.exec(`az keyvault secret show --name ${valutKeyName} --vault-name ${vaultName}`);
                        if(result.stderr) {
                            const errorMessage: string = result.stderr;
                            this.log.error(errorMessage);
                            return Promise.reject(errorMessage);
                        }
                        const storageAccountKey =JSON.parse(result.stdout).value;
                        //create storage client
                        this.azureStorageClient = azure.createFileService(this.azureStorageAccountName, storageAccountKey);
                        await AzureStorageClientUtility.createShare(this.azureStorageClient, this.azureStorageShare);
                        //create sotrage secret
                        this.azureStorageSecretName = 'nni-secret-' + uniqueString(8).toLowerCase();
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388

                        await this.genericK8sClient.createSecret(
                            {
                                apiVersion: 'v1',
                                kind: 'Secret',
                                metadata: { 
                                    name: this.azureStorageSecretName,
                                    namespace: 'default',
                                    labels: {
                                        app: this.NNI_KUBEFLOW_TRIAL_LABEL,
                                        expId: getExperimentId()
                                    }
                                },
                                type: 'Opaque',
                                data: {
                                    azurestorageaccountname: this.azureStorageAccountName,
                                    azurestorageaccountkey: storageAccountKey
                                }
                            }
                        );
                    } catch(error) {
389
                        this.log.error(error);
SparkSnail's avatar
SparkSnail committed
390
391
                        throw new Error(error);
                    }
392
393

                    this.kubeflowClusterConfig = azureKubeflowClusterConfig;
394
395
396
                } else if(kubeflowClusterConfigBase && (kubeflowClusterConfigBase.storage === 'nfs' || kubeflowClusterConfigBase.storage === undefined)) {
                    //Check and mount NFS mount point here
                    //If storage is undefined, the default value is nfs
397
398
399
400
401
402
                    const nfsKubeflowClusterConfig: KubeflowClusterConfigNFS = 
                                 new KubeflowClusterConfigNFS(kubeflowClusterJsonObject.operator, 
                                            kubeflowClusterJsonObject.apiVersion,
                                            kubeflowClusterJsonObject.nfs, 
                                            kubeflowClusterJsonObject.storage);

403
404
405
406
407
408
409
410
411
412
413
                    await cpp.exec(`mkdir -p ${this.trialLocalNFSTempFolder}`);
                    const nfsServer: string = nfsKubeflowClusterConfig.nfs.server;
                    const nfsPath: string = nfsKubeflowClusterConfig.nfs.path;

                    try {
                        await cpp.exec(`sudo mount ${nfsServer}:${nfsPath} ${this.trialLocalNFSTempFolder}`);
                    } catch(error) {
                        const mountError: string = `Mount NFS ${nfsServer}:${nfsPath} to ${this.trialLocalNFSTempFolder} failed, error is ${error}`;
                        this.log.error(mountError);
                        throw new Error(mountError);
                    }
414
                    this.kubeflowClusterConfig = nfsKubeflowClusterConfig;
415
416
417
418
                } else {
                    const error: string = `kubeflowClusterConfig format error!`;
                    this.log.error(error);
                    throw new Error(error);
419
420
                }

421
422
                this.operatorClient = KubeflowOperatorClient.generateOperatorClient(this.kubeflowClusterConfig.operator,
                                                                                     this.kubeflowClusterConfig.apiVersion);
423
424
425
426
427
428
429
430
                break;

            case TrialConfigMetadataKey.TRIAL_CONFIG:
                if (!this.kubeflowClusterConfig){
                    this.log.error('kubeflow cluster config is not initialized');
                    return Promise.reject(new Error('kubeflow cluster config is not initialized'));                    
                }

431
432
433
434
435
436
437
                assert(this.kubeflowClusterConfig !== undefined)
                let kubeflowTrialJsonObjsect = JSON.parse(value);
                if(this.kubeflowClusterConfig.operator === 'tf-operator'){
                    this.kubeflowTrialConfig = new KubeflowTrialConfigTensorflow(kubeflowTrialJsonObjsect.codeDir, 
                        kubeflowTrialJsonObjsect.worker, kubeflowTrialJsonObjsect.ps);
                }else if(this.kubeflowClusterConfig.operator === 'pytorch-operator'){
                    this.kubeflowTrialConfig = new KubeflowTrialConfigPytorch(kubeflowTrialJsonObjsect.codeDir, 
438
                        kubeflowTrialJsonObjsect.master, kubeflowTrialJsonObjsect.worker);
439
440
441
442
443
444
                }

                if (!this.kubeflowTrialConfig){
                    this.log.error('kubeflow kubeflow TrialConfig is not initialized');
                    return Promise.reject(new Error('kubeflow kubeflow TrialConfig is not initialized'));                    
                }
445
446
447
448
449
450
451
452

                // Validate to make sure codeDir doesn't have too many files
                try {
                    await validateCodeDir(this.kubeflowTrialConfig.codeDir);
                } catch(error) {
                    this.log.error(error);
                    return Promise.reject(new Error(error));                    
                }
453
454
455
456
457
458
459
460
461
                break;
            default:
                break;
        }

        return Promise.resolve();
    }

    public getClusterMetadata(key: string): Promise<string> {
462
        return Promise.resolve('');
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
    }

    public async cleanUp(): Promise<void> {
        this.stopping = true;

        // First, cancel all running kubeflow jobs
        for(let [trialJobId, kubeflowTrialJob] of this.trialJobsMap) {
            if(['RUNNING', 'WAITING', 'UNKNOWN'].includes(kubeflowTrialJob.status)) {
                try {
                    await this.cancelTrialJob(trialJobId);
                } catch(error) {} // DONT throw error during cleanup
                kubeflowTrialJob.status = 'SYS_CANCELED';
            }
        }
        
        // Delete all kubeflow jobs whose expId label is current experiment id 
        try {
480
481
482
483
484
485
486
487
            if(this.operatorClient) {
                await this.operatorClient.deleteKubeflowJob(new Map(
                    [
                        ['app', this.NNI_KUBEFLOW_TRIAL_LABEL],
                        ['expId', getExperimentId()]
                    ]
                ));
            }
488
        } catch(error) {
489
            this.log.error(`Delete kubeflow job with label: app=${this.NNI_KUBEFLOW_TRIAL_LABEL},expId=${getExperimentId()} failed, error is ${error}`);
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
        }

        // Unmount NFS
        try {
            await cpp.exec(`sudo umount ${this.trialLocalNFSTempFolder}`);
        } catch(error) {
            this.log.error(`Unmount ${this.trialLocalNFSTempFolder} failed, error is ${error}`);
        }

        // Stop Kubeflow rest server 
        const restServer: KubeflowJobRestServer = component.get(KubeflowJobRestServer);
        try {
            await restServer.stop();
            this.log.info('Kubeflow Training service rest server stopped successfully.');
        } catch (error) {
            this.log.error(`Kubeflow Training service rest server stopped failed, error: ${error.message}`);
            Promise.reject(error);
        }

        return Promise.resolve();
    }

    public get MetricsEmitter() : EventEmitter {
        return this.metricsEmitter;
    }

516
517
518
519
520
521
    /**
     * Generate kubeflow resource config file
     * @param trialJobId trial job id
     * @param trialWorkingFolder working folder
     * @param kubeflowJobName job name
     * @param workerPodResources worker pod template
522
     * @param nonWorkerPodResources non-worker pod template, like ps or master
523
     */
524
    private generateKubeflowJobConfig(trialJobId: string, trialWorkingFolder: string, kubeflowJobName : string, workerPodResources : any, nonWorkerPodResources?: any) : any {
525
526
527
528
529
530
531
532
        if(!this.kubeflowClusterConfig) {
            throw new Error('Kubeflow Cluster config is not initialized');
        }

        if(!this.kubeflowTrialConfig) {
            throw new Error('Kubeflow trial config is not initialized');
        }

533
534
        if(!this.operatorClient) {
            throw new Error('Kubeflow operator client is not initialized');
535
        }
536

537
538
539
540
541
542
543
        const replicaSpecsObj: any = {};
        let replicaSpecsObjMap = new Map<string, object>();

        if(this.kubeflowClusterConfig.operator === 'tf-operator') {
            let tensorflowTrialConfig: KubeflowTrialConfigTensorflow = <KubeflowTrialConfigTensorflow>this.kubeflowTrialConfig;
            replicaSpecsObj.Worker = this.generateReplicaConfig(trialWorkingFolder, tensorflowTrialConfig.worker.replicas, 
                tensorflowTrialConfig.worker.image, 'run_worker.sh', workerPodResources);
544
            
545
546
547
548
            if (tensorflowTrialConfig.ps){
                replicaSpecsObj.Ps = this.generateReplicaConfig(trialWorkingFolder, tensorflowTrialConfig.ps.replicas, 
                    tensorflowTrialConfig.ps.image, 'run_ps.sh', nonWorkerPodResources);
            }
549
            replicaSpecsObjMap.set(this.operatorClient.jobKind, {'tfReplicaSpecs': replicaSpecsObj})
550
551
552
        }
        else if(this.kubeflowClusterConfig.operator === 'pytorch-operator') {
            let pytorchTrialConfig: KubeflowTrialConfigPytorch = <KubeflowTrialConfigPytorch>this.kubeflowTrialConfig;
553
554
555
            if(pytorchTrialConfig.worker) {
                replicaSpecsObj.Worker = this.generateReplicaConfig(trialWorkingFolder, pytorchTrialConfig.worker.replicas, 
                    pytorchTrialConfig.worker.image, 'run_worker.sh', workerPodResources);
556
            }
557
558
559
560
            replicaSpecsObj.Master = this.generateReplicaConfig(trialWorkingFolder, pytorchTrialConfig.master.replicas, 
                pytorchTrialConfig.master.image, 'run_master.sh', nonWorkerPodResources);
            
            replicaSpecsObjMap.set(this.operatorClient.jobKind, {'pytorchReplicaSpecs': replicaSpecsObj})
561
562
        }

563
        return {
564
565
            apiVersion: `kubeflow.org/${this.operatorClient.apiVersion}`,
            kind: this.operatorClient.jobKind,
566
567
568
569
570
571
572
573
574
            metadata: { 
                name: kubeflowJobName,
                namespace: 'default',
                labels: {
                    app: this.NNI_KUBEFLOW_TRIAL_LABEL,
                    expId: getExperimentId(),
                    trialId: trialJobId
                }
            },
575
            spec: replicaSpecsObjMap.get(this.operatorClient.jobKind)
576
        };     
577
578
    }

579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
    /**
     * Generate tf-operator's tfjobs replica config section
     * @param trialWorkingFolder trial working folder
     * @param replicaNumber replica number
     * @param replicaImage image
     * @param runScriptFile script file name
     * @param podResources pod resource config section
     */
    private generateReplicaConfig(trialWorkingFolder: string, replicaNumber: number, replicaImage: string, runScriptFile: string, podResources: any): any {
        if(!this.kubeflowClusterConfig) {
            throw new Error('Kubeflow Cluster config is not initialized');
        }

        if(!this.kubeflowTrialConfig) {
            throw new Error('Kubeflow trial config is not initialized');
        }

596
597
        if(!this.operatorClient) {
            throw new Error('Kubeflow operator client is not initialized');
598
599
        }

SparkSnail's avatar
SparkSnail committed
600
        let volumeSpecMap = new Map<string, object>();
601
        if(this.kubeflowClusterConfig.storage && this.kubeflowClusterConfig.storage === 'azureStorage'){
SparkSnail's avatar
SparkSnail committed
602
603
            volumeSpecMap.set('nniVolumes', [
            {
604
605
606
607
608
609
                    name: 'nni-vol',
                    azureFile: {
                        secretName: `${this.azureStorageSecretName}`,
                        shareName: `${this.azureStorageShare}`,
                        readonly: false
                    }
SparkSnail's avatar
SparkSnail committed
610
            }])
611
612
        }else {
            let nfsKubeflowClusterConfig: KubeflowClusterConfigNFS = <KubeflowClusterConfigNFS> this.kubeflowClusterConfig;
SparkSnail's avatar
SparkSnail committed
613
614
615
            volumeSpecMap.set('nniVolumes', [
            {
                name: 'nni-vol',
616
617
618
                nfs: {
                    server: `${nfsKubeflowClusterConfig.nfs.server}`,
                    path: `${nfsKubeflowClusterConfig.nfs.path}`
SparkSnail's avatar
SparkSnail committed
619
620
                }
            }])
621
622
        }

623
624
625
626
627
628
629
630
631
632
633
        return {
            replicas: replicaNumber,
            template: {
                metadata: {
                    creationTimestamp: null
                },
                spec: {
                    containers: [
                    {
                        // Kubeflow tensorflow operator requires that containers' name must be tensorflow
                        // TODO: change the name based on operator's type
634
                        name: this.operatorClient.containerName,
635
636
                        image: replicaImage,
                        args: ["sh", `${path.join(trialWorkingFolder, runScriptFile)}`],
SparkSnail's avatar
SparkSnail committed
637
638
639
                        volumeMounts: [
                        {
                            name: 'nni-vol',
640
641
642
643
644
                            mountPath: this.CONTAINER_MOUNT_PATH
                        }],
                        resources: podResources
                    }],
                    restartPolicy: 'ExitCode',
SparkSnail's avatar
SparkSnail committed
645
                    volumes: volumeSpecMap.get('nniVolumes')
646
647
648
649
650
651
652
653
654
655
656
657
                }
            }
        };
    }

    /**
     * Genereate run script for different roles(like worker or ps)
     * @param trialJobId trial job id
     * @param trialWorkingFolder working folder
     * @param command 
     * @param trialSequenceId sequence id
     */
658
659
    private generateRunScript(trialJobId: string, trialWorkingFolder: string, 
                command: string, trialSequenceId: string, roleType: DistTrainRole, gpuNum: number): string {
660
661
662
663
664
665
666
667
668
669
670
        const runScriptLines: string[] = [];

        runScriptLines.push('#!/bin/bash');
        runScriptLines.push('export NNI_PLATFORM=kubeflow');
        runScriptLines.push(`export NNI_SYS_DIR=$PWD/nni/${trialJobId}`);
        runScriptLines.push(`export NNI_OUTPUT_DIR=${path.join(trialWorkingFolder, 'output', `${roleType}_output`)}`);
        runScriptLines.push('export MULTI_PHASE=false');
        runScriptLines.push(`export NNI_TRIAL_JOB_ID=${trialJobId}`);
        runScriptLines.push(`export NNI_EXP_ID=${getExperimentId()}`);
        runScriptLines.push(`export NNI_CODE_DIR=${trialWorkingFolder}`);
        runScriptLines.push(`export NNI_TRIAL_SEQ_ID=${trialSequenceId}`);
671
        
672
673
674
        // Nvidia devcie plugin for K8S has a known issue that requesting zero GPUs allocates all GPUs
        // Refer https://github.com/NVIDIA/k8s-device-plugin/issues/61
        // So we have to explicitly set CUDA_VISIBLE_DEVICES to empty if user sets gpuNum to 0 in NNI config file
675
676
        if(gpuNum === 0) {
            runScriptLines.push(`export CUDA_VISIBLE_DEVICES=''`);
677
        }
678

679
        const nniManagerIp = this.nniManagerIpConfig?this.nniManagerIpConfig.nniManagerIp:getIPV4Address();
680
681
682
683
684
        runScriptLines.push('mkdir -p $NNI_SYS_DIR');
        runScriptLines.push('mkdir -p $NNI_OUTPUT_DIR');
        runScriptLines.push('cp -rT $NNI_CODE_DIR $NNI_SYS_DIR');
        runScriptLines.push('cd $NNI_SYS_DIR');
        runScriptLines.push('sh install_nni.sh # Check and install NNI pkg');
SparkSnail's avatar
SparkSnail committed
685
686
687
        runScriptLines.push(`python3 -m nni_trial_tool.trial_keeper --trial_command '${command}' `
        + `--nnimanager_ip '${nniManagerIp}' --nnimanager_port '${this.kubeflowRestServerPort}' `
        + `1>$NNI_OUTPUT_DIR/trialkeeper_stdout 2>$NNI_OUTPUT_DIR/trialkeeper_stderr`);
688
689

        return runScriptLines.join('\n');
690
691
    }

692
693
694
695
696
697
698
699
700
    private generateSequenceId(): number {
        if (this.nextTrialSequenceId === -1) {
            this.nextTrialSequenceId = getInitTrialSequenceId();
        }

        return this.nextTrialSequenceId++;
    }
}

QuanluZhang's avatar
QuanluZhang committed
701
export { KubeflowTrainingService }