paiK8STrainingService.ts 13.9 KB
Newer Older
1
2
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
3
4
5
6
7
8
9
10
11
12
13
14

'use strict';

import * as fs from 'fs';
import * as path from 'path';
// tslint:disable-next-line:no-implicit-dependencies
import * as request from 'request';
import * as component from '../../../common/component';

import { Deferred } from 'ts-deferred';
import { String } from 'typescript-string-operations';
import {
15
16
    HyperParameters, NNIManagerIpConfig,
    TrialJobApplicationForm, TrialJobDetail
17
} from '../../../common/trainingService';
18
19
20
21
import {
    generateParamFileName,
    getIPV4Address, getVersion, uniqueString
} from '../../../common/utils';
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import { CONTAINER_INSTALL_NNI_SHELL_FORMAT } from '../../common/containerJobData';
import { TrialConfigMetadataKey } from '../../common/trialConfigMetadataKey';
import { execMkdir, validateCodeDir, execCopydir } from '../../common/util';
import { PAI_K8S_TRIAL_COMMAND_FORMAT } from './paiK8SData';
import { NNIPAIK8STrialConfig } from './paiK8SConfig';
import { PAITrainingService } from '../paiTrainingService';
import { PAIClusterConfig, PAITrialJobDetail } from '../paiConfig';
import { PAIJobRestServer } from '../paiJobRestServer';

const yaml = require('js-yaml');

/**
 * Training Service implementation for OpenPAI (Open Platform for AI)
 * Refer https://github.com/Microsoft/pai for more info about OpenPAI
 */
@component.Singleton
class PAIK8STrainingService extends PAITrainingService {
    protected paiTrialConfig: NNIPAIK8STrialConfig | undefined;
40
    private copyExpCodeDirPromise?: Promise<void>;
SparkSnail's avatar
SparkSnail committed
41
    private paiJobConfig: any;
42
    private nniVersion: string | undefined;
43
44
    constructor() {
        super();
45

46
47
48
49
    }

    public async setClusterMetadata(key: string, value: string): Promise<void> {
        switch (key) {
50
51
52
53
            case TrialConfigMetadataKey.NNI_MANAGER_IP:
                this.nniManagerIpConfig = <NNIManagerIpConfig>JSON.parse(value);
                break;

54
55
56
            case TrialConfigMetadataKey.PAI_CLUSTER_CONFIG:
                this.paiJobRestServer = new PAIJobRestServer(component.get(PAIK8STrainingService));
                this.paiClusterConfig = <PAIClusterConfig>JSON.parse(value);
SparkSnail's avatar
SparkSnail committed
57
                this.paiClusterConfig.host = this.formatPAIHost(this.paiClusterConfig.host);
58
                this.paiToken = this.paiClusterConfig.token;
59
60
                break;

61
            case TrialConfigMetadataKey.TRIAL_CONFIG: {
62
63
64
65
66
67
68
                if (this.paiClusterConfig === undefined) {
                    this.log.error('pai cluster config is not initialized');
                    break;
                }
                this.paiTrialConfig = <NNIPAIK8STrialConfig>JSON.parse(value);
                // Validate to make sure codeDir doesn't have too many files
                await validateCodeDir(this.paiTrialConfig.codeDir);
69
70
71
                const nniManagerNFSExpCodeDir = path.join(this.paiTrialConfig.nniManagerNFSMountPath, this.experimentId, 'nni-code');
                await execMkdir(nniManagerNFSExpCodeDir);
                //Copy codeDir files to local working folder
72
                this.log.info(`Starting copy codeDir data from ${this.paiTrialConfig.codeDir} to ${nniManagerNFSExpCodeDir}`);
73
                this.copyExpCodeDirPromise = execCopydir(this.paiTrialConfig.codeDir, nniManagerNFSExpCodeDir);
74
75
76
                if (this.paiTrialConfig.paiConfigPath) {
                    this.paiJobConfig = yaml.safeLoad(fs.readFileSync(this.paiTrialConfig.paiConfigPath, 'utf8'));
                }
77
                break;
78
            }
79
80
            case TrialConfigMetadataKey.VERSION_CHECK:
                this.versionCheck = (value === 'true' || value === 'True');
81
                this.nniVersion = this.versionCheck ? await getVersion() : '';
82
83
84
85
86
87
88
89
90
91
92
93
                break;
            case TrialConfigMetadataKey.LOG_COLLECTION:
                this.logCollection = value;
                break;
            case TrialConfigMetadataKey.MULTI_PHASE:
                this.isMultiPhase = (value === 'true' || value === 'True');
                break;
            default:
                //Reject for unknown keys
                this.log.error(`Uknown key: ${key}`);
        }
    }
94

95
    // update trial parameters for multi-phase
96
    public async updateTrialJob(trialJobId: string, form: TrialJobApplicationForm): Promise<TrialJobDetail> {
97
        const trialJobDetail: PAITrialJobDetail | undefined = this.trialJobsMap.get(trialJobId);
98
99
100
        if (trialJobDetail === undefined) {
            throw new Error(`updateTrialJob failed: ${trialJobId} not found`);
        }
101
102
103
        // Write file content ( parameter.cfg ) to working folders
        await this.writeParameterFile(trialJobDetail.logPath, form.hyperParameters);

104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
        return trialJobDetail;
    }

