paiTrainingService.ts 27 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
 * Copyright (c) Microsoft Corporation
 * All rights reserved.
 *
 * MIT License
 *
 * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
 * documentation files (the "Software"), to deal in the Software without restriction, including without limitation
 * the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and
 * to permit persons to whom the Software is furnished to do so, subject to the following conditions:
 * The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
 *
 * THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING
 * BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
 * DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
 */

20
'use strict';
21
22
23
24

import * as cpp from 'child-process-promise';
import * as fs from 'fs';
import * as path from 'path';
25
// tslint:disable-next-line:no-implicit-dependencies
26
import * as request from 'request';
27
import * as component from '../../common/component';
28
29

import { EventEmitter } from 'events';
30
31
import { Deferred } from 'ts-deferred';
import { String } from 'typescript-string-operations';
32
import { MethodNotImplementedError } from '../../common/errors';
33
import { getExperimentId, getInitTrialSequenceId } from '../../common/experimentStartupInfo';
34
35
import { getLogger, Logger } from '../../common/log';
import {
36
    HyperParameters, JobApplicationForm, NNIManagerIpConfig, TrainingService,
37
    TrialJobApplicationForm, TrialJobDetail, TrialJobMetric
38
} from '../../common/trainingService';
39
import { delay, generateParamFileName,
40
    getExperimentRootDir, getIPV4Address, getVersion, uniqueString, unixPathJoin } from '../../common/utils';
41
42
import { CONTAINER_INSTALL_NNI_SHELL_FORMAT } from '../common/containerJobData';
import { TrialConfigMetadataKey } from '../common/trialConfigMetadataKey';
43
import { execMkdir, validateCodeDir } from '../common/util';
44
45
import { HDFSClientUtility } from './hdfsClientUtility';
import { NNIPAITrialConfig, PAIClusterConfig, PAIJobConfig, PAITaskRole } from './paiConfig';
46
import { PAI_LOG_PATH_FORMAT, PAI_TRIAL_COMMAND_FORMAT, PAITrialJobDetail } from './paiData';
47
import { PAIJobInfoCollector } from './paiJobInfoCollector';
48
import { PAIJobRestServer, ParameterFileMeta } from './paiJobRestServer';
49

50
import * as WebHDFS from 'webhdfs';
51
52
53
54
55
56
57
58
59
60
61
62
63

/**
 * Training Service implementation for OpenPAI (Open Platform for AI)
 * Refer https://github.com/Microsoft/pai for more info about OpenPAI
 */
@component.Singleton
class PAITrainingService implements TrainingService {
    private readonly log!: Logger;
    private readonly metricsEmitter: EventEmitter;
    private readonly trialJobsMap: Map<string, PAITrialJobDetail>;
    private readonly expRootDir: string;
    private paiTrialConfig: NNIPAITrialConfig | undefined;
    private paiClusterConfig?: PAIClusterConfig;
64
    private readonly jobQueue: string[];
65
    private stopping: boolean = false;
66
    // tslint:disable-next-line:no-any
67
68
    private hdfsClient: any;
    private paiToken? : string;
69
    private paiTokenUpdateTime?: number;
70
71
    private readonly paiTokenUpdateInterval: number;
    private readonly experimentId! : string;
72
    private readonly paiJobCollector : PAIJobInfoCollector;
73
    private nextTrialSequenceId: number;
74
    private paiRestServerPort?: number;
75
    private nniManagerIpConfig?: NNIManagerIpConfig;
76
    private copyExpCodeDirPromise?: Promise<void>;
77
    private copyAuthFilePromise?: Promise<void>;
78
    private versionCheck: boolean = true;
SparkSnail's avatar
SparkSnail committed
79
    private logCollection: string;
80
    private isMultiPhase: boolean = false;
81
    private authFileHdfsPath: string | undefined = undefined;
82
    private portList?: string | undefined;
83
84
85
86
87

