paiTrainingService.ts 24.8 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

/**
 * Copyright (c) Microsoft Corporation
 * All rights reserved.
 *
 * MIT License
 *
 * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
 * documentation files (the "Software"), to deal in the Software without restriction, including without limitation
 * the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and
 * to permit persons to whom the Software is furnished to do so, subject to the following conditions:
 * The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
 *
 * THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING
 * BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
 * DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
 */

21
'use strict';
22
23
24
25
26

import * as cpp from 'child-process-promise';
import * as fs from 'fs';
import * as path from 'path';
import * as request from 'request';
27
import * as component from '../../common/component';
28
29

import { EventEmitter } from 'events';
30
31
import { Deferred } from 'ts-deferred';
import { String } from 'typescript-string-operations';
32
import { MethodNotImplementedError } from '../../common/errors';
33
import { getExperimentId, getInitTrialSequenceId } from '../../common/experimentStartupInfo';
34
35
import { getLogger, Logger } from '../../common/log';
import {
36
37
    JobApplicationForm, NNIManagerIpConfig, TrainingService,
    TrialJobApplicationForm, TrialJobDetail, TrialJobMetric
38
} from '../../common/trainingService';
39
40
41
42
import { delay, generateParamFileName,
    getExperimentRootDir, getIPV4Address, getVersion, uniqueString } from '../../common/utils';
import { CONTAINER_INSTALL_NNI_SHELL_FORMAT } from '../common/containerJobData';
import { TrialConfigMetadataKey } from '../common/trialConfigMetadataKey';
43
import { validateCodeDir } from '../common/util';
44
45
46
47
48
import { HDFSClientUtility } from './hdfsClientUtility';
import { NNIPAITrialConfig, PAIClusterConfig, PAIJobConfig, PAITaskRole } from './paiConfig';
import { PAI_LOG_PATH_FORMAT, PAI_OUTPUT_DIR_FORMAT, PAI_TRIAL_COMMAND_FORMAT, PAITrialJobDetail } from './paiData';
import { PAIJobInfoCollector } from './paiJobInfoCollector';
import { PAIJobRestServer } from './paiJobRestServer';
49

50
const WebHDFS = require('webhdfs');
51
52
53
54
55
56
57
58
59
60
61
62
63

/**
 * Training Service implementation for OpenPAI (Open Platform for AI)
 * Refer https://github.com/Microsoft/pai for more info about OpenPAI
 */
@component.Singleton
class PAITrainingService implements TrainingService {
    private readonly log!: Logger;
    private readonly metricsEmitter: EventEmitter;
    private readonly trialJobsMap: Map<string, PAITrialJobDetail>;
    private readonly expRootDir: string;
    private paiTrialConfig: NNIPAITrialConfig | undefined;
    private paiClusterConfig?: PAIClusterConfig;
64
    private jobQueue: string[];
65
66
67
    private stopping: boolean = false;
    private hdfsClient: any;
    private paiToken? : string;
68
69
    private paiTokenUpdateTime?: number;
    private paiTokenUpdateInterval: number;
70
71
72
    private experimentId! : string;
    private readonly paiJobCollector : PAIJobInfoCollector;
    private readonly hdfsDirPattern: string;
fishyds's avatar
fishyds committed
73
74
    private hdfsBaseDir: string | undefined;
    private hdfsOutputHost: string | undefined;
75
    private nextTrialSequenceId: number;
76
    private paiRestServerPort?: number;
77
    private nniManagerIpConfig?: NNIManagerIpConfig;
78
    private copyExpCodeDirPromise?: Promise<void>;
79
    private versionCheck: boolean = true;
SparkSnail's avatar
SparkSnail committed
80
    private logCollection: string;
81
82
83
84
85

    constructor() {
        this.log = getLogger();
        this.metricsEmitter = new EventEmitter();
        this.trialJobsMap = new Map<string, PAITrialJobDetail>();
86
        this.jobQueue = [];
87
88
        // Root dir on HDFS
        this.expRootDir = path.join('/nni', 'experiments', getExperimentId());
89
        this.experimentId = getExperimentId();
90
91
        this.paiJobCollector = new PAIJobInfoCollector(this.trialJobsMap);
        this.hdfsDirPattern = 'hdfs://(?<host>([0-9]{1,3}.){3}[0-9]{1,3})(:[0-9]{2,5})?(?<baseDir>/.*)?';
92
        this.nextTrialSequenceId = -1;
93
        this.paiTokenUpdateInterval = 7200000; //2hours
SparkSnail's avatar
SparkSnail committed
94
        this.logCollection = 'none';
chicm-ms's avatar
chicm-ms committed
95
        this.log.info('Construct OpenPAI training service.');
96
97
98
    }

