paiTrainingService.ts 26.9 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
 * Copyright (c) Microsoft Corporation
 * All rights reserved.
 *
 * MIT License
 *
 * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
 * documentation files (the "Software"), to deal in the Software without restriction, including without limitation
 * the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and
 * to permit persons to whom the Software is furnished to do so, subject to the following conditions:
 * The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
 *
 * THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING
 * BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
 * DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
 */

20
'use strict';
21
22
23
24

import * as cpp from 'child-process-promise';
import * as fs from 'fs';
import * as path from 'path';
25
// tslint:disable-next-line:no-implicit-dependencies
26
import * as request from 'request';
27
import * as component from '../../common/component';
28
29

import { EventEmitter } from 'events';
30
31
import { Deferred } from 'ts-deferred';
import { String } from 'typescript-string-operations';
32
import { MethodNotImplementedError } from '../../common/errors';
33
import { getExperimentId, getInitTrialSequenceId } from '../../common/experimentStartupInfo';
34
35
import { getLogger, Logger } from '../../common/log';
import {
36
    HyperParameters, JobApplicationForm, NNIManagerIpConfig, TrainingService,
37
    TrialJobApplicationForm, TrialJobDetail, TrialJobMetric
38
} from '../../common/trainingService';
39
import { delay, generateParamFileName,
40
    getExperimentRootDir, getIPV4Address, getVersion, uniqueString, unixPathJoin } from '../../common/utils';
41
42
import { CONTAINER_INSTALL_NNI_SHELL_FORMAT } from '../common/containerJobData';
import { TrialConfigMetadataKey } from '../common/trialConfigMetadataKey';
43
import { execMkdir, validateCodeDir } from '../common/util';
44
45
import { HDFSClientUtility } from './hdfsClientUtility';
import { NNIPAITrialConfig, PAIClusterConfig, PAIJobConfig, PAITaskRole } from './paiConfig';
46
import { PAI_LOG_PATH_FORMAT, PAI_TRIAL_COMMAND_FORMAT, PAITrialJobDetail } from './paiData';
47
import { PAIJobInfoCollector } from './paiJobInfoCollector';
48
import { PAIJobRestServer, ParameterFileMeta } from './paiJobRestServer';
49

50
import * as WebHDFS from 'webhdfs';
51
52
53
54
55
56
57
58
59
60
61
62
63

/**
 * Training Service implementation for OpenPAI (Open Platform for AI)
 * Refer https://github.com/Microsoft/pai for more info about OpenPAI
 */
@component.Singleton
class PAITrainingService implements TrainingService {
    private readonly log!: Logger;
    private readonly metricsEmitter: EventEmitter;
    private readonly trialJobsMap: Map<string, PAITrialJobDetail>;
    private readonly expRootDir: string;
    private paiTrialConfig: NNIPAITrialConfig | undefined;
    private paiClusterConfig?: PAIClusterConfig;
64
    private readonly jobQueue: string[];
65
    private stopping: boolean = false;
66
    // tslint:disable-next-line:no-any
67
68
    private hdfsClient: any;
    private paiToken? : string;
69
    private paiTokenUpdateTime?: number;
70
71
    private readonly paiTokenUpdateInterval: number;
    private readonly experimentId! : string;
72
    private readonly paiJobCollector : PAIJobInfoCollector;
73
    private nextTrialSequenceId: number;
74
    private paiRestServerPort?: number;
75
    private nniManagerIpConfig?: NNIManagerIpConfig;
76
    private copyExpCodeDirPromise?: Promise<void>;
77
    private copyAuthFilePromise?: Promise<void>;
78
    private versionCheck: boolean = true;
SparkSnail's avatar
SparkSnail committed
79
    private logCollection: string;
80
    private isMultiPhase: boolean = false;
81
    private authFileHdfsPath: string | undefined = undefined;
82
83
84
85
86

