openPaiEnvironmentService.ts 14.5 KB
Newer Older
1
2
3
4
5
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

'use strict';

6
import * as yaml from 'js-yaml';
7
8
9
import * as request from 'request';
import { Deferred } from 'ts-deferred';
import * as component from '../../../common/component';
10
import { ExperimentConfig, OpenpaiConfig, flattenConfig, toMegaBytes } from '../../../common/experimentConfig';
11
12
import { getLogger, Logger } from '../../../common/log';
import { PAIClusterConfig } from '../../pai/paiConfig';
SparkSnail's avatar
SparkSnail committed
13
import { NNIPAITrialConfig } from '../../pai/paiConfig';
14
import { EnvironmentInformation, EnvironmentService } from '../environment';
15
import { SharedStorageService } from '../sharedStorage';
16
import { MountedStorageService } from '../storages/mountedStorageService';
17

18
interface FlattenOpenpaiConfig extends ExperimentConfig, OpenpaiConfig { }
19
20
21
22
23
24
25

/**
 * Collector PAI jobs info from PAI cluster, and update pai job status locally
 */
@component.Singleton
export class OpenPaiEnvironmentService extends EnvironmentService {

liuzhe-lz's avatar
liuzhe-lz committed
26
    private readonly log: Logger = getLogger('OpenPaiEnvironmentService');
27
    private paiClusterConfig: PAIClusterConfig | undefined;
SparkSnail's avatar
SparkSnail committed
28
    private paiTrialConfig: NNIPAITrialConfig | undefined;
29
30
    private paiToken: string;
    private protocol: string;
31
    private experimentId: string;
32
    private config: FlattenOpenpaiConfig;
33

34
    constructor(_experimentRootDir: string, experimentId: string, config: ExperimentConfig) {
35
        super();
36
        this.experimentId = experimentId;
37
38
39
40
41
42
43
44
        this.config = flattenConfig(config, 'openpai');
        this.paiToken = this.config.token;
        this.protocol = this.config.host.toLowerCase().startsWith('https://') ? 'https' : 'http';

        // FIXME: only support MountedStorageService
        const storageService = new MountedStorageService();
        const remoteRoot = storageService.joinPath(this.config.localStorageMountPoint, this.experimentId);
        storageService.initialize(this.config.localStorageMountPoint, remoteRoot);
45
46
    }

47
48
49
50
    public get environmentMaintenceLoopInterval(): number {
        return 5000;
    }

51
52
53
54
    public get hasStorageService(): boolean {
        return true;
    }

55
56
57
58
    public get getName(): string {
        return 'pai';
    }

59
60
61
62
63
64
65
66
    public async refreshEnvironmentsStatus(environments: EnvironmentInformation[]): Promise<void> {
        const deferred: Deferred<void> = new Deferred<void>();

        if (this.paiToken === undefined) {
            throw new Error('PAI token is not initialized');
        }

        const getJobInfoRequest: request.Options = {
67
            uri: `${this.config.host}/rest-server/api/v2/jobs?username=${this.config.username}`,
68
69
70
71
72
73
74
75
76
            method: 'GET',
            json: true,
            headers: {
                'Content-Type': 'application/json',
                Authorization: `Bearer ${this.paiToken}`
            }
        };

        request(getJobInfoRequest, async (error: any, response: request.Response, body: any) => {
77
            // Status code 200 for success
78
            if ((error !== undefined && error !== null) || response.statusCode >= 400) {
79
                const errorMessage: string = (error !== undefined && error !== null) ? error.message :
liuzhe-lz's avatar
liuzhe-lz committed
80
                    `OpenPAI: get environment list from PAI Cluster failed!, http code:${response.statusCode}, http body:' ${JSON.stringify(body)}`;
81
82
                this.log.error(`${errorMessage}`);
                deferred.reject(errorMessage);
83
84
85
86
87
88
89
            } else {
                const jobInfos = new Map<string, any>();
                body.forEach((jobInfo: any) => {
                    jobInfos.set(jobInfo.name, jobInfo);
                });

                environments.forEach((environment) => {
90
91
                    if (jobInfos.has(environment.envId)) {
                        const jobResponse = jobInfos.get(environment.envId);
92
93
94
95
96
97
                        if (jobResponse && jobResponse.state) {
                            const oldEnvironmentStatus = environment.status;
                            switch (jobResponse.state) {
                                case 'RUNNING':
                                case 'WAITING':
                                case 'SUCCEEDED':
98
99
                                    environment.setStatus(jobResponse.state);
                                    break;
100
                                case 'FAILED':
101
                                    environment.setStatus(jobResponse.state);
102
                                    deferred.reject(`OpenPAI: job ${environment.envId} is failed!`);
103
104
105
                                    break;
                                case 'STOPPED':
                                case 'STOPPING':
106
                                    environment.setStatus('USER_CANCELED');
107
108
                                    break;
                                default:
109
110
                                    this.log.error(`OpenPAI: job ${environment.envId} returns unknown state ${jobResponse.state}.`);
                                    environment.setStatus('UNKNOWN');
111
112
                            }
                            if (oldEnvironmentStatus !== environment.status) {
113
                                this.log.debug(`OpenPAI: job ${environment.envId} change status ${oldEnvironmentStatus} to ${environment.status} due to job is ${jobResponse.state}.`)
114
115
                            }
                        } else {
liuzhe-lz's avatar
liuzhe-lz committed
116
                            this.log.error(`OpenPAI: job ${environment.envId} has no state returned. body:`, jobResponse);
117
118
119
120
                            // some error happens, and mark this environment
                            environment.status = 'FAILED';
                        }
                    } else {
121
                        this.log.error(`OpenPAI job ${environment.envId} is not found in job list.`);
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
                        environment.status = 'UNKNOWN';
                    }
                });
                deferred.resolve();
            }
        });
        return deferred.promise;
    }