    public async run(): Promise<void> {
chicm-ms's avatar
chicm-ms committed
99
        this.log.info('Run PAI training service.');
100
101
        const restServer: PAIJobRestServer = component.get(PAIJobRestServer);
        await restServer.start();
102
        restServer.setEnableVersionCheck = this.versionCheck;
103
        this.log.info(`PAI Training service rest server listening on: ${restServer.endPoint}`);
104
105
106
        await Promise.all([
            this.statusCheckingLoop(),
            this.submitJobLoop()]);
chicm-ms's avatar
chicm-ms committed
107
        this.log.info('PAI training service exit.');
108
109
110
111
    }

    public async listTrialJobs(): Promise<TrialJobDetail[]> {
        const jobs: TrialJobDetail[] = [];
112
113

        for (const [key, value] of this.trialJobsMap) {
114
115
116
            if (value.form.jobType === 'TRIAL') {
                jobs.push(await this.getTrialJob(key));
            }
117
        }
118
119
120
121

        return Promise.resolve(jobs);
    }

122
    public async getTrialJob(trialJobId: string): Promise<TrialJobDetail> {
123
        if (!this.paiClusterConfig) {
124
125
126
127
128
129
            throw new Error('PAI Cluster config is not initialized');
        }

        const paiTrialJob: PAITrialJobDetail | undefined = this.trialJobsMap.get(trialJobId);

        if (!paiTrialJob) {
130
131
            return Promise.reject(`trial job ${trialJobId} not found`);
        }
132
133
134
135

        return Promise.resolve(paiTrialJob);
    }

136
    public addTrialJobMetricListener(listener: (metric: TrialJobMetric) => void): void {
137
138
139
        this.metricsEmitter.on('metric', listener);
    }

140
    public removeTrialJobMetricListener(listener: (metric: TrialJobMetric) => void): void {
141
142
143
144
145
        this.metricsEmitter.off('metric', listener);
    }

    public async submitTrialJob(form: JobApplicationForm): Promise<TrialJobDetail> {
        const deferred : Deferred<PAITrialJobDetail> = new Deferred<PAITrialJobDetail>();
146
        if (!this.hdfsBaseDir) {
fishyds's avatar
fishyds committed
147
148
149
            throw new Error('hdfsBaseDir is not initialized');
        }

150
151
152
        this.log.info(`submitTrialJob: form: ${JSON.stringify(form)}`);

        const trialJobId: string = uniqueString(5);
153
        const trialSequenceId: number = this.generateSequenceId();
154
155
        //TODO: use HDFS working folder instead
        const trialWorkingFolder: string = path.join(this.expRootDir, 'trials', trialJobId);
156
        const paiJobName: string = `nni_exp_${this.experimentId}_trial_${trialJobId}`;
157

fishyds's avatar
fishyds committed
158
        const hdfsOutputDir : string = path.join(this.hdfsBaseDir, this.experimentId, trialJobId);
159
160
        const hdfsLogPath : string = String.Format(
            PAI_LOG_PATH_FORMAT,
fishyds's avatar
fishyds committed
161
            this.hdfsOutputHost,
162
163
164
165
166
            hdfsOutputDir);

        const trialJobDetail: PAITrialJobDetail = new PAITrialJobDetail(
            trialJobId,
            'WAITING',
167
            paiJobName,
168
169
            Date.now(),
            trialWorkingFolder,
170
171
            form,
            trialSequenceId,
172
173
            hdfsLogPath);

174
175
176
        this.trialJobsMap.set(trialJobId, trialJobDetail);
        this.jobQueue.push(trialJobId);
        deferred.resolve(trialJobDetail);
177
178
179
180
181
182
183
184
185
186
187
188

        return deferred.promise;
    }

    public updateTrialJob(trialJobId: string, form: JobApplicationForm): Promise<TrialJobDetail> {
        throw new MethodNotImplementedError();
    }

