kubernetesTrainingService.ts 15.7 KB
Newer Older
liuzhe-lz's avatar
liuzhe-lz committed
1
2
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
3

4
'use strict';
5
6
7
8

import * as cpp from 'child-process-promise';
import * as path from 'path';

9
import * as azureStorage from 'azure-storage';
10
11
12
13
14
15
import {EventEmitter} from 'events';
import {Base64} from 'js-base64';
import {String} from 'typescript-string-operations';
import {getExperimentId} from '../../common/experimentStartupInfo';
import {getLogger, Logger} from '../../common/log';
import {MethodNotImplementedError} from '../../common/errors';
16
import {
Yuge Zhang's avatar
Yuge Zhang committed
17
    NNIManagerIpConfig, TrialJobDetail, TrialJobMetric
18
} from '../../common/trainingService';
19
20
21
22
23
24
import {delay, getExperimentRootDir, getIPV4Address, getJobCancelStatus, getVersion, uniqueString} from '../../common/utils';
import {AzureStorageClientUtility} from './azureStorageClientUtils';
import {GeneralK8sClient, KubernetesCRDClient} from './kubernetesApiClient';
import {KubernetesClusterConfig} from './kubernetesConfig';
import {kubernetesScriptFormat, KubernetesTrialJobDetail} from './kubernetesData';
import {KubernetesJobRestServer} from './kubernetesJobRestServer';
25

chicm-ms's avatar
chicm-ms committed
26
const fs = require('fs');
27

28
29
30
/**
 * Training Service implementation for Kubernetes
 */
