paiTrainingService.ts 26.7 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
 * Copyright (c) Microsoft Corporation
 * All rights reserved.
 *
 * MIT License
 *
 * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
 * documentation files (the "Software"), to deal in the Software without restriction, including without limitation
 * the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and
 * to permit persons to whom the Software is furnished to do so, subject to the following conditions:
 * The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
 *
 * THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING
 * BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
 * DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
 */

20
'use strict';
21
22
23
24

import * as cpp from 'child-process-promise';
import * as fs from 'fs';
import * as path from 'path';
25
// tslint:disable-next-line:no-implicit-dependencies
26
import * as request from 'request';
27
import * as component from '../../common/component';
28
29

import { EventEmitter } from 'events';
30
31
import { Deferred } from 'ts-deferred';
import { String } from 'typescript-string-operations';
32
import { MethodNotImplementedError } from '../../common/errors';
33
import { getExperimentId } from '../../common/experimentStartupInfo';
34
35
import { getLogger, Logger } from '../../common/log';
import {
36
    HyperParameters, NNIManagerIpConfig, TrainingService,
37
    TrialJobApplicationForm, TrialJobDetail, TrialJobMetric
38
} from '../../common/trainingService';
39
import { delay, generateParamFileName,
40
    getExperimentRootDir, getIPV4Address, getVersion, uniqueString, unixPathJoin } from '../../common/utils';
41
42
import { CONTAINER_INSTALL_NNI_SHELL_FORMAT } from '../common/containerJobData';
import { TrialConfigMetadataKey } from '../common/trialConfigMetadataKey';
43
import { execMkdir, validateCodeDir } from '../common/util';
44
45
import { HDFSClientUtility } from './hdfsClientUtility';
import { NNIPAITrialConfig, PAIClusterConfig, PAIJobConfig, PAITaskRole } from './paiConfig';
46
import { PAI_LOG_PATH_FORMAT, PAI_TRIAL_COMMAND_FORMAT, PAITrialJobDetail } from './paiData';
47
import { PAIJobInfoCollector } from './paiJobInfoCollector';
48
import { PAIJobRestServer, ParameterFileMeta } from './paiJobRestServer';
49

50
import * as WebHDFS from 'webhdfs';
51
52
53
54
55
56
57
58
59
60
61
62
63

/**
 * Training Service implementation for OpenPAI (Open Platform for AI)
 * Refer https://github.com/Microsoft/pai for more info about OpenPAI
 */
@component.Singleton
class PAITrainingService implements TrainingService {
    private readonly log!: Logger;
    private readonly metricsEmitter: EventEmitter;
    private readonly trialJobsMap: Map<string, PAITrialJobDetail>;
    private readonly expRootDir: string;
    private paiTrialConfig: NNIPAITrialConfig | undefined;
    private paiClusterConfig?: PAIClusterConfig;
64
    private readonly jobQueue: string[];
65
    private stopping: boolean = false;
66
    // tslint:disable-next-line:no-any
67
68
    private hdfsClient: any;
    private paiToken? : string;
69
    private paiTokenUpdateTime?: number;
70
71
    private readonly paiTokenUpdateInterval: number;
    private readonly experimentId! : string;
72
    private readonly paiJobCollector : PAIJobInfoCollector;
73
    private paiRestServerPort?: number;
74
    private nniManagerIpConfig?: NNIManagerIpConfig;
75
    private copyExpCodeDirPromise?: Promise<void>;
76
    private copyAuthFilePromise?: Promise<void>;
77
    private versionCheck: boolean = true;
SparkSnail's avatar
SparkSnail committed
78
    private logCollection: string;
79
    private isMultiPhase: boolean = false;
80
    private authFileHdfsPath: string | undefined = undefined;
81
    private portList?: string | undefined;
82
83
84
85
86