    public get isMultiPhaseJobSupported(): boolean {
        return false;
    }

QuanluZhang's avatar
QuanluZhang committed
189
    public cancelTrialJob(trialJobId: string, isEarlyStopped: boolean = false): Promise<void> {
190
191
        const trialJobDetail : PAITrialJobDetail | undefined =  this.trialJobsMap.get(trialJobId);
        const deferred : Deferred<void> = new Deferred<void>();
192
        if (!trialJobDetail) {
193
            this.log.error(`cancelTrialJob: trial job id ${trialJobId} not found`);
194

195
196
197
            return Promise.reject();
        }

198
        if (!this.paiClusterConfig) {
199
            throw new Error('PAI Cluster config is not initialized');
200
        }
201
202
203
204
205
        if (!this.paiToken) {
            throw new Error('PAI token is not initialized');
        }

        const stopJobRequest: request.Options = {
206
            uri: `http://${this.paiClusterConfig.host}/rest-server/api/v1/user/${this.paiClusterConfig.userName}/jobs/${trialJobDetail.paiJobName}/executionType`,
207
208
            method: 'PUT',
            json: true,
209
            body: {value: 'STOP'},
210
            headers: {
211
212
                'Content-Type': 'application/json',
                Authorization: `Bearer ${this.paiToken}`
213
214
            }
        };
215
216
217
218

        // Set trialjobDetail's early stopped field, to mark the job's cancellation source
        trialJobDetail.isEarlyStopped = isEarlyStopped;

219
220
221
        request(stopJobRequest, (error: Error, response: request.Response, body: any) => {
            if (error || response.statusCode >= 400) {
                this.log.error(`PAI Training service: stop trial ${trialJobId} to PAI Cluster failed!`);
222
                deferred.reject(error ? error.message : `Stop trial failed, http code: ${response.statusCode}`);
223
224
225
226
227
            } else {
                deferred.resolve();
            }
        });

228
        return deferred.promise;
229
230
    }

231
    // tslint:disable-next-line:max-func-body-length
fishyds's avatar
fishyds committed
232
    public async setClusterMetadata(key: string, value: string): Promise<void> {
233
234
235
        const deferred : Deferred<void> = new Deferred<void>();

        switch (key) {
236
237
238
239
240
            case TrialConfigMetadataKey.NNI_MANAGER_IP:
                this.nniManagerIpConfig = <NNIManagerIpConfig>JSON.parse(value);
                deferred.resolve();
                break;

241
242
            case TrialConfigMetadataKey.PAI_CLUSTER_CONFIG:
                this.paiClusterConfig = <PAIClusterConfig>JSON.parse(value);
243

244
245
                this.hdfsClient = WebHDFS.createClient({
                    user: this.paiClusterConfig.userName,
246
247
                    // Refer PAI document for Pylon mapping https://github.com/Microsoft/pai/tree/master/docs/pylon
                    port: 80,
248
                    path: '/webhdfs/api/v1',
249
250
251
252
                    host: this.paiClusterConfig.host
                });

                // Get PAI authentication token
253
                await this.updatePaiToken();
254
                deferred.resolve();
255
                break;
256

257
            case TrialConfigMetadataKey.TRIAL_CONFIG:
258
                if (!this.paiClusterConfig) {
259
                    this.log.error('pai cluster config is not initialized');
fishyds's avatar
fishyds committed
260
                    deferred.reject(new Error('pai cluster config is not initialized'));
261
262
263
264
                    break;
                }
                this.paiTrialConfig = <NNIPAITrialConfig>JSON.parse(value);
                //paiTrialConfig.outputDir could be null if it is not set in nnictl
265
                if (this.paiTrialConfig.outputDir === undefined || this.paiTrialConfig.outputDir === null){
266
267
268
269
270
                    this.paiTrialConfig.outputDir = String.Format(
                        PAI_OUTPUT_DIR_FORMAT,
                        this.paiClusterConfig.host
                    ).replace(/\r\n|\n|\r/gm, '');
                }
271

272
273
274
275
276
277
278
279
280
                // Validate to make sure codeDir doesn't have too many files
                try {
                    await validateCodeDir(this.paiTrialConfig.codeDir);
                } catch(error) {
                    this.log.error(error);
                    deferred.reject(new Error(error));
                    break;
                }

fishyds's avatar
fishyds committed
281
282
                const hdfsDirContent = this.paiTrialConfig.outputDir.match(this.hdfsDirPattern);

283
                if (hdfsDirContent === null) {
fishyds's avatar
fishyds committed
284
285
286
                    throw new Error('Trial outputDir format Error');
                }
                const groups = hdfsDirContent.groups;
287
                if (groups === undefined) {
fishyds's avatar
fishyds committed
288
289
                    throw new Error('Trial outputDir format Error');
                }
290

fishyds's avatar
fishyds committed
291
                this.hdfsOutputHost = groups['host'];
292
                //TODO: choose to use /${username} as baseDir
fishyds's avatar
fishyds committed
293
294
                this.hdfsBaseDir = groups['baseDir'];
                if(this.hdfsBaseDir === undefined) {
295
                    this.hdfsBaseDir = '/';
fishyds's avatar
fishyds committed
296
                }
297
298

                let dataOutputHdfsClient;
299
                if (this.paiClusterConfig.host === this.hdfsOutputHost && this.hdfsClient) {
300
                    dataOutputHdfsClient = this.hdfsClient;
301
302
303
304
305
306
307
                } else {
                    dataOutputHdfsClient = WebHDFS.createClient({
                        user: this.paiClusterConfig.userName,
                        port: 50070,
                        host: this.hdfsOutputHost
                    });
                }
fishyds's avatar
fishyds committed
308
309

                try {
310
311
                    const exist : boolean = await HDFSClientUtility.pathExists('/', dataOutputHdfsClient);
                    if (!exist) {
fishyds's avatar
fishyds committed
312
313
                        deferred.reject(new Error(`Please check hdfsOutputDir host!`));
                    }
314
                } catch (error) {
fishyds's avatar
fishyds committed
315
316
                    deferred.reject(new Error(`HDFS encounters problem, error is ${error}. Please check hdfsOutputDir host!`));
                }
317

318
                // Copy experiment files from local folder to HDFS
319
320
                this.copyExpCodeDirPromise = HDFSClientUtility.copyDirectoryToHdfs(
                    this.paiTrialConfig.codeDir,
321
                    HDFSClientUtility.getHdfsExpCodeDir(this.paiClusterConfig.userName),
322
323
                    this.hdfsClient
                );
fishyds's avatar
fishyds committed
324

325
326
                deferred.resolve();
                break;
327
328
329
            case TrialConfigMetadataKey.VERSION_CHECK:
                this.versionCheck = (value === 'true' || value === 'True');
                break;
SparkSnail's avatar
SparkSnail committed
330
331
332
            case TrialConfigMetadataKey.LOG_COLLECTION:
                this.logCollection = value;
                break;
333
334
335
336
337
            default:
                //Reject for unknown keys
                throw new Error(`Uknown key: ${key}`);
        }

338
        return deferred.promise;
339
340
341
342
343
344
    }