31
32
33
34
35
abstract class KubernetesTrainingService {
    protected readonly NNI_KUBERNETES_TRIAL_LABEL: string = 'nni-kubernetes-trial';
    protected readonly log!: Logger;
    protected readonly metricsEmitter: EventEmitter;
    protected readonly trialJobsMap: Map<string, KubernetesTrialJobDetail>;
36
    //  experiment root dir in NFS
37
    protected readonly trialLocalTempFolder: string;
38
    protected stopping: boolean = false;
chicm-ms's avatar
chicm-ms committed
39
    protected experimentId!: string;
40
41
42
43
44
45
46
47
48
49
50
    protected kubernetesRestServerPort?: number;
    protected readonly CONTAINER_MOUNT_PATH: string;
    protected azureStorageClient?: azureStorage.FileService;
    protected azureStorageShare?: string;
    protected azureStorageSecretName?: string;
    protected azureStorageAccountName?: string;
    protected nniManagerIpConfig?: NNIManagerIpConfig;
    protected readonly genericK8sClient: GeneralK8sClient;
    protected kubernetesCRDClient?: KubernetesCRDClient;
    protected kubernetesJobRestServer?: KubernetesJobRestServer;
    protected kubernetesClusterConfig?: KubernetesClusterConfig;
51
    protected versionCheck: boolean = true;
SparkSnail's avatar
SparkSnail committed
52
    protected logCollection: string;
53
54
    protected copyExpCodeDirPromise?: Promise<string>;
    protected expContainerCodeFolder: string;
55

56
    constructor() {
liuzhe-lz's avatar
liuzhe-lz committed
57
        this.log = getLogger('KubernetesTrainingService');
58
59
        this.metricsEmitter = new EventEmitter();
        this.trialJobsMap = new Map<string, KubernetesTrialJobDetail>();
60
        this.trialLocalTempFolder = path.join(getExperimentRootDir(), 'trials-nfs-tmp');
61
        this.experimentId = getExperimentId();
62
        this.CONTAINER_MOUNT_PATH = '/tmp/mount';
63
        this.expContainerCodeFolder = path.join(this.CONTAINER_MOUNT_PATH, 'nni', this.experimentId, 'nni-code');
64
        this.genericK8sClient = new GeneralK8sClient();
SparkSnail's avatar
SparkSnail committed
65
        this.logCollection = 'none';
66
67
    }

68
    public generatePodResource(memory: number, cpuNum: number, gpuNum: number): any {
69
        const resources: any = {
70
            memory: `${memory}Mi`,
71
            cpu: `${cpuNum}`
72
        };
73
74
75
76
77
78

        if (gpuNum !== 0) {
            resources['nvidia.com/gpu'] = `${gpuNum}`;
        }

        return resources;
79
    }
80

81
    public async listTrialJobs(): Promise<TrialJobDetail[]> {
82
        const jobs: TrialJobDetail[] = [];
83

84
        for (const key of this.trialJobsMap.keys()) {
85
            jobs.push(await this.getTrialJob(key));
86
        }
87
88
89
90

        return Promise.resolve(jobs);
    }

91
    public async getTrialJob(trialJobId: string): Promise<TrialJobDetail> {
92
93
94

        const kubernetesTrialJob: TrialJobDetail | undefined = this.trialJobsMap.get(trialJobId);

95
96
        if (kubernetesTrialJob === undefined) {
            return Promise.reject(`trial job ${trialJobId} not found`);
97
        }
98
99
100
101

        return Promise.resolve(kubernetesTrialJob);
    }

Yuge Zhang's avatar
Yuge Zhang committed
102
    public async getTrialFile(_trialJobId: string, _filename: string): Promise<string | Buffer> {
103
104
105
        throw new MethodNotImplementedError();
    }

106
    public addTrialJobMetricListener(listener: (metric: TrialJobMetric) => void): void {
107
108
109
        this.metricsEmitter.on('metric', listener);
    }

110
    public removeTrialJobMetricListener(listener: (metric: TrialJobMetric) => void): void {
111
112
        this.metricsEmitter.off('metric', listener);
    }
113

114
115
116
117
    public get isMultiPhaseJobSupported(): boolean {
        return false;
    }

118
    public getClusterMetadata(_key: string): Promise<string> {
119
120
121
        return Promise.resolve('');
    }

chicm-ms's avatar
chicm-ms committed
122
    public get MetricsEmitter(): EventEmitter {
123
124
125
        return this.metricsEmitter;
    }

126
    public async cancelTrialJob(trialJobId: string, isEarlyStopped: boolean = false): Promise<void> {
127
        const trialJobDetail: KubernetesTrialJobDetail | undefined = this.trialJobsMap.get(trialJobId);
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
        if (trialJobDetail === undefined) {
            const errorMessage: string = `CancelTrialJob: trial job id ${trialJobId} not found`;
            this.log.error(errorMessage);

            return Promise.reject(errorMessage);
        }
        if (this.kubernetesCRDClient === undefined) {
            const errorMessage: string = `CancelTrialJob: trial job id ${trialJobId} failed because operatorClient is undefined`;
            this.log.error(errorMessage);

            return Promise.reject(errorMessage);
        }

        try {
            await this.kubernetesCRDClient.deleteKubernetesJob(new Map(
                [
                    ['app', this.NNI_KUBERNETES_TRIAL_LABEL],
                    ['expId', getExperimentId()],
                    ['trialId', trialJobId]
                ]
            ));
        } catch (err) {
            const errorMessage: string = `Delete trial ${trialJobId} failed: ${err}`;
            this.log.error(errorMessage);

            return Promise.reject(errorMessage);
        }

        trialJobDetail.endTime = Date.now();
        trialJobDetail.status = getJobCancelStatus(isEarlyStopped);

        return Promise.resolve();
    }