    public async submitTrialJob(form: TrialJobApplicationForm): Promise<TrialJobDetail> {
        if (this.paiClusterConfig === undefined) {
            throw new Error(`paiClusterConfig not initialized!`);
        }
        if (this.paiTrialConfig === undefined) {
            throw new Error(`paiTrialConfig not initialized!`);
        }

        this.log.info(`submitTrialJob: form: ${JSON.stringify(form)}`);

        const trialJobId: string = uniqueString(5);
        //TODO: use HDFS working folder instead
        const trialWorkingFolder: string = path.join(this.expRootDir, 'trials', trialJobId);
        const paiJobName: string = `nni_exp_${this.experimentId}_trial_${trialJobId}`;
        const logPath: string = path.join(this.paiTrialConfig.nniManagerNFSMountPath, this.experimentId, trialJobId);
SparkSnail's avatar
SparkSnail committed
122
        const paiJobDetailUrl: string = `${this.protocol}://${this.paiClusterConfig.host}/job-detail.html?username=${this.paiClusterConfig.userName}&jobName=${paiJobName}`;
123
124
125
126
127
128
129
        const trialJobDetail: PAITrialJobDetail = new PAITrialJobDetail(
            trialJobId,
            'WAITING',
            paiJobName,
            Date.now(),
            trialWorkingFolder,
            form,
SparkSnail's avatar
SparkSnail committed
130
131
            logPath,
            paiJobDetailUrl);
132
133
134
135
136
137

        this.trialJobsMap.set(trialJobId, trialJobDetail);
        this.jobQueue.push(trialJobId);

        return trialJobDetail;
    }
138
139

