paiTrainingService.ts 25.6 KB
Newer Older
liuzhe-lz's avatar
liuzhe-lz committed
1
2
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
3

4
'use strict';
5
6
7
8

import * as cpp from 'child-process-promise';
import * as fs from 'fs';
import * as path from 'path';
9
// tslint:disable-next-line:no-implicit-dependencies
10
import * as request from 'request';
11
import * as component from '../../common/component';
12
13

import { EventEmitter } from 'events';
14
15
import { Deferred } from 'ts-deferred';
import { String } from 'typescript-string-operations';
16
import { MethodNotImplementedError } from '../../common/errors';
17
import { getExperimentId } from '../../common/experimentStartupInfo';
18
19
import { getLogger, Logger } from '../../common/log';
import {
20
    HyperParameters, NNIManagerIpConfig, TrainingService,
21
    TrialJobApplicationForm, TrialJobDetail, TrialJobMetric
22
} from '../../common/trainingService';
23
import { delay, generateParamFileName,
24
    getExperimentRootDir, getIPV4Address, getVersion, uniqueString, unixPathJoin } from '../../common/utils';
25
26
import { CONTAINER_INSTALL_NNI_SHELL_FORMAT } from '../common/containerJobData';
import { TrialConfigMetadataKey } from '../common/trialConfigMetadataKey';
27
import { execMkdir, validateCodeDir } from '../common/util';
28
29
import { HDFSClientUtility } from './hdfsClientUtility';
import { NNIPAITrialConfig, PAIClusterConfig, PAIJobConfig, PAITaskRole } from './paiConfig';
30
import { PAI_LOG_PATH_FORMAT, PAI_TRIAL_COMMAND_FORMAT, PAITrialJobDetail } from './paiData';
31
import { PAIJobInfoCollector } from './paiJobInfoCollector';
32
import { PAIJobRestServer, ParameterFileMeta } from './paiJobRestServer';
33

34
import * as WebHDFS from 'webhdfs';
35
36
37
38
39
40
41
42
43
44
45
46
47

/**
 * Training Service implementation for OpenPAI (Open Platform for AI)
 * Refer https://github.com/Microsoft/pai for more info about OpenPAI
 */
@component.Singleton
class PAITrainingService implements TrainingService {
    private readonly log!: Logger;
    private readonly metricsEmitter: EventEmitter;
    private readonly trialJobsMap: Map<string, PAITrialJobDetail>;
    private readonly expRootDir: string;
    private paiTrialConfig: NNIPAITrialConfig | undefined;
    private paiClusterConfig?: PAIClusterConfig;
48
    private readonly jobQueue: string[];
49
    private stopping: boolean = false;
50
    // tslint:disable-next-line:no-any
51
52
    private hdfsClient: any;
    private paiToken? : string;
53
    private paiTokenUpdateTime?: number;
54
55
    private readonly paiTokenUpdateInterval: number;
    private readonly experimentId! : string;
56
    private readonly paiJobCollector : PAIJobInfoCollector;
57
    private paiRestServerPort?: number;
58
    private nniManagerIpConfig?: NNIManagerIpConfig;
59
    private copyExpCodeDirPromise?: Promise<void>;
60
    private copyAuthFilePromise?: Promise<void>;
61
    private versionCheck: boolean = true;
SparkSnail's avatar
SparkSnail committed
62
    private logCollection: string;
63
    private isMultiPhase: boolean = false;
64
    private authFileHdfsPath: string | undefined = undefined;
65
    private portList?: string | undefined;
66
67
68
69
70

    constructor() {
        this.log = getLogger();
        this.metricsEmitter = new EventEmitter();
        this.trialJobsMap = new Map<string, PAITrialJobDetail>();
71
        this.jobQueue = [];
72
73
        // Root dir on HDFS
        this.expRootDir = path.join('/nni', 'experiments', getExperimentId());
74
        this.experimentId = getExperimentId();
75
        this.paiJobCollector = new PAIJobInfoCollector(this.trialJobsMap);
76
        this.paiTokenUpdateInterval = 7200000; //2hours
SparkSnail's avatar
SparkSnail committed
77
        this.logCollection = 'none';
chicm-ms's avatar
chicm-ms committed
78
        this.log.info('Construct OpenPAI training service.');
79
80
81
    }