    constructor() {
        this.log = getLogger();
        this.metricsEmitter = new EventEmitter();
        this.trialJobsMap = new Map<string, PAITrialJobDetail>();
88
        this.jobQueue = [];
89
90
        // Root dir on HDFS
        this.expRootDir = path.join('/nni', 'experiments', getExperimentId());
91
        this.experimentId = getExperimentId();
92
        this.paiJobCollector = new PAIJobInfoCollector(this.trialJobsMap);
93
        this.nextTrialSequenceId = -1;
94
        this.paiTokenUpdateInterval = 7200000; //2hours
SparkSnail's avatar
SparkSnail committed
95
        this.logCollection = 'none';
chicm-ms's avatar
chicm-ms committed
96
        this.log.info('Construct OpenPAI training service.');
97
98
99
    }

    public async run(): Promise<void> {
chicm-ms's avatar
chicm-ms committed
100
        this.log.info('Run PAI training service.');
101
102
        const restServer: PAIJobRestServer = component.get(PAIJobRestServer);
        await restServer.start();
103
        restServer.setEnableVersionCheck = this.versionCheck;
104
        this.log.info(`PAI Training service rest server listening on: ${restServer.endPoint}`);
105
106
107
        await Promise.all([
            this.statusCheckingLoop(),
            this.submitJobLoop()]);
chicm-ms's avatar
chicm-ms committed
108
        this.log.info('PAI training service exit.');
109
110
111
112
    }

    public async listTrialJobs(): Promise<TrialJobDetail[]> {
        const jobs: TrialJobDetail[] = [];
113
114

        for (const [key, value] of this.trialJobsMap) {
115
116
117
            if (value.form.jobType === 'TRIAL') {
                jobs.push(await this.getTrialJob(key));
            }
118
        }
119
120
121
122

        return Promise.resolve(jobs);
    }

123
    public async getTrialJob(trialJobId: string): Promise<TrialJobDetail> {
124
        if (this.paiClusterConfig === undefined) {
125
126
127
128
129
            throw new Error('PAI Cluster config is not initialized');
        }

        const paiTrialJob: PAITrialJobDetail | undefined = this.trialJobsMap.get(trialJobId);

130
        if (paiTrialJob === undefined) {
131
132
            return Promise.reject(`trial job ${trialJobId} not found`);
        }
133
134
135
136

        return Promise.resolve(paiTrialJob);
    }

137
    public addTrialJobMetricListener(listener: (metric: TrialJobMetric) => void): void {
138
139
140
        this.metricsEmitter.on('metric', listener);
    }

141
    public removeTrialJobMetricListener(listener: (metric: TrialJobMetric) => void): void {
142
143
144
145
        this.metricsEmitter.off('metric', listener);
    }

    public async submitTrialJob(form: JobApplicationForm): Promise<TrialJobDetail> {
146
147
        if (this.paiClusterConfig === undefined) {
            throw new Error(`paiClusterConfig not initialized!`);
fishyds's avatar
fishyds committed
148
        }
149
        const deferred : Deferred<PAITrialJobDetail> = new Deferred<PAITrialJobDetail>();
fishyds's avatar
fishyds committed
150

151
152
153
        this.log.info(`submitTrialJob: form: ${JSON.stringify(form)}`);

        const trialJobId: string = uniqueString(5);
154
        const trialSequenceId: number = this.generateSequenceId();
155
156
        //TODO: use HDFS working folder instead
        const trialWorkingFolder: string = path.join(this.expRootDir, 'trials', trialJobId);
157
        const paiJobName: string = `nni_exp_${this.experimentId}_trial_${trialJobId}`;
Shinai Yang's avatar
Shinai Yang committed
158
159
        const hdfsCodeDir: string = HDFSClientUtility.getHdfsTrialWorkDir(this.paiClusterConfig.userName, trialJobId);
        const hdfsOutputDir: string = unixPathJoin(hdfsCodeDir, 'nnioutput');
160

161
162
        const hdfsLogPath : string = String.Format(
            PAI_LOG_PATH_FORMAT,
163
            this.paiClusterConfig.host,
Shinai Yang's avatar
Shinai Yang committed
164
            hdfsOutputDir
165
            );
166
167
168
169

        const trialJobDetail: PAITrialJobDetail = new PAITrialJobDetail(
            trialJobId,
            'WAITING',
170
            paiJobName,
171
172
            Date.now(),
            trialWorkingFolder,
173
174
            form,
            trialSequenceId,
175
176
            hdfsLogPath);

177
178
179
        this.trialJobsMap.set(trialJobId, trialJobDetail);
        this.jobQueue.push(trialJobId);
        deferred.resolve(trialJobDetail);
180
181
182
183

        return deferred.promise;
    }

184
185
186
187
188
189
190
191
192
193
194
195
    public async updateTrialJob(trialJobId: string, form: JobApplicationForm): Promise<TrialJobDetail> {
        const trialJobDetail: undefined | TrialJobDetail = this.trialJobsMap.get(trialJobId);
        if (trialJobDetail === undefined) {
            throw new Error(`updateTrialJob failed: ${trialJobId} not found`);
        }
        if (form.jobType === 'TRIAL') {
                await this.writeParameterFile(trialJobId, (<TrialJobApplicationForm>form).hyperParameters);
        } else {
            throw new Error(`updateTrialJob failed: jobType ${form.jobType} not supported.`);
        }

        return trialJobDetail;
196
197
198
    }

