paiTrainingService.ts 25 KB
Newer Older
liuzhe-lz's avatar
liuzhe-lz committed
1
2
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
3

4
'use strict';
5
6
7
8

import * as fs from 'fs';
import * as path from 'path';
import * as request from 'request';
9
import * as component from '../../common/component';
10
11

import { EventEmitter } from 'events';
12
13
import { Deferred } from 'ts-deferred';
import { String } from 'typescript-string-operations';
14
import { getExperimentId } from '../../common/experimentStartupInfo';
15
16
import { getLogger, Logger } from '../../common/log';
import {
17
    HyperParameters, NNIManagerIpConfig, TrainingService,
18
    TrialJobApplicationForm, TrialJobDetail, TrialJobMetric
19
} from '../../common/trainingService';
20
import { delay, generateParamFileName,
21
    getExperimentRootDir, getIPV4Address, getVersion, uniqueString, unixPathJoin } from '../../common/utils';
22
23
import { CONTAINER_INSTALL_NNI_SHELL_FORMAT } from '../common/containerJobData';
import { TrialConfigMetadataKey } from '../common/trialConfigMetadataKey';
24
import { execMkdir, validateCodeDir } from '../common/util';
25
26
import { HDFSClientUtility } from './hdfsClientUtility';
import { NNIPAITrialConfig, PAIClusterConfig, PAIJobConfig, PAITaskRole } from './paiConfig';
27
import { PAI_LOG_PATH_FORMAT, PAI_TRIAL_COMMAND_FORMAT, PAITrialJobDetail } from './paiData';
28
import { PAIJobInfoCollector } from './paiJobInfoCollector';
29
import { PAIJobRestServer, ParameterFileMeta } from './paiJobRestServer';
30

31
import * as WebHDFS from 'webhdfs';
32
33
34
35
36
37
38
39
40
41
42
43
44

/**
 * Training Service implementation for OpenPAI (Open Platform for AI)
 * Refer https://github.com/Microsoft/pai for more info about OpenPAI
 */
@component.Singleton
class PAITrainingService implements TrainingService {
    private readonly log!: Logger;
    private readonly metricsEmitter: EventEmitter;
    private readonly trialJobsMap: Map<string, PAITrialJobDetail>;
    private readonly expRootDir: string;
    private paiTrialConfig: NNIPAITrialConfig | undefined;
    private paiClusterConfig?: PAIClusterConfig;
45
    private readonly jobQueue: string[];
46
47
48
    private stopping: boolean = false;
    private hdfsClient: any;
    private paiToken? : string;
49
    private paiTokenUpdateTime?: number;
50
    private readonly paiTokenUpdateInterval: number;
chicm-ms's avatar
chicm-ms committed
51
52
    private readonly experimentId!: string;
    private readonly paiJobCollector: PAIJobInfoCollector;
53
    private paiRestServerPort?: number;
54
    private nniManagerIpConfig?: NNIManagerIpConfig;
55
    private copyExpCodeDirPromise?: Promise<void>;
56
    private copyAuthFilePromise?: Promise<void>;
57
    private versionCheck: boolean = true;
SparkSnail's avatar
SparkSnail committed
58
    private logCollection: string;
59
    private isMultiPhase: boolean = false;
60
    private authFileHdfsPath: string | undefined = undefined;
61
    private portList?: string | undefined;
62
63
64
65
66

    constructor() {
        this.log = getLogger();
        this.metricsEmitter = new EventEmitter();
        this.trialJobsMap = new Map<string, PAITrialJobDetail>();
67
        this.jobQueue = [];
68
69
        // Root dir on HDFS
        this.expRootDir = path.join('/nni', 'experiments', getExperimentId());
70
        this.experimentId = getExperimentId();
71
        this.paiJobCollector = new PAIJobInfoCollector(this.trialJobsMap);
72
        this.paiTokenUpdateInterval = 7200000; //2hours
SparkSnail's avatar
SparkSnail committed
73
        this.logCollection = 'none';
chicm-ms's avatar
chicm-ms committed
74
        this.log.info('Construct OpenPAI training service.');
75
76
77
    }

