openPaiEnvironmentService.ts 14.6 KB
Newer Older
1
2
3
4
5
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

'use strict';

6
import * as yaml from 'js-yaml';
7
8
9
import * as request from 'request';
import { Deferred } from 'ts-deferred';
import * as component from '../../../common/component';
10
import { ExperimentConfig, OpenpaiConfig, flattenConfig, toMegaBytes } from '../../../common/experimentConfig';
11
import { ExperimentStartupInfo } from '../../../common/experimentStartupInfo';
12
13
import { getLogger, Logger } from '../../../common/log';
import { PAIClusterConfig } from '../../pai/paiConfig';
SparkSnail's avatar
SparkSnail committed
14
import { NNIPAITrialConfig } from '../../pai/paiConfig';
15
import { EnvironmentInformation, EnvironmentService } from '../environment';
16
import { SharedStorageService } from '../sharedStorage';
17
import { MountedStorageService } from '../storages/mountedStorageService';
18

19
interface FlattenOpenpaiConfig extends ExperimentConfig, OpenpaiConfig { }
20
21
22
23
24
25
26

/**
 * Collector PAI jobs info from PAI cluster, and update pai job status locally
 */
@component.Singleton
export class OpenPaiEnvironmentService extends EnvironmentService {

liuzhe-lz's avatar
liuzhe-lz committed
27
    private readonly log: Logger = getLogger('OpenPaiEnvironmentService');
28
    private paiClusterConfig: PAIClusterConfig | undefined;
SparkSnail's avatar
SparkSnail committed
29
    private paiTrialConfig: NNIPAITrialConfig | undefined;
30
31
    private paiToken: string;
    private protocol: string;
32
    private experimentId: string;
33
    private config: FlattenOpenpaiConfig;
34

35
    constructor(config: ExperimentConfig, info: ExperimentStartupInfo) {
36
        super();
37
        this.experimentId = info.experimentId;
38
39
40
41
42
43
44
45
        this.config = flattenConfig(config, 'openpai');
        this.paiToken = this.config.token;
        this.protocol = this.config.host.toLowerCase().startsWith('https://') ? 'https' : 'http';

        // FIXME: only support MountedStorageService
        const storageService = new MountedStorageService();
        const remoteRoot = storageService.joinPath(this.config.localStorageMountPoint, this.experimentId);
        storageService.initialize(this.config.localStorageMountPoint, remoteRoot);
46
47
    }

48
49
50
51
    public get environmentMaintenceLoopInterval(): number {
        return 5000;
    }

52
53
54
55
    public get hasStorageService(): boolean {
        return true;
    }

56
57
58
59
    public get getName(): string {
        return 'pai';
    }

60
61
62
63
64
65
66
67
    public async refreshEnvironmentsStatus(environments: EnvironmentInformation[]): Promise<void> {
        const deferred: Deferred<void> = new Deferred<void>();

        if (this.paiToken === undefined) {
            throw new Error('PAI token is not initialized');
        }

        const getJobInfoRequest: request.Options = {
68
            uri: `${this.config.host}/rest-server/api/v2/jobs?username=${this.config.username}`,
69
70
71
72
73
74
75
76
77
            method: 'GET',
            json: true,
            headers: {
                'Content-Type': 'application/json',
                Authorization: `Bearer ${this.paiToken}`
            }
        };

        request(getJobInfoRequest, async (error: any, response: request.Response, body: any) => {
78
            // Status code 200 for success
79
            if ((error !== undefined && error !== null) || response.statusCode >= 400) {
80
                const errorMessage: string = (error !== undefined && error !== null) ? error.message :
liuzhe-lz's avatar
liuzhe-lz committed
81
                    `OpenPAI: get environment list from PAI Cluster failed!, http code:${response.statusCode}, http body:' ${JSON.stringify(body)}`;
82
83
                this.log.error(`${errorMessage}`);
                deferred.reject(errorMessage);
84
85
86
87
88
89
90
            } else {
                const jobInfos = new Map<string, any>();
                body.forEach((jobInfo: any) => {
                    jobInfos.set(jobInfo.name, jobInfo);
                });

                environments.forEach((environment) => {
91
92
                    if (jobInfos.has(environment.envId)) {
                        const jobResponse = jobInfos.get(environment.envId);
93
94
95
96
97
98
                        if (jobResponse && jobResponse.state) {
                            const oldEnvironmentStatus = environment.status;
                            switch (jobResponse.state) {
                                case 'RUNNING':
                                case 'WAITING':
                                case 'SUCCEEDED':
99
100
                                    environment.setStatus(jobResponse.state);
                                    break;
101
                                case 'FAILED':
102
                                    environment.setStatus(jobResponse.state);
103
                                    deferred.reject(`OpenPAI: job ${environment.envId} is failed!`);
104
105
106
                                    break;
                                case 'STOPPED':
                                case 'STOPPING':
107
                                    environment.setStatus('USER_CANCELED');
108
109
                                    break;
                                default:
110
111
                                    this.log.error(`OpenPAI: job ${environment.envId} returns unknown state ${jobResponse.state}.`);
                                    environment.setStatus('UNKNOWN');
112
113
                            }
                            if (oldEnvironmentStatus !== environment.status) {
114
                                this.log.debug(`OpenPAI: job ${environment.envId} change status ${oldEnvironmentStatus} to ${environment.status} due to job is ${jobResponse.state}.`)
115
116
                            }
                        } else {
liuzhe-lz's avatar
liuzhe-lz committed
117
                            this.log.error(`OpenPAI: job ${environment.envId} has no state returned. body:`, jobResponse);
118
119
120
121
                            // some error happens, and mark this environment
                            environment.status = 'FAILED';
                        }
                    } else {
122
                        this.log.error(`OpenPAI job ${environment.envId} is not found in job list.`);
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
                        environment.status = 'UNKNOWN';
                    }
                });
                deferred.resolve();
            }
        });
        return deferred.promise;
    }