    private generateNNITrialCommand(trialJobDetail: PAITrialJobDetail, command: string): string {
140
141
142
        if (this.paiTrialConfig === undefined) {
            throw new Error('trial config is not initialized');
        }
SparkSnail's avatar
SparkSnail committed
143
        const containerNFSExpCodeDir = `${this.paiTrialConfig.containerNFSMountPath}/${this.experimentId}/nni-code`;
144
145
146
147
148
149
150
151
152
153
        const containerWorkingDir: string = `${this.paiTrialConfig.containerNFSMountPath}/${this.experimentId}/${trialJobDetail.id}`;
        const nniManagerIp: string = this.nniManagerIpConfig ? this.nniManagerIpConfig.nniManagerIp : getIPV4Address();
        const nniPaiTrialCommand: string = String.Format(
            PAI_K8S_TRIAL_COMMAND_FORMAT,
            `${containerWorkingDir}`,
            `${containerWorkingDir}/nnioutput`,
            trialJobDetail.id,
            this.experimentId,
            trialJobDetail.form.sequenceId,
            this.isMultiPhase,
154
            containerNFSExpCodeDir,
155
156
157
158
159
160
            command,
            nniManagerIp,
            this.paiRestServerPort,
            this.nniVersion,
            this.logCollection
        )
161
            .replace(/\r\n|\n|\r/gm, '');
162
163
164
165
166

        return nniPaiTrialCommand;

    }

167
    private generateJobConfigInYamlFormat(trialJobDetail: PAITrialJobDetail): any {
168
169
        if (this.paiTrialConfig === undefined) {
            throw new Error('trial config is not initialized');
170
        }
171
        const jobName = `nni_exp_${this.experimentId}_trial_${trialJobDetail.id}`
172

173
        let nniJobConfig: any = undefined;
SparkSnail's avatar
SparkSnail committed
174
        if (this.paiTrialConfig.paiConfigPath) {
SparkSnail's avatar
SparkSnail committed
175
            nniJobConfig = JSON.parse(JSON.stringify(this.paiJobConfig)); //Trick for deep clone in Typescript
176
177
178
            nniJobConfig.name = jobName;
            // Each taskRole will generate new command in NNI's command format
            // Each command will be formatted to NNI style
179
            for (const taskRoleIndex in nniJobConfig.taskRoles) {
180
                const commands = nniJobConfig.taskRoles[taskRoleIndex].commands
181
                const nniTrialCommand = this.generateNNITrialCommand(trialJobDetail, commands.join(" && ").replace(/(["'$`\\])/g, '\\$1'));
182
                nniJobConfig.taskRoles[taskRoleIndex].commands = [nniTrialCommand]
SparkSnail's avatar
SparkSnail committed
183
            }
184

SparkSnail's avatar
SparkSnail committed
185
        } else {
186
            nniJobConfig = {
187
                protocolVersion: 2,
188
189
190
191
                name: jobName,
                type: 'job',
                jobRetryCount: 0,
                prerequisites: [
192
193
194
195
196
                    {
                        type: 'dockerimage',
                        uri: this.paiTrialConfig.image,
                        name: 'docker_image_0'
                    }
197
198
199
200
201
                ],
                taskRoles: {
                    taskrole: {
                        instances: 1,
                        completion: {
202
203
                            minFailedInstances: 1,
                            minSucceededInstances: -1
204
205
206
207
208
209
210
211
212
213
214
215
216
217
                        },
                        taskRetryCount: 0,
                        dockerImage: 'docker_image_0',
                        resourcePerInstance: {
                            gpu: this.paiTrialConfig.gpuNum,
                            cpu: this.paiTrialConfig.cpuNum,
                            memoryMB: this.paiTrialConfig.memoryMB
                        },
                        commands: [
                            this.generateNNITrialCommand(trialJobDetail, this.paiTrialConfig.command)
                        ]
                    }
                },
                extras: {
218
                    'storages': [
219
                        {
220
                            name: this.paiTrialConfig.paiStorageConfigName
221
222
223
224
225
226
227
228
229
230
                        }
                    ],
                    submitFrom: 'submit-job-v2'
                }
            }
            if (this.paiTrialConfig.virtualCluster) {
                nniJobConfig.defaults = {
                    virtualCluster: this.paiTrialConfig.virtualCluster
                }
            }
SparkSnail's avatar
SparkSnail committed
231
        }
232
233
        return yaml.safeDump(nniJobConfig);
    }
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256

    protected async submitTrialJobToPAI(trialJobId: string): Promise<boolean> {
        const deferred: Deferred<boolean> = new Deferred<boolean>();
        const trialJobDetail: PAITrialJobDetail | undefined = this.trialJobsMap.get(trialJobId);

        if (trialJobDetail === undefined) {
            throw new Error(`Failed to find PAITrialJobDetail for job ${trialJobId}`);
        }

        if (this.paiClusterConfig === undefined) {
            throw new Error('PAI Cluster config is not initialized');
        }
        if (this.paiTrialConfig === undefined) {
            throw new Error('trial config is not initialized');
        }
        if (this.paiToken === undefined) {
            throw new Error('PAI token is not initialized');
        }

        if (this.paiJobRestServer === undefined) {
            throw new Error('paiJobRestServer is not initialized');
        }

257
258
259
        // Make sure experiment code files is copied from local to NFS
        if (this.copyExpCodeDirPromise !== undefined) {
            await this.copyExpCodeDirPromise;
260
261
262
263
            this.log.info(`Copy codeDir data finished.`);
            // All trials share same destination NFS code folder, only copy codeDir once for an experiment.
            // After copy data finished, set copyExpCodeDirPromise be undefined to avoid log content duplicated.
            this.copyExpCodeDirPromise = undefined;
264
265
        }

266
267
268
269
        this.paiRestServerPort = this.paiJobRestServer.clusterRestServerPort;

        // Step 1. Prepare PAI job configuration
        //create trial local working folder locally.
270
        await execMkdir(trialJobDetail.logPath);
271
        // Write NNI installation file to local files
272
        await fs.promises.writeFile(path.join(trialJobDetail.logPath, 'install_nni.sh'), CONTAINER_INSTALL_NNI_SHELL_FORMAT, { encoding: 'utf8' });
273
274
275

        // Write file content ( parameter.cfg ) to local working folders
        if (trialJobDetail.form !== undefined) {
276
            await this.writeParameterFile(trialJobDetail.logPath, trialJobDetail.form.hyperParameters);
277
278
        }

279
280
        //Generate Job Configuration in yaml format
        const paiJobConfig = this.generateJobConfigInYamlFormat(trialJobDetail);
SparkSnail's avatar
SparkSnail committed
281
        this.log.debug(paiJobConfig);
282
        // Step 2. Submit PAI job via Rest call
283
284
        // Refer https://github.com/Microsoft/pai/blob/master/docs/rest-server/API.md for more detail about PAI Rest API
        const submitJobRequest: request.Options = {
285
            uri: `${this.protocol}://${this.paiClusterConfig.host}/rest-server/api/v2/jobs`,
286
287
            method: 'POST',
            body: paiJobConfig,
288
            followAllRedirects: true,
289
290
291
292
293
294
            headers: {
                'Content-Type': 'text/yaml',
                Authorization: `Bearer ${this.paiToken}`
            }
        };
        request(submitJobRequest, (error: Error, response: request.Response, body: any) => {
295
            // If submit success, will get status code 202. refer: https://github.com/microsoft/pai/blob/master/src/rest-server/docs/swagger.yaml
296
297
298
299
300
            if ((error !== undefined && error !== null) || response.statusCode >= 400) {
                const errorMessage: string = (error !== undefined && error !== null) ? error.message :
                    `Submit trial ${trialJobId} failed, http code:${response.statusCode}, http body: ${body}`;
                this.log.error(errorMessage);
                trialJobDetail.status = 'FAILED';
301
                deferred.reject(errorMessage);
302
303
304
305
306
307
308
309
            } else {
                trialJobDetail.submitTime = Date.now();
            }
            deferred.resolve(true);
        });

        return deferred.promise;
    }
310
311
312
313
314

    private async writeParameterFile(directory: string, hyperParameters: HyperParameters): Promise<void> {
        const filepath: string = path.join(directory, generateParamFileName(hyperParameters));
        await fs.promises.writeFile(filepath, hyperParameters.value, { encoding: 'utf8' });
    }
315
316
317
}

export { PAIK8STrainingService };