    public async run(): Promise<void> {
chicm-ms's avatar
chicm-ms committed
78
        this.log.info('Run PAI training service.');
79
80
        const restServer: PAIJobRestServer = component.get(PAIJobRestServer);
        await restServer.start();
81
        restServer.setEnableVersionCheck = this.versionCheck;
82
        this.log.info(`PAI Training service rest server listening on: ${restServer.endPoint}`);
83
84
85
        await Promise.all([
            this.statusCheckingLoop(),
            this.submitJobLoop()]);
chicm-ms's avatar
chicm-ms committed
86
        this.log.info('PAI training service exit.');
87
88
89
90
    }

    public async listTrialJobs(): Promise<TrialJobDetail[]> {
        const jobs: TrialJobDetail[] = [];
91
92

        for (const [key, value] of this.trialJobsMap) {
93
            jobs.push(await this.getTrialJob(key));
94
        }
95
96
97
98

        return Promise.resolve(jobs);
    }

99
    public async getTrialJob(trialJobId: string): Promise<TrialJobDetail> {
100
        if (this.paiClusterConfig === undefined) {
101
102
103
104
105
            throw new Error('PAI Cluster config is not initialized');
        }

        const paiTrialJob: PAITrialJobDetail | undefined = this.trialJobsMap.get(trialJobId);

106
        if (paiTrialJob === undefined) {
107
108
            return Promise.reject(`trial job ${trialJobId} not found`);
        }
109
110
111
112

        return Promise.resolve(paiTrialJob);
    }

113
    public addTrialJobMetricListener(listener: (metric: TrialJobMetric) => void): void {
114
115
116
        this.metricsEmitter.on('metric', listener);
    }

117
    public removeTrialJobMetricListener(listener: (metric: TrialJobMetric) => void): void {
118
119
120
        this.metricsEmitter.off('metric', listener);
    }

121
    public async submitTrialJob(form: TrialJobApplicationForm): Promise<TrialJobDetail> {
122
123
        if (this.paiClusterConfig === undefined) {
            throw new Error(`paiClusterConfig not initialized!`);
fishyds's avatar
fishyds committed
124
        }
chicm-ms's avatar
chicm-ms committed
125
        const deferred: Deferred<PAITrialJobDetail> = new Deferred<PAITrialJobDetail>();
fishyds's avatar
fishyds committed
126

127
128
129
130
131
        this.log.info(`submitTrialJob: form: ${JSON.stringify(form)}`);

        const trialJobId: string = uniqueString(5);
        //TODO: use HDFS working folder instead
        const trialWorkingFolder: string = path.join(this.expRootDir, 'trials', trialJobId);
132
        const paiJobName: string = `nni_exp_${this.experimentId}_trial_${trialJobId}`;
Shinai Yang's avatar
Shinai Yang committed
133
134
        const hdfsCodeDir: string = HDFSClientUtility.getHdfsTrialWorkDir(this.paiClusterConfig.userName, trialJobId);
        const hdfsOutputDir: string = unixPathJoin(hdfsCodeDir, 'nnioutput');
135

chicm-ms's avatar
chicm-ms committed
136
        const hdfsLogPath: string = String.Format(
137
            PAI_LOG_PATH_FORMAT,
138
            this.paiClusterConfig.host,
Shinai Yang's avatar
Shinai Yang committed
139
            hdfsOutputDir
140
            );
141
142
143
144

        const trialJobDetail: PAITrialJobDetail = new PAITrialJobDetail(
            trialJobId,
            'WAITING',
145
            paiJobName,
146
147
            Date.now(),
            trialWorkingFolder,
148
            form,
149
150
            hdfsLogPath);

151
152
153
        this.trialJobsMap.set(trialJobId, trialJobDetail);
        this.jobQueue.push(trialJobId);
        deferred.resolve(trialJobDetail);
154
155
156
157

        return deferred.promise;
    }

158
    public async updateTrialJob(trialJobId: string, form: TrialJobApplicationForm): Promise<TrialJobDetail> {
159
160
161
162
        const trialJobDetail: undefined | TrialJobDetail = this.trialJobsMap.get(trialJobId);
        if (trialJobDetail === undefined) {
            throw new Error(`updateTrialJob failed: ${trialJobId} not found`);
        }
163
        await this.writeParameterFile(trialJobId, form.hyperParameters);
164
165

        return trialJobDetail;
166
167
168
    }