    public async run(): Promise<void> {
chicm-ms's avatar
chicm-ms committed
82
        this.log.info('Run PAI training service.');
83
84
        const restServer: PAIJobRestServer = component.get(PAIJobRestServer);
        await restServer.start();
85
        restServer.setEnableVersionCheck = this.versionCheck;
86
        this.log.info(`PAI Training service rest server listening on: ${restServer.endPoint}`);
87
88
89
        await Promise.all([
            this.statusCheckingLoop(),
            this.submitJobLoop()]);
chicm-ms's avatar
chicm-ms committed
90
        this.log.info('PAI training service exit.');
91
92
93
94
    }

    public async listTrialJobs(): Promise<TrialJobDetail[]> {
        const jobs: TrialJobDetail[] = [];
95
96

        for (const [key, value] of this.trialJobsMap) {
97
            jobs.push(await this.getTrialJob(key));
98
        }
99
100
101
102

        return Promise.resolve(jobs);
    }

103
    public async getTrialJob(trialJobId: string): Promise<TrialJobDetail> {
104
        if (this.paiClusterConfig === undefined) {
105
106
107
108
109
            throw new Error('PAI Cluster config is not initialized');
        }

        const paiTrialJob: PAITrialJobDetail | undefined = this.trialJobsMap.get(trialJobId);

110
        if (paiTrialJob === undefined) {
111
112
            return Promise.reject(`trial job ${trialJobId} not found`);
        }
113
114
115
116

        return Promise.resolve(paiTrialJob);
    }

117
    public addTrialJobMetricListener(listener: (metric: TrialJobMetric) => void): void {
118
119
120
        this.metricsEmitter.on('metric', listener);
    }

121
    public removeTrialJobMetricListener(listener: (metric: TrialJobMetric) => void): void {
122
123
124
        this.metricsEmitter.off('metric', listener);
    }

125
    public async submitTrialJob(form: TrialJobApplicationForm): Promise<TrialJobDetail> {
126
127
        if (this.paiClusterConfig === undefined) {
            throw new Error(`paiClusterConfig not initialized!`);
fishyds's avatar
fishyds committed
128
        }
129
        const deferred : Deferred<PAITrialJobDetail> = new Deferred<PAITrialJobDetail>();
fishyds's avatar
fishyds committed
130

131
132
133
134
135
        this.log.info(`submitTrialJob: form: ${JSON.stringify(form)}`);

        const trialJobId: string = uniqueString(5);
        //TODO: use HDFS working folder instead
        const trialWorkingFolder: string = path.join(this.expRootDir, 'trials', trialJobId);
136
        const paiJobName: string = `nni_exp_${this.experimentId}_trial_${trialJobId}`;
Shinai Yang's avatar
Shinai Yang committed
137
138
        const hdfsCodeDir: string = HDFSClientUtility.getHdfsTrialWorkDir(this.paiClusterConfig.userName, trialJobId);
        const hdfsOutputDir: string = unixPathJoin(hdfsCodeDir, 'nnioutput');
139

140
141
        const hdfsLogPath : string = String.Format(
            PAI_LOG_PATH_FORMAT,
142
            this.paiClusterConfig.host,
Shinai Yang's avatar
Shinai Yang committed
143
            hdfsOutputDir
144
            );
145
146
147
148

        const trialJobDetail: PAITrialJobDetail = new PAITrialJobDetail(
            trialJobId,
            'WAITING',
149
            paiJobName,
150
151
            Date.now(),
            trialWorkingFolder,
152
            form,
153
154
            hdfsLogPath);

155
156
157
        this.trialJobsMap.set(trialJobId, trialJobDetail);
        this.jobQueue.push(trialJobId);
        deferred.resolve(trialJobDetail);
158
159
160
161

        return deferred.promise;
    }

162
    public async updateTrialJob(trialJobId: string, form: TrialJobApplicationForm): Promise<TrialJobDetail> {
163
164
165
166
        const trialJobDetail: undefined | TrialJobDetail = this.trialJobsMap.get(trialJobId);
        if (trialJobDetail === undefined) {
            throw new Error(`updateTrialJob failed: ${trialJobId} not found`);
        }
167
        await this.writeParameterFile(trialJobId, form.hyperParameters);
168
169

        return trialJobDetail;
170
171
172
    }