    constructor() {
        this.log = getLogger();
        this.metricsEmitter = new EventEmitter();
        this.trialJobsMap = new Map<string, PAITrialJobDetail>();
87
        this.jobQueue = [];
88
89
        // Root dir on HDFS
        this.expRootDir = path.join('/nni', 'experiments', getExperimentId());
90
        this.experimentId = getExperimentId();
91
        this.paiJobCollector = new PAIJobInfoCollector(this.trialJobsMap);
92
        this.nextTrialSequenceId = -1;
93
        this.paiTokenUpdateInterval = 7200000; //2hours
SparkSnail's avatar
SparkSnail committed
94
        this.logCollection = 'none';
chicm-ms's avatar
chicm-ms committed
95
        this.log.info('Construct OpenPAI training service.');
96
97
98
    }

    public async run(): Promise<void> {
chicm-ms's avatar
chicm-ms committed
99
        this.log.info('Run PAI training service.');
100
101
        const restServer: PAIJobRestServer = component.get(PAIJobRestServer);
        await restServer.start();
102
        restServer.setEnableVersionCheck = this.versionCheck;
103
        this.log.info(`PAI Training service rest server listening on: ${restServer.endPoint}`);
104
105
106
        await Promise.all([
            this.statusCheckingLoop(),
            this.submitJobLoop()]);
chicm-ms's avatar
chicm-ms committed
107
        this.log.info('PAI training service exit.');
108
109
110
111
    }

    public async listTrialJobs(): Promise<TrialJobDetail[]> {
        const jobs: TrialJobDetail[] = [];
112
113

        for (const [key, value] of this.trialJobsMap) {
114
115
116
            if (value.form.jobType === 'TRIAL') {
                jobs.push(await this.getTrialJob(key));
            }
117
        }
118
119
120
121

        return Promise.resolve(jobs);
    }

122
    public async getTrialJob(trialJobId: string): Promise<TrialJobDetail> {
123
        if (this.paiClusterConfig === undefined) {
124
125
126
127
128
            throw new Error('PAI Cluster config is not initialized');
        }

        const paiTrialJob: PAITrialJobDetail | undefined = this.trialJobsMap.get(trialJobId);

129
        if (paiTrialJob === undefined) {
130
131
            return Promise.reject(`trial job ${trialJobId} not found`);
        }
132
133
134
135

        return Promise.resolve(paiTrialJob);
    }

136
    public addTrialJobMetricListener(listener: (metric: TrialJobMetric) => void): void {
137
138
139
        this.metricsEmitter.on('metric', listener);
    }

140
    public removeTrialJobMetricListener(listener: (metric: TrialJobMetric) => void): void {
141
142
143
144
        this.metricsEmitter.off('metric', listener);
    }

    public async submitTrialJob(form: JobApplicationForm): Promise<TrialJobDetail> {
145
146
        if (this.paiClusterConfig === undefined) {
            throw new Error(`paiClusterConfig not initialized!`);
fishyds's avatar
fishyds committed
147
        }
148
        const deferred : Deferred<PAITrialJobDetail> = new Deferred<PAITrialJobDetail>();
fishyds's avatar
fishyds committed
149

150
151
152
        this.log.info(`submitTrialJob: form: ${JSON.stringify(form)}`);

        const trialJobId: string = uniqueString(5);
153
        const trialSequenceId: number = this.generateSequenceId();
154
155
        //TODO: use HDFS working folder instead
        const trialWorkingFolder: string = path.join(this.expRootDir, 'trials', trialJobId);
156
        const paiJobName: string = `nni_exp_${this.experimentId}_trial_${trialJobId}`;
Shinai Yang's avatar
Shinai Yang committed
157
158
        const hdfsCodeDir: string = HDFSClientUtility.getHdfsTrialWorkDir(this.paiClusterConfig.userName, trialJobId);
        const hdfsOutputDir: string = unixPathJoin(hdfsCodeDir, 'nnioutput');
159

160
161
        const hdfsLogPath : string = String.Format(
            PAI_LOG_PATH_FORMAT,
162
            this.paiClusterConfig.host,
Shinai Yang's avatar
Shinai Yang committed
163
            hdfsOutputDir
164
            );
165
166
167
168

        const trialJobDetail: PAITrialJobDetail = new PAITrialJobDetail(
            trialJobId,
            'WAITING',
169
            paiJobName,
170
171
            Date.now(),
            trialWorkingFolder,
172
173
            form,
            trialSequenceId,
174
175
            hdfsLogPath);

176
177
178
        this.trialJobsMap.set(trialJobId, trialJobDetail);
        this.jobQueue.push(trialJobId);
        deferred.resolve(trialJobDetail);
179
180
181
182

        return deferred.promise;
    }

183
184
185
186
187
188
189
190
191
192
193
194
    public async updateTrialJob(trialJobId: string, form: JobApplicationForm): Promise<TrialJobDetail> {
        const trialJobDetail: undefined | TrialJobDetail = this.trialJobsMap.get(trialJobId);
        if (trialJobDetail === undefined) {
            throw new Error(`updateTrialJob failed: ${trialJobId} not found`);
        }
        if (form.jobType === 'TRIAL') {
                await this.writeParameterFile(trialJobId, (<TrialJobApplicationForm>form).hyperParameters);
        } else {
            throw new Error(`updateTrialJob failed: jobType ${form.jobType} not supported.`);
        }

        return trialJobDetail;
195
196
197
    }