    public get isMultiPhaseJobSupported(): boolean {
169
        return true;
170
171
    }

QuanluZhang's avatar
QuanluZhang committed
172
    public cancelTrialJob(trialJobId: string, isEarlyStopped: boolean = false): Promise<void> {
chicm-ms's avatar
chicm-ms committed
173
174
        const trialJobDetail: PAITrialJobDetail | undefined =  this.trialJobsMap.get(trialJobId);
        const deferred: Deferred<void> = new Deferred<void>();
175
        if (trialJobDetail === undefined) {
176
            this.log.error(`cancelTrialJob: trial job id ${trialJobId} not found`);
177

178
179
180
            return Promise.reject();
        }

181
        if (this.paiClusterConfig === undefined) {
182
            throw new Error('PAI Cluster config is not initialized');
183
        }
184
        if (this.paiToken === undefined) {
185
186
187
188
            throw new Error('PAI token is not initialized');
        }

        const stopJobRequest: request.Options = {
189
            uri: `http://${this.paiClusterConfig.host}/rest-server/api/v1/user/${this.paiClusterConfig.userName}\
190
/jobs/${trialJobDetail.paiJobName}/executionType`, 
191
192
            method: 'PUT',
            json: true,
193
            body: {value: 'STOP'},
194
            headers: {
195
196
                'Content-Type': 'application/json',
                Authorization: `Bearer ${this.paiToken}`
197
198
            }
        };
199
200
201
202

        // Set trialjobDetail's early stopped field, to mark the job's cancellation source
        trialJobDetail.isEarlyStopped = isEarlyStopped;

203
        request(stopJobRequest, (error: Error, response: request.Response, body: any) => {
204
            if ((error !== undefined && error !== null) || response.statusCode >= 400) {
205
                this.log.error(`PAI Training service: stop trial ${trialJobId} to PAI Cluster failed!`);
206
207
                deferred.reject((error !== undefined && error !== null) ? error.message :
                 `Stop trial failed, http code: ${response.statusCode}`);
208
209
210
211
212
            } else {
                deferred.resolve();
            }
        });

213
        return deferred.promise;
214
215
    }

fishyds's avatar
fishyds committed
216
    public async setClusterMetadata(key: string, value: string): Promise<void> {
chicm-ms's avatar
chicm-ms committed
217
        const deferred: Deferred<void> = new Deferred<void>();
218
219

        switch (key) {
220
221
222
223
224
            case TrialConfigMetadataKey.NNI_MANAGER_IP:
                this.nniManagerIpConfig = <NNIManagerIpConfig>JSON.parse(value);
                deferred.resolve();
                break;

225
226
            case TrialConfigMetadataKey.PAI_CLUSTER_CONFIG:
                this.paiClusterConfig = <PAIClusterConfig>JSON.parse(value);
227

228
229
                this.hdfsClient = WebHDFS.createClient({
                    user: this.paiClusterConfig.userName,
230
231
                    // Refer PAI document for Pylon mapping https://github.com/Microsoft/pai/tree/master/docs/pylon
                    port: 80,
232
                    path: '/webhdfs/api/v1',
233
234
                    host: this.paiClusterConfig.host
                });
235
236
237
238
239
240
241
242
                if(this.paiClusterConfig.passWord) {
                    // Get PAI authentication token
                    await this.updatePaiToken();
                } else if(this.paiClusterConfig.token) {
                    this.paiToken = this.paiClusterConfig.token;
                } else {
                    deferred.reject(new Error('pai cluster config format error, please set password or token!'));
                }
243

244
                deferred.resolve();
245
                break;
246

247
            case TrialConfigMetadataKey.TRIAL_CONFIG:
248
                if (this.paiClusterConfig === undefined) {
249
                    this.log.error('pai cluster config is not initialized');
fishyds's avatar
fishyds committed
250
                    deferred.reject(new Error('pai cluster config is not initialized'));
251
252
253
                    break;
                }
                this.paiTrialConfig = <NNIPAITrialConfig>JSON.parse(value);
254

255
256
257
                // Validate to make sure codeDir doesn't have too many files
                try {
                    await validateCodeDir(this.paiTrialConfig.codeDir);
258
                } catch (error) {
259
260
261
262
                    this.log.error(error);
                    deferred.reject(new Error(error));
                    break;
                }
263
           
264
                // Copy experiment files from local folder to HDFS
265
266
                this.copyExpCodeDirPromise = HDFSClientUtility.copyDirectoryToHdfs(
                    this.paiTrialConfig.codeDir,
267
                    HDFSClientUtility.getHdfsExpCodeDir(this.paiClusterConfig.userName),
268
269
                    this.hdfsClient
                );
270
271
272
273
274
275
                
                // Upload authFile to hdfs
                if (this.paiTrialConfig.authFile) {
                    this.authFileHdfsPath = unixPathJoin(HDFSClientUtility.hdfsExpRootDir(this.paiClusterConfig.userName), 'authFile');
                    this.copyAuthFilePromise = HDFSClientUtility.copyFileToHdfs(this.paiTrialConfig.authFile, this.authFileHdfsPath, this.hdfsClient);
                }
fishyds's avatar
fishyds committed
276

277
278
                deferred.resolve();
                break;
279
280
281
            case TrialConfigMetadataKey.VERSION_CHECK:
                this.versionCheck = (value === 'true' || value === 'True');
                break;
SparkSnail's avatar
SparkSnail committed
282
283
284
            case TrialConfigMetadataKey.LOG_COLLECTION:
                this.logCollection = value;
                break;
285
286
287
            case TrialConfigMetadataKey.MULTI_PHASE:
                this.isMultiPhase = (value === 'true' || value === 'True');
                break;
288
289
290
291
292
            default:
                //Reject for unknown keys
                throw new Error(`Uknown key: ${key}`);
        }

293
        return deferred.promise;
294
295
296
    }