    public async startEnvironment(environment: EnvironmentInformation): Promise<void> {
        const deferred: Deferred<void> = new Deferred<void>();

        if (this.paiToken === undefined) {
            throw new Error('PAI token is not initialized');
        }
        // Step 1. Prepare PAI job configuration
138
139
140
        let environmentRoot: string;
        if (environment.useSharedStorage) {
            environmentRoot = component.get<SharedStorageService>(SharedStorageService).remoteWorkingRoot;
J-shang's avatar
J-shang committed
141
            environment.command = `${component.get<SharedStorageService>(SharedStorageService).remoteMountCommand.replace(/echo -e /g, `echo `).replace(/echo /g, `echo -e `)} && cd ${environmentRoot} && ${environment.command}`;
142
        } else {
143
            environmentRoot = `${this.config.containerStorageMountPoint}/${this.experimentId}`;
144
145
            environment.command = `cd ${environmentRoot} && ${environment.command}`;
        }
SparkSnail's avatar
SparkSnail committed
146
        environment.runnerWorkingFolder = `${environmentRoot}/envs/${environment.id}`;
147
148
149
        environment.trackingUrl = `${this.config.host}/job-detail.html?username=${this.config.username}&jobName=${environment.envId}`;
        environment.useActiveGpu = false;  // does openpai supports these?
        environment.maxTrialNumberPerGpu = 1;
150
151
152
153
154
155
156

        // Step 2. Generate Job Configuration in yaml format
        const paiJobConfig = this.generateJobConfigInYamlFormat(environment);
        this.log.debug(`generated paiJobConfig: ${paiJobConfig}`);

        // Step 3. Submit PAI job via Rest call
        const submitJobRequest: request.Options = {
157
            uri: `${this.config.host}/rest-server/api/v2/jobs`,
158
159
            method: 'POST',
            body: paiJobConfig,
160
            followAllRedirects: true,
161
162
163
164
165
166
            headers: {
                'Content-Type': 'text/yaml',
                Authorization: `Bearer ${this.paiToken}`
            }
        };
        request(submitJobRequest, (error, response, body) => {
167
            // Status code 202 for success, refer https://github.com/microsoft/pai/blob/master/src/rest-server/docs/swagger.yaml
168
169
            if ((error !== undefined && error !== null) || response.statusCode >= 400) {
                const errorMessage: string = (error !== undefined && error !== null) ? error.message :
170
                    `start environment ${environment.envId} failed, http code:${response.statusCode}, http body: ${body}`;
171
172
173

                this.log.error(errorMessage);
                environment.status = 'FAILED';
174
                deferred.reject(errorMessage);
175
176
177
178
179
180
181
182
183
184
            }
            deferred.resolve();
        });

        return deferred.promise;
    }