    public get isMultiPhaseJobSupported(): boolean {
198
        return true;
199
200
    }

201
    // tslint:disable:no-http-string
QuanluZhang's avatar
QuanluZhang committed
202
    public cancelTrialJob(trialJobId: string, isEarlyStopped: boolean = false): Promise<void> {
203
204
        const trialJobDetail : PAITrialJobDetail | undefined =  this.trialJobsMap.get(trialJobId);
        const deferred : Deferred<void> = new Deferred<void>();
205
        if (trialJobDetail === undefined) {
206
            this.log.error(`cancelTrialJob: trial job id ${trialJobId} not found`);
207

208
209
210
            return Promise.reject();
        }

211
        if (this.paiClusterConfig === undefined) {
212
            throw new Error('PAI Cluster config is not initialized');
213
        }
214
        if (this.paiToken === undefined) {
215
216
217
218
            throw new Error('PAI token is not initialized');
        }

        const stopJobRequest: request.Options = {
219
220
            uri: `http://${this.paiClusterConfig.host}/rest-server/api/v1/user/${this.paiClusterConfig.userName}\
/jobs/${trialJobDetail.paiJobName}/executionType`,
221
222
            method: 'PUT',
            json: true,
223
            body: {value: 'STOP'},
224
            headers: {
225
226
                'Content-Type': 'application/json',
                Authorization: `Bearer ${this.paiToken}`
227
228
            }
        };
229
230
231
232

        // Set trialjobDetail's early stopped field, to mark the job's cancellation source
        trialJobDetail.isEarlyStopped = isEarlyStopped;

233
        // tslint:disable-next-line:no-any
234
        request(stopJobRequest, (error: Error, response: request.Response, body: any) => {
235
            if ((error !== undefined && error !== null) || response.statusCode >= 400) {
236
                this.log.error(`PAI Training service: stop trial ${trialJobId} to PAI Cluster failed!`);
237
238
                deferred.reject((error !== undefined && error !== null) ? error.message :
                 `Stop trial failed, http code: ${response.statusCode}`);
239
240
241
242
243
            } else {
                deferred.resolve();
            }
        });

244
        return deferred.promise;
245
246
    }

247
    // tslint:disable: no-unsafe-any no-any
248
    // tslint:disable-next-line:max-func-body-length
fishyds's avatar
fishyds committed
249
    public async setClusterMetadata(key: string, value: string): Promise<void> {
250
251
252
        const deferred : Deferred<void> = new Deferred<void>();

        switch (key) {
253
254
255
256
257
            case TrialConfigMetadataKey.NNI_MANAGER_IP:
                this.nniManagerIpConfig = <NNIManagerIpConfig>JSON.parse(value);
                deferred.resolve();
                break;

258
259
            case TrialConfigMetadataKey.PAI_CLUSTER_CONFIG:
                this.paiClusterConfig = <PAIClusterConfig>JSON.parse(value);
260

261
262
                this.hdfsClient = WebHDFS.createClient({
                    user: this.paiClusterConfig.userName,
263
264
                    // Refer PAI document for Pylon mapping https://github.com/Microsoft/pai/tree/master/docs/pylon
                    port: 80,
265
                    path: '/webhdfs/api/v1',
266
267
268
269
                    host: this.paiClusterConfig.host
                });

                // Get PAI authentication token
270
                await this.updatePaiToken();
271
                deferred.resolve();
272
                break;
273

274
            case TrialConfigMetadataKey.TRIAL_CONFIG:
275
                if (this.paiClusterConfig === undefined) {
276
                    this.log.error('pai cluster config is not initialized');
fishyds's avatar
fishyds committed
277
                    deferred.reject(new Error('pai cluster config is not initialized'));
278
279
280
                    break;
                }
                this.paiTrialConfig = <NNIPAITrialConfig>JSON.parse(value);
281

282
283
284
                // Validate to make sure codeDir doesn't have too many files
                try {
                    await validateCodeDir(this.paiTrialConfig.codeDir);
285
                } catch (error) {
286
287
288
289
                    this.log.error(error);
                    deferred.reject(new Error(error));
                    break;
                }
290
           
291
                // Copy experiment files from local folder to HDFS
292
293
                this.copyExpCodeDirPromise = HDFSClientUtility.copyDirectoryToHdfs(
                    this.paiTrialConfig.codeDir,
294
                    HDFSClientUtility.getHdfsExpCodeDir(this.paiClusterConfig.userName),
295
296
                    this.hdfsClient
                );
297
298
299
300
301
302
                
                // Upload authFile to hdfs
                if (this.paiTrialConfig.authFile) {
                    this.authFileHdfsPath = unixPathJoin(HDFSClientUtility.hdfsExpRootDir(this.paiClusterConfig.userName), 'authFile');
                    this.copyAuthFilePromise = HDFSClientUtility.copyFileToHdfs(this.paiTrialConfig.authFile, this.authFileHdfsPath, this.hdfsClient);
                }
fishyds's avatar
fishyds committed
303

304
305
                deferred.resolve();
                break;
306
307
308
            case TrialConfigMetadataKey.VERSION_CHECK:
                this.versionCheck = (value === 'true' || value === 'True');
                break;
SparkSnail's avatar
SparkSnail committed
309
310
311
            case TrialConfigMetadataKey.LOG_COLLECTION:
                this.logCollection = value;
                break;
312
313
314
            case TrialConfigMetadataKey.MULTI_PHASE:
                this.isMultiPhase = (value === 'true' || value === 'True');
                break;
315
316
317
318
319
            default:
                //Reject for unknown keys
                throw new Error(`Uknown key: ${key}`);
        }

320
        return deferred.promise;
321
    }
322
    // tslint:enable: no-unsafe-any
323
324
325
326
327

