openPaiEnvironmentService.ts 14.7 KB
Newer Older
1
2
3
4
5
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

'use strict';

6
import * as yaml from 'js-yaml';
7
import * as request from 'request';
SparkSnail's avatar
SparkSnail committed
8
import { Container, Scope } from 'typescript-ioc';
9
10
import { Deferred } from 'ts-deferred';
import * as component from '../../../common/component';
11
import { ExperimentConfig, OpenpaiConfig, flattenConfig, toMegaBytes } from '../../../common/experimentConfig';
12
import { ExperimentStartupInfo } from '../../../common/experimentStartupInfo';
13
14
import { getLogger, Logger } from '../../../common/log';
import { PAIClusterConfig } from '../../pai/paiConfig';
SparkSnail's avatar
SparkSnail committed
15
import { NNIPAITrialConfig } from '../../pai/paiConfig';
16
import { EnvironmentInformation, EnvironmentService } from '../environment';
17
import { SharedStorageService } from '../sharedStorage';
18
import { MountedStorageService } from '../storages/mountedStorageService';
SparkSnail's avatar
SparkSnail committed
19
import { StorageService } from '../storageService';
20

21
interface FlattenOpenpaiConfig extends ExperimentConfig, OpenpaiConfig { }
22
23
24
25
26
27
28

/**
 * Collector PAI jobs info from PAI cluster, and update pai job status locally
 */
@component.Singleton
export class OpenPaiEnvironmentService extends EnvironmentService {

liuzhe-lz's avatar
liuzhe-lz committed
29
    private readonly log: Logger = getLogger('OpenPaiEnvironmentService');
30
    private paiClusterConfig: PAIClusterConfig | undefined;
SparkSnail's avatar
SparkSnail committed
31
    private paiTrialConfig: NNIPAITrialConfig | undefined;
32
33
    private paiToken: string;
    private protocol: string;
34
    private experimentId: string;
35
    private config: FlattenOpenpaiConfig;
36

37
    constructor(config: ExperimentConfig, info: ExperimentStartupInfo) {
38
        super();
39
        this.experimentId = info.experimentId;
40
41
42
        this.config = flattenConfig(config, 'openpai');
        this.paiToken = this.config.token;
        this.protocol = this.config.host.toLowerCase().startsWith('https://') ? 'https' : 'http';
SparkSnail's avatar
SparkSnail committed
43
44
45
46
        Container.bind(StorageService)
          .to(MountedStorageService)
          .scope(Scope.Singleton);
        const storageService = component.get<StorageService>(StorageService)
47
48
        const remoteRoot = storageService.joinPath(this.config.localStorageMountPoint, this.experimentId);
        storageService.initialize(this.config.localStorageMountPoint, remoteRoot);
49
50
    }

51
52
53
54
    public get environmentMaintenceLoopInterval(): number {
        return 5000;
    }

55
56
57
58
    public get hasStorageService(): boolean {
        return true;
    }

59
60
61
62
    public get getName(): string {
        return 'pai';
    }

63
64
65
66
67
68
69
70
    public async refreshEnvironmentsStatus(environments: EnvironmentInformation[]): Promise<void> {
        const deferred: Deferred<void> = new Deferred<void>();

        if (this.paiToken === undefined) {
            throw new Error('PAI token is not initialized');
        }

        const getJobInfoRequest: request.Options = {
71
            uri: `${this.config.host}/rest-server/api/v2/jobs?username=${this.config.username}`,
72
73
74
75
76
77
78
79
80
            method: 'GET',
            json: true,
            headers: {
                'Content-Type': 'application/json',
                Authorization: `Bearer ${this.paiToken}`
            }
        };

        request(getJobInfoRequest, async (error: any, response: request.Response, body: any) => {
81
            // Status code 200 for success
82
            if ((error !== undefined && error !== null) || response.statusCode >= 400) {
83
                const errorMessage: string = (error !== undefined && error !== null) ? error.message :
liuzhe-lz's avatar
liuzhe-lz committed
84
                    `OpenPAI: get environment list from PAI Cluster failed!, http code:${response.statusCode}, http body:' ${JSON.stringify(body)}`;
85
86
                this.log.error(`${errorMessage}`);
                deferred.reject(errorMessage);
87
88
89
90
91
92
93
            } else {
                const jobInfos = new Map<string, any>();
                body.forEach((jobInfo: any) => {
                    jobInfos.set(jobInfo.name, jobInfo);
                });

                environments.forEach((environment) => {
94
95
                    if (jobInfos.has(environment.envId)) {
                        const jobResponse = jobInfos.get(environment.envId);
96
97
98
99
100
101
                        if (jobResponse && jobResponse.state) {
                            const oldEnvironmentStatus = environment.status;
                            switch (jobResponse.state) {
                                case 'RUNNING':
                                case 'WAITING':
                                case 'SUCCEEDED':
102
103
                                    environment.setStatus(jobResponse.state);
                                    break;
104
                                case 'FAILED':
105
                                    environment.setStatus(jobResponse.state);
106
                                    deferred.reject(`OpenPAI: job ${environment.envId} is failed!`);
107
108
109
                                    break;
                                case 'STOPPED':
                                case 'STOPPING':
110
                                    environment.setStatus('USER_CANCELED');
111
112
                                    break;
                                default:
113
114
                                    this.log.error(`OpenPAI: job ${environment.envId} returns unknown state ${jobResponse.state}.`);
                                    environment.setStatus('UNKNOWN');
115
116
                            }
                            if (oldEnvironmentStatus !== environment.status) {
117
                                this.log.debug(`OpenPAI: job ${environment.envId} change status ${oldEnvironmentStatus} to ${environment.status} due to job is ${jobResponse.state}.`)
118
119
                            }
                        } else {
liuzhe-lz's avatar
liuzhe-lz committed
120
                            this.log.error(`OpenPAI: job ${environment.envId} has no state returned. body:`, jobResponse);
121
122
123
124
                            // some error happens, and mark this environment
                            environment.status = 'FAILED';
                        }
                    } else {
125
                        this.log.error(`OpenPAI job ${environment.envId} is not found in job list.`);
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
                        environment.status = 'UNKNOWN';
                    }
                });
                deferred.resolve();
            }
        });
        return deferred.promise;
    }