    public get isMultiPhaseJobSupported(): boolean {
173
        return true;
174
175
    }

176
    // tslint:disable:no-http-string
QuanluZhang's avatar
QuanluZhang committed
177
    public cancelTrialJob(trialJobId: string, isEarlyStopped: boolean = false): Promise<void> {
178
179
        const trialJobDetail : PAITrialJobDetail | undefined =  this.trialJobsMap.get(trialJobId);
        const deferred : Deferred<void> = new Deferred<void>();
180
        if (trialJobDetail === undefined) {
181
            this.log.error(`cancelTrialJob: trial job id ${trialJobId} not found`);
182

183
184
185
            return Promise.reject();
        }

186
        if (this.paiClusterConfig === undefined) {
187
            throw new Error('PAI Cluster config is not initialized');
188
        }
189
        if (this.paiToken === undefined) {
190
191
192
193
            throw new Error('PAI token is not initialized');
        }

        const stopJobRequest: request.Options = {
194
            uri: `http://${this.paiClusterConfig.host}/rest-server/api/v1/user/${this.paiClusterConfig.userName}\
195
/jobs/${trialJobDetail.paiJobName}/executionType`, 
196
197
            method: 'PUT',
            json: true,
198
            body: {value: 'STOP'},
199
            headers: {
200
201
                'Content-Type': 'application/json',
                Authorization: `Bearer ${this.paiToken}`
202
203
            }
        };
204
205
206
207

        // Set trialjobDetail's early stopped field, to mark the job's cancellation source
        trialJobDetail.isEarlyStopped = isEarlyStopped;

208
        // tslint:disable-next-line:no-any
209
        request(stopJobRequest, (error: Error, response: request.Response, body: any) => {
210
            if ((error !== undefined && error !== null) || response.statusCode >= 400) {
211
                this.log.error(`PAI Training service: stop trial ${trialJobId} to PAI Cluster failed!`);
212
213
                deferred.reject((error !== undefined && error !== null) ? error.message :
                 `Stop trial failed, http code: ${response.statusCode}`);
214
215
216
217
218
            } else {
                deferred.resolve();
            }
        });

219
        return deferred.promise;
220
221
    }

222
    // tslint:disable: no-unsafe-any no-any
223
    // tslint:disable-next-line:max-func-body-length
fishyds's avatar
fishyds committed
224
    public async setClusterMetadata(key: string, value: string): Promise<void> {
225
226
227
        const deferred : Deferred<void> = new Deferred<void>();

        switch (key) {
228
229
230
231
232
            case TrialConfigMetadataKey.NNI_MANAGER_IP:
                this.nniManagerIpConfig = <NNIManagerIpConfig>JSON.parse(value);
                deferred.resolve();
                break;

233
234
            case TrialConfigMetadataKey.PAI_CLUSTER_CONFIG:
                this.paiClusterConfig = <PAIClusterConfig>JSON.parse(value);
235

236
237
                this.hdfsClient = WebHDFS.createClient({
                    user: this.paiClusterConfig.userName,
238
239
                    // Refer PAI document for Pylon mapping https://github.com/Microsoft/pai/tree/master/docs/pylon
                    port: 80,
240
                    path: '/webhdfs/api/v1',
241
242
                    host: this.paiClusterConfig.host
                });
243
244
245
246
247
248
249
250
                if(this.paiClusterConfig.passWord) {
                    // Get PAI authentication token
                    await this.updatePaiToken();
                } else if(this.paiClusterConfig.token) {
                    this.paiToken = this.paiClusterConfig.token;
                } else {
                    deferred.reject(new Error('pai cluster config format error, please set password or token!'));
                }
251

252
                deferred.resolve();
253
                break;
254

255
            case TrialConfigMetadataKey.TRIAL_CONFIG:
256
                if (this.paiClusterConfig === undefined) {
257
                    this.log.error('pai cluster config is not initialized');
fishyds's avatar
fishyds committed
258
                    deferred.reject(new Error('pai cluster config is not initialized'));
259
260
261
                    break;
                }
                this.paiTrialConfig = <NNIPAITrialConfig>JSON.parse(value);
262

263
264
265
                // Validate to make sure codeDir doesn't have too many files
                try {
                    await validateCodeDir(this.paiTrialConfig.codeDir);
266
                } catch (error) {
267
268
269
270
                    this.log.error(error);
                    deferred.reject(new Error(error));
                    break;
                }
271
           
272
                // Copy experiment files from local folder to HDFS
273
274
                this.copyExpCodeDirPromise = HDFSClientUtility.copyDirectoryToHdfs(
                    this.paiTrialConfig.codeDir,
275
                    HDFSClientUtility.getHdfsExpCodeDir(this.paiClusterConfig.userName),
276
277
                    this.hdfsClient
                );
278
279
280
281
282
283
                
                // Upload authFile to hdfs
                if (this.paiTrialConfig.authFile) {
                    this.authFileHdfsPath = unixPathJoin(HDFSClientUtility.hdfsExpRootDir(this.paiClusterConfig.userName), 'authFile');
                    this.copyAuthFilePromise = HDFSClientUtility.copyFileToHdfs(this.paiTrialConfig.authFile, this.authFileHdfsPath, this.hdfsClient);
                }
fishyds's avatar
fishyds committed
284

285
286
                deferred.resolve();
                break;
287
288
289
            case TrialConfigMetadataKey.VERSION_CHECK:
                this.versionCheck = (value === 'true' || value === 'True');
                break;
SparkSnail's avatar
SparkSnail committed
290
291
292
            case TrialConfigMetadataKey.LOG_COLLECTION:
                this.logCollection = value;
                break;
293
294
295
            case TrialConfigMetadataKey.MULTI_PHASE:
                this.isMultiPhase = (value === 'true' || value === 'True');
                break;
296
297
298
299
300
            default:
                //Reject for unknown keys
                throw new Error(`Uknown key: ${key}`);
        }

301
        return deferred.promise;
302
    }
303
    // tslint:enable: no-unsafe-any
304
305
306
307
308