    constructor() {
        this.log = getLogger();
        this.metricsEmitter = new EventEmitter();
        this.trialJobsMap = new Map<string, PAITrialJobDetail>();
87
        this.jobQueue = [];
88
89
        // Root dir on HDFS
        this.expRootDir = path.join('/nni', 'experiments', getExperimentId());
90
        this.experimentId = getExperimentId();
91
        this.paiJobCollector = new PAIJobInfoCollector(this.trialJobsMap);
92
        this.paiTokenUpdateInterval = 7200000; //2hours
SparkSnail's avatar
SparkSnail committed
93
        this.logCollection = 'none';
chicm-ms's avatar
chicm-ms committed
94
        this.log.info('Construct OpenPAI training service.');
95
96
97
    }

    public async run(): Promise<void> {
chicm-ms's avatar
chicm-ms committed
98
        this.log.info('Run PAI training service.');
99
100
        const restServer: PAIJobRestServer = component.get(PAIJobRestServer);
        await restServer.start();
101
        restServer.setEnableVersionCheck = this.versionCheck;
102
        this.log.info(`PAI Training service rest server listening on: ${restServer.endPoint}`);
103
104
105
        await Promise.all([
            this.statusCheckingLoop(),
            this.submitJobLoop()]);
chicm-ms's avatar
chicm-ms committed
106
        this.log.info('PAI training service exit.');
107
108
109
110
    }

    public async listTrialJobs(): Promise<TrialJobDetail[]> {
        const jobs: TrialJobDetail[] = [];
111
112

        for (const [key, value] of this.trialJobsMap) {
113
            jobs.push(await this.getTrialJob(key));
114
        }
115
116
117
118

        return Promise.resolve(jobs);
    }

119
    public async getTrialJob(trialJobId: string): Promise<TrialJobDetail> {
120
        if (this.paiClusterConfig === undefined) {
121
122
123
124
125
            throw new Error('PAI Cluster config is not initialized');
        }

        const paiTrialJob: PAITrialJobDetail | undefined = this.trialJobsMap.get(trialJobId);

126
        if (paiTrialJob === undefined) {
127
128
            return Promise.reject(`trial job ${trialJobId} not found`);
        }
129
130
131
132

        return Promise.resolve(paiTrialJob);
    }

133
    public addTrialJobMetricListener(listener: (metric: TrialJobMetric) => void): void {
134
135
136
        this.metricsEmitter.on('metric', listener);
    }

137
    public removeTrialJobMetricListener(listener: (metric: TrialJobMetric) => void): void {
138
139
140
        this.metricsEmitter.off('metric', listener);
    }

141
    public async submitTrialJob(form: TrialJobApplicationForm): Promise<TrialJobDetail> {
142
143
        if (this.paiClusterConfig === undefined) {
            throw new Error(`paiClusterConfig not initialized!`);
fishyds's avatar
fishyds committed
144
        }
145
        const deferred : Deferred<PAITrialJobDetail> = new Deferred<PAITrialJobDetail>();
fishyds's avatar
fishyds committed
146

147
148
149
150
151
        this.log.info(`submitTrialJob: form: ${JSON.stringify(form)}`);

        const trialJobId: string = uniqueString(5);
        //TODO: use HDFS working folder instead
        const trialWorkingFolder: string = path.join(this.expRootDir, 'trials', trialJobId);
152
        const paiJobName: string = `nni_exp_${this.experimentId}_trial_${trialJobId}`;
Shinai Yang's avatar
Shinai Yang committed
153
154
        const hdfsCodeDir: string = HDFSClientUtility.getHdfsTrialWorkDir(this.paiClusterConfig.userName, trialJobId);
        const hdfsOutputDir: string = unixPathJoin(hdfsCodeDir, 'nnioutput');
155

156
157
        const hdfsLogPath : string = String.Format(
            PAI_LOG_PATH_FORMAT,
158
            this.paiClusterConfig.host,
Shinai Yang's avatar
Shinai Yang committed
159
            hdfsOutputDir
160
            );
161
162
163
164

        const trialJobDetail: PAITrialJobDetail = new PAITrialJobDetail(
            trialJobId,
            'WAITING',
165
            paiJobName,
166
167
            Date.now(),
            trialWorkingFolder,
168
            form,
169
170
            hdfsLogPath);

171
172
173
        this.trialJobsMap.set(trialJobId, trialJobDetail);
        this.jobQueue.push(trialJobId);
        deferred.resolve(trialJobDetail);
174
175
176
177

        return deferred.promise;
    }

178
    public async updateTrialJob(trialJobId: string, form: TrialJobApplicationForm): Promise<TrialJobDetail> {
179
180
181
182
        const trialJobDetail: undefined | TrialJobDetail = this.trialJobsMap.get(trialJobId);
        if (trialJobDetail === undefined) {
            throw new Error(`updateTrialJob failed: ${trialJobId} not found`);
        }
183
        await this.writeParameterFile(trialJobId, form.hyperParameters);
184
185

        return trialJobDetail;
186
187
188
    }