    public async cleanUp(): Promise<void> {
        this.stopping = true;

        // First, cancel all running kubernetes jobs
        for (const [trialJobId, kubernetesTrialJob] of this.trialJobsMap) {
            if (['RUNNING', 'WAITING', 'UNKNOWN'].includes(kubernetesTrialJob.status)) {
                try {
                    await this.cancelTrialJob(trialJobId);
                } catch (error) {
171
                    // DONT throw error during cleanup
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
                }
                kubernetesTrialJob.status = 'SYS_CANCELED';
            }
        }

        // Delete all kubernetes jobs whose expId label is current experiment id
        try {
            if (this.kubernetesCRDClient !== undefined) {
                await this.kubernetesCRDClient.deleteKubernetesJob(new Map(
                    [
                        ['app', this.NNI_KUBERNETES_TRIAL_LABEL],
                        ['expId', getExperimentId()]
                    ]
                ));
            }
        } catch (error) {
            this.log.error(`Delete kubernetes job with label: app=${this.NNI_KUBERNETES_TRIAL_LABEL},\
            expId=${getExperimentId()} failed, error is ${error}`);
        }

        // Unmount NFS
        try {
194
            await cpp.exec(`sudo umount ${this.trialLocalTempFolder}`);
195
        } catch (error) {
196
            this.log.error(`Unmount ${this.trialLocalTempFolder} failed, error is ${error}`);
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
        }

        // Stop kubernetes rest server
        if (this.kubernetesJobRestServer === undefined) {
            throw new Error('kubernetesJobRestServer not initialized!');
        }
        try {
            await this.kubernetesJobRestServer.stop();
            this.log.info('Kubernetes Training service rest server stopped successfully.');
        } catch (error) {
            this.log.error(`Kubernetes Training service rest server stopped failed, error: ${error.message}`);

            return Promise.reject(error);
        }

        return Promise.resolve();
    }

chicm-ms's avatar
chicm-ms committed
215
    protected async createAzureStorage(vaultName: string, valutKeyName: string): Promise<void> {
216
        try {
217
218
            const result: any = await cpp.exec(`az keyvault secret show --name ${valutKeyName} --vault-name ${vaultName}`);
            if (result.stderr) {
219
220
                const errorMessage: string = result.stderr;
                this.log.error(errorMessage);
221

222
223
                return Promise.reject(errorMessage);
            }
224
225
226
227
            const storageAccountKey: any = JSON.parse(result.stdout).value;
            if (this.azureStorageAccountName === undefined) {
                throw new Error('azureStorageAccountName not initialized!');
            }
228
            //create storage client
229
            this.azureStorageClient = azureStorage.createFileService(this.azureStorageAccountName, storageAccountKey);
230
231
            await AzureStorageClientUtility.createShare(this.azureStorageClient, this.azureStorageShare);
            //create sotrage secret
232
            this.azureStorageSecretName = String.Format('nni-secret-{0}', uniqueString(8)
233
234
235
                .toLowerCase());

            const namespace = this.genericK8sClient.getNamespace ? this.genericK8sClient.getNamespace : "default"
236
237
238
239
            await this.genericK8sClient.createSecret(
                {
                    apiVersion: 'v1',
                    kind: 'Secret',
240
                    metadata: {
241
                        name: this.azureStorageSecretName,
242
                        namespace: namespace,
243
244
245
246
247
248
249
                        labels: {
                            app: this.NNI_KUBERNETES_TRIAL_LABEL,
                            expId: getExperimentId()
                        }
                    },
                    type: 'Opaque',
                    data: {
250
251
                        azurestorageaccountname: Base64.encode(this.azureStorageAccountName),
                        azurestorageaccountkey: Base64.encode(storageAccountKey)
252
253
254
                    }
                }
            );
255
        } catch (error) {
256
            this.log.error(error);
257

258
259
            return Promise.reject(error);
        }
260

261
262
        return Promise.resolve();
    }
263

264
    /**
265
266
267
     * Genereate run script for different roles(like worker or ps)
     * @param trialJobId trial job id
     * @param trialWorkingFolder working folder
268
     * @param command command
269
270
     * @param trialSequenceId sequence id
     */
271
    protected async generateRunScript(platform: string, trialJobId: string, trialWorkingFolder: string,
272
        command: string, trialSequenceId: string, roleName: string, gpuNum: number): Promise<string> {
273
        let nvidiaScript: string = '';
274
275
276
        // Nvidia devcie plugin for K8S has a known issue that requesting zero GPUs allocates all GPUs
        // Refer https://github.com/NVIDIA/k8s-device-plugin/issues/61
        // So we have to explicitly set CUDA_VISIBLE_DEVICES to empty if user sets gpuNum to 0 in NNI config file
277
        if (gpuNum === 0) {
278
            nvidiaScript = 'export CUDA_VISIBLE_DEVICES=';
279
        }
280
281
        const nniManagerIp: string = this.nniManagerIpConfig ? this.nniManagerIpConfig.nniManagerIp : getIPV4Address();
        const version: string = this.versionCheck ? await getVersion() : '';
282
        const runScript: string = String.Format(
283
            kubernetesScriptFormat,
284
            platform,
285
            trialWorkingFolder,
286
287
288
            path.join(trialWorkingFolder, 'output', `${roleName}_output`),
            trialJobId,
            getExperimentId(),
289
            this.expContainerCodeFolder,
290
            trialSequenceId,
291
            nvidiaScript,
292
293
            command,
            nniManagerIp,
294
            this.kubernetesRestServerPort,
SparkSnail's avatar
SparkSnail committed
295
296
            version,
            this.logCollection
297
        );
298

299
        return Promise.resolve(runScript);
300
301
    }
    protected async createNFSStorage(nfsServer: string, nfsPath: string): Promise<void> {
302
        await cpp.exec(`mkdir -p ${this.trialLocalTempFolder}`);
303
        try {
304
            await cpp.exec(`sudo mount ${nfsServer}:${nfsPath} ${this.trialLocalTempFolder}`);
305
        } catch (error) {
306
            const mountError: string = `Mount NFS ${nfsServer}:${nfsPath} to ${this.trialLocalTempFolder} failed, error is ${error}`;
307
308
            this.log.error(mountError);

309
            return Promise.reject(mountError);
310
311
312
313
        }

        return Promise.resolve();
    }
314
315
316
317
318
319
320
321
322
323
324
325
326
    protected async createPVCStorage(pvcPath: string): Promise<void> {
        try {
            await cpp.exec(`mkdir -p ${pvcPath}`);
            await cpp.exec(`sudo ln -s ${pvcPath} ${this.trialLocalTempFolder}`);
        } catch (error) {
            const linkError: string = `Linking ${pvcPath} to ${this.trialLocalTempFolder} failed, error is ${error}`;
            this.log.error(linkError);

            return Promise.reject(linkError);
        }

        return Promise.resolve();
    }
327
328

