paiTrainingService.ts 26.2 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
 * Copyright (c) Microsoft Corporation
 * All rights reserved.
 *
 * MIT License
 *
 * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
 * documentation files (the "Software"), to deal in the Software without restriction, including without limitation
 * the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and
 * to permit persons to whom the Software is furnished to do so, subject to the following conditions:
 * The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
 *
 * THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING
 * BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
 * DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
 */

20
'use strict';
21
22
23
24

import * as cpp from 'child-process-promise';
import * as fs from 'fs';
import * as path from 'path';
25
// tslint:disable-next-line:no-implicit-dependencies
26
import * as request from 'request';
27
import * as component from '../../common/component';
28
29

import { EventEmitter } from 'events';
30
31
import { Deferred } from 'ts-deferred';
import { String } from 'typescript-string-operations';
32
import { MethodNotImplementedError } from '../../common/errors';
33
import { getExperimentId, getInitTrialSequenceId } from '../../common/experimentStartupInfo';
34
35
import { getLogger, Logger } from '../../common/log';
import {
36
37
    JobApplicationForm, NNIManagerIpConfig, TrainingService,
    TrialJobApplicationForm, TrialJobDetail, TrialJobMetric
38
} from '../../common/trainingService';
39
import { delay, generateParamFileName,
40
    getExperimentRootDir, getIPV4Address, getVersion, uniqueString, unixPathJoin } from '../../common/utils';
41
42
import { CONTAINER_INSTALL_NNI_SHELL_FORMAT } from '../common/containerJobData';
import { TrialConfigMetadataKey } from '../common/trialConfigMetadataKey';
43
import { execMkdir, validateCodeDir } from '../common/util';
44
45
46
47
48
import { HDFSClientUtility } from './hdfsClientUtility';
import { NNIPAITrialConfig, PAIClusterConfig, PAIJobConfig, PAITaskRole } from './paiConfig';
import { PAI_LOG_PATH_FORMAT, PAI_OUTPUT_DIR_FORMAT, PAI_TRIAL_COMMAND_FORMAT, PAITrialJobDetail } from './paiData';
import { PAIJobInfoCollector } from './paiJobInfoCollector';
import { PAIJobRestServer } from './paiJobRestServer';
49

50
import * as WebHDFS from 'webhdfs';
51
52
53
54
55
56
57
58
59
60
61
62
63

/**
 * Training Service implementation for OpenPAI (Open Platform for AI)
 * Refer https://github.com/Microsoft/pai for more info about OpenPAI
 */
@component.Singleton
class PAITrainingService implements TrainingService {
    private readonly log!: Logger;
    private readonly metricsEmitter: EventEmitter;
    private readonly trialJobsMap: Map<string, PAITrialJobDetail>;
    private readonly expRootDir: string;
    private paiTrialConfig: NNIPAITrialConfig | undefined;
    private paiClusterConfig?: PAIClusterConfig;
64
    private readonly jobQueue: string[];
65
    private stopping: boolean = false;
66
    // tslint:disable-next-line:no-any
67
68
    private hdfsClient: any;
    private paiToken? : string;
69
    private paiTokenUpdateTime?: number;
70
71
    private readonly paiTokenUpdateInterval: number;
    private readonly experimentId! : string;
72
73
    private readonly paiJobCollector : PAIJobInfoCollector;
    private readonly hdfsDirPattern: string;
fishyds's avatar
fishyds committed
74
75
    private hdfsBaseDir: string | undefined;
    private hdfsOutputHost: string | undefined;
76
    private nextTrialSequenceId: number;
77
    private paiRestServerPort?: number;
78
    private nniManagerIpConfig?: NNIManagerIpConfig;
79
    private copyExpCodeDirPromise?: Promise<void>;
80
    private versionCheck: boolean = true;
SparkSnail's avatar
SparkSnail committed
81
    private logCollection: string;
82
83
84
85
86