    public get isMultiPhaseJobSupported(): boolean {
189
        return true;
190
191
    }

192
    // tslint:disable:no-http-string
QuanluZhang's avatar
QuanluZhang committed
193
    public cancelTrialJob(trialJobId: string, isEarlyStopped: boolean = false): Promise<void> {
194
195
        const trialJobDetail : PAITrialJobDetail | undefined =  this.trialJobsMap.get(trialJobId);
        const deferred : Deferred<void> = new Deferred<void>();
196
        if (trialJobDetail === undefined) {
197
            this.log.error(`cancelTrialJob: trial job id ${trialJobId} not found`);
198

199
200
201
            return Promise.reject();
        }

202
        if (this.paiClusterConfig === undefined) {
203
            throw new Error('PAI Cluster config is not initialized');
204
        }
205
        if (this.paiToken === undefined) {
206
207
208
209
            throw new Error('PAI token is not initialized');
        }

        const stopJobRequest: request.Options = {
210
            uri: `http://${this.paiClusterConfig.host}/rest-server/api/v1/user/${this.paiClusterConfig.userName}\
211
/jobs/${trialJobDetail.paiJobName}/executionType`, 
212
213
            method: 'PUT',
            json: true,
214
            body: {value: 'STOP'},
215
            headers: {
216
217
                'Content-Type': 'application/json',
                Authorization: `Bearer ${this.paiToken}`
218
219
            }
        };
220
221
222
223

        // Set trialjobDetail's early stopped field, to mark the job's cancellation source
        trialJobDetail.isEarlyStopped = isEarlyStopped;

224
        // tslint:disable-next-line:no-any
225
        request(stopJobRequest, (error: Error, response: request.Response, body: any) => {
226
            if ((error !== undefined && error !== null) || response.statusCode >= 400) {
227
                this.log.error(`PAI Training service: stop trial ${trialJobId} to PAI Cluster failed!`);
228
229
                deferred.reject((error !== undefined && error !== null) ? error.message :
                 `Stop trial failed, http code: ${response.statusCode}`);
230
231
232
233
234
            } else {
                deferred.resolve();
            }
        });

235
        return deferred.promise;
236
237
    }

238
    // tslint:disable: no-unsafe-any no-any
239
    // tslint:disable-next-line:max-func-body-length
fishyds's avatar
fishyds committed
240
    public async setClusterMetadata(key: string, value: string): Promise<void> {
241
242
243
        const deferred : Deferred<void> = new Deferred<void>();

        switch (key) {
244
245
246
247
248
            case TrialConfigMetadataKey.NNI_MANAGER_IP:
                this.nniManagerIpConfig = <NNIManagerIpConfig>JSON.parse(value);
                deferred.resolve();
                break;

249
250
            case TrialConfigMetadataKey.PAI_CLUSTER_CONFIG:
                this.paiClusterConfig = <PAIClusterConfig>JSON.parse(value);
251

252
253
                this.hdfsClient = WebHDFS.createClient({
                    user: this.paiClusterConfig.userName,
254
255
                    // Refer PAI document for Pylon mapping https://github.com/Microsoft/pai/tree/master/docs/pylon
                    port: 80,
256
                    path: '/webhdfs/api/v1',
257
258
                    host: this.paiClusterConfig.host
                });
259
260
261
262
263
264
265
266
                if(this.paiClusterConfig.passWord) {
                    // Get PAI authentication token
                    await this.updatePaiToken();
                } else if(this.paiClusterConfig.token) {
                    this.paiToken = this.paiClusterConfig.token;
                } else {
                    deferred.reject(new Error('pai cluster config format error, please set password or token!'));
                }
267

268
                deferred.resolve();
269
                break;
270

271
            case TrialConfigMetadataKey.TRIAL_CONFIG:
272
                if (this.paiClusterConfig === undefined) {
273
                    this.log.error('pai cluster config is not initialized');
fishyds's avatar
fishyds committed
274
                    deferred.reject(new Error('pai cluster config is not initialized'));
275
276
277
                    break;
                }
                this.paiTrialConfig = <NNIPAITrialConfig>JSON.parse(value);
278

279
280
281
                // Validate to make sure codeDir doesn't have too many files
                try {
                    await validateCodeDir(this.paiTrialConfig.codeDir);
282
                } catch (error) {
283
284
285
286
                    this.log.error(error);
                    deferred.reject(new Error(error));
                    break;
                }
287
           
288
                // Copy experiment files from local folder to HDFS
289
290
                this.copyExpCodeDirPromise = HDFSClientUtility.copyDirectoryToHdfs(
                    this.paiTrialConfig.codeDir,
291
                    HDFSClientUtility.getHdfsExpCodeDir(this.paiClusterConfig.userName),
292
293
                    this.hdfsClient
                );
294
295
296
297
298
299
                
                // Upload authFile to hdfs
                if (this.paiTrialConfig.authFile) {
                    this.authFileHdfsPath = unixPathJoin(HDFSClientUtility.hdfsExpRootDir(this.paiClusterConfig.userName), 'authFile');
                    this.copyAuthFilePromise = HDFSClientUtility.copyFileToHdfs(this.paiTrialConfig.authFile, this.authFileHdfsPath, this.hdfsClient);
                }
fishyds's avatar
fishyds committed
300

301
302
                deferred.resolve();
                break;
303
304
305
            case TrialConfigMetadataKey.VERSION_CHECK:
                this.versionCheck = (value === 'true' || value === 'True');
                break;
SparkSnail's avatar
SparkSnail committed
306
307
308
            case TrialConfigMetadataKey.LOG_COLLECTION:
                this.logCollection = value;
                break;
309
310
311
            case TrialConfigMetadataKey.MULTI_PHASE:
                this.isMultiPhase = (value === 'true' || value === 'True');
                break;
312
313
314
315
316
            default:
                //Reject for unknown keys
                throw new Error(`Uknown key: ${key}`);
        }

317
        return deferred.promise;
318
    }
319
    // tslint:enable: no-unsafe-any
320
321
322
323
324