    public getClusterMetadata(key: string): Promise<string> {
        const deferred : Deferred<string> = new Deferred<string>();

        deferred.resolve();
345
346

        return deferred.promise;
347
348
349
    }

    public async cleanUp(): Promise<void> {
chicm-ms's avatar
chicm-ms committed
350
        this.log.info('Stopping PAI training service...');
351
352
353
354
355
356
357
358
359
        this.stopping = true;

        const deferred : Deferred<void> = new Deferred<void>();
        const restServer: PAIJobRestServer = component.get(PAIJobRestServer);
        try {
            await restServer.stop();
            deferred.resolve();
            this.log.info('PAI Training service rest server stopped successfully.');
        } catch (error) {
360
            this.log.error(`PAI Training service rest server stopped failed, error: ${error.message}`);
361
362
363
            deferred.reject(error);
        }

364
        return deferred.promise;
365
366
367
368
369
    }

    public get MetricsEmitter() : EventEmitter {
        return this.metricsEmitter;
    }
370

371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
    // tslint:disable-next-line:max-func-body-length
    private async submitTrialJobToPAI(trialJobId: string): Promise<boolean> {
        const deferred : Deferred<boolean> = new Deferred<boolean>();
        const trialJobDetail: PAITrialJobDetail | undefined = this.trialJobsMap.get(trialJobId);

        if (!trialJobDetail) {
            throw new Error(`Failed to find PAITrialJobDetail for job ${trialJobId}`);
        }

        if (!this.paiClusterConfig) {
            throw new Error('PAI Cluster config is not initialized');
        }
        if (!this.paiTrialConfig) {
            throw new Error('trial config is not initialized');
        }
        if (!this.paiToken) {
            throw new Error('PAI token is not initialized');
        }

        if (!this.hdfsBaseDir) {
            throw new Error('hdfsBaseDir is not initialized');
        }

        if (!this.hdfsOutputHost) {
            throw new Error('hdfsOutputHost is not initialized');
        }

        if (!this.paiRestServerPort) {
            const restServer: PAIJobRestServer = component.get(PAIJobRestServer);
            this.paiRestServerPort = restServer.clusterRestServerPort;
        }

        // Make sure experiment code files is copied from local to HDFS
        if (this.copyExpCodeDirPromise) {
            await this.copyExpCodeDirPromise;
        }

        // Step 1. Prepare PAI job configuration
        const hdfsOutputDir : string = path.join(this.hdfsBaseDir, this.experimentId, trialJobId);
        const hdfsCodeDir: string = HDFSClientUtility.getHdfsTrialWorkDir(this.paiClusterConfig.userName, trialJobId);

        const trialLocalTempFolder: string = path.join(getExperimentRootDir(), 'trials-local', trialJobId);
        //create tmp trial working folder locally.
        await cpp.exec(`mkdir -p ${trialLocalTempFolder}`);

        const runScriptContent : string = CONTAINER_INSTALL_NNI_SHELL_FORMAT;
        // Write NNI installation file to local tmp files
        await fs.promises.writeFile(path.join(trialLocalTempFolder, 'install_nni.sh'), runScriptContent, { encoding: 'utf8' });

        // Write file content ( parameter.cfg ) to local tmp folders
        const trialForm : TrialJobApplicationForm = (<TrialJobApplicationForm>trialJobDetail.form);
        if (trialForm) {
            await fs.promises.writeFile(
                path.join(trialLocalTempFolder, generateParamFileName(trialForm.hyperParameters)),
                trialForm.hyperParameters.value, { encoding: 'utf8' }
            );
        }

        const nniManagerIp: string = this.nniManagerIpConfig ? this.nniManagerIpConfig.nniManagerIp : getIPV4Address();
        const version: string = this.versionCheck ? await getVersion() : '';
        const nniPaiTrialCommand : string = String.Format(
            PAI_TRIAL_COMMAND_FORMAT,
            // PAI will copy job's codeDir into /root directory
            `$PWD/${trialJobId}`,
            `$PWD/${trialJobId}/nnioutput`,
            trialJobId,
            this.experimentId,
            trialJobDetail.sequenceId,
            this.paiTrialConfig.command,
            nniManagerIp,
            this.paiRestServerPort,
            hdfsOutputDir,
            this.hdfsOutputHost,
            this.paiClusterConfig.userName,
            HDFSClientUtility.getHdfsExpCodeDir(this.paiClusterConfig.userName),
            version,
            this.logCollection
        ).replace(/\r\n|\n|\r/gm, '');

        console.log(`nniPAItrial command is ${nniPaiTrialCommand.trim()}`);
        const paiTaskRoles : PAITaskRole[] = [
            new PAITaskRole(
                `nni_trail_${trialJobId}`,
                // Task role number
                1,
                // Task CPU number
                this.paiTrialConfig.cpuNum,
                // Task memory
                this.paiTrialConfig.memoryMB,
                // Task GPU number
                this.paiTrialConfig.gpuNum,
                // Task command
                nniPaiTrialCommand,
                // Task shared memory
                this.paiTrialConfig.shmMB
            )
        ];

        const paiJobConfig : PAIJobConfig = new PAIJobConfig(
            // Job name
            trialJobDetail.paiJobName,
            // Docker image
            this.paiTrialConfig.image,
            // dataDir
            this.paiTrialConfig.dataDir,
            // outputDir
            this.paiTrialConfig.outputDir,
            // codeDir
            `$PAI_DEFAULT_FS_URI${hdfsCodeDir}`,
            // PAI Task roles
            paiTaskRoles,
            // Add Virutal Cluster
            this.paiTrialConfig.virtualCluster === undefined ? 'default' : this.paiTrialConfig.virtualCluster.toString()
        );

        // Step 2. Upload code files in codeDir onto HDFS
        try {
            await HDFSClientUtility.copyDirectoryToHdfs(trialLocalTempFolder, hdfsCodeDir, this.hdfsClient);
        } catch (error) {
            this.log.error(`PAI Training service: copy ${this.paiTrialConfig.codeDir} to HDFS ${hdfsCodeDir} failed, error is ${error}`);
            throw new Error(error.message);
        }

        // Step 3. Submit PAI job via Rest call
        // Refer https://github.com/Microsoft/pai/blob/master/docs/rest-server/API.md for more detail about PAI Rest API
        const submitJobRequest: request.Options = {
            uri: `http://${this.paiClusterConfig.host}/rest-server/api/v1/user/${this.paiClusterConfig.userName}/jobs`,
            method: 'POST',
            json: true,
            body: paiJobConfig,
            headers: {
                'Content-Type': 'application/json',
                Authorization: `Bearer ${this.paiToken}`
            }
        };
        request(submitJobRequest, (error: Error, response: request.Response, body: any) => {
            if (error || response.statusCode >= 400) {
                const errorMessage : string = error ? error.message :
                    `Submit trial ${trialJobId} failed, http code:${response.statusCode}, http body: ${response.body}`;
                this.log.error(errorMessage);
                trialJobDetail.status = 'FAILED';
                deferred.reject(new Error(errorMessage));
            } else {
                trialJobDetail.submitTime = Date.now();
                deferred.resolve(true);
            }
        });

        return deferred.promise;
    }

522
    private generateSequenceId(): number {
523
524
        if (this.nextTrialSequenceId === -1) {
            this.nextTrialSequenceId = getInitTrialSequenceId();
525
526
        }

527
        return this.nextTrialSequenceId++;
528
    }
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557