    public get isMultiPhaseJobSupported(): boolean {
199
        return true;
200
201
    }

202
    // tslint:disable:no-http-string
QuanluZhang's avatar
QuanluZhang committed
203
    public cancelTrialJob(trialJobId: string, isEarlyStopped: boolean = false): Promise<void> {
204
205
        const trialJobDetail : PAITrialJobDetail | undefined =  this.trialJobsMap.get(trialJobId);
        const deferred : Deferred<void> = new Deferred<void>();
206
        if (trialJobDetail === undefined) {
207
            this.log.error(`cancelTrialJob: trial job id ${trialJobId} not found`);
208

209
210
211
            return Promise.reject();
        }

212
        if (this.paiClusterConfig === undefined) {
213
            throw new Error('PAI Cluster config is not initialized');
214
        }
215
        if (this.paiToken === undefined) {
216
217
218
219
            throw new Error('PAI token is not initialized');
        }

        const stopJobRequest: request.Options = {
220
221
            uri: `http://${this.paiClusterConfig.host}/rest-server/api/v1/user/${this.paiClusterConfig.userName}\
/jobs/${trialJobDetail.paiJobName}/executionType`,
222
223
            method: 'PUT',
            json: true,
224
            body: {value: 'STOP'},
225
            headers: {
226
227
                'Content-Type': 'application/json',
                Authorization: `Bearer ${this.paiToken}`
228
229
            }
        };
230
231
232
233

        // Set trialjobDetail's early stopped field, to mark the job's cancellation source
        trialJobDetail.isEarlyStopped = isEarlyStopped;

234
        // tslint:disable-next-line:no-any
235
        request(stopJobRequest, (error: Error, response: request.Response, body: any) => {
236
            if ((error !== undefined && error !== null) || response.statusCode >= 400) {
237
                this.log.error(`PAI Training service: stop trial ${trialJobId} to PAI Cluster failed!`);
238
239
                deferred.reject((error !== undefined && error !== null) ? error.message :
                 `Stop trial failed, http code: ${response.statusCode}`);
240
241
242
243
244
            } else {
                deferred.resolve();
            }
        });

245
        return deferred.promise;
246
247
    }

248
    // tslint:disable: no-unsafe-any no-any
249
    // tslint:disable-next-line:max-func-body-length
fishyds's avatar
fishyds committed
250
    public async setClusterMetadata(key: string, value: string): Promise<void> {
251
252
253
        const deferred : Deferred<void> = new Deferred<void>();

        switch (key) {
254
255
256
257
258
            case TrialConfigMetadataKey.NNI_MANAGER_IP:
                this.nniManagerIpConfig = <NNIManagerIpConfig>JSON.parse(value);
                deferred.resolve();
                break;

259
260
            case TrialConfigMetadataKey.PAI_CLUSTER_CONFIG:
                this.paiClusterConfig = <PAIClusterConfig>JSON.parse(value);
261

262
263
                this.hdfsClient = WebHDFS.createClient({
                    user: this.paiClusterConfig.userName,
264
265
                    // Refer PAI document for Pylon mapping https://github.com/Microsoft/pai/tree/master/docs/pylon
                    port: 80,
266
                    path: '/webhdfs/api/v1',
267
268
269
270
                    host: this.paiClusterConfig.host
                });

                // Get PAI authentication token
271
                await this.updatePaiToken();
272
                deferred.resolve();
273
                break;
274

275
            case TrialConfigMetadataKey.TRIAL_CONFIG:
276
                if (this.paiClusterConfig === undefined) {
277
                    this.log.error('pai cluster config is not initialized');
fishyds's avatar
fishyds committed
278
                    deferred.reject(new Error('pai cluster config is not initialized'));
279
280
281
                    break;
                }
                this.paiTrialConfig = <NNIPAITrialConfig>JSON.parse(value);
282

283
284
285
                // Validate to make sure codeDir doesn't have too many files
                try {
                    await validateCodeDir(this.paiTrialConfig.codeDir);
286
                } catch (error) {
287
288
289
290
                    this.log.error(error);
                    deferred.reject(new Error(error));
                    break;
                }
291
           
292
                // Copy experiment files from local folder to HDFS
293
294
                this.copyExpCodeDirPromise = HDFSClientUtility.copyDirectoryToHdfs(
                    this.paiTrialConfig.codeDir,
295
                    HDFSClientUtility.getHdfsExpCodeDir(this.paiClusterConfig.userName),
296
297
                    this.hdfsClient
                );
298
299
300
301
302
303
                
                // Upload authFile to hdfs
                if (this.paiTrialConfig.authFile) {
                    this.authFileHdfsPath = unixPathJoin(HDFSClientUtility.hdfsExpRootDir(this.paiClusterConfig.userName), 'authFile');
                    this.copyAuthFilePromise = HDFSClientUtility.copyFileToHdfs(this.paiTrialConfig.authFile, this.authFileHdfsPath, this.hdfsClient);
                }
fishyds's avatar
fishyds committed
304

305
306
                deferred.resolve();
                break;
307
308
309
            case TrialConfigMetadataKey.VERSION_CHECK:
                this.versionCheck = (value === 'true' || value === 'True');
                break;
SparkSnail's avatar
SparkSnail committed
310
311
312
            case TrialConfigMetadataKey.LOG_COLLECTION:
                this.logCollection = value;
                break;
313
314
315
            case TrialConfigMetadataKey.MULTI_PHASE:
                this.isMultiPhase = (value === 'true' || value === 'True');
                break;
316
317
318
319
320
            default:
                //Reject for unknown keys
                throw new Error(`Uknown key: ${key}`);
        }

321
        return deferred.promise;
322
    }
323
    // tslint:enable: no-unsafe-any
324
325
326
327
328