    public getClusterMetadata(key: string): Promise<string> {
        const deferred : Deferred<string> = new Deferred<string>();

        deferred.resolve();
325
326

        return deferred.promise;
327
328
329
    }

    public async cleanUp(): Promise<void> {
chicm-ms's avatar
chicm-ms committed
330
        this.log.info('Stopping PAI training service...');
331
332
333
334
335
336
337
338
339
        this.stopping = true;

        const deferred : Deferred<void> = new Deferred<void>();
        const restServer: PAIJobRestServer = component.get(PAIJobRestServer);
        try {
            await restServer.stop();
            deferred.resolve();
            this.log.info('PAI Training service rest server stopped successfully.');
        } catch (error) {
340
            // tslint:disable-next-line: no-unsafe-any
341
            this.log.error(`PAI Training service rest server stopped failed, error: ${error.message}`);
342
343
344
            deferred.reject(error);
        }

345
        return deferred.promise;
346
347
348
349
350
    }

    public get MetricsEmitter() : EventEmitter {
        return this.metricsEmitter;
    }
351

352
353
354
355
356
    // tslint:disable-next-line:max-func-body-length
    private async submitTrialJobToPAI(trialJobId: string): Promise<boolean> {
        const deferred : Deferred<boolean> = new Deferred<boolean>();
        const trialJobDetail: PAITrialJobDetail | undefined = this.trialJobsMap.get(trialJobId);

357
        if (trialJobDetail === undefined) {
358
359
360
            throw new Error(`Failed to find PAITrialJobDetail for job ${trialJobId}`);
        }

361
        if (this.paiClusterConfig === undefined) {
362
363
            throw new Error('PAI Cluster config is not initialized');
        }
364
        if (this.paiTrialConfig === undefined) {
365
366
            throw new Error('trial config is not initialized');
        }
367
        if (this.paiToken === undefined) {
368
369
370
            throw new Error('PAI token is not initialized');
        }

371
        if (this.paiRestServerPort === undefined) {
372
373
374
375
376
            const restServer: PAIJobRestServer = component.get(PAIJobRestServer);
            this.paiRestServerPort = restServer.clusterRestServerPort;
        }

        // Make sure experiment code files is copied from local to HDFS
377
        if (this.copyExpCodeDirPromise !== undefined) {
378
379
380
            await this.copyExpCodeDirPromise;
        }

381
382
383
384
        //Make sure authFile is copied from local to HDFS
        if (this.paiTrialConfig.authFile) {
            await this.copyAuthFilePromise;
        }
385
386
387
388
        // Step 1. Prepare PAI job configuration

        const trialLocalTempFolder: string = path.join(getExperimentRootDir(), 'trials-local', trialJobId);
        //create tmp trial working folder locally.
389
        await execMkdir(trialLocalTempFolder);
390
391
392
393
394
395

        const runScriptContent : string = CONTAINER_INSTALL_NNI_SHELL_FORMAT;
        // Write NNI installation file to local tmp files
        await fs.promises.writeFile(path.join(trialLocalTempFolder, 'install_nni.sh'), runScriptContent, { encoding: 'utf8' });

        // Write file content ( parameter.cfg ) to local tmp folders
396
        if (trialJobDetail.form !== undefined) {
397
            await fs.promises.writeFile(
398
399
                path.join(trialLocalTempFolder, generateParamFileName(trialJobDetail.form.hyperParameters)),
                trialJobDetail.form.hyperParameters.value, { encoding: 'utf8' }
400
401
            );
        }
Shinai Yang's avatar
Shinai Yang committed
402
403
        const hdfsCodeDir: string = HDFSClientUtility.getHdfsTrialWorkDir(this.paiClusterConfig.userName, trialJobId);
        const hdfsOutputDir: string = unixPathJoin(hdfsCodeDir, 'nnioutput');
404
        // tslint:disable-next-line: strict-boolean-expressions
405
406
407
408
409
410
411
412
413
        const nniManagerIp: string = this.nniManagerIpConfig ? this.nniManagerIpConfig.nniManagerIp : getIPV4Address();
        const version: string = this.versionCheck ? await getVersion() : '';
        const nniPaiTrialCommand : string = String.Format(
            PAI_TRIAL_COMMAND_FORMAT,
            // PAI will copy job's codeDir into /root directory
            `$PWD/${trialJobId}`,
            `$PWD/${trialJobId}/nnioutput`,
            trialJobId,
            this.experimentId,
414
            trialJobDetail.form.sequenceId,
415
            this.isMultiPhase,
416
417
418
            this.paiTrialConfig.command,
            nniManagerIp,
            this.paiRestServerPort,
Shinai Yang's avatar
Shinai Yang committed
419
            hdfsOutputDir,
420
            this.paiClusterConfig.host,
421
422
423
424
            this.paiClusterConfig.userName,
            HDFSClientUtility.getHdfsExpCodeDir(this.paiClusterConfig.userName),
            version,
            this.logCollection
425
426
        )
        .replace(/\r\n|\n|\r/gm, '');
427

428
        // tslint:disable-next-line:no-console
SparkSnail's avatar
SparkSnail committed
429
        this.log.info(`nniPAItrial command is ${nniPaiTrialCommand.trim()}`);
430
431
432
433
434
435
436
437
438
439
440
441
442
443
        const paiTaskRoles : PAITaskRole[] = [
            new PAITaskRole(
                `nni_trail_${trialJobId}`,
                // Task role number
                1,
                // Task CPU number
                this.paiTrialConfig.cpuNum,
                // Task memory
                this.paiTrialConfig.memoryMB,
                // Task GPU number
                this.paiTrialConfig.gpuNum,
                // Task command
                nniPaiTrialCommand,
                // Task shared memory
444
                this.paiTrialConfig.shmMB,
445
446
                // Task portList
                this.paiTrialConfig.portList
447
448
449
450
451
452
453
454
455
            )
        ];

        const paiJobConfig : PAIJobConfig = new PAIJobConfig(
            // Job name
            trialJobDetail.paiJobName,
            // Docker image
            this.paiTrialConfig.image,
            // codeDir
Shinai Yang's avatar
Shinai Yang committed
456
            `$PAI_DEFAULT_FS_URI${hdfsCodeDir}`,
457
458
459
            // PAI Task roles
            paiTaskRoles,
            // Add Virutal Cluster
460
461
            this.paiTrialConfig.virtualCluster === undefined ? 'default' : this.paiTrialConfig.virtualCluster.toString(),
            //Task auth File
462
            this.authFileHdfsPath
463
464
465
466
        );

        // Step 2. Upload code files in codeDir onto HDFS
        try {
Shinai Yang's avatar
Shinai Yang committed
467
            await HDFSClientUtility.copyDirectoryToHdfs(trialLocalTempFolder, hdfsCodeDir, this.hdfsClient);
468
        } catch (error) {
Shinai Yang's avatar
Shinai Yang committed
469
            this.log.error(`PAI Training service: copy ${this.paiTrialConfig.codeDir} to HDFS ${hdfsCodeDir} failed, error is ${error}`);
470
471
472
473
            trialJobDetail.status = 'FAILED';
            deferred.resolve(true);

            return deferred.promise;
474
475
476
477
478
479
480
481
482
483
484
485
486
487
        }

        // Step 3. Submit PAI job via Rest call
        // Refer https://github.com/Microsoft/pai/blob/master/docs/rest-server/API.md for more detail about PAI Rest API
        const submitJobRequest: request.Options = {
            uri: `http://${this.paiClusterConfig.host}/rest-server/api/v1/user/${this.paiClusterConfig.userName}/jobs`,
            method: 'POST',
            json: true,
            body: paiJobConfig,
            headers: {
                'Content-Type': 'application/json',
                Authorization: `Bearer ${this.paiToken}`
            }
        };
488
        // tslint:disable:no-any no-unsafe-any
489
        request(submitJobRequest, (error: Error, response: request.Response, body: any) => {
490
491
            if ((error !== undefined && error !== null) || response.statusCode >= 400) {
                const errorMessage : string = (error !== undefined && error !== null) ? error.message :
492
                    `Submit trial ${trialJobId} failed, http code:${response.statusCode}, http body: ${response.body.message}`;
493
                trialJobDetail.status = 'FAILED';
494
                deferred.resolve(true);
495
496
497
498
499
500
501
502
503
504
505
            } else {
                trialJobDetail.submitTime = Date.now();
                deferred.resolve(true);
            }
        });

        return deferred.promise;
    }