    public async startEnvironment(environment: EnvironmentInformation): Promise<void> {
        const deferred: Deferred<void> = new Deferred<void>();

        if (this.paiToken === undefined) {
            throw new Error('PAI token is not initialized');
        }
        // Step 1. Prepare PAI job configuration
142
143
144
        let environmentRoot: string;
        if (environment.useSharedStorage) {
            environmentRoot = component.get<SharedStorageService>(SharedStorageService).remoteWorkingRoot;
J-shang's avatar
J-shang committed
145
            environment.command = `${component.get<SharedStorageService>(SharedStorageService).remoteMountCommand.replace(/echo -e /g, `echo `).replace(/echo /g, `echo -e `)} && cd ${environmentRoot} && ${environment.command}`;
146
        } else {
147
            environmentRoot = `${this.config.containerStorageMountPoint}/${this.experimentId}`;
148
149
            environment.command = `cd ${environmentRoot} && ${environment.command}`;
        }
SparkSnail's avatar
SparkSnail committed
150
        environment.runnerWorkingFolder = `${environmentRoot}/envs/${environment.id}`;
151
152
153
        environment.trackingUrl = `${this.config.host}/job-detail.html?username=${this.config.username}&jobName=${environment.envId}`;
        environment.useActiveGpu = false;  // does openpai supports these?
        environment.maxTrialNumberPerGpu = 1;
154
155
156
157
158
159
160

        // Step 2. Generate Job Configuration in yaml format
        const paiJobConfig = this.generateJobConfigInYamlFormat(environment);
        this.log.debug(`generated paiJobConfig: ${paiJobConfig}`);

        // Step 3. Submit PAI job via Rest call
        const submitJobRequest: request.Options = {
161
            uri: `${this.config.host}/rest-server/api/v2/jobs`,
162
163
            method: 'POST',
            body: paiJobConfig,
164
            followAllRedirects: true,
165
166
167
168
169
170
            headers: {
                'Content-Type': 'text/yaml',
                Authorization: `Bearer ${this.paiToken}`
            }
        };
        request(submitJobRequest, (error, response, body) => {
171
            // Status code 202 for success, refer https://github.com/microsoft/pai/blob/master/src/rest-server/docs/swagger.yaml
172
173
            if ((error !== undefined && error !== null) || response.statusCode >= 400) {
                const errorMessage: string = (error !== undefined && error !== null) ? error.message :
174
                    `start environment ${environment.envId} failed, http code:${response.statusCode}, http body: ${body}`;
175
176
177

                this.log.error(errorMessage);
                environment.status = 'FAILED';
178
                deferred.reject(errorMessage);
179
180
181
182
183
184
185
186
187
188
            }
            deferred.resolve();
        });

        return deferred.promise;
    }