    public getClusterMetadata(key: string): Promise<string> {
        const deferred : Deferred<string> = new Deferred<string>();

        deferred.resolve();
328
329

        return deferred.promise;
330
331
332
    }

    public async cleanUp(): Promise<void> {
chicm-ms's avatar
chicm-ms committed
333
        this.log.info('Stopping PAI training service...');
334
335
336
337
338
339
340
341
342
        this.stopping = true;

        const deferred : Deferred<void> = new Deferred<void>();
        const restServer: PAIJobRestServer = component.get(PAIJobRestServer);
        try {
            await restServer.stop();
            deferred.resolve();
            this.log.info('PAI Training service rest server stopped successfully.');
        } catch (error) {
343
            // tslint:disable-next-line: no-unsafe-any
344
            this.log.error(`PAI Training service rest server stopped failed, error: ${error.message}`);
345
346
347
            deferred.reject(error);
        }

348
        return deferred.promise;
349
350
351
352
353
    }

    public get MetricsEmitter() : EventEmitter {
        return this.metricsEmitter;
    }
354

355
356
357
358
359
    // tslint:disable-next-line:max-func-body-length
    private async submitTrialJobToPAI(trialJobId: string): Promise<boolean> {
        const deferred : Deferred<boolean> = new Deferred<boolean>();
        const trialJobDetail: PAITrialJobDetail | undefined = this.trialJobsMap.get(trialJobId);

360
        if (trialJobDetail === undefined) {
361
362
363
            throw new Error(`Failed to find PAITrialJobDetail for job ${trialJobId}`);
        }

364
        if (this.paiClusterConfig === undefined) {
365
366
            throw new Error('PAI Cluster config is not initialized');
        }
367
        if (this.paiTrialConfig === undefined) {
368
369
            throw new Error('trial config is not initialized');
        }
370
        if (this.paiToken === undefined) {
371
372
373
            throw new Error('PAI token is not initialized');
        }

374
        if (this.paiRestServerPort === undefined) {
375
376
377
378
379
            const restServer: PAIJobRestServer = component.get(PAIJobRestServer);
            this.paiRestServerPort = restServer.clusterRestServerPort;
        }

        // Make sure experiment code files is copied from local to HDFS
380
        if (this.copyExpCodeDirPromise !== undefined) {
381
382
383
            await this.copyExpCodeDirPromise;
        }

384
385
386
387
        //Make sure authFile is copied from local to HDFS
        if (this.paiTrialConfig.authFile) {
            await this.copyAuthFilePromise;
        }
388
389
390
391
        // Step 1. Prepare PAI job configuration

        const trialLocalTempFolder: string = path.join(getExperimentRootDir(), 'trials-local', trialJobId);
        //create tmp trial working folder locally.
392
        await execMkdir(trialLocalTempFolder);
393
394
395
396
397
398
399

        const runScriptContent : string = CONTAINER_INSTALL_NNI_SHELL_FORMAT;
        // Write NNI installation file to local tmp files
        await fs.promises.writeFile(path.join(trialLocalTempFolder, 'install_nni.sh'), runScriptContent, { encoding: 'utf8' });

        // Write file content ( parameter.cfg ) to local tmp folders
        const trialForm : TrialJobApplicationForm = (<TrialJobApplicationForm>trialJobDetail.form);
400
        if (trialForm !== undefined) {
401
402
403
404
405
            await fs.promises.writeFile(
                path.join(trialLocalTempFolder, generateParamFileName(trialForm.hyperParameters)),
                trialForm.hyperParameters.value, { encoding: 'utf8' }
            );
        }
Shinai Yang's avatar
Shinai Yang committed
406
407
        const hdfsCodeDir: string = HDFSClientUtility.getHdfsTrialWorkDir(this.paiClusterConfig.userName, trialJobId);
        const hdfsOutputDir: string = unixPathJoin(hdfsCodeDir, 'nnioutput');
408
        // tslint:disable-next-line: strict-boolean-expressions
409
410
411
412
413
414
415
416
417
418
        const nniManagerIp: string = this.nniManagerIpConfig ? this.nniManagerIpConfig.nniManagerIp : getIPV4Address();
        const version: string = this.versionCheck ? await getVersion() : '';
        const nniPaiTrialCommand : string = String.Format(
            PAI_TRIAL_COMMAND_FORMAT,
            // PAI will copy job's codeDir into /root directory
            `$PWD/${trialJobId}`,
            `$PWD/${trialJobId}/nnioutput`,
            trialJobId,
            this.experimentId,
            trialJobDetail.sequenceId,
419
            this.isMultiPhase,
420
421
422
            this.paiTrialConfig.command,
            nniManagerIp,
            this.paiRestServerPort,
Shinai Yang's avatar
Shinai Yang committed
423
            hdfsOutputDir,
424
            this.paiClusterConfig.host,
425
426
427
428
            this.paiClusterConfig.userName,
            HDFSClientUtility.getHdfsExpCodeDir(this.paiClusterConfig.userName),
            version,
            this.logCollection
429
430
        )
        .replace(/\r\n|\n|\r/gm, '');
431

432
        // tslint:disable-next-line:no-console
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
        console.log(`nniPAItrial command is ${nniPaiTrialCommand.trim()}`);
        const paiTaskRoles : PAITaskRole[] = [
            new PAITaskRole(
                `nni_trail_${trialJobId}`,
                // Task role number
                1,
                // Task CPU number
                this.paiTrialConfig.cpuNum,
                // Task memory
                this.paiTrialConfig.memoryMB,
                // Task GPU number
                this.paiTrialConfig.gpuNum,
                // Task command
                nniPaiTrialCommand,
                // Task shared memory
448
                this.paiTrialConfig.shmMB,
449
450
451
452
453
454
455
456
457
            )
        ];

        const paiJobConfig : PAIJobConfig = new PAIJobConfig(
            // Job name
            trialJobDetail.paiJobName,
            // Docker image
            this.paiTrialConfig.image,
            // codeDir
Shinai Yang's avatar
Shinai Yang committed
458
            `$PAI_DEFAULT_FS_URI${hdfsCodeDir}`,
459
460
461
            // PAI Task roles
            paiTaskRoles,
            // Add Virutal Cluster
462
463
            this.paiTrialConfig.virtualCluster === undefined ? 'default' : this.paiTrialConfig.virtualCluster.toString(),
            //Task auth File
464
            this.authFileHdfsPath
465
466
467
468
        );

        // Step 2. Upload code files in codeDir onto HDFS
        try {
Shinai Yang's avatar
Shinai Yang committed
469
            await HDFSClientUtility.copyDirectoryToHdfs(trialLocalTempFolder, hdfsCodeDir, this.hdfsClient);
470
        } catch (error) {
Shinai Yang's avatar
Shinai Yang committed
471
            this.log.error(`PAI Training service: copy ${this.paiTrialConfig.codeDir} to HDFS ${hdfsCodeDir} failed, error is ${error}`);
472
473
474
475
            trialJobDetail.status = 'FAILED';
            deferred.resolve(true);

            return deferred.promise;
476
477
478
479
480
481
482
483
484
485
486
487
488
489
        }

        // Step 3. Submit PAI job via Rest call
        // Refer https://github.com/Microsoft/pai/blob/master/docs/rest-server/API.md for more detail about PAI Rest API
        const submitJobRequest: request.Options = {
            uri: `http://${this.paiClusterConfig.host}/rest-server/api/v1/user/${this.paiClusterConfig.userName}/jobs`,
            method: 'POST',
            json: true,
            body: paiJobConfig,
            headers: {
                'Content-Type': 'application/json',
                Authorization: `Bearer ${this.paiToken}`
            }
        };
490
        // tslint:disable:no-any no-unsafe-any
491
        request(submitJobRequest, (error: Error, response: request.Response, body: any) => {
492
493
            if ((error !== undefined && error !== null) || response.statusCode >= 400) {
                const errorMessage : string = (error !== undefined && error !== null) ? error.message :
494
495
496
                    `Submit trial ${trialJobId} failed, http code:${response.statusCode}, http body: ${response.body}`;
                this.log.error(errorMessage);
                trialJobDetail.status = 'FAILED';
497
                deferred.resolve(true);
498
499
500
501
502
503
504
505
506
            } else {
                trialJobDetail.submitTime = Date.now();
                deferred.resolve(true);
            }
        });

        return deferred.promise;
    }

507
    private generateSequenceId(): number {
508
509
        if (this.nextTrialSequenceId === -1) {
            this.nextTrialSequenceId = getInitTrialSequenceId();
510
511
        }

512
        return this.nextTrialSequenceId++;
513
    }
514
515
516