    private async statusCheckingLoop(): Promise<void> {
        while (!this.stopping) {
506
507
508
509
510
511
512
513
514
            if(this.paiClusterConfig && this.paiClusterConfig.passWord) {
                try {
                    await this.updatePaiToken();
                } catch (error) {
                    this.log.error(`${error}`);
                    //only throw error when initlize paiToken first time
                    if (this.paiToken === undefined) {
                        throw new Error(error);
                    }
SparkSnail's avatar
SparkSnail committed
515
516
                }
            }
517
518
            await this.paiJobCollector.retrieveTrialStatus(this.paiToken, this.paiClusterConfig);
            const restServer: PAIJobRestServer = component.get(PAIJobRestServer);
519
            if (restServer.getErrorMessage !== undefined) {
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
                throw new Error(restServer.getErrorMessage);
            }
            await delay(3000);
        }
    }

    private async submitJobLoop(): Promise<void> {
        while (!this.stopping) {
            while (!this.stopping && this.jobQueue.length > 0) {
                const trialJobId: string = this.jobQueue[0];
                if (await this.submitTrialJobToPAI(trialJobId)) {
                    // Remove trial job with trialJobId from job queue
                    this.jobQueue.shift();
                } else {
                    // Break the while loop since failed to submitJob
                    break;
                }
            }
            await delay(3000);
        }
    }

542
543
544
545
546
    /**
     * Update pai token by the interval time or initialize the pai token
     */
    private async updatePaiToken(): Promise<void> {
        const deferred : Deferred<void> = new Deferred<void>();
547
548

        const currentTime: number = new Date().getTime();
549
        //If pai token initialized and not reach the interval time, do not update
550
        if (this.paiTokenUpdateTime !== undefined && (currentTime - this.paiTokenUpdateTime) < this.paiTokenUpdateInterval) {
551
552
            return Promise.resolve();
        }
553

554
        if (this.paiClusterConfig === undefined) {
555
            const paiClusterConfigError: string = `pai cluster config not initialized!`;
556
            this.log.error(`${paiClusterConfigError}`);
557
            throw Error(`${paiClusterConfigError}`);
558
559
        }

560
        const authenticationReq: request.Options = {
561
562
563
564
565
566
567
568
569
            uri: `http://${this.paiClusterConfig.host}/rest-server/api/v1/token`,
            method: 'POST',
            json: true,
            body: {
                username: this.paiClusterConfig.userName,
                password: this.paiClusterConfig.passWord
            }
        };

570
571
        request(authenticationReq, (error: Error, response: request.Response, body: any) => {
            if (error !== undefined && error !== null) {
572
573
574
                this.log.error(`Get PAI token failed: ${error.message}`);
                deferred.reject(new Error(`Get PAI token failed: ${error.message}`));
            } else {
575
                if (response.statusCode !== 200) {
576
                    this.log.error(`Get PAI token failed: get PAI Rest return code ${response.statusCode}`);
577
                    deferred.reject(new Error(`Get PAI token failed: ${response.body}, please check paiConfig username or password`));
578
579
580
581
582
583
                }
                this.paiToken = body.token;
                this.paiTokenUpdateTime = new Date().getTime();
                deferred.resolve();
            }
        });
584

585
586
587
588
589
590
591
592
        let timeoutId: NodeJS.Timer;
        const timeoutDelay: Promise<void> = new Promise<void>((resolve: Function, reject: Function): void => {
            // Set timeout and reject the promise once reach timeout (5 seconds)
            timeoutId = setTimeout(
                () => reject(new Error('Get PAI token timeout. Please check your PAI cluster.')),
                5000);
        });

593
        return Promise.race([timeoutDelay, deferred.promise])
594
            .finally(() => { clearTimeout(timeoutId); });
595
    }
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639