    private async statusCheckingLoop(): Promise<void> {
        while (!this.stopping) {
            await this.updatePaiToken();
            await this.paiJobCollector.retrieveTrialStatus(this.paiToken, this.paiClusterConfig);
            const restServer: PAIJobRestServer = component.get(PAIJobRestServer);
            if (restServer.getErrorMessage) {
                throw new Error(restServer.getErrorMessage);
            }
            await delay(3000);
        }
    }

    private async submitJobLoop(): Promise<void> {
        while (!this.stopping) {
            while (!this.stopping && this.jobQueue.length > 0) {
                const trialJobId: string = this.jobQueue[0];
                if (await this.submitTrialJobToPAI(trialJobId)) {
                    // Remove trial job with trialJobId from job queue
                    this.jobQueue.shift();
                } else {
                    // Break the while loop since failed to submitJob
                    break;
                }
            }
            await delay(3000);
        }
    }

558
559
560
561
562
    /**
     * Update pai token by the interval time or initialize the pai token
     */
    private async updatePaiToken(): Promise<void> {
        const deferred : Deferred<void> = new Deferred<void>();
563
564

        const currentTime: number = new Date().getTime();
565
        //If pai token initialized and not reach the interval time, do not update
566
        if (this.paiTokenUpdateTime && (currentTime - this.paiTokenUpdateTime) < this.paiTokenUpdateInterval){
567
568
            return Promise.resolve();
        }
569
570
571

        if (!this.paiClusterConfig) {
            const paiClusterConfigError: string = `pai cluster config not initialized!`;
572
            this.log.error(`${paiClusterConfigError}`);
573
            throw Error(`${paiClusterConfigError}`);
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
        }

        const authentication_req: request.Options = {
            uri: `http://${this.paiClusterConfig.host}/rest-server/api/v1/token`,
            method: 'POST',
            json: true,
            body: {
                username: this.paiClusterConfig.userName,
                password: this.paiClusterConfig.passWord
            }
        };

        request(authentication_req, (error: Error, response: request.Response, body: any) => {
            if (error) {
                this.log.error(`Get PAI token failed: ${error.message}`);
                deferred.reject(new Error(`Get PAI token failed: ${error.message}`));
            } else {
591
                if (response.statusCode !== 200){
592
                    this.log.error(`Get PAI token failed: get PAI Rest return code ${response.statusCode}`);
593
                    deferred.reject(new Error(`Get PAI token failed: ${response.body}, please check paiConfig username or password`));
594
595
596
597
598
599
                }
                this.paiToken = body.token;
                this.paiTokenUpdateTime = new Date().getTime();
                deferred.resolve();
            }
        });
600

601
602
603
604
605
606
607
608
        let timeoutId: NodeJS.Timer;
        const timeoutDelay: Promise<void> = new Promise<void>((resolve: Function, reject: Function): void => {
            // Set timeout and reject the promise once reach timeout (5 seconds)
            timeoutId = setTimeout(
                () => reject(new Error('Get PAI token timeout. Please check your PAI cluster.')),
                5000);
        });

609
610
        return Promise.race([timeoutDelay, deferred.promise])
            .finally(() => clearTimeout(timeoutId));
611
    }
612
613
}

614
export { PAITrainingService };