    public getClusterMetadata(key: string): Promise<string> {
        const deferred : Deferred<string> = new Deferred<string>();

        deferred.resolve();
309
310

        return deferred.promise;
311
312
313
    }

    public async cleanUp(): Promise<void> {
chicm-ms's avatar
chicm-ms committed
314
        this.log.info('Stopping PAI training service...');
315
316
317
318
319
320
321
322
323
        this.stopping = true;

        const deferred : Deferred<void> = new Deferred<void>();
        const restServer: PAIJobRestServer = component.get(PAIJobRestServer);
        try {
            await restServer.stop();
            deferred.resolve();
            this.log.info('PAI Training service rest server stopped successfully.');
        } catch (error) {
324
            // tslint:disable-next-line: no-unsafe-any
325
            this.log.error(`PAI Training service rest server stopped failed, error: ${error.message}`);
326
327
328
            deferred.reject(error);
        }

329
        return deferred.promise;
330
331
332
333
334
    }

    public get MetricsEmitter() : EventEmitter {
        return this.metricsEmitter;
    }
335

336
337
338
339
340
    // tslint:disable-next-line:max-func-body-length
    private async submitTrialJobToPAI(trialJobId: string): Promise<boolean> {
        const deferred : Deferred<boolean> = new Deferred<boolean>();
        const trialJobDetail: PAITrialJobDetail | undefined = this.trialJobsMap.get(trialJobId);

341
        if (trialJobDetail === undefined) {
342
343
344
            throw new Error(`Failed to find PAITrialJobDetail for job ${trialJobId}`);
        }

345
        if (this.paiClusterConfig === undefined) {
346
347
            throw new Error('PAI Cluster config is not initialized');
        }
348
        if (this.paiTrialConfig === undefined) {
349
350
            throw new Error('trial config is not initialized');
        }
351
        if (this.paiToken === undefined) {
352
353
354
            throw new Error('PAI token is not initialized');
        }

355
        if (this.paiRestServerPort === undefined) {
356
357
358
359
360
            const restServer: PAIJobRestServer = component.get(PAIJobRestServer);
            this.paiRestServerPort = restServer.clusterRestServerPort;
        }

        // Make sure experiment code files is copied from local to HDFS
361
        if (this.copyExpCodeDirPromise !== undefined) {
362
363
364
            await this.copyExpCodeDirPromise;
        }

365
366
367
368
        //Make sure authFile is copied from local to HDFS
        if (this.paiTrialConfig.authFile) {
            await this.copyAuthFilePromise;
        }
369
370
371
372
        // Step 1. Prepare PAI job configuration

        const trialLocalTempFolder: string = path.join(getExperimentRootDir(), 'trials-local', trialJobId);
        //create tmp trial working folder locally.
373
        await execMkdir(trialLocalTempFolder);
374
375
376
377
378
379

        const runScriptContent : string = CONTAINER_INSTALL_NNI_SHELL_FORMAT;
        // Write NNI installation file to local tmp files
        await fs.promises.writeFile(path.join(trialLocalTempFolder, 'install_nni.sh'), runScriptContent, { encoding: 'utf8' });

        // Write file content ( parameter.cfg ) to local tmp folders
380
        if (trialJobDetail.form !== undefined) {
381
            await fs.promises.writeFile(
382
383
                path.join(trialLocalTempFolder, generateParamFileName(trialJobDetail.form.hyperParameters)),
                trialJobDetail.form.hyperParameters.value, { encoding: 'utf8' }
384
385
            );
        }
Shinai Yang's avatar
Shinai Yang committed
386
387
        const hdfsCodeDir: string = HDFSClientUtility.getHdfsTrialWorkDir(this.paiClusterConfig.userName, trialJobId);
        const hdfsOutputDir: string = unixPathJoin(hdfsCodeDir, 'nnioutput');
388
        // tslint:disable-next-line: strict-boolean-expressions
389
390
391
392
393
394
395
396
397
        const nniManagerIp: string = this.nniManagerIpConfig ? this.nniManagerIpConfig.nniManagerIp : getIPV4Address();
        const version: string = this.versionCheck ? await getVersion() : '';
        const nniPaiTrialCommand : string = String.Format(
            PAI_TRIAL_COMMAND_FORMAT,
            // PAI will copy job's codeDir into /root directory
            `$PWD/${trialJobId}`,
            `$PWD/${trialJobId}/nnioutput`,
            trialJobId,
            this.experimentId,
398
            trialJobDetail.form.sequenceId,
399
            this.isMultiPhase,
400
401
402
            this.paiTrialConfig.command,
            nniManagerIp,
            this.paiRestServerPort,
Shinai Yang's avatar
Shinai Yang committed
403
            hdfsOutputDir,
404
            this.paiClusterConfig.host,
405
406
407
408
            this.paiClusterConfig.userName,
            HDFSClientUtility.getHdfsExpCodeDir(this.paiClusterConfig.userName),
            version,
            this.logCollection
409
410
        )
        .replace(/\r\n|\n|\r/gm, '');
411

412
        // tslint:disable-next-line:no-console
SparkSnail's avatar
SparkSnail committed
413
        this.log.info(`nniPAItrial command is ${nniPaiTrialCommand.trim()}`);
414
415
416
417
418
419
420
421
422
423
424
425
426
427
        const paiTaskRoles : PAITaskRole[] = [
            new PAITaskRole(
                `nni_trail_${trialJobId}`,
                // Task role number
                1,
                // Task CPU number
                this.paiTrialConfig.cpuNum,
                // Task memory
                this.paiTrialConfig.memoryMB,
                // Task GPU number
                this.paiTrialConfig.gpuNum,
                // Task command
                nniPaiTrialCommand,
                // Task shared memory
428
                this.paiTrialConfig.shmMB,
429
430
                // Task portList
                this.paiTrialConfig.portList
431
432
433
434
435
436
437
438
439
            )
        ];

        const paiJobConfig : PAIJobConfig = new PAIJobConfig(
            // Job name
            trialJobDetail.paiJobName,
            // Docker image
            this.paiTrialConfig.image,
            // codeDir
Shinai Yang's avatar
Shinai Yang committed
440
            `$PAI_DEFAULT_FS_URI${hdfsCodeDir}`,
441
442
443
            // PAI Task roles
            paiTaskRoles,
            // Add Virutal Cluster
444
445
            this.paiTrialConfig.virtualCluster === undefined ? 'default' : this.paiTrialConfig.virtualCluster.toString(),
            //Task auth File
446
            this.authFileHdfsPath
447
448
449
450
        );

        // Step 2. Upload code files in codeDir onto HDFS
        try {
Shinai Yang's avatar
Shinai Yang committed
451
            await HDFSClientUtility.copyDirectoryToHdfs(trialLocalTempFolder, hdfsCodeDir, this.hdfsClient);
452
        } catch (error) {
Shinai Yang's avatar
Shinai Yang committed
453
            this.log.error(`PAI Training service: copy ${this.paiTrialConfig.codeDir} to HDFS ${hdfsCodeDir} failed, error is ${error}`);
454
455
456
457
            trialJobDetail.status = 'FAILED';
            deferred.resolve(true);

            return deferred.promise;
458
459
460
461
462
463
464
465
466
467
468
469
470
471
        }

        // Step 3. Submit PAI job via Rest call
        // Refer https://github.com/Microsoft/pai/blob/master/docs/rest-server/API.md for more detail about PAI Rest API
        const submitJobRequest: request.Options = {
            uri: `http://${this.paiClusterConfig.host}/rest-server/api/v1/user/${this.paiClusterConfig.userName}/jobs`,
            method: 'POST',
            json: true,
            body: paiJobConfig,
            headers: {
                'Content-Type': 'application/json',
                Authorization: `Bearer ${this.paiToken}`
            }
        };
472
        // tslint:disable:no-any no-unsafe-any
473
        request(submitJobRequest, (error: Error, response: request.Response, body: any) => {
474
475
            if ((error !== undefined && error !== null) || response.statusCode >= 400) {
                const errorMessage : string = (error !== undefined && error !== null) ? error.message :
476
                    `Submit trial ${trialJobId} failed, http code:${response.statusCode}, http body: ${response.body.message}`;
477
                trialJobDetail.status = 'FAILED';
478
                deferred.resolve(true);
479
480
481
482
483
484
485
486
487
488
489
            } else {
                trialJobDetail.submitTime = Date.now();
                deferred.resolve(true);
            }
        });

        return deferred.promise;
    }