    constructor() {
        this.log = getLogger();
        this.metricsEmitter = new EventEmitter();
        this.trialJobsMap = new Map<string, PAITrialJobDetail>();
87
        this.jobQueue = [];
88
89
        // Root dir on HDFS
        this.expRootDir = path.join('/nni', 'experiments', getExperimentId());
90
        this.experimentId = getExperimentId();
91
92
        this.paiJobCollector = new PAIJobInfoCollector(this.trialJobsMap);
        this.hdfsDirPattern = 'hdfs://(?<host>([0-9]{1,3}.){3}[0-9]{1,3})(:[0-9]{2,5})?(?<baseDir>/.*)?';
93
        this.nextTrialSequenceId = -1;
94
        this.paiTokenUpdateInterval = 7200000; //2hours
SparkSnail's avatar
SparkSnail committed
95
        this.logCollection = 'none';
chicm-ms's avatar
chicm-ms committed
96
        this.log.info('Construct OpenPAI training service.');
97
98
99
    }

    public async run(): Promise<void> {
chicm-ms's avatar
chicm-ms committed
100
        this.log.info('Run PAI training service.');
101
102
        const restServer: PAIJobRestServer = component.get(PAIJobRestServer);
        await restServer.start();
103
        restServer.setEnableVersionCheck = this.versionCheck;
104
        this.log.info(`PAI Training service rest server listening on: ${restServer.endPoint}`);
105
106
107
        await Promise.all([
            this.statusCheckingLoop(),
            this.submitJobLoop()]);
chicm-ms's avatar
chicm-ms committed
108
        this.log.info('PAI training service exit.');
109
110
111
112
    }

    public async listTrialJobs(): Promise<TrialJobDetail[]> {
        const jobs: TrialJobDetail[] = [];
113
114

        for (const [key, value] of this.trialJobsMap) {
115
116
117
            if (value.form.jobType === 'TRIAL') {
                jobs.push(await this.getTrialJob(key));
            }
118
        }
119
120
121
122

        return Promise.resolve(jobs);
    }

123
    public async getTrialJob(trialJobId: string): Promise<TrialJobDetail> {
124
        if (this.paiClusterConfig === undefined) {
125
126
127
128
129
            throw new Error('PAI Cluster config is not initialized');
        }

        const paiTrialJob: PAITrialJobDetail | undefined = this.trialJobsMap.get(trialJobId);

130
        if (paiTrialJob === undefined) {
131
132
            return Promise.reject(`trial job ${trialJobId} not found`);
        }
133
134
135
136

        return Promise.resolve(paiTrialJob);
    }

137
    public addTrialJobMetricListener(listener: (metric: TrialJobMetric) => void): void {
138
139
140
        this.metricsEmitter.on('metric', listener);
    }

141
    public removeTrialJobMetricListener(listener: (metric: TrialJobMetric) => void): void {
142
143
144
145
146
        this.metricsEmitter.off('metric', listener);
    }

    public async submitTrialJob(form: JobApplicationForm): Promise<TrialJobDetail> {
        const deferred : Deferred<PAITrialJobDetail> = new Deferred<PAITrialJobDetail>();
147
        if (this.hdfsBaseDir === undefined) {
fishyds's avatar
fishyds committed
148
149
150
            throw new Error('hdfsBaseDir is not initialized');
        }

151
152
153
        this.log.info(`submitTrialJob: form: ${JSON.stringify(form)}`);

        const trialJobId: string = uniqueString(5);
154
        const trialSequenceId: number = this.generateSequenceId();
155
156
        //TODO: use HDFS working folder instead
        const trialWorkingFolder: string = path.join(this.expRootDir, 'trials', trialJobId);
157
        const paiJobName: string = `nni_exp_${this.experimentId}_trial_${trialJobId}`;
158

fishyds's avatar
fishyds committed
159
        const hdfsOutputDir : string = path.join(this.hdfsBaseDir, this.experimentId, trialJobId);
160
161
        const hdfsLogPath : string = String.Format(
            PAI_LOG_PATH_FORMAT,
fishyds's avatar
fishyds committed
162
            this.hdfsOutputHost,
163
164
165
166
167
            hdfsOutputDir);

        const trialJobDetail: PAITrialJobDetail = new PAITrialJobDetail(
            trialJobId,
            'WAITING',
168
            paiJobName,
169
170
            Date.now(),
            trialWorkingFolder,
171
172
            form,
            trialSequenceId,
173
174
            hdfsLogPath);

175
176
177
        this.trialJobsMap.set(trialJobId, trialJobDetail);
        this.jobQueue.push(trialJobId);
        deferred.resolve(trialJobDetail);
178
179
180
181
182
183
184
185
186
187
188
189

        return deferred.promise;
    }