    public getClusterMetadata(key: string): Promise<string> {
chicm-ms's avatar
chicm-ms committed
297
        const deferred: Deferred<string> = new Deferred<string>();
298
299

        deferred.resolve();
300
301

        return deferred.promise;
302
303
304
    }

    public async cleanUp(): Promise<void> {
chicm-ms's avatar
chicm-ms committed
305
        this.log.info('Stopping PAI training service...');
306
307
        this.stopping = true;

chicm-ms's avatar
chicm-ms committed
308
        const deferred: Deferred<void> = new Deferred<void>();
309
310
311
312
313
314
        const restServer: PAIJobRestServer = component.get(PAIJobRestServer);
        try {
            await restServer.stop();
            deferred.resolve();
            this.log.info('PAI Training service rest server stopped successfully.');
        } catch (error) {
315
            this.log.error(`PAI Training service rest server stopped failed, error: ${error.message}`);
316
317
318
            deferred.reject(error);
        }

319
        return deferred.promise;
320
321
    }

chicm-ms's avatar
chicm-ms committed
322
    public get MetricsEmitter(): EventEmitter {
323
324
        return this.metricsEmitter;
    }
325

326
    private async submitTrialJobToPAI(trialJobId: string): Promise<boolean> {
chicm-ms's avatar
chicm-ms committed
327
        const deferred: Deferred<boolean> = new Deferred<boolean>();
328
329
        const trialJobDetail: PAITrialJobDetail | undefined = this.trialJobsMap.get(trialJobId);

330
        if (trialJobDetail === undefined) {
331
332
333
            throw new Error(`Failed to find PAITrialJobDetail for job ${trialJobId}`);
        }

334
        if (this.paiClusterConfig === undefined) {
335
336
            throw new Error('PAI Cluster config is not initialized');
        }
337
        if (this.paiTrialConfig === undefined) {
338
339
            throw new Error('trial config is not initialized');
        }
340
        if (this.paiToken === undefined) {
341
342
343
            throw new Error('PAI token is not initialized');
        }

344
        if (this.paiRestServerPort === undefined) {
345
346
347
348
349
            const restServer: PAIJobRestServer = component.get(PAIJobRestServer);
            this.paiRestServerPort = restServer.clusterRestServerPort;
        }

        // Make sure experiment code files is copied from local to HDFS
350
        if (this.copyExpCodeDirPromise !== undefined) {
351
352
353
            await this.copyExpCodeDirPromise;
        }

354
355
356
357
        //Make sure authFile is copied from local to HDFS
        if (this.paiTrialConfig.authFile) {
            await this.copyAuthFilePromise;
        }
358
359
360
361
        // Step 1. Prepare PAI job configuration

        const trialLocalTempFolder: string = path.join(getExperimentRootDir(), 'trials-local', trialJobId);
        //create tmp trial working folder locally.
362
        await execMkdir(trialLocalTempFolder);
363

chicm-ms's avatar
chicm-ms committed
364
        const runScriptContent: string = CONTAINER_INSTALL_NNI_SHELL_FORMAT;
365
366
367
368
        // Write NNI installation file to local tmp files
        await fs.promises.writeFile(path.join(trialLocalTempFolder, 'install_nni.sh'), runScriptContent, { encoding: 'utf8' });

        // Write file content ( parameter.cfg ) to local tmp folders
369
        if (trialJobDetail.form !== undefined) {
370
            await fs.promises.writeFile(
371
372
                path.join(trialLocalTempFolder, generateParamFileName(trialJobDetail.form.hyperParameters)),
                trialJobDetail.form.hyperParameters.value, { encoding: 'utf8' }
373
374
            );
        }
Shinai Yang's avatar
Shinai Yang committed
375
376
        const hdfsCodeDir: string = HDFSClientUtility.getHdfsTrialWorkDir(this.paiClusterConfig.userName, trialJobId);
        const hdfsOutputDir: string = unixPathJoin(hdfsCodeDir, 'nnioutput');
377
378
        const nniManagerIp: string = this.nniManagerIpConfig ? this.nniManagerIpConfig.nniManagerIp : getIPV4Address();
        const version: string = this.versionCheck ? await getVersion() : '';
chicm-ms's avatar
chicm-ms committed
379
        const nniPaiTrialCommand: string = String.Format(
380
381
382
383
384
385
            PAI_TRIAL_COMMAND_FORMAT,
            // PAI will copy job's codeDir into /root directory
            `$PWD/${trialJobId}`,
            `$PWD/${trialJobId}/nnioutput`,
            trialJobId,
            this.experimentId,
386
            trialJobDetail.form.sequenceId,
387
            this.isMultiPhase,
388
389
390
            this.paiTrialConfig.command,
            nniManagerIp,
            this.paiRestServerPort,
Shinai Yang's avatar
Shinai Yang committed
391
            hdfsOutputDir,
392
            this.paiClusterConfig.host,
393
394
395
396
            this.paiClusterConfig.userName,
            HDFSClientUtility.getHdfsExpCodeDir(this.paiClusterConfig.userName),
            version,
            this.logCollection
397
398
        )
        .replace(/\r\n|\n|\r/gm, '');
399

SparkSnail's avatar
SparkSnail committed
400
        this.log.info(`nniPAItrial command is ${nniPaiTrialCommand.trim()}`);
chicm-ms's avatar
chicm-ms committed
401
        const paiTaskRoles: PAITaskRole[] = [
402
403
404
405
406
407
408
409
410
411
412
413
414
            new PAITaskRole(
                `nni_trail_${trialJobId}`,
                // Task role number
                1,
                // Task CPU number
                this.paiTrialConfig.cpuNum,
                // Task memory
                this.paiTrialConfig.memoryMB,
                // Task GPU number
                this.paiTrialConfig.gpuNum,
                // Task command
                nniPaiTrialCommand,
                // Task shared memory
415
                this.paiTrialConfig.shmMB,
416
417
                // Task portList
                this.paiTrialConfig.portList
418
419
420
            )
        ];

chicm-ms's avatar
chicm-ms committed
421
        const paiJobConfig: PAIJobConfig = new PAIJobConfig(
422
423
424
425
426
            // Job name
            trialJobDetail.paiJobName,
            // Docker image
            this.paiTrialConfig.image,
            // codeDir
Shinai Yang's avatar
Shinai Yang committed
427
            `$PAI_DEFAULT_FS_URI${hdfsCodeDir}`,
428
429
430
            // PAI Task roles
            paiTaskRoles,
            // Add Virutal Cluster
431
432
            this.paiTrialConfig.virtualCluster === undefined ? 'default' : this.paiTrialConfig.virtualCluster.toString(),
            //Task auth File
433
            this.authFileHdfsPath
434
435
436
437
        );

        // Step 2. Upload code files in codeDir onto HDFS
        try {
Shinai Yang's avatar
Shinai Yang committed
438
            await HDFSClientUtility.copyDirectoryToHdfs(trialLocalTempFolder, hdfsCodeDir, this.hdfsClient);
439
        } catch (error) {
Shinai Yang's avatar
Shinai Yang committed
440
            this.log.error(`PAI Training service: copy ${this.paiTrialConfig.codeDir} to HDFS ${hdfsCodeDir} failed, error is ${error}`);
441
            trialJobDetail.status = 'FAILED'; // eslint-disable-line require-atomic-updates
442
443
444
            deferred.resolve(true);

            return deferred.promise;
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
        }

        // Step 3. Submit PAI job via Rest call
        // Refer https://github.com/Microsoft/pai/blob/master/docs/rest-server/API.md for more detail about PAI Rest API
        const submitJobRequest: request.Options = {
            uri: `http://${this.paiClusterConfig.host}/rest-server/api/v1/user/${this.paiClusterConfig.userName}/jobs`,
            method: 'POST',
            json: true,
            body: paiJobConfig,
            headers: {
                'Content-Type': 'application/json',
                Authorization: `Bearer ${this.paiToken}`
            }
        };
        request(submitJobRequest, (error: Error, response: request.Response, body: any) => {
460
            if ((error !== undefined && error !== null) || response.statusCode >= 400) {
chicm-ms's avatar
chicm-ms committed
461
                const errorMessage: string = (error !== undefined && error !== null) ? error.message :
462
                    `Submit trial ${trialJobId} failed, http code:${response.statusCode}, http body: ${response.body.message}`;
463
                trialJobDetail.status = 'FAILED';
464
                deferred.resolve(true);
465
466
467
468
469
470
471
472
473
474
475
            } else {
                trialJobDetail.submitTime = Date.now();
                deferred.resolve(true);
            }
        });

        return deferred.promise;
    }