    private async statusCheckingLoop(): Promise<void> {
        while (!this.stopping) {
490
491
492
493
494
495
496
497
498
            if(this.paiClusterConfig && this.paiClusterConfig.passWord) {
                try {
                    await this.updatePaiToken();
                } catch (error) {
                    this.log.error(`${error}`);
                    //only throw error when initlize paiToken first time
                    if (this.paiToken === undefined) {
                        throw new Error(error);
                    }
SparkSnail's avatar
SparkSnail committed
499
500
                }
            }
501
502
            await this.paiJobCollector.retrieveTrialStatus(this.paiToken, this.paiClusterConfig);
            const restServer: PAIJobRestServer = component.get(PAIJobRestServer);
503
            if (restServer.getErrorMessage !== undefined) {
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
                throw new Error(restServer.getErrorMessage);
            }
            await delay(3000);
        }
    }

    private async submitJobLoop(): Promise<void> {
        while (!this.stopping) {
            while (!this.stopping && this.jobQueue.length > 0) {
                const trialJobId: string = this.jobQueue[0];
                if (await this.submitTrialJobToPAI(trialJobId)) {
                    // Remove trial job with trialJobId from job queue
                    this.jobQueue.shift();
                } else {
                    // Break the while loop since failed to submitJob
                    break;
                }
            }
            await delay(3000);
        }
    }

526
527
528
529
530
    /**
     * Update pai token by the interval time or initialize the pai token
     */
    private async updatePaiToken(): Promise<void> {
        const deferred : Deferred<void> = new Deferred<void>();
531
532

        const currentTime: number = new Date().getTime();
533
        //If pai token initialized and not reach the interval time, do not update
534
        if (this.paiTokenUpdateTime !== undefined && (currentTime - this.paiTokenUpdateTime) < this.paiTokenUpdateInterval) {
535
536
            return Promise.resolve();
        }
537

538
        if (this.paiClusterConfig === undefined) {
539
            const paiClusterConfigError: string = `pai cluster config not initialized!`;
540
            this.log.error(`${paiClusterConfigError}`);
541
            throw Error(`${paiClusterConfigError}`);
542
543
        }

544
        const authenticationReq: request.Options = {
545
546
547
548
549
550
551
552
553
            uri: `http://${this.paiClusterConfig.host}/rest-server/api/v1/token`,
            method: 'POST',
            json: true,
            body: {
                username: this.paiClusterConfig.userName,
                password: this.paiClusterConfig.passWord
            }
        };

554
555
        request(authenticationReq, (error: Error, response: request.Response, body: any) => {
            if (error !== undefined && error !== null) {
556
557
558
                this.log.error(`Get PAI token failed: ${error.message}`);
                deferred.reject(new Error(`Get PAI token failed: ${error.message}`));
            } else {
559
                if (response.statusCode !== 200) {
560
                    this.log.error(`Get PAI token failed: get PAI Rest return code ${response.statusCode}`);
561
                    deferred.reject(new Error(`Get PAI token failed: ${response.body}, please check paiConfig username or password`));
562
563
564
565
566
567
                }
                this.paiToken = body.token;
                this.paiTokenUpdateTime = new Date().getTime();
                deferred.resolve();
            }
        });
568

569
570
571
572
573
574
575
576
        let timeoutId: NodeJS.Timer;
        const timeoutDelay: Promise<void> = new Promise<void>((resolve: Function, reject: Function): void => {
            // Set timeout and reject the promise once reach timeout (5 seconds)
            timeoutId = setTimeout(
                () => reject(new Error('Get PAI token timeout. Please check your PAI cluster.')),
                5000);
        });

577
        return Promise.race([timeoutDelay, deferred.promise])
578
            .finally(() => { clearTimeout(timeoutId); });
579
    }
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623