    public updateTrialJob(trialJobId: string, form: JobApplicationForm): Promise<TrialJobDetail> {
        throw new MethodNotImplementedError();
    }

    public get isMultiPhaseJobSupported(): boolean {
        return false;
    }

190
    // tslint:disable:no-http-string
QuanluZhang's avatar
QuanluZhang committed
191
    public cancelTrialJob(trialJobId: string, isEarlyStopped: boolean = false): Promise<void> {
192
193
        const trialJobDetail : PAITrialJobDetail | undefined =  this.trialJobsMap.get(trialJobId);
        const deferred : Deferred<void> = new Deferred<void>();
194
        if (trialJobDetail === undefined) {
195
            this.log.error(`cancelTrialJob: trial job id ${trialJobId} not found`);
196

197
198
199
            return Promise.reject();
        }

200
        if (this.paiClusterConfig === undefined) {
201
            throw new Error('PAI Cluster config is not initialized');
202
        }
203
        if (this.paiToken === undefined) {
204
205
206
207
            throw new Error('PAI token is not initialized');
        }

        const stopJobRequest: request.Options = {
208
209
            uri: `http://${this.paiClusterConfig.host}/rest-server/api/v1/user/${this.paiClusterConfig.userName}\
/jobs/${trialJobDetail.paiJobName}/executionType`,
210
211
            method: 'PUT',
            json: true,
212
            body: {value: 'STOP'},
213
            headers: {
214
215
                'Content-Type': 'application/json',
                Authorization: `Bearer ${this.paiToken}`
216
217
            }
        };
218
219
220
221

        // Set trialjobDetail's early stopped field, to mark the job's cancellation source
        trialJobDetail.isEarlyStopped = isEarlyStopped;

222
        // tslint:disable-next-line:no-any
223
        request(stopJobRequest, (error: Error, response: request.Response, body: any) => {
224
            if ((error !== undefined && error !== null) || response.statusCode >= 400) {
225
                this.log.error(`PAI Training service: stop trial ${trialJobId} to PAI Cluster failed!`);
226
227
                deferred.reject((error !== undefined && error !== null) ? error.message :
                 `Stop trial failed, http code: ${response.statusCode}`);
228
229
230
231
232
            } else {
                deferred.resolve();
            }
        });

233
        return deferred.promise;
234
235
    }

236
    // tslint:disable: no-unsafe-any no-any
237
    // tslint:disable-next-line:max-func-body-length
fishyds's avatar
fishyds committed
238
    public async setClusterMetadata(key: string, value: string): Promise<void> {
239
240
241
        const deferred : Deferred<void> = new Deferred<void>();

        switch (key) {
242
243
244
245
246
            case TrialConfigMetadataKey.NNI_MANAGER_IP:
                this.nniManagerIpConfig = <NNIManagerIpConfig>JSON.parse(value);
                deferred.resolve();
                break;

247
248
            case TrialConfigMetadataKey.PAI_CLUSTER_CONFIG:
                this.paiClusterConfig = <PAIClusterConfig>JSON.parse(value);
249

250
251
                this.hdfsClient = WebHDFS.createClient({
                    user: this.paiClusterConfig.userName,
252
253
                    // Refer PAI document for Pylon mapping https://github.com/Microsoft/pai/tree/master/docs/pylon
                    port: 80,
254
                    path: '/webhdfs/api/v1',
255
256
257
258
                    host: this.paiClusterConfig.host
                });

                // Get PAI authentication token
259
                await this.updatePaiToken();
260
                deferred.resolve();
261
                break;
262

263
            case TrialConfigMetadataKey.TRIAL_CONFIG:
264
                if (this.paiClusterConfig === undefined) {
265
                    this.log.error('pai cluster config is not initialized');
fishyds's avatar
fishyds committed
266
                    deferred.reject(new Error('pai cluster config is not initialized'));
267
268
269
270
                    break;
                }
                this.paiTrialConfig = <NNIPAITrialConfig>JSON.parse(value);
                //paiTrialConfig.outputDir could be null if it is not set in nnictl
271
                if (this.paiTrialConfig.outputDir === undefined || this.paiTrialConfig.outputDir === null) {
272
273
274
                    this.paiTrialConfig.outputDir = String.Format(
                        PAI_OUTPUT_DIR_FORMAT,
                        this.paiClusterConfig.host
275
276
                    )
                    .replace(/\r\n|\n|\r/gm, '');
277
                }
278

279
280
281
                // Validate to make sure codeDir doesn't have too many files
                try {
                    await validateCodeDir(this.paiTrialConfig.codeDir);
282
                } catch (error) {
283
284
285
286
287
                    this.log.error(error);
                    deferred.reject(new Error(error));
                    break;
                }

288
                const hdfsDirContent: any = this.paiTrialConfig.outputDir.match(this.hdfsDirPattern);
fishyds's avatar
fishyds committed
289

290
                if (hdfsDirContent === null) {
fishyds's avatar
fishyds committed
291
292
                    throw new Error('Trial outputDir format Error');
                }
293
                const groups: any = hdfsDirContent.groups;
294
                if (groups === undefined) {
fishyds's avatar
fishyds committed
295
296
                    throw new Error('Trial outputDir format Error');
                }
297
                this.hdfsOutputHost = groups.host;
298
                //TODO: choose to use /${username} as baseDir
299
300
                this.hdfsBaseDir = groups.baseDir;
                if (this.hdfsBaseDir === undefined) {
301
                    this.hdfsBaseDir = '/';
fishyds's avatar
fishyds committed
302
                }
303

304
                let dataOutputHdfsClient: any;
305
                if (this.paiClusterConfig.host === this.hdfsOutputHost && this.hdfsClient) {
306
                    dataOutputHdfsClient = this.hdfsClient;
307
308
309
310
311
312
313
                } else {
                    dataOutputHdfsClient = WebHDFS.createClient({
                        user: this.paiClusterConfig.userName,
                        port: 50070,
                        host: this.hdfsOutputHost
                    });
                }
fishyds's avatar
fishyds committed
314
315

                try {
316
317
                    const exist : boolean = await HDFSClientUtility.pathExists('/', dataOutputHdfsClient);
                    if (!exist) {
fishyds's avatar
fishyds committed
318
319
                        deferred.reject(new Error(`Please check hdfsOutputDir host!`));
                    }
320
                } catch (error) {
fishyds's avatar
fishyds committed
321
322
                    deferred.reject(new Error(`HDFS encounters problem, error is ${error}. Please check hdfsOutputDir host!`));
                }
323

324
                // Copy experiment files from local folder to HDFS
325
326
                this.copyExpCodeDirPromise = HDFSClientUtility.copyDirectoryToHdfs(
                    this.paiTrialConfig.codeDir,
327
                    HDFSClientUtility.getHdfsExpCodeDir(this.paiClusterConfig.userName),
328
329
                    this.hdfsClient
                );
fishyds's avatar
fishyds committed
330

331
332
                deferred.resolve();
                break;
333
334
335
            case TrialConfigMetadataKey.VERSION_CHECK:
                this.versionCheck = (value === 'true' || value === 'True');
                break;
SparkSnail's avatar
SparkSnail committed
336
337
338
            case TrialConfigMetadataKey.LOG_COLLECTION:
                this.logCollection = value;
                break;
339
340
341
342
343
            default:
                //Reject for unknown keys
                throw new Error(`Uknown key: ${key}`);
        }

344
        return deferred.promise;
345
    }
346
    // tslint:enable: no-unsafe-any
347
348
349
350
351