    public getClusterMetadata(key: string): Promise<string> {
        const deferred : Deferred<string> = new Deferred<string>();

        deferred.resolve();
329
330

        return deferred.promise;
331
332
333
    }

    public async cleanUp(): Promise<void> {
chicm-ms's avatar
chicm-ms committed
334
        this.log.info('Stopping PAI training service...');
335
336
337
338
339
340
341
342
343
        this.stopping = true;

        const deferred : Deferred<void> = new Deferred<void>();
        const restServer: PAIJobRestServer = component.get(PAIJobRestServer);
        try {
            await restServer.stop();
            deferred.resolve();
            this.log.info('PAI Training service rest server stopped successfully.');
        } catch (error) {
344
            // tslint:disable-next-line: no-unsafe-any
345
            this.log.error(`PAI Training service rest server stopped failed, error: ${error.message}`);
346
347
348
            deferred.reject(error);
        }

349
        return deferred.promise;
350
351
352
353
354
    }

    public get MetricsEmitter() : EventEmitter {
        return this.metricsEmitter;
    }
355

356
357
358
359
360
    // tslint:disable-next-line:max-func-body-length
    private async submitTrialJobToPAI(trialJobId: string): Promise<boolean> {
        const deferred : Deferred<boolean> = new Deferred<boolean>();
        const trialJobDetail: PAITrialJobDetail | undefined = this.trialJobsMap.get(trialJobId);

361
        if (trialJobDetail === undefined) {
362
363
364
            throw new Error(`Failed to find PAITrialJobDetail for job ${trialJobId}`);
        }

365
        if (this.paiClusterConfig === undefined) {
366
367
            throw new Error('PAI Cluster config is not initialized');
        }
368
        if (this.paiTrialConfig === undefined) {
369
370
            throw new Error('trial config is not initialized');
        }
371
        if (this.paiToken === undefined) {
372
373
374
            throw new Error('PAI token is not initialized');
        }

375
        if (this.paiRestServerPort === undefined) {
376
377
378
379
380
            const restServer: PAIJobRestServer = component.get(PAIJobRestServer);
            this.paiRestServerPort = restServer.clusterRestServerPort;
        }

        // Make sure experiment code files is copied from local to HDFS
381
        if (this.copyExpCodeDirPromise !== undefined) {
382
383
384
            await this.copyExpCodeDirPromise;
        }

385
386
387
388
        //Make sure authFile is copied from local to HDFS
        if (this.paiTrialConfig.authFile) {
            await this.copyAuthFilePromise;
        }
389
390
391
392
        // Step 1. Prepare PAI job configuration

        const trialLocalTempFolder: string = path.join(getExperimentRootDir(), 'trials-local', trialJobId);
        //create tmp trial working folder locally.
393
        await execMkdir(trialLocalTempFolder);
394
395
396
397
398
399
400

        const runScriptContent : string = CONTAINER_INSTALL_NNI_SHELL_FORMAT;
        // Write NNI installation file to local tmp files
        await fs.promises.writeFile(path.join(trialLocalTempFolder, 'install_nni.sh'), runScriptContent, { encoding: 'utf8' });

        // Write file content ( parameter.cfg ) to local tmp folders
        const trialForm : TrialJobApplicationForm = (<TrialJobApplicationForm>trialJobDetail.form);
401
        if (trialForm !== undefined) {
402
403
404
405
406
            await fs.promises.writeFile(
                path.join(trialLocalTempFolder, generateParamFileName(trialForm.hyperParameters)),
                trialForm.hyperParameters.value, { encoding: 'utf8' }
            );
        }
Shinai Yang's avatar
Shinai Yang committed
407
408
        const hdfsCodeDir: string = HDFSClientUtility.getHdfsTrialWorkDir(this.paiClusterConfig.userName, trialJobId);
        const hdfsOutputDir: string = unixPathJoin(hdfsCodeDir, 'nnioutput');
409
        // tslint:disable-next-line: strict-boolean-expressions
410
411
412
413
414
415
416
417
418
419
        const nniManagerIp: string = this.nniManagerIpConfig ? this.nniManagerIpConfig.nniManagerIp : getIPV4Address();
        const version: string = this.versionCheck ? await getVersion() : '';
        const nniPaiTrialCommand : string = String.Format(
            PAI_TRIAL_COMMAND_FORMAT,
            // PAI will copy job's codeDir into /root directory
            `$PWD/${trialJobId}`,
            `$PWD/${trialJobId}/nnioutput`,
            trialJobId,
            this.experimentId,
            trialJobDetail.sequenceId,
420
            this.isMultiPhase,
421
422
423
            this.paiTrialConfig.command,
            nniManagerIp,
            this.paiRestServerPort,
Shinai Yang's avatar
Shinai Yang committed
424
            hdfsOutputDir,
425
            this.paiClusterConfig.host,
426
427
428
429
            this.paiClusterConfig.userName,
            HDFSClientUtility.getHdfsExpCodeDir(this.paiClusterConfig.userName),
            version,
            this.logCollection
430
431
        )
        .replace(/\r\n|\n|\r/gm, '');
432

433
        // tslint:disable-next-line:no-console
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
        console.log(`nniPAItrial command is ${nniPaiTrialCommand.trim()}`);
        const paiTaskRoles : PAITaskRole[] = [
            new PAITaskRole(
                `nni_trail_${trialJobId}`,
                // Task role number
                1,
                // Task CPU number
                this.paiTrialConfig.cpuNum,
                // Task memory
                this.paiTrialConfig.memoryMB,
                // Task GPU number
                this.paiTrialConfig.gpuNum,
                // Task command
                nniPaiTrialCommand,
                // Task shared memory
449
                this.paiTrialConfig.shmMB,
450
451
                // Task portList
                this.paiTrialConfig.portList
452
453
454
455
456
457
458
459
460
            )
        ];

        const paiJobConfig : PAIJobConfig = new PAIJobConfig(
            // Job name
            trialJobDetail.paiJobName,
            // Docker image
            this.paiTrialConfig.image,
            // codeDir
Shinai Yang's avatar
Shinai Yang committed
461
            `$PAI_DEFAULT_FS_URI${hdfsCodeDir}`,
462
463
464
            // PAI Task roles
            paiTaskRoles,
            // Add Virutal Cluster
465
466
            this.paiTrialConfig.virtualCluster === undefined ? 'default' : this.paiTrialConfig.virtualCluster.toString(),
            //Task auth File
467
            this.authFileHdfsPath
468
469
470
471
        );

        // Step 2. Upload code files in codeDir onto HDFS
        try {
Shinai Yang's avatar
Shinai Yang committed
472
            await HDFSClientUtility.copyDirectoryToHdfs(trialLocalTempFolder, hdfsCodeDir, this.hdfsClient);
473
        } catch (error) {
Shinai Yang's avatar
Shinai Yang committed
474
            this.log.error(`PAI Training service: copy ${this.paiTrialConfig.codeDir} to HDFS ${hdfsCodeDir} failed, error is ${error}`);
475
476
477
478
            trialJobDetail.status = 'FAILED';
            deferred.resolve(true);

            return deferred.promise;
479
480
481
482
483
484
485
486
487
488
489
490
491
492
        }

        // Step 3. Submit PAI job via Rest call
        // Refer https://github.com/Microsoft/pai/blob/master/docs/rest-server/API.md for more detail about PAI Rest API
        const submitJobRequest: request.Options = {
            uri: `http://${this.paiClusterConfig.host}/rest-server/api/v1/user/${this.paiClusterConfig.userName}/jobs`,
            method: 'POST',
            json: true,
            body: paiJobConfig,
            headers: {
                'Content-Type': 'application/json',
                Authorization: `Bearer ${this.paiToken}`
            }
        };
493
        // tslint:disable:no-any no-unsafe-any
494
        request(submitJobRequest, (error: Error, response: request.Response, body: any) => {
495
496
            if ((error !== undefined && error !== null) || response.statusCode >= 400) {
                const errorMessage : string = (error !== undefined && error !== null) ? error.message :
497
498
499
                    `Submit trial ${trialJobId} failed, http code:${response.statusCode}, http body: ${response.body}`;
                this.log.error(errorMessage);
                trialJobDetail.status = 'FAILED';
500
                deferred.resolve(true);
501
502
503
504
505
506
507
508
509
            } else {
                trialJobDetail.submitTime = Date.now();
                deferred.resolve(true);
            }
        });

        return deferred.promise;
    }

510
    private generateSequenceId(): number {
511
512
        if (this.nextTrialSequenceId === -1) {
            this.nextTrialSequenceId = getInitTrialSequenceId();
513
514
        }

515
        return this.nextTrialSequenceId++;
516
    }
517
518
519