    private async statusCheckingLoop(): Promise<void> {
        while (!this.stopping) {
517
            try {
SparkSnail's avatar
SparkSnail committed
518
                await this.updatePaiToken();
519
            } catch (error) {
SparkSnail's avatar
SparkSnail committed
520
521
                this.log.error(`${error}`);
                //only throw error when initlize paiToken first time
522
                if (this.paiToken === undefined) {
SparkSnail's avatar
SparkSnail committed
523
524
525
                    throw new Error(error);
                }
            }
526
527
            await this.paiJobCollector.retrieveTrialStatus(this.paiToken, this.paiClusterConfig);
            const restServer: PAIJobRestServer = component.get(PAIJobRestServer);
528
            if (restServer.getErrorMessage !== undefined) {
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
                throw new Error(restServer.getErrorMessage);
            }
            await delay(3000);
        }
    }

    private async submitJobLoop(): Promise<void> {
        while (!this.stopping) {
            while (!this.stopping && this.jobQueue.length > 0) {
                const trialJobId: string = this.jobQueue[0];
                if (await this.submitTrialJobToPAI(trialJobId)) {
                    // Remove trial job with trialJobId from job queue
                    this.jobQueue.shift();
                } else {
                    // Break the while loop since failed to submitJob
                    break;
                }
            }
            await delay(3000);
        }
    }

551
552
553
554
555
    /**
     * Update pai token by the interval time or initialize the pai token
     */
    private async updatePaiToken(): Promise<void> {
        const deferred : Deferred<void> = new Deferred<void>();
556
557

        const currentTime: number = new Date().getTime();
558
        //If pai token initialized and not reach the interval time, do not update
559
        if (this.paiTokenUpdateTime !== undefined && (currentTime - this.paiTokenUpdateTime) < this.paiTokenUpdateInterval) {
560
561
            return Promise.resolve();
        }
562

563
        if (this.paiClusterConfig === undefined) {
564
            const paiClusterConfigError: string = `pai cluster config not initialized!`;
565
            this.log.error(`${paiClusterConfigError}`);
566
            throw Error(`${paiClusterConfigError}`);
567
568
        }

569
        const authenticationReq: request.Options = {
570
571
572
573
574
575
576
577
578
            uri: `http://${this.paiClusterConfig.host}/rest-server/api/v1/token`,
            method: 'POST',
            json: true,
            body: {
                username: this.paiClusterConfig.userName,
                password: this.paiClusterConfig.passWord
            }
        };

579
580
        request(authenticationReq, (error: Error, response: request.Response, body: any) => {
            if (error !== undefined && error !== null) {
581
582
583
                this.log.error(`Get PAI token failed: ${error.message}`);
                deferred.reject(new Error(`Get PAI token failed: ${error.message}`));
            } else {
584
                if (response.statusCode !== 200) {
585
                    this.log.error(`Get PAI token failed: get PAI Rest return code ${response.statusCode}`);
586
                    deferred.reject(new Error(`Get PAI token failed: ${response.body}, please check paiConfig username or password`));
587
588
589
590
591
592
                }
                this.paiToken = body.token;
                this.paiTokenUpdateTime = new Date().getTime();
                deferred.resolve();
            }
        });
593

594
595
596
597
598
599
600
601
        let timeoutId: NodeJS.Timer;
        const timeoutDelay: Promise<void> = new Promise<void>((resolve: Function, reject: Function): void => {
            // Set timeout and reject the promise once reach timeout (5 seconds)
            timeoutId = setTimeout(
                () => reject(new Error('Get PAI token timeout. Please check your PAI cluster.')),
                5000);
        });

602
        return Promise.race([timeoutDelay, deferred.promise])
603
            .finally(() => { clearTimeout(timeoutId); });
604
    }
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648