    private async statusCheckingLoop(): Promise<void> {
        while (!this.stopping) {
476
477
478
479
480
481
482
483
484
            if(this.paiClusterConfig && this.paiClusterConfig.passWord) {
                try {
                    await this.updatePaiToken();
                } catch (error) {
                    this.log.error(`${error}`);
                    //only throw error when initlize paiToken first time
                    if (this.paiToken === undefined) {
                        throw new Error(error);
                    }
SparkSnail's avatar
SparkSnail committed
485
486
                }
            }
487
488
            await this.paiJobCollector.retrieveTrialStatus(this.paiToken, this.paiClusterConfig);
            const restServer: PAIJobRestServer = component.get(PAIJobRestServer);
489
            if (restServer.getErrorMessage !== undefined) {
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
                throw new Error(restServer.getErrorMessage);
            }
            await delay(3000);
        }
    }

    private async submitJobLoop(): Promise<void> {
        while (!this.stopping) {
            while (!this.stopping && this.jobQueue.length > 0) {
                const trialJobId: string = this.jobQueue[0];
                if (await this.submitTrialJobToPAI(trialJobId)) {
                    // Remove trial job with trialJobId from job queue
                    this.jobQueue.shift();
                } else {
                    // Break the while loop since failed to submitJob
                    break;
                }
            }
            await delay(3000);
        }
    }

512
513
514
515
    /**
     * Update pai token by the interval time or initialize the pai token
     */
    private async updatePaiToken(): Promise<void> {
chicm-ms's avatar
chicm-ms committed
516
        const deferred: Deferred<void> = new Deferred<void>();
517
518

        const currentTime: number = new Date().getTime();
519
        //If pai token initialized and not reach the interval time, do not update
520
        if (this.paiTokenUpdateTime !== undefined && (currentTime - this.paiTokenUpdateTime) < this.paiTokenUpdateInterval) {
521
522
            return Promise.resolve();
        }
523

524
        if (this.paiClusterConfig === undefined) {
525
            const paiClusterConfigError: string = `pai cluster config not initialized!`;
526
            this.log.error(`${paiClusterConfigError}`);
527
            throw Error(`${paiClusterConfigError}`);
528
529
        }

530
        const authenticationReq: request.Options = {
531
532
533
534
535
536
537
538
539
            uri: `http://${this.paiClusterConfig.host}/rest-server/api/v1/token`,
            method: 'POST',
            json: true,
            body: {
                username: this.paiClusterConfig.userName,
                password: this.paiClusterConfig.passWord
            }
        };

540
541
        request(authenticationReq, (error: Error, response: request.Response, body: any) => {
            if (error !== undefined && error !== null) {
542
543
544
                this.log.error(`Get PAI token failed: ${error.message}`);
                deferred.reject(new Error(`Get PAI token failed: ${error.message}`));
            } else {
545
                if (response.statusCode !== 200) {
546
                    this.log.error(`Get PAI token failed: get PAI Rest return code ${response.statusCode}`);
547
                    deferred.reject(new Error(`Get PAI token failed: ${response.body}, please check paiConfig username or password`));
548
549
550
551
552
553
                }
                this.paiToken = body.token;
                this.paiTokenUpdateTime = new Date().getTime();
                deferred.resolve();
            }
        });
554

555
556
557
558
559
560
561
562
        let timeoutId: NodeJS.Timer;
        const timeoutDelay: Promise<void> = new Promise<void>((resolve: Function, reject: Function): void => {
            // Set timeout and reject the promise once reach timeout (5 seconds)
            timeoutId = setTimeout(
                () => reject(new Error('Get PAI token timeout. Please check your PAI cluster.')),
                5000);
        });

563
        return Promise.race([timeoutDelay, deferred.promise])
564
            .finally(() => { clearTimeout(timeoutId); });
565
    }
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591