    public async startEnvironment(environment: EnvironmentInformation): Promise<void> {
        const deferred: Deferred<void> = new Deferred<void>();

        if (this.paiToken === undefined) {
            throw new Error('PAI token is not initialized');
        }
        // Step 1. Prepare PAI job configuration
139
140
141
        let environmentRoot: string;
        if (environment.useSharedStorage) {
            environmentRoot = component.get<SharedStorageService>(SharedStorageService).remoteWorkingRoot;
J-shang's avatar
J-shang committed
142
            environment.command = `${component.get<SharedStorageService>(SharedStorageService).remoteMountCommand.replace(/echo -e /g, `echo `).replace(/echo /g, `echo -e `)} && cd ${environmentRoot} && ${environment.command}`;
143
        } else {
144
            environmentRoot = `${this.config.containerStorageMountPoint}/${this.experimentId}`;
145
146
            environment.command = `cd ${environmentRoot} && ${environment.command}`;
        }
SparkSnail's avatar
SparkSnail committed
147
        environment.runnerWorkingFolder = `${environmentRoot}/envs/${environment.id}`;
148
149
150
        environment.trackingUrl = `${this.config.host}/job-detail.html?username=${this.config.username}&jobName=${environment.envId}`;
        environment.useActiveGpu = false;  // does openpai supports these?
        environment.maxTrialNumberPerGpu = 1;
151
152
153
154
155
156
157

        // Step 2. Generate Job Configuration in yaml format
        const paiJobConfig = this.generateJobConfigInYamlFormat(environment);
        this.log.debug(`generated paiJobConfig: ${paiJobConfig}`);

        // Step 3. Submit PAI job via Rest call
        const submitJobRequest: request.Options = {
158
            uri: `${this.config.host}/rest-server/api/v2/jobs`,
159
160
            method: 'POST',
            body: paiJobConfig,
161
            followAllRedirects: true,
162
163
164
165
166
167
            headers: {
                'Content-Type': 'text/yaml',
                Authorization: `Bearer ${this.paiToken}`
            }
        };
        request(submitJobRequest, (error, response, body) => {
168
            // Status code 202 for success, refer https://github.com/microsoft/pai/blob/master/src/rest-server/docs/swagger.yaml
169
170
            if ((error !== undefined && error !== null) || response.statusCode >= 400) {
                const errorMessage: string = (error !== undefined && error !== null) ? error.message :
171
                    `start environment ${environment.envId} failed, http code:${response.statusCode}, http body: ${body}`;
172
173
174

                this.log.error(errorMessage);
                environment.status = 'FAILED';
175
                deferred.reject(errorMessage);
176
177
178
179
180
181
182
183
184
185
            }
            deferred.resolve();
        });

        return deferred.promise;
    }