    public async stopEnvironment(environment: EnvironmentInformation): Promise<void> {
        const deferred: Deferred<void> = new Deferred<void>();

185
186
187
        if (environment.isAlive === false) {
            return Promise.resolve();
        }
188
189
190
191
192
        if (this.paiToken === undefined) {
            return Promise.reject(Error('PAI token is not initialized'));
        }

        const stopJobRequest: request.Options = {
193
            uri: `${this.config.host}/rest-server/api/v2/jobs/${this.config.username}~${environment.envId}/executionType`,
194
195
196
197
198
199
200
201
202
203
            method: 'PUT',
            json: true,
            body: { value: 'STOP' },
            time: true,
            headers: {
                'Content-Type': 'application/json',
                Authorization: `Bearer ${this.paiToken}`
            }
        };

204
        this.log.debug(`stopping OpenPAI environment ${environment.envId}, ${stopJobRequest.uri}`);
205
206
207
208

        try {
            request(stopJobRequest, (error, response, _body) => {
                try {
209
                    // Status code 202 for success.
210
                    if ((error !== undefined && error !== null) || (response && response.statusCode >= 400)) {
211
212
213
                        const errorMessage: string = (error !== undefined && error !== null) ? error.message :
                            `OpenPAI: stop job ${environment.envId} failed, http code:${response.statusCode}, http body: ${_body}`;
                        this.log.error(`${errorMessage}`);
214
215
216
                        deferred.reject((error !== undefined && error !== null) ? error :
                            `Stop trial failed, http code: ${response.statusCode}`);
                    } else {
217
                        this.log.info(`OpenPAI job ${environment.envId} stopped.`);
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
                    }
                    deferred.resolve();
                } catch (error) {
                    this.log.error(`OpenPAI error when inner stopping environment ${error}`);
                    deferred.reject(error);
                }
            });
        } catch (error) {
            this.log.error(`OpenPAI error when stopping environment ${error}`);
            return Promise.reject(error);
        }

        return deferred.promise;
    }

    private generateJobConfigInYamlFormat(environment: EnvironmentInformation): any {
234
        const jobName = environment.envId;
235
236

        let nniJobConfig: any = undefined;
237
238
        if (this.config.openpaiConfig !== undefined) {
            nniJobConfig = JSON.parse(JSON.stringify(this.config.openpaiConfig)); //Trick for deep clone in Typescript
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
            nniJobConfig.name = jobName;
            if (nniJobConfig.taskRoles) {

                environment.nodeCount = 0;
                // count instance
                for (const taskRoleName in nniJobConfig.taskRoles) {
                    const taskRole = nniJobConfig.taskRoles[taskRoleName];
                    let instanceCount = 1;
                    if (taskRole.instances) {
                        instanceCount = taskRole.instances;
                    }
                    environment.nodeCount += instanceCount;
                }

                // Each taskRole will generate new command in NNI's command format
                // Each command will be formatted to NNI style
                for (const taskRoleName in nniJobConfig.taskRoles) {
                    const taskRole = nniJobConfig.taskRoles[taskRoleName];
                    // replace ' to '\''
                    const joinedCommand = taskRole.commands.join(" && ").replace("'", "'\\''").trim();
                    const nniTrialCommand = `${environment.command} --node_count ${environment.nodeCount} --trial_command '${joinedCommand}'`;
                    this.log.debug(`replace command ${taskRole.commands} to ${[nniTrialCommand]}`);
                    taskRole.commands = [nniTrialCommand];
                }
            }

        } else {
            nniJobConfig = {
                protocolVersion: 2,
                name: jobName,
                type: 'job',
                jobRetryCount: 0,
                prerequisites: [
                    {
                        type: 'dockerimage',
274
                        uri: this.config.dockerImage,
275
276
277
278
279
280
281
282
283
284
285
286
287
                        name: 'docker_image_0'
                    }
                ],
                taskRoles: {
                    taskrole: {
                        instances: 1,
                        completion: {
                            minFailedInstances: 1,
                            minSucceededInstances: -1
                        },
                        taskRetryCount: 0,
                        dockerImage: 'docker_image_0',
                        resourcePerInstance: {
288
289
290
                            gpu: this.config.trialGpuNumber,
                            cpu: this.config.trialCpuNumber,
                            memoryMB: toMegaBytes(this.config.trialMemorySize)
291
292
293
294
295
296
297
298
299
                        },
                        commands: [
                            environment.command
                        ]
                    }
                },
                extras: {
                    'storages': [
                        {
300
                            name: this.config.storageConfigName
301
302
303
304
305
                        }
                    ],
                    submitFrom: 'submit-job-v2'
                }
            }
306
            if (this.config.deprecated && this.config.deprecated.virtualCluster) {
307
                nniJobConfig.defaults = {
308
                    virtualCluster: this.config.deprecated.virtualCluster
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
                }
            }
        }
        return yaml.safeDump(nniJobConfig);
    }

    protected formatPAIHost(host: string): string {
        // If users' host start with 'http://' or 'https://', use the original host,
        // or format to 'http//${host}'
        if (host.startsWith('http://')) {
            this.protocol = 'http';
            return host.replace('http://', '');
        } else if (host.startsWith('https://')) {
            this.protocol = 'https';
            return host.replace('https://', '');
        } else {
            return host;
        }
    }
}