paiTrainingService.ts 26.2 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
 * Copyright (c) Microsoft Corporation
 * All rights reserved.
 *
 * MIT License
 *
 * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
 * documentation files (the "Software"), to deal in the Software without restriction, including without limitation
 * the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and
 * to permit persons to whom the Software is furnished to do so, subject to the following conditions:
 * The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
 *
 * THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING
 * BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
 * DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
 */

20
'use strict';
21
22
23
24

import * as cpp from 'child-process-promise';
import * as fs from 'fs';
import * as path from 'path';
25
// tslint:disable-next-line:no-implicit-dependencies
26
import * as request from 'request';
27
import * as component from '../../common/component';
28
29

import { EventEmitter } from 'events';
30
31
import { Deferred } from 'ts-deferred';
import { String } from 'typescript-string-operations';
32
import { MethodNotImplementedError } from '../../common/errors';
33
import { getExperimentId, getInitTrialSequenceId } from '../../common/experimentStartupInfo';
34
35
import { getLogger, Logger } from '../../common/log';
import {
36
    HyperParameters, JobApplicationForm, NNIManagerIpConfig, TrainingService,
37
    TrialJobApplicationForm, TrialJobDetail, TrialJobMetric
38
} from '../../common/trainingService';
39
import { delay, generateParamFileName,
40
    getExperimentRootDir, getIPV4Address, getVersion, uniqueString, unixPathJoin } from '../../common/utils';
41
42
import { CONTAINER_INSTALL_NNI_SHELL_FORMAT } from '../common/containerJobData';
import { TrialConfigMetadataKey } from '../common/trialConfigMetadataKey';
43
import { execMkdir, validateCodeDir } from '../common/util';
44
45
import { HDFSClientUtility } from './hdfsClientUtility';
import { NNIPAITrialConfig, PAIClusterConfig, PAIJobConfig, PAITaskRole } from './paiConfig';
46
import { PAI_LOG_PATH_FORMAT, PAI_TRIAL_COMMAND_FORMAT, PAITrialJobDetail } from './paiData';
47
import { PAIJobInfoCollector } from './paiJobInfoCollector';
48
import { PAIJobRestServer, ParameterFileMeta } from './paiJobRestServer';
49

50
import * as WebHDFS from 'webhdfs';
51
52
53
54
55
56
57
58
59
60
61
62
63

/**
 * Training Service implementation for OpenPAI (Open Platform for AI)
 * Refer https://github.com/Microsoft/pai for more info about OpenPAI
 */
@component.Singleton
class PAITrainingService implements TrainingService {
    private readonly log!: Logger;
    private readonly metricsEmitter: EventEmitter;
    private readonly trialJobsMap: Map<string, PAITrialJobDetail>;
    private readonly expRootDir: string;
    private paiTrialConfig: NNIPAITrialConfig | undefined;
    private paiClusterConfig?: PAIClusterConfig;
64
    private readonly jobQueue: string[];
65
    private stopping: boolean = false;
66
    // tslint:disable-next-line:no-any
67
68
    private hdfsClient: any;
    private paiToken? : string;
69
    private paiTokenUpdateTime?: number;
70
71
    private readonly paiTokenUpdateInterval: number;
    private readonly experimentId! : string;
72
    private readonly paiJobCollector : PAIJobInfoCollector;
73
    private nextTrialSequenceId: number;
74
    private paiRestServerPort?: number;
75
    private nniManagerIpConfig?: NNIManagerIpConfig;
76
    private copyExpCodeDirPromise?: Promise<void>;
77
    private versionCheck: boolean = true;
SparkSnail's avatar
SparkSnail committed
78
    private logCollection: string;
79
    private isMultiPhase: boolean = false;
80
81
82
83
84