    public getClusterMetadata(key: string): Promise<string> {
        const deferred : Deferred<string> = new Deferred<string>();

        deferred.resolve();
352
353

        return deferred.promise;
354
355
356
    }

    public async cleanUp(): Promise<void> {
chicm-ms's avatar
chicm-ms committed
357
        this.log.info('Stopping PAI training service...');
358
359
360
361
362
363
364
365
366
        this.stopping = true;

        const deferred : Deferred<void> = new Deferred<void>();
        const restServer: PAIJobRestServer = component.get(PAIJobRestServer);
        try {
            await restServer.stop();
            deferred.resolve();
            this.log.info('PAI Training service rest server stopped successfully.');
        } catch (error) {
367
            // tslint:disable-next-line: no-unsafe-any
368
            this.log.error(`PAI Training service rest server stopped failed, error: ${error.message}`);
369
370
371
            deferred.reject(error);
        }

372
        return deferred.promise;
373
374
375
376
377
    }

    public get MetricsEmitter() : EventEmitter {
        return this.metricsEmitter;
    }
378

379
380
381
382
383
    // tslint:disable-next-line:max-func-body-length
    private async submitTrialJobToPAI(trialJobId: string): Promise<boolean> {
        const deferred : Deferred<boolean> = new Deferred<boolean>();
        const trialJobDetail: PAITrialJobDetail | undefined = this.trialJobsMap.get(trialJobId);

384
        if (trialJobDetail === undefined) {
385
386
387
            throw new Error(`Failed to find PAITrialJobDetail for job ${trialJobId}`);
        }

388
        if (this.paiClusterConfig === undefined) {
389
390
            throw new Error('PAI Cluster config is not initialized');
        }
391
        if (this.paiTrialConfig === undefined) {
392
393
            throw new Error('trial config is not initialized');
        }
394
        if (this.paiToken === undefined) {
395
396
397
            throw new Error('PAI token is not initialized');
        }

398
        if (this.hdfsBaseDir === undefined) {
399
400
401
            throw new Error('hdfsBaseDir is not initialized');
        }

402
        if (this.hdfsOutputHost === undefined) {
403
404
405
            throw new Error('hdfsOutputHost is not initialized');
        }

406
        if (this.paiRestServerPort === undefined) {
407
408
409
410
411
            const restServer: PAIJobRestServer = component.get(PAIJobRestServer);
            this.paiRestServerPort = restServer.clusterRestServerPort;
        }

        // Make sure experiment code files is copied from local to HDFS
412
        if (this.copyExpCodeDirPromise !== undefined) {
413
414
415
416
            await this.copyExpCodeDirPromise;
        }

        // Step 1. Prepare PAI job configuration
417
        const hdfsOutputDir : string = unixPathJoin(this.hdfsBaseDir, this.experimentId, trialJobId);
418
419
420
421
        const hdfsCodeDir: string = HDFSClientUtility.getHdfsTrialWorkDir(this.paiClusterConfig.userName, trialJobId);

        const trialLocalTempFolder: string = path.join(getExperimentRootDir(), 'trials-local', trialJobId);
        //create tmp trial working folder locally.
422
        await execMkdir(trialLocalTempFolder);
423
424
425
426
427
428
429

        const runScriptContent : string = CONTAINER_INSTALL_NNI_SHELL_FORMAT;
        // Write NNI installation file to local tmp files
        await fs.promises.writeFile(path.join(trialLocalTempFolder, 'install_nni.sh'), runScriptContent, { encoding: 'utf8' });

        // Write file content ( parameter.cfg ) to local tmp folders
        const trialForm : TrialJobApplicationForm = (<TrialJobApplicationForm>trialJobDetail.form);
430
        if (trialForm !== undefined) {
431
432
433
434
435
436
            await fs.promises.writeFile(
                path.join(trialLocalTempFolder, generateParamFileName(trialForm.hyperParameters)),
                trialForm.hyperParameters.value, { encoding: 'utf8' }
            );
        }

437
        // tslint:disable-next-line: strict-boolean-expressions
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
        const nniManagerIp: string = this.nniManagerIpConfig ? this.nniManagerIpConfig.nniManagerIp : getIPV4Address();
        const version: string = this.versionCheck ? await getVersion() : '';
        const nniPaiTrialCommand : string = String.Format(
            PAI_TRIAL_COMMAND_FORMAT,
            // PAI will copy job's codeDir into /root directory
            `$PWD/${trialJobId}`,
            `$PWD/${trialJobId}/nnioutput`,
            trialJobId,
            this.experimentId,
            trialJobDetail.sequenceId,
            this.paiTrialConfig.command,
            nniManagerIp,
            this.paiRestServerPort,
            hdfsOutputDir,
            this.hdfsOutputHost,
            this.paiClusterConfig.userName,
            HDFSClientUtility.getHdfsExpCodeDir(this.paiClusterConfig.userName),
            version,
            this.logCollection
457
458
        )
        .replace(/\r\n|\n|\r/gm, '');
459

460
        // tslint:disable-next-line:no-console
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
        console.log(`nniPAItrial command is ${nniPaiTrialCommand.trim()}`);
        const paiTaskRoles : PAITaskRole[] = [
            new PAITaskRole(
                `nni_trail_${trialJobId}`,
                // Task role number
                1,
                // Task CPU number
                this.paiTrialConfig.cpuNum,
                // Task memory
                this.paiTrialConfig.memoryMB,
                // Task GPU number
                this.paiTrialConfig.gpuNum,
                // Task command
                nniPaiTrialCommand,
                // Task shared memory
                this.paiTrialConfig.shmMB
            )
        ];

        const paiJobConfig : PAIJobConfig = new PAIJobConfig(
            // Job name
            trialJobDetail.paiJobName,
            // Docker image
            this.paiTrialConfig.image,
            // dataDir
            this.paiTrialConfig.dataDir,
            // outputDir
            this.paiTrialConfig.outputDir,
            // codeDir
            `$PAI_DEFAULT_FS_URI${hdfsCodeDir}`,
            // PAI Task roles
            paiTaskRoles,
            // Add Virutal Cluster
            this.paiTrialConfig.virtualCluster === undefined ? 'default' : this.paiTrialConfig.virtualCluster.toString()
        );

        // Step 2. Upload code files in codeDir onto HDFS
        try {
            await HDFSClientUtility.copyDirectoryToHdfs(trialLocalTempFolder, hdfsCodeDir, this.hdfsClient);
        } catch (error) {
            this.log.error(`PAI Training service: copy ${this.paiTrialConfig.codeDir} to HDFS ${hdfsCodeDir} failed, error is ${error}`);
502
503
504
505
            trialJobDetail.status = 'FAILED';
            deferred.resolve(true);

            return deferred.promise;
506
507
508
509
510
511
512
513
514
515
516
517
518
519
        }

        // Step 3. Submit PAI job via Rest call
        // Refer https://github.com/Microsoft/pai/blob/master/docs/rest-server/API.md for more detail about PAI Rest API
        const submitJobRequest: request.Options = {
            uri: `http://${this.paiClusterConfig.host}/rest-server/api/v1/user/${this.paiClusterConfig.userName}/jobs`,
            method: 'POST',
            json: true,
            body: paiJobConfig,
            headers: {
                'Content-Type': 'application/json',
                Authorization: `Bearer ${this.paiToken}`
            }
        };
520
        // tslint:disable:no-any no-unsafe-any
521
        request(submitJobRequest, (error: Error, response: request.Response, body: any) => {
522
523
            if ((error !== undefined && error !== null) || response.statusCode >= 400) {
                const errorMessage : string = (error !== undefined && error !== null) ? error.message :
524
525
526
                    `Submit trial ${trialJobId} failed, http code:${response.statusCode}, http body: ${response.body}`;
                this.log.error(errorMessage);
                trialJobDetail.status = 'FAILED';
527
                deferred.resolve(true);
528
529
530
531
532
533
534
535
536
            } else {
                trialJobDetail.submitTime = Date.now();
                deferred.resolve(true);
            }
        });

        return deferred.promise;
    }

537
    private generateSequenceId(): number {
538
539
        if (this.nextTrialSequenceId === -1) {
            this.nextTrialSequenceId = getInitTrialSequenceId();
540
541
        }

542
        return this.nextTrialSequenceId++;
543
    }
544
545
546