    private async writeParameterFile(trialJobId: string, hyperParameters: HyperParameters): Promise<void> {
        if (this.paiClusterConfig === undefined) {
            throw new Error('PAI Cluster config is not initialized');
        }
        if (this.paiTrialConfig === undefined) {
            throw new Error('PAI trial config is not initialized');
        }

        const trialLocalTempFolder: string = path.join(getExperimentRootDir(), 'trials-local', trialJobId);
        const hpFileName: string = generateParamFileName(hyperParameters);
        const localFilepath: string = path.join(trialLocalTempFolder, hpFileName);
        await fs.promises.writeFile(localFilepath, hyperParameters.value, { encoding: 'utf8' });
        const hdfsCodeDir: string = HDFSClientUtility.getHdfsTrialWorkDir(this.paiClusterConfig.userName, trialJobId);
        const hdfsHpFilePath: string = path.join(hdfsCodeDir, hpFileName);

        await HDFSClientUtility.copyFileToHdfs(localFilepath, hdfsHpFilePath, this.hdfsClient);

        await this.postParameterFileMeta({
            experimentId: this.experimentId,
            trialId: trialJobId,
            filePath: hdfsHpFilePath
        });
    }

    private postParameterFileMeta(parameterFileMeta: ParameterFileMeta): Promise<void> {
        const deferred : Deferred<void> = new Deferred<void>();
        const restServer: PAIJobRestServer = component.get(PAIJobRestServer);
        const req: request.Options = {
            uri: `${restServer.endPoint}${restServer.apiRootUrl}/parameter-file-meta`,
            method: 'POST',
            json: true,
            body: parameterFileMeta
        };
        request(req, (err: Error, res: request.Response) => {
            if (err) {
                deferred.reject(err);
            } else {
                deferred.resolve();
            }
        });

        return deferred.promise;
    }
624
625
}

626
export { PAITrainingService };