    protected async createRegistrySecret(filePath: string | undefined): Promise<string | undefined> {
329
        if (filePath === undefined || filePath === '') {
330
331
            return undefined;
        }
chicm-ms's avatar
chicm-ms committed
332
333
        const body = fs.readFileSync(filePath).toString('base64');
        const registrySecretName = String.Format('nni-secret-{0}', uniqueString(8)
334
335
            .toLowerCase());
        const namespace = this.genericK8sClient.getNamespace ? this.genericK8sClient.getNamespace : "default"
336
337
338
339
340
341
        await this.genericK8sClient.createSecret(
            {
                apiVersion: 'v1',
                kind: 'Secret',
                metadata: {
                    name: registrySecretName,
342
                    namespace: namespace,
343
344
345
346
347
348
349
350
351
352
353
354
355
                    labels: {
                        app: this.NNI_KUBERNETES_TRIAL_LABEL,
                        expId: getExperimentId()
                    }
                },
                type: 'kubernetes.io/dockerconfigjson',
                data: {
                    '.dockerconfigjson': body
                }
            }
        );
        return registrySecretName;
    }
356

357
358
359
360
361
362
363
    /**
     * upload local directory to azureStorage
     * @param srcDirectory the source directory of local folder
     * @param destDirectory the target directory in azure
     * @param uploadRetryCount the retry time when upload failed
     */
    protected async uploadFolderToAzureStorage(srcDirectory: string, destDirectory: string, uploadRetryCount: number | undefined): Promise<string> {
364
365
366
367
        if (this.azureStorageClient === undefined) {
            throw new Error('azureStorageClient is not initialized');
        }
        let retryCount: number = 1;
368
        if (uploadRetryCount) {
369
370
            retryCount = uploadRetryCount;
        }
371
372
        let uploadSuccess: boolean = false;
        let folderUriInAzure = '';
373
374
        try {
            do {
375
376
                uploadSuccess = await AzureStorageClientUtility.uploadDirectory(
                    this.azureStorageClient,
377
                    `${destDirectory}`,
378
379
380
                    this.azureStorageShare,
                    `${srcDirectory}`);
                if (!uploadSuccess) {
381
382
383
                    //wait for 5 seconds to re-upload files
                    await delay(5000);
                    this.log.info('Upload failed, Retry: upload files to azure-storage');
384
385
386
                } else {
                    folderUriInAzure = `https://${this.azureStorageAccountName}.file.core.windows.net/${this.azureStorageShare}/${destDirectory}`;
                    break;
387
388
389
390
391
                }
            } while (retryCount-- >= 0)
        } catch (error) {
            this.log.error(error);
            //return a empty url when got error
392
            return Promise.resolve('');
393
        }
394
        return Promise.resolve(folderUriInAzure);
395
    }
J-shang's avatar
J-shang committed
396
397
398
399
400
401
402
403

    public getTrialOutputLocalPath(_trialJobId: string): Promise<string> {
        throw new MethodNotImplementedError();
    }

    public fetchTrialOutput(_trialJobId: string, _subpath: string): Promise<void> {
        throw new MethodNotImplementedError();
    }
404
}
405
export {KubernetesTrainingService};