    private async statusCheckingLoop(): Promise<void> {
        while (!this.stopping) {
547
            try {
SparkSnail's avatar
SparkSnail committed
548
                await this.updatePaiToken();
549
            } catch (error) {
SparkSnail's avatar
SparkSnail committed
550
551
                this.log.error(`${error}`);
                //only throw error when initlize paiToken first time
552
                if (this.paiToken === undefined) {
SparkSnail's avatar
SparkSnail committed
553
554
555
                    throw new Error(error);
                }
            }
556
557
            await this.paiJobCollector.retrieveTrialStatus(this.paiToken, this.paiClusterConfig);
            const restServer: PAIJobRestServer = component.get(PAIJobRestServer);
558
            if (restServer.getErrorMessage !== undefined) {
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
                throw new Error(restServer.getErrorMessage);
            }
            await delay(3000);
        }
    }

    private async submitJobLoop(): Promise<void> {
        while (!this.stopping) {
            while (!this.stopping && this.jobQueue.length > 0) {
                const trialJobId: string = this.jobQueue[0];
                if (await this.submitTrialJobToPAI(trialJobId)) {
                    // Remove trial job with trialJobId from job queue
                    this.jobQueue.shift();
                } else {
                    // Break the while loop since failed to submitJob
                    break;
                }
            }
            await delay(3000);
        }
    }

581
582
583
584
585
    /**
     * Update pai token by the interval time or initialize the pai token
     */
    private async updatePaiToken(): Promise<void> {
        const deferred : Deferred<void> = new Deferred<void>();
586
587

        const currentTime: number = new Date().getTime();
588
        //If pai token initialized and not reach the interval time, do not update
589
        if (this.paiTokenUpdateTime !== undefined && (currentTime - this.paiTokenUpdateTime) < this.paiTokenUpdateInterval) {
590
591
            return Promise.resolve();
        }
592

593
        if (this.paiClusterConfig === undefined) {
594
            const paiClusterConfigError: string = `pai cluster config not initialized!`;
595
            this.log.error(`${paiClusterConfigError}`);
596
            throw Error(`${paiClusterConfigError}`);
597
598
        }

599
        const authenticationReq: request.Options = {
600
601
602
603
604
605
606
607
608
            uri: `http://${this.paiClusterConfig.host}/rest-server/api/v1/token`,
            method: 'POST',
            json: true,
            body: {
                username: this.paiClusterConfig.userName,
                password: this.paiClusterConfig.passWord
            }
        };

609
610
        request(authenticationReq, (error: Error, response: request.Response, body: any) => {
            if (error !== undefined && error !== null) {
611
612
613
                this.log.error(`Get PAI token failed: ${error.message}`);
                deferred.reject(new Error(`Get PAI token failed: ${error.message}`));
            } else {
614
                if (response.statusCode !== 200) {
615
                    this.log.error(`Get PAI token failed: get PAI Rest return code ${response.statusCode}`);
616
                    deferred.reject(new Error(`Get PAI token failed: ${response.body}, please check paiConfig username or password`));
617
618
619
620
621
622
                }
                this.paiToken = body.token;
                this.paiTokenUpdateTime = new Date().getTime();
                deferred.resolve();
            }
        });
623

624
625
626
627
628
629
630
631
        let timeoutId: NodeJS.Timer;
        const timeoutDelay: Promise<void> = new Promise<void>((resolve: Function, reject: Function): void => {
            // Set timeout and reject the promise once reach timeout (5 seconds)
            timeoutId = setTimeout(
                () => reject(new Error('Get PAI token timeout. Please check your PAI cluster.')),
                5000);
        });

632
        return Promise.race([timeoutDelay, deferred.promise])
633
            .finally(() => { clearTimeout(timeoutId); });
634
    }
635
    // tslint:enable:no-any no-unsafe-any no-http-string
636
637
}

638
export { PAITrainingService };