    private async writeParameterFile(trialJobId: string, hyperParameters: HyperParameters): Promise<void> {
        if (this.paiClusterConfig === undefined) {
            throw new Error('PAI Cluster config is not initialized');
        }
        if (this.paiTrialConfig === undefined) {
            throw new Error('PAI trial config is not initialized');
        }

        const trialLocalTempFolder: string = path.join(getExperimentRootDir(), 'trials-local', trialJobId);
        const hpFileName: string = generateParamFileName(hyperParameters);
        const localFilepath: string = path.join(trialLocalTempFolder, hpFileName);
        await fs.promises.writeFile(localFilepath, hyperParameters.value, { encoding: 'utf8' });
        const hdfsCodeDir: string = HDFSClientUtility.getHdfsTrialWorkDir(this.paiClusterConfig.userName, trialJobId);
        const hdfsHpFilePath: string = path.join(hdfsCodeDir, hpFileName);

        await HDFSClientUtility.copyFileToHdfs(localFilepath, hdfsHpFilePath, this.hdfsClient);

        await this.postParameterFileMeta({
            experimentId: this.experimentId,
            trialId: trialJobId,
            filePath: hdfsHpFilePath
        });
    }

    private postParameterFileMeta(parameterFileMeta: ParameterFileMeta): Promise<void> {
        const deferred : Deferred<void> = new Deferred<void>();
        const restServer: PAIJobRestServer = component.get(PAIJobRestServer);
        const req: request.Options = {
            uri: `${restServer.endPoint}${restServer.apiRootUrl}/parameter-file-meta`,
            method: 'POST',
            json: true,
            body: parameterFileMeta
        };
        request(req, (err: Error, res: request.Response) => {
            if (err) {
                deferred.reject(err);
            } else {
                deferred.resolve();
            }
        });

        return deferred.promise;
    }
640
641
}

642
export { PAITrainingService };