    private async writeParameterFile(trialJobId: string, hyperParameters: HyperParameters): Promise<void> {
        if (this.paiClusterConfig === undefined) {
            throw new Error('PAI Cluster config is not initialized');
        }
        if (this.paiTrialConfig === undefined) {
            throw new Error('PAI trial config is not initialized');
        }

        const trialLocalTempFolder: string = path.join(getExperimentRootDir(), 'trials-local', trialJobId);
        const hpFileName: string = generateParamFileName(hyperParameters);
        const localFilepath: string = path.join(trialLocalTempFolder, hpFileName);
        await fs.promises.writeFile(localFilepath, hyperParameters.value, { encoding: 'utf8' });
        const hdfsCodeDir: string = HDFSClientUtility.getHdfsTrialWorkDir(this.paiClusterConfig.userName, trialJobId);
        const hdfsHpFilePath: string = path.join(hdfsCodeDir, hpFileName);

        await HDFSClientUtility.copyFileToHdfs(localFilepath, hdfsHpFilePath, this.hdfsClient);

        await this.postParameterFileMeta({
            experimentId: this.experimentId,
            trialId: trialJobId,
            filePath: hdfsHpFilePath
        });
    }

    private postParameterFileMeta(parameterFileMeta: ParameterFileMeta): Promise<void> {
chicm-ms's avatar
chicm-ms committed
592
        const deferred: Deferred<void> = new Deferred<void>();
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
        const restServer: PAIJobRestServer = component.get(PAIJobRestServer);
        const req: request.Options = {
            uri: `${restServer.endPoint}${restServer.apiRootUrl}/parameter-file-meta`,
            method: 'POST',
            json: true,
            body: parameterFileMeta
        };
        request(req, (err: Error, res: request.Response) => {
            if (err) {
                deferred.reject(err);
            } else {
                deferred.resolve();
            }
        });

        return deferred.promise;
    }
610
611
}

612
export { PAITrainingService };