    private async statusCheckingLoop(): Promise<void> {
        while (!this.stopping) {
520
            try {
SparkSnail's avatar
SparkSnail committed
521
                await this.updatePaiToken();
522
            } catch (error) {
SparkSnail's avatar
SparkSnail committed
523
524
                this.log.error(`${error}`);
                //only throw error when initlize paiToken first time
525
                if (this.paiToken === undefined) {
SparkSnail's avatar
SparkSnail committed
526
527
528
                    throw new Error(error);
                }
            }
529
530
            await this.paiJobCollector.retrieveTrialStatus(this.paiToken, this.paiClusterConfig);
            const restServer: PAIJobRestServer = component.get(PAIJobRestServer);
531
            if (restServer.getErrorMessage !== undefined) {
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
                throw new Error(restServer.getErrorMessage);
            }
            await delay(3000);
        }
    }

    private async submitJobLoop(): Promise<void> {
        while (!this.stopping) {
            while (!this.stopping && this.jobQueue.length > 0) {
                const trialJobId: string = this.jobQueue[0];
                if (await this.submitTrialJobToPAI(trialJobId)) {
                    // Remove trial job with trialJobId from job queue
                    this.jobQueue.shift();
                } else {
                    // Break the while loop since failed to submitJob
                    break;
                }
            }
            await delay(3000);
        }
    }

554
555
556
557
558
    /**
     * Update pai token by the interval time or initialize the pai token
     */
    private async updatePaiToken(): Promise<void> {
        const deferred : Deferred<void> = new Deferred<void>();
559
560

        const currentTime: number = new Date().getTime();
561
        //If pai token initialized and not reach the interval time, do not update
562
        if (this.paiTokenUpdateTime !== undefined && (currentTime - this.paiTokenUpdateTime) < this.paiTokenUpdateInterval) {
563
564
            return Promise.resolve();
        }
565

566
        if (this.paiClusterConfig === undefined) {
567
            const paiClusterConfigError: string = `pai cluster config not initialized!`;
568
            this.log.error(`${paiClusterConfigError}`);
569
            throw Error(`${paiClusterConfigError}`);
570
571
        }

572
        const authenticationReq: request.Options = {
573
574
575
576
577
578
579
580
581
            uri: `http://${this.paiClusterConfig.host}/rest-server/api/v1/token`,
            method: 'POST',
            json: true,
            body: {
                username: this.paiClusterConfig.userName,
                password: this.paiClusterConfig.passWord
            }
        };

582
583
        request(authenticationReq, (error: Error, response: request.Response, body: any) => {
            if (error !== undefined && error !== null) {
584
585
586
                this.log.error(`Get PAI token failed: ${error.message}`);
                deferred.reject(new Error(`Get PAI token failed: ${error.message}`));
            } else {
587
                if (response.statusCode !== 200) {
588
                    this.log.error(`Get PAI token failed: get PAI Rest return code ${response.statusCode}`);
589
                    deferred.reject(new Error(`Get PAI token failed: ${response.body}, please check paiConfig username or password`));
590
591
592
593
594
595
                }
                this.paiToken = body.token;
                this.paiTokenUpdateTime = new Date().getTime();
                deferred.resolve();
            }
        });
596

597
598
599
600
601
602
603
604
        let timeoutId: NodeJS.Timer;
        const timeoutDelay: Promise<void> = new Promise<void>((resolve: Function, reject: Function): void => {
            // Set timeout and reject the promise once reach timeout (5 seconds)
            timeoutId = setTimeout(
                () => reject(new Error('Get PAI token timeout. Please check your PAI cluster.')),
                5000);
        });

605
        return Promise.race([timeoutDelay, deferred.promise])
606
            .finally(() => { clearTimeout(timeoutId); });
607
    }
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651