    private async writeParameterFile(trialJobId: string, hyperParameters: HyperParameters): Promise<void> {
        if (this.paiClusterConfig === undefined) {
            throw new Error('PAI Cluster config is not initialized');
        }
        if (this.paiTrialConfig === undefined) {
            throw new Error('PAI trial config is not initialized');
        }

        const trialLocalTempFolder: string = path.join(getExperimentRootDir(), 'trials-local', trialJobId);
        const hpFileName: string = generateParamFileName(hyperParameters);
        const localFilepath: string = path.join(trialLocalTempFolder, hpFileName);
        await fs.promises.writeFile(localFilepath, hyperParameters.value, { encoding: 'utf8' });
        const hdfsCodeDir: string = HDFSClientUtility.getHdfsTrialWorkDir(this.paiClusterConfig.userName, trialJobId);
        const hdfsHpFilePath: string = path.join(hdfsCodeDir, hpFileName);

        await HDFSClientUtility.copyFileToHdfs(localFilepath, hdfsHpFilePath, this.hdfsClient);

        await this.postParameterFileMeta({
            experimentId: this.experimentId,
            trialId: trialJobId,
            filePath: hdfsHpFilePath
        });
    }

    private postParameterFileMeta(parameterFileMeta: ParameterFileMeta): Promise<void> {
        const deferred : Deferred<void> = new Deferred<void>();
        const restServer: PAIJobRestServer = component.get(PAIJobRestServer);
        const req: request.Options = {
            uri: `${restServer.endPoint}${restServer.apiRootUrl}/parameter-file-meta`,
            method: 'POST',
            json: true,
            body: parameterFileMeta
        };
        request(req, (err: Error, res: request.Response) => {
            if (err) {
                deferred.reject(err);
            } else {
                deferred.resolve();
            }
        });

        return deferred.promise;
    }
649
650
}

651
export { PAITrainingService };