    public async stopEnvironment(environment: EnvironmentInformation): Promise<void> {
        const deferred: Deferred<void> = new Deferred<void>();

189
190
191
        if (environment.isAlive === false) {
            return Promise.resolve();
        }
192
193
194
195
196
        if (this.paiToken === undefined) {
            return Promise.reject(Error('PAI token is not initialized'));
        }

        const stopJobRequest: request.Options = {
197
            uri: `${this.config.host}/rest-server/api/v2/jobs/${this.config.username}~${environment.envId}/executionType`,
198
199
200
201
202
203
204
205
206
207
            method: 'PUT',
            json: true,
            body: { value: 'STOP' },
            time: true,
            headers: {
                'Content-Type': 'application/json',
                Authorization: `Bearer ${this.paiToken}`
            }
        };

208
        this.log.debug(`stopping OpenPAI environment ${environment.envId}, ${stopJobRequest.uri}`);
209
210
211
212

        try {
            request(stopJobRequest, (error, response, _body) => {
                try {
213
                    // Status code 202 for success.
214
                    if ((error !== undefined && error !== null) || (response && response.statusCode >= 400)) {
215
216
217
                        const errorMessage: string = (error !== undefined && error !== null) ? error.message :
                            `OpenPAI: stop job ${environment.envId} failed, http code:${response.statusCode}, http body: ${_body}`;
                        this.log.error(`${errorMessage}`);
218
219
220
                        deferred.reject((error !== undefined && error !== null) ? error :
                            `Stop trial failed, http code: ${response.statusCode}`);
                    } else {
221
                        this.log.info(`OpenPAI job ${environment.envId} stopped.`);
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
                    }
                    deferred.resolve();
                } catch (error) {
                    this.log.error(`OpenPAI error when inner stopping environment ${error}`);
                    deferred.reject(error);
                }
            });
        } catch (error) {
            this.log.error(`OpenPAI error when stopping environment ${error}`);
            return Promise.reject(error);
        }

        return deferred.promise;
    }

    private generateJobConfigInYamlFormat(environment: EnvironmentInformation): any {
238
        const jobName = environment.envId;
239
240

        let nniJobConfig: any = undefined;
241
242
        if (this.config.openpaiConfig !== undefined) {
            nniJobConfig = JSON.parse(JSON.stringify(this.config.openpaiConfig)); //Trick for deep clone in Typescript
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
            nniJobConfig.name = jobName;
            if (nniJobConfig.taskRoles) {

                environment.nodeCount = 0;
                // count instance
                for (const taskRoleName in nniJobConfig.taskRoles) {
                    const taskRole = nniJobConfig.taskRoles[taskRoleName];
                    let instanceCount = 1;
                    if (taskRole.instances) {
                        instanceCount = taskRole.instances;
                    }
                    environment.nodeCount += instanceCount;
                }

                // Each taskRole will generate new command in NNI's command format
                // Each command will be formatted to NNI style
                for (const taskRoleName in nniJobConfig.taskRoles) {
                    const taskRole = nniJobConfig.taskRoles[taskRoleName];
                    // replace ' to '\''
                    const joinedCommand = taskRole.commands.join(" && ").replace("'", "'\\''").trim();
                    const nniTrialCommand = `${environment.command} --node_count ${environment.nodeCount} --trial_command '${joinedCommand}'`;
                    this.log.debug(`replace command ${taskRole.commands} to ${[nniTrialCommand]}`);
                    taskRole.commands = [nniTrialCommand];
                }
            }

        } else {
            nniJobConfig = {
                protocolVersion: 2,
                name: jobName,
                type: 'job',
                jobRetryCount: 0,
                prerequisites: [
                    {
                        type: 'dockerimage',
278
                        uri: this.config.dockerImage,
279
280
281
282
283
284
285
286
287
288
289
290
291
                        name: 'docker_image_0'
                    }
                ],
                taskRoles: {
                    taskrole: {
                        instances: 1,
                        completion: {
                            minFailedInstances: 1,
                            minSucceededInstances: -1
                        },
                        taskRetryCount: 0,
                        dockerImage: 'docker_image_0',
                        resourcePerInstance: {
SparkSnail's avatar
SparkSnail committed
292
                            gpu: this.config.trialGpuNumber === undefined? 0: this.config.trialGpuNumber,
293
294
                            cpu: this.config.trialCpuNumber,
                            memoryMB: toMegaBytes(this.config.trialMemorySize)
295
296
297
298
299
300
301
302
303
                        },
                        commands: [
                            environment.command
                        ]
                    }
                },
                extras: {
                    'storages': [
                        {
304
                            name: this.config.storageConfigName
305
306
307
308
309
                        }
                    ],
                    submitFrom: 'submit-job-v2'
                }
            }
SparkSnail's avatar
SparkSnail committed
310
            if (this.config.virtualCluster) {
311
                nniJobConfig.defaults = {
SparkSnail's avatar
SparkSnail committed
312
                    virtualCluster: this.config.virtualCluster
313
314
315
                }
            }
        }
316
        return yaml.dump(nniJobConfig);
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
    }

    protected formatPAIHost(host: string): string {
        // If users' host start with 'http://' or 'https://', use the original host,
        // or format to 'http//${host}'
        if (host.startsWith('http://')) {
            this.protocol = 'http';
            return host.replace('http://', '');
        } else if (host.startsWith('https://')) {
            this.protocol = 'https';
            return host.replace('https://', '');
        } else {
            return host;
        }
    }
}