    private async writeParameterFile(trialJobId: string, hyperParameters: HyperParameters): Promise<void> {
        if (this.paiClusterConfig === undefined) {
            throw new Error('PAI Cluster config is not initialized');
        }
        if (this.paiTrialConfig === undefined) {
            throw new Error('PAI trial config is not initialized');
        }

        const trialLocalTempFolder: string = path.join(getExperimentRootDir(), 'trials-local', trialJobId);
        const hpFileName: string = generateParamFileName(hyperParameters);
        const localFilepath: string = path.join(trialLocalTempFolder, hpFileName);
        await fs.promises.writeFile(localFilepath, hyperParameters.value, { encoding: 'utf8' });
        const hdfsCodeDir: string = HDFSClientUtility.getHdfsTrialWorkDir(this.paiClusterConfig.userName, trialJobId);
        const hdfsHpFilePath: string = path.join(hdfsCodeDir, hpFileName);

        await HDFSClientUtility.copyFileToHdfs(localFilepath, hdfsHpFilePath, this.hdfsClient);

        await this.postParameterFileMeta({
            experimentId: this.experimentId,
            trialId: trialJobId,
            filePath: hdfsHpFilePath
        });
    }

    private postParameterFileMeta(parameterFileMeta: ParameterFileMeta): Promise<void> {
        const deferred : Deferred<void> = new Deferred<void>();
        const restServer: PAIJobRestServer = component.get(PAIJobRestServer);
        const req: request.Options = {
            uri: `${restServer.endPoint}${restServer.apiRootUrl}/parameter-file-meta`,
            method: 'POST',
            json: true,
            body: parameterFileMeta
        };
        request(req, (err: Error, res: request.Response) => {
            if (err) {
                deferred.reject(err);
            } else {
                deferred.resolve();
            }
        });

        return deferred.promise;
    }
652
653
}

654
export { PAITrainingService };