    public async stopEnvironment(environment: EnvironmentInformation): Promise<void> {
        const deferred: Deferred<void> = new Deferred<void>();

186
187
188
        if (environment.isAlive === false) {
            return Promise.resolve();
        }
189
190
191
192
193
        if (this.paiToken === undefined) {
            return Promise.reject(Error('PAI token is not initialized'));
        }

        const stopJobRequest: request.Options = {
194
            uri: `${this.config.host}/rest-server/api/v2/jobs/${this.config.username}~${environment.envId}/executionType`,
195
196
197
198
199
200
201
202
203
204
            method: 'PUT',
            json: true,
            body: { value: 'STOP' },
            time: true,
            headers: {
                'Content-Type': 'application/json',
                Authorization: `Bearer ${this.paiToken}`
            }
        };

205
        this.log.debug(`stopping OpenPAI environment ${environment.envId}, ${stopJobRequest.uri}`);
206
207
208
209

        try {
            request(stopJobRequest, (error, response, _body) => {
                try {
210
                    // Status code 202 for success.
211
                    if ((error !== undefined && error !== null) || (response && response.statusCode >= 400)) {
212
213
214
                        const errorMessage: string = (error !== undefined && error !== null) ? error.message :
                            `OpenPAI: stop job ${environment.envId} failed, http code:${response.statusCode}, http body: ${_body}`;
                        this.log.error(`${errorMessage}`);
215
216
217
                        deferred.reject((error !== undefined && error !== null) ? error :
                            `Stop trial failed, http code: ${response.statusCode}`);
                    } else {
218
                        this.log.info(`OpenPAI job ${environment.envId} stopped.`);
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
                    }
                    deferred.resolve();
                } catch (error) {
                    this.log.error(`OpenPAI error when inner stopping environment ${error}`);
                    deferred.reject(error);
                }
            });
        } catch (error) {
            this.log.error(`OpenPAI error when stopping environment ${error}`);
            return Promise.reject(error);
        }

        return deferred.promise;
    }

    private generateJobConfigInYamlFormat(environment: EnvironmentInformation): any {
235
        const jobName = environment.envId;
236
237

        let nniJobConfig: any = undefined;
238
239
        if (this.config.openpaiConfig !== undefined) {
            nniJobConfig = JSON.parse(JSON.stringify(this.config.openpaiConfig)); //Trick for deep clone in Typescript
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
            nniJobConfig.name = jobName;
            if (nniJobConfig.taskRoles) {

                environment.nodeCount = 0;
                // count instance
                for (const taskRoleName in nniJobConfig.taskRoles) {
                    const taskRole = nniJobConfig.taskRoles[taskRoleName];
                    let instanceCount = 1;
                    if (taskRole.instances) {
                        instanceCount = taskRole.instances;
                    }
                    environment.nodeCount += instanceCount;
                }

                // Each taskRole will generate new command in NNI's command format
                // Each command will be formatted to NNI style
                for (const taskRoleName in nniJobConfig.taskRoles) {
                    const taskRole = nniJobConfig.taskRoles[taskRoleName];
                    // replace ' to '\''
                    const joinedCommand = taskRole.commands.join(" && ").replace("'", "'\\''").trim();
                    const nniTrialCommand = `${environment.command} --node_count ${environment.nodeCount} --trial_command '${joinedCommand}'`;
                    this.log.debug(`replace command ${taskRole.commands} to ${[nniTrialCommand]}`);
                    taskRole.commands = [nniTrialCommand];
                }
            }

        } else {
            nniJobConfig = {
                protocolVersion: 2,
                name: jobName,
                type: 'job',
                jobRetryCount: 0,
                prerequisites: [
                    {
                        type: 'dockerimage',
275
                        uri: this.config.dockerImage,
276
277
278
279
280
281
282
283
284
285
286
287
288
                        name: 'docker_image_0'
                    }
                ],
                taskRoles: {
                    taskrole: {
                        instances: 1,
                        completion: {
                            minFailedInstances: 1,
                            minSucceededInstances: -1
                        },
                        taskRetryCount: 0,
                        dockerImage: 'docker_image_0',
                        resourcePerInstance: {
289
290
291
                            gpu: this.config.trialGpuNumber,
                            cpu: this.config.trialCpuNumber,
                            memoryMB: toMegaBytes(this.config.trialMemorySize)
292
293
294
295
296
297
298
299
300
                        },
                        commands: [
                            environment.command
                        ]
                    }
                },
                extras: {
                    'storages': [
                        {
301
                            name: this.config.storageConfigName
302
303
304
305
306
                        }
                    ],
                    submitFrom: 'submit-job-v2'
                }
            }
307
            if (this.config.deprecated && this.config.deprecated.virtualCluster) {
308
                nniJobConfig.defaults = {
309
                    virtualCluster: this.config.deprecated.virtualCluster
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
                }
            }
        }
        return yaml.safeDump(nniJobConfig);
    }

    protected formatPAIHost(host: string): string {
        // If users' host start with 'http://' or 'https://', use the original host,
        // or format to 'http//${host}'
        if (host.startsWith('http://')) {
            this.protocol = 'http';
            return host.replace('http://', '');
        } else if (host.startsWith('https://')) {
            this.protocol = 'https';
            return host.replace('https://', '');
        } else {
            return host;
        }
    }
}