    constructor() {
        this.log = getLogger();
        this.metricsEmitter = new EventEmitter();
        this.trialJobsMap = new Map<string, PAITrialJobDetail>();
85
        this.jobQueue = [];
86
87
        // Root dir on HDFS
        this.expRootDir = path.join('/nni', 'experiments', getExperimentId());
88
        this.experimentId = getExperimentId();
89
        this.paiJobCollector = new PAIJobInfoCollector(this.trialJobsMap);
90
        this.nextTrialSequenceId = -1;
91
        this.paiTokenUpdateInterval = 7200000; //2hours
SparkSnail's avatar
SparkSnail committed
92
        this.logCollection = 'none';
chicm-ms's avatar
chicm-ms committed
93
        this.log.info('Construct OpenPAI training service.');
94
95
96
    }

    public async run(): Promise<void> {
chicm-ms's avatar
chicm-ms committed
97
        this.log.info('Run PAI training service.');
98
99
        const restServer: PAIJobRestServer = component.get(PAIJobRestServer);
        await restServer.start();
100
        restServer.setEnableVersionCheck = this.versionCheck;
101
        this.log.info(`PAI Training service rest server listening on: ${restServer.endPoint}`);
102
103
104
        await Promise.all([
            this.statusCheckingLoop(),
            this.submitJobLoop()]);
chicm-ms's avatar
chicm-ms committed
105
        this.log.info('PAI training service exit.');
106
107
108
109
    }

    public async listTrialJobs(): Promise<TrialJobDetail[]> {
        const jobs: TrialJobDetail[] = [];
110
111

        for (const [key, value] of this.trialJobsMap) {
112
113
114
            if (value.form.jobType === 'TRIAL') {
                jobs.push(await this.getTrialJob(key));
            }
115
        }
116
117
118
119

        return Promise.resolve(jobs);
    }

120
    public async getTrialJob(trialJobId: string): Promise<TrialJobDetail> {
121
        if (this.paiClusterConfig === undefined) {
122
123
124
125
126
            throw new Error('PAI Cluster config is not initialized');
        }

        const paiTrialJob: PAITrialJobDetail | undefined = this.trialJobsMap.get(trialJobId);

127
        if (paiTrialJob === undefined) {
128
129
            return Promise.reject(`trial job ${trialJobId} not found`);
        }
130
131
132
133

        return Promise.resolve(paiTrialJob);
    }

134
    public addTrialJobMetricListener(listener: (metric: TrialJobMetric) => void): void {
135
136
137
        this.metricsEmitter.on('metric', listener);
    }

138
    public removeTrialJobMetricListener(listener: (metric: TrialJobMetric) => void): void {
139
140
141
142
        this.metricsEmitter.off('metric', listener);
    }

    public async submitTrialJob(form: JobApplicationForm): Promise<TrialJobDetail> {
143
144
        if (this.paiClusterConfig === undefined) {
            throw new Error(`paiClusterConfig not initialized!`);
fishyds's avatar
fishyds committed
145
        }
146
        const deferred : Deferred<PAITrialJobDetail> = new Deferred<PAITrialJobDetail>();
fishyds's avatar
fishyds committed
147

148
149
150
        this.log.info(`submitTrialJob: form: ${JSON.stringify(form)}`);

        const trialJobId: string = uniqueString(5);
151
        const trialSequenceId: number = this.generateSequenceId();
152
153
        //TODO: use HDFS working folder instead
        const trialWorkingFolder: string = path.join(this.expRootDir, 'trials', trialJobId);
154
        const paiJobName: string = `nni_exp_${this.experimentId}_trial_${trialJobId}`;
Shinai Yang's avatar
Shinai Yang committed
155
156
        const hdfsCodeDir: string = HDFSClientUtility.getHdfsTrialWorkDir(this.paiClusterConfig.userName, trialJobId);
        const hdfsOutputDir: string = unixPathJoin(hdfsCodeDir, 'nnioutput');
157

158
159
        const hdfsLogPath : string = String.Format(
            PAI_LOG_PATH_FORMAT,
160
            this.paiClusterConfig.host,
Shinai Yang's avatar
Shinai Yang committed
161
            hdfsOutputDir
162
            );
163
164
165
166

        const trialJobDetail: PAITrialJobDetail = new PAITrialJobDetail(
            trialJobId,
            'WAITING',
167
            paiJobName,
168
169
            Date.now(),
            trialWorkingFolder,
170
171
            form,
            trialSequenceId,
172
173
            hdfsLogPath);

174
175
176
        this.trialJobsMap.set(trialJobId, trialJobDetail);
        this.jobQueue.push(trialJobId);
        deferred.resolve(trialJobDetail);
177
178
179
180

        return deferred.promise;
    }

181
182
183
184
185
186
187
188
189
190
191
192
    public async updateTrialJob(trialJobId: string, form: JobApplicationForm): Promise<TrialJobDetail> {
        const trialJobDetail: undefined | TrialJobDetail = this.trialJobsMap.get(trialJobId);
        if (trialJobDetail === undefined) {
            throw new Error(`updateTrialJob failed: ${trialJobId} not found`);
        }
        if (form.jobType === 'TRIAL') {
                await this.writeParameterFile(trialJobId, (<TrialJobApplicationForm>form).hyperParameters);
        } else {
            throw new Error(`updateTrialJob failed: jobType ${form.jobType} not supported.`);
        }

        return trialJobDetail;
193
194
195
    }

    public get isMultiPhaseJobSupported(): boolean {
196
        return true;
197
198
    }

199
    // tslint:disable:no-http-string
QuanluZhang's avatar
QuanluZhang committed
200
    public cancelTrialJob(trialJobId: string, isEarlyStopped: boolean = false): Promise<void> {
201
202
        const trialJobDetail : PAITrialJobDetail | undefined =  this.trialJobsMap.get(trialJobId);
        const deferred : Deferred<void> = new Deferred<void>();
203
        if (trialJobDetail === undefined) {
204
            this.log.error(`cancelTrialJob: trial job id ${trialJobId} not found`);
205

206
207
208
            return Promise.reject();
        }

209
        if (this.paiClusterConfig === undefined) {
210
            throw new Error('PAI Cluster config is not initialized');
211
        }
212
        if (this.paiToken === undefined) {
213
214
215
216
            throw new Error('PAI token is not initialized');
        }

        const stopJobRequest: request.Options = {
217
218
            uri: `http://${this.paiClusterConfig.host}/rest-server/api/v1/user/${this.paiClusterConfig.userName}\
/jobs/${trialJobDetail.paiJobName}/executionType`,
219
220
            method: 'PUT',
            json: true,
221
            body: {value: 'STOP'},
222
            headers: {
223
224
                'Content-Type': 'application/json',
                Authorization: `Bearer ${this.paiToken}`
225
226
            }
        };
227
228
229
230

        // Set trialjobDetail's early stopped field, to mark the job's cancellation source
        trialJobDetail.isEarlyStopped = isEarlyStopped;

231
        // tslint:disable-next-line:no-any
232
        request(stopJobRequest, (error: Error, response: request.Response, body: any) => {
233
            if ((error !== undefined && error !== null) || response.statusCode >= 400) {
234
                this.log.error(`PAI Training service: stop trial ${trialJobId} to PAI Cluster failed!`);
235
236
                deferred.reject((error !== undefined && error !== null) ? error.message :
                 `Stop trial failed, http code: ${response.statusCode}`);
237
238
239
240
241
            } else {
                deferred.resolve();
            }
        });

242
        return deferred.promise;
243
244
    }

245
    // tslint:disable: no-unsafe-any no-any
246
    // tslint:disable-next-line:max-func-body-length
fishyds's avatar
fishyds committed
247
    public async setClusterMetadata(key: string, value: string): Promise<void> {
248
249
250
        const deferred : Deferred<void> = new Deferred<void>();

        switch (key) {
251
252
253
254
255
            case TrialConfigMetadataKey.NNI_MANAGER_IP:
                this.nniManagerIpConfig = <NNIManagerIpConfig>JSON.parse(value);
                deferred.resolve();
                break;

256
257
            case TrialConfigMetadataKey.PAI_CLUSTER_CONFIG:
                this.paiClusterConfig = <PAIClusterConfig>JSON.parse(value);
258

259
260
                this.hdfsClient = WebHDFS.createClient({
                    user: this.paiClusterConfig.userName,
261
262
                    // Refer PAI document for Pylon mapping https://github.com/Microsoft/pai/tree/master/docs/pylon
                    port: 80,
263
                    path: '/webhdfs/api/v1',
264
265
266
267
                    host: this.paiClusterConfig.host
                });

                // Get PAI authentication token
268
                await this.updatePaiToken();
269
                deferred.resolve();
270
                break;
271

272
            case TrialConfigMetadataKey.TRIAL_CONFIG:
273
                if (this.paiClusterConfig === undefined) {
274
                    this.log.error('pai cluster config is not initialized');
fishyds's avatar
fishyds committed
275
                    deferred.reject(new Error('pai cluster config is not initialized'));
276
277
278
                    break;
                }
                this.paiTrialConfig = <NNIPAITrialConfig>JSON.parse(value);
279

280
281
282
                // Validate to make sure codeDir doesn't have too many files
                try {
                    await validateCodeDir(this.paiTrialConfig.codeDir);
283
                } catch (error) {
284
285
286
287
                    this.log.error(error);
                    deferred.reject(new Error(error));
                    break;
                }
288
           
289
                // Copy experiment files from local folder to HDFS
290
291
                this.copyExpCodeDirPromise = HDFSClientUtility.copyDirectoryToHdfs(
                    this.paiTrialConfig.codeDir,
292
                    HDFSClientUtility.getHdfsExpCodeDir(this.paiClusterConfig.userName),
293
294
                    this.hdfsClient
                );
fishyds's avatar
fishyds committed
295

296
297
                deferred.resolve();
                break;
298
299
300
            case TrialConfigMetadataKey.VERSION_CHECK:
                this.versionCheck = (value === 'true' || value === 'True');
                break;
SparkSnail's avatar
SparkSnail committed
301
302
303
            case TrialConfigMetadataKey.LOG_COLLECTION:
                this.logCollection = value;
                break;
304
305
306
            case TrialConfigMetadataKey.MULTI_PHASE:
                this.isMultiPhase = (value === 'true' || value === 'True');
                break;
307
308
309
310
311
            default:
                //Reject for unknown keys
                throw new Error(`Uknown key: ${key}`);
        }

312
        return deferred.promise;
313
    }
314
    // tslint:enable: no-unsafe-any
315
316
317
318
319

    public getClusterMetadata(key: string): Promise<string> {
        const deferred : Deferred<string> = new Deferred<string>();

        deferred.resolve();
320
321

        return deferred.promise;
322
323
324
    }

    public async cleanUp(): Promise<void> {
chicm-ms's avatar
chicm-ms committed
325
        this.log.info('Stopping PAI training service...');
326
327
328
329
330
331
332
333
334
        this.stopping = true;

        const deferred : Deferred<void> = new Deferred<void>();
        const restServer: PAIJobRestServer = component.get(PAIJobRestServer);
        try {
            await restServer.stop();
            deferred.resolve();
            this.log.info('PAI Training service rest server stopped successfully.');
        } catch (error) {
335
            // tslint:disable-next-line: no-unsafe-any
336
            this.log.error(`PAI Training service rest server stopped failed, error: ${error.message}`);
337
338
339
            deferred.reject(error);
        }

340
        return deferred.promise;
341
342
343
344
345
    }

    public get MetricsEmitter() : EventEmitter {
        return this.metricsEmitter;
    }
346

347
348
349
350
351
    // tslint:disable-next-line:max-func-body-length
    private async submitTrialJobToPAI(trialJobId: string): Promise<boolean> {
        const deferred : Deferred<boolean> = new Deferred<boolean>();
        const trialJobDetail: PAITrialJobDetail | undefined = this.trialJobsMap.get(trialJobId);

352
        if (trialJobDetail === undefined) {
353
354
355
            throw new Error(`Failed to find PAITrialJobDetail for job ${trialJobId}`);
        }

356
        if (this.paiClusterConfig === undefined) {
357
358
            throw new Error('PAI Cluster config is not initialized');
        }
359
        if (this.paiTrialConfig === undefined) {
360
361
            throw new Error('trial config is not initialized');
        }
362
        if (this.paiToken === undefined) {
363
364
365
            throw new Error('PAI token is not initialized');
        }

366
        if (this.paiRestServerPort === undefined) {
367
368
369
370
371
            const restServer: PAIJobRestServer = component.get(PAIJobRestServer);
            this.paiRestServerPort = restServer.clusterRestServerPort;
        }

        // Make sure experiment code files is copied from local to HDFS
372
        if (this.copyExpCodeDirPromise !== undefined) {
373
374
375
376
377
378
379
            await this.copyExpCodeDirPromise;
        }

        // Step 1. Prepare PAI job configuration

        const trialLocalTempFolder: string = path.join(getExperimentRootDir(), 'trials-local', trialJobId);
        //create tmp trial working folder locally.
380
        await execMkdir(trialLocalTempFolder);
381
382
383
384
385
386
387

        const runScriptContent : string = CONTAINER_INSTALL_NNI_SHELL_FORMAT;
        // Write NNI installation file to local tmp files
        await fs.promises.writeFile(path.join(trialLocalTempFolder, 'install_nni.sh'), runScriptContent, { encoding: 'utf8' });

        // Write file content ( parameter.cfg ) to local tmp folders
        const trialForm : TrialJobApplicationForm = (<TrialJobApplicationForm>trialJobDetail.form);
388
        if (trialForm !== undefined) {
389
390
391
392
393
            await fs.promises.writeFile(
                path.join(trialLocalTempFolder, generateParamFileName(trialForm.hyperParameters)),
                trialForm.hyperParameters.value, { encoding: 'utf8' }
            );
        }
Shinai Yang's avatar
Shinai Yang committed
394
395
        const hdfsCodeDir: string = HDFSClientUtility.getHdfsTrialWorkDir(this.paiClusterConfig.userName, trialJobId);
        const hdfsOutputDir: string = unixPathJoin(hdfsCodeDir, 'nnioutput');
396
        // tslint:disable-next-line: strict-boolean-expressions
397
398
399
400
401
402
403
404
405
406
        const nniManagerIp: string = this.nniManagerIpConfig ? this.nniManagerIpConfig.nniManagerIp : getIPV4Address();
        const version: string = this.versionCheck ? await getVersion() : '';
        const nniPaiTrialCommand : string = String.Format(
            PAI_TRIAL_COMMAND_FORMAT,
            // PAI will copy job's codeDir into /root directory
            `$PWD/${trialJobId}`,
            `$PWD/${trialJobId}/nnioutput`,
            trialJobId,
            this.experimentId,
            trialJobDetail.sequenceId,
407
            this.isMultiPhase,
408
409
410
            this.paiTrialConfig.command,
            nniManagerIp,
            this.paiRestServerPort,
Shinai Yang's avatar
Shinai Yang committed
411
            hdfsOutputDir,
412
            this.paiClusterConfig.host,
413
414
415
416
            this.paiClusterConfig.userName,
            HDFSClientUtility.getHdfsExpCodeDir(this.paiClusterConfig.userName),
            version,
            this.logCollection
417
418
        )
        .replace(/\r\n|\n|\r/gm, '');
419

420
        // tslint:disable-next-line:no-console
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
        console.log(`nniPAItrial command is ${nniPaiTrialCommand.trim()}`);
        const paiTaskRoles : PAITaskRole[] = [
            new PAITaskRole(
                `nni_trail_${trialJobId}`,
                // Task role number
                1,
                // Task CPU number
                this.paiTrialConfig.cpuNum,
                // Task memory
                this.paiTrialConfig.memoryMB,
                // Task GPU number
                this.paiTrialConfig.gpuNum,
                // Task command
                nniPaiTrialCommand,
                // Task shared memory
436
                this.paiTrialConfig.shmMB,
437
438
439
440
441
442
443
444
445
            )
        ];

        const paiJobConfig : PAIJobConfig = new PAIJobConfig(
            // Job name
            trialJobDetail.paiJobName,
            // Docker image
            this.paiTrialConfig.image,
            // codeDir
Shinai Yang's avatar
Shinai Yang committed
446
            `$PAI_DEFAULT_FS_URI${hdfsCodeDir}`,
447
448
449
            // PAI Task roles
            paiTaskRoles,
            // Add Virutal Cluster
450
451
452
            this.paiTrialConfig.virtualCluster === undefined ? 'default' : this.paiTrialConfig.virtualCluster.toString(),
            //Task auth File
            this.paiTrialConfig.authFile
453
454
455
456
        );

        // Step 2. Upload code files in codeDir onto HDFS
        try {
Shinai Yang's avatar
Shinai Yang committed
457
            await HDFSClientUtility.copyDirectoryToHdfs(trialLocalTempFolder, hdfsCodeDir, this.hdfsClient);
458
        } catch (error) {
Shinai Yang's avatar
Shinai Yang committed
459
            this.log.error(`PAI Training service: copy ${this.paiTrialConfig.codeDir} to HDFS ${hdfsCodeDir} failed, error is ${error}`);
460
461
462
463
            trialJobDetail.status = 'FAILED';
            deferred.resolve(true);

            return deferred.promise;
464
465
466
467
468
469
470
471
472
473
474
475
476
477
        }

        // Step 3. Submit PAI job via Rest call
        // Refer https://github.com/Microsoft/pai/blob/master/docs/rest-server/API.md for more detail about PAI Rest API
        const submitJobRequest: request.Options = {
            uri: `http://${this.paiClusterConfig.host}/rest-server/api/v1/user/${this.paiClusterConfig.userName}/jobs`,
            method: 'POST',
            json: true,
            body: paiJobConfig,
            headers: {
                'Content-Type': 'application/json',
                Authorization: `Bearer ${this.paiToken}`
            }
        };
478
        // tslint:disable:no-any no-unsafe-any
479
        request(submitJobRequest, (error: Error, response: request.Response, body: any) => {
480
481
            if ((error !== undefined && error !== null) || response.statusCode >= 400) {
                const errorMessage : string = (error !== undefined && error !== null) ? error.message :
482
483
484
                    `Submit trial ${trialJobId} failed, http code:${response.statusCode}, http body: ${response.body}`;
                this.log.error(errorMessage);
                trialJobDetail.status = 'FAILED';
485
                deferred.resolve(true);
486
487
488
489
490
491
492
493
494
            } else {
                trialJobDetail.submitTime = Date.now();
                deferred.resolve(true);
            }
        });

        return deferred.promise;
    }

495
    private generateSequenceId(): number {
496
497
        if (this.nextTrialSequenceId === -1) {
            this.nextTrialSequenceId = getInitTrialSequenceId();
498
499
        }

500
        return this.nextTrialSequenceId++;
501
    }
502
503
504

    private async statusCheckingLoop(): Promise<void> {
        while (!this.stopping) {
505
            try {
SparkSnail's avatar
SparkSnail committed
506
                await this.updatePaiToken();
507
            } catch (error) {
SparkSnail's avatar
SparkSnail committed
508
509
                this.log.error(`${error}`);
                //only throw error when initlize paiToken first time
510
                if (this.paiToken === undefined) {
SparkSnail's avatar
SparkSnail committed
511
512
513
                    throw new Error(error);
                }
            }
514
515
            await this.paiJobCollector.retrieveTrialStatus(this.paiToken, this.paiClusterConfig);
            const restServer: PAIJobRestServer = component.get(PAIJobRestServer);
516
            if (restServer.getErrorMessage !== undefined) {
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
                throw new Error(restServer.getErrorMessage);
            }
            await delay(3000);
        }
    }

    private async submitJobLoop(): Promise<void> {
        while (!this.stopping) {
            while (!this.stopping && this.jobQueue.length > 0) {
                const trialJobId: string = this.jobQueue[0];
                if (await this.submitTrialJobToPAI(trialJobId)) {
                    // Remove trial job with trialJobId from job queue
                    this.jobQueue.shift();
                } else {
                    // Break the while loop since failed to submitJob
                    break;
                }
            }
            await delay(3000);
        }
    }

539
540
541
542
543
    /**
     * Update pai token by the interval time or initialize the pai token
     */
    private async updatePaiToken(): Promise<void> {
        const deferred : Deferred<void> = new Deferred<void>();
544
545

        const currentTime: number = new Date().getTime();
546
        //If pai token initialized and not reach the interval time, do not update
547
        if (this.paiTokenUpdateTime !== undefined && (currentTime - this.paiTokenUpdateTime) < this.paiTokenUpdateInterval) {
548
549
            return Promise.resolve();
        }
550

551
        if (this.paiClusterConfig === undefined) {
552
            const paiClusterConfigError: string = `pai cluster config not initialized!`;
553
            this.log.error(`${paiClusterConfigError}`);
554
            throw Error(`${paiClusterConfigError}`);
555
556
        }

557
        const authenticationReq: request.Options = {
558
559
560
561
562
563
564
565
566
            uri: `http://${this.paiClusterConfig.host}/rest-server/api/v1/token`,
            method: 'POST',
            json: true,
            body: {
                username: this.paiClusterConfig.userName,
                password: this.paiClusterConfig.passWord
            }
        };

567
568
        request(authenticationReq, (error: Error, response: request.Response, body: any) => {
            if (error !== undefined && error !== null) {
569
570
571
                this.log.error(`Get PAI token failed: ${error.message}`);
                deferred.reject(new Error(`Get PAI token failed: ${error.message}`));
            } else {
572
                if (response.statusCode !== 200) {
573
                    this.log.error(`Get PAI token failed: get PAI Rest return code ${response.statusCode}`);
574
                    deferred.reject(new Error(`Get PAI token failed: ${response.body}, please check paiConfig username or password`));
575
576
577
578
579
580
                }
                this.paiToken = body.token;
                this.paiTokenUpdateTime = new Date().getTime();
                deferred.resolve();
            }
        });
581

582
583
584
585
586
587
588
589
        let timeoutId: NodeJS.Timer;
        const timeoutDelay: Promise<void> = new Promise<void>((resolve: Function, reject: Function): void => {
            // Set timeout and reject the promise once reach timeout (5 seconds)
            timeoutId = setTimeout(
                () => reject(new Error('Get PAI token timeout. Please check your PAI cluster.')),
                5000);
        });

590
        return Promise.race([timeoutDelay, deferred.promise])
591
            .finally(() => { clearTimeout(timeoutId); });
592
    }
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636

    private async writeParameterFile(trialJobId: string, hyperParameters: HyperParameters): Promise<void> {
        if (this.paiClusterConfig === undefined) {
            throw new Error('PAI Cluster config is not initialized');
        }
        if (this.paiTrialConfig === undefined) {
            throw new Error('PAI trial config is not initialized');
        }

        const trialLocalTempFolder: string = path.join(getExperimentRootDir(), 'trials-local', trialJobId);
        const hpFileName: string = generateParamFileName(hyperParameters);
        const localFilepath: string = path.join(trialLocalTempFolder, hpFileName);
        await fs.promises.writeFile(localFilepath, hyperParameters.value, { encoding: 'utf8' });
        const hdfsCodeDir: string = HDFSClientUtility.getHdfsTrialWorkDir(this.paiClusterConfig.userName, trialJobId);
        const hdfsHpFilePath: string = path.join(hdfsCodeDir, hpFileName);

        await HDFSClientUtility.copyFileToHdfs(localFilepath, hdfsHpFilePath, this.hdfsClient);

        await this.postParameterFileMeta({
            experimentId: this.experimentId,
            trialId: trialJobId,
            filePath: hdfsHpFilePath
        });
    }

    private postParameterFileMeta(parameterFileMeta: ParameterFileMeta): Promise<void> {
        const deferred : Deferred<void> = new Deferred<void>();
        const restServer: PAIJobRestServer = component.get(PAIJobRestServer);
        const req: request.Options = {
            uri: `${restServer.endPoint}${restServer.apiRootUrl}/parameter-file-meta`,
            method: 'POST',
            json: true,
            body: parameterFileMeta
        };
        request(req, (err: Error, res: request.Response) => {
            if (err) {
                deferred.reject(err);
            } else {
                deferred.resolve();
            }
        });

        return deferred.promise;
    }
637
638
}

639
export { PAITrainingService };