paiTrainingService.ts 25.5 KB
Newer Older
liuzhe-lz's avatar
liuzhe-lz committed
1
2
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
3

4
'use strict';
5
6
7

import * as fs from 'fs';
import * as path from 'path';
8
// tslint:disable-next-line:no-implicit-dependencies
9
import * as request from 'request';
10
import * as component from '../../common/component';
11
12

import { EventEmitter } from 'events';
13
14
import { Deferred } from 'ts-deferred';
import { String } from 'typescript-string-operations';
15
import { getExperimentId } from '../../common/experimentStartupInfo';
16
17
import { getLogger, Logger } from '../../common/log';
import {
18
    HyperParameters, NNIManagerIpConfig, TrainingService,
19
    TrialJobApplicationForm, TrialJobDetail, TrialJobMetric
20
} from '../../common/trainingService';
21
import { delay, generateParamFileName,
22
    getExperimentRootDir, getIPV4Address, getVersion, uniqueString, unixPathJoin } from '../../common/utils';
23
24
import { CONTAINER_INSTALL_NNI_SHELL_FORMAT } from '../common/containerJobData';
import { TrialConfigMetadataKey } from '../common/trialConfigMetadataKey';
25
import { execMkdir, validateCodeDir } from '../common/util';
26
27
import { HDFSClientUtility } from './hdfsClientUtility';
import { NNIPAITrialConfig, PAIClusterConfig, PAIJobConfig, PAITaskRole } from './paiConfig';
28
import { PAI_LOG_PATH_FORMAT, PAI_TRIAL_COMMAND_FORMAT, PAITrialJobDetail } from './paiData';
29
import { PAIJobInfoCollector } from './paiJobInfoCollector';
30
import { PAIJobRestServer, ParameterFileMeta } from './paiJobRestServer';
31

32
import * as WebHDFS from 'webhdfs';
33
34
35
36
37
38
39
40
41
42
43
44
45

/**
 * Training Service implementation for OpenPAI (Open Platform for AI)
 * Refer https://github.com/Microsoft/pai for more info about OpenPAI
 */
@component.Singleton
class PAITrainingService implements TrainingService {
    private readonly log!: Logger;
    private readonly metricsEmitter: EventEmitter;
    private readonly trialJobsMap: Map<string, PAITrialJobDetail>;
    private readonly expRootDir: string;
    private paiTrialConfig: NNIPAITrialConfig | undefined;
    private paiClusterConfig?: PAIClusterConfig;
46
    private readonly jobQueue: string[];
47
    private stopping: boolean = false;
48
    // tslint:disable-next-line:no-any
49
50
    private hdfsClient: any;
    private paiToken? : string;
51
    private paiTokenUpdateTime?: number;
52
    private readonly paiTokenUpdateInterval: number;
chicm-ms's avatar
chicm-ms committed
53
54
    private readonly experimentId!: string;
    private readonly paiJobCollector: PAIJobInfoCollector;
55
    private paiRestServerPort?: number;
56
    private nniManagerIpConfig?: NNIManagerIpConfig;
57
    private copyExpCodeDirPromise?: Promise<void>;
58
    private copyAuthFilePromise?: Promise<void>;
59
    private versionCheck: boolean = true;
SparkSnail's avatar
SparkSnail committed
60
    private logCollection: string;
61
    private isMultiPhase: boolean = false;
62
    private authFileHdfsPath: string | undefined = undefined;
63
    private portList?: string | undefined;
64
65
66
67
68

    constructor() {
        this.log = getLogger();
        this.metricsEmitter = new EventEmitter();
        this.trialJobsMap = new Map<string, PAITrialJobDetail>();
69
        this.jobQueue = [];
70
71
        // Root dir on HDFS
        this.expRootDir = path.join('/nni', 'experiments', getExperimentId());
72
        this.experimentId = getExperimentId();
73
        this.paiJobCollector = new PAIJobInfoCollector(this.trialJobsMap);
74
        this.paiTokenUpdateInterval = 7200000; //2hours
SparkSnail's avatar
SparkSnail committed
75
        this.logCollection = 'none';
chicm-ms's avatar
chicm-ms committed
76
        this.log.info('Construct OpenPAI training service.');
77
78
79
    }

    public async run(): Promise<void> {
chicm-ms's avatar
chicm-ms committed
80
        this.log.info('Run PAI training service.');
81
82
        const restServer: PAIJobRestServer = component.get(PAIJobRestServer);
        await restServer.start();
83
        restServer.setEnableVersionCheck = this.versionCheck;
84
        this.log.info(`PAI Training service rest server listening on: ${restServer.endPoint}`);
85
86
87
        await Promise.all([
            this.statusCheckingLoop(),
            this.submitJobLoop()]);
chicm-ms's avatar
chicm-ms committed
88
        this.log.info('PAI training service exit.');
89
90
91
92
    }

    public async listTrialJobs(): Promise<TrialJobDetail[]> {
        const jobs: TrialJobDetail[] = [];
93
94

        for (const [key, value] of this.trialJobsMap) {
95
            jobs.push(await this.getTrialJob(key));
96
        }
97
98
99
100

        return Promise.resolve(jobs);
    }

101
    public async getTrialJob(trialJobId: string): Promise<TrialJobDetail> {
102
        if (this.paiClusterConfig === undefined) {
103
104
105
106
107
            throw new Error('PAI Cluster config is not initialized');
        }

        const paiTrialJob: PAITrialJobDetail | undefined = this.trialJobsMap.get(trialJobId);

108
        if (paiTrialJob === undefined) {
109
110
            return Promise.reject(`trial job ${trialJobId} not found`);
        }
111
112
113
114

        return Promise.resolve(paiTrialJob);
    }

115
    public addTrialJobMetricListener(listener: (metric: TrialJobMetric) => void): void {
116
117
118
        this.metricsEmitter.on('metric', listener);
    }

119
    public removeTrialJobMetricListener(listener: (metric: TrialJobMetric) => void): void {
120
121
122
        this.metricsEmitter.off('metric', listener);
    }

123
    public async submitTrialJob(form: TrialJobApplicationForm): Promise<TrialJobDetail> {
124
125
        if (this.paiClusterConfig === undefined) {
            throw new Error(`paiClusterConfig not initialized!`);
fishyds's avatar
fishyds committed
126
        }
chicm-ms's avatar
chicm-ms committed
127
        const deferred: Deferred<PAITrialJobDetail> = new Deferred<PAITrialJobDetail>();
fishyds's avatar
fishyds committed
128

129
130
131
132
133
        this.log.info(`submitTrialJob: form: ${JSON.stringify(form)}`);

        const trialJobId: string = uniqueString(5);
        //TODO: use HDFS working folder instead
        const trialWorkingFolder: string = path.join(this.expRootDir, 'trials', trialJobId);
134
        const paiJobName: string = `nni_exp_${this.experimentId}_trial_${trialJobId}`;
Shinai Yang's avatar
Shinai Yang committed
135
136
        const hdfsCodeDir: string = HDFSClientUtility.getHdfsTrialWorkDir(this.paiClusterConfig.userName, trialJobId);
        const hdfsOutputDir: string = unixPathJoin(hdfsCodeDir, 'nnioutput');
137

chicm-ms's avatar
chicm-ms committed
138
        const hdfsLogPath: string = String.Format(
139
            PAI_LOG_PATH_FORMAT,
140
            this.paiClusterConfig.host,
Shinai Yang's avatar
Shinai Yang committed
141
            hdfsOutputDir
142
            );
143
144
145
146

        const trialJobDetail: PAITrialJobDetail = new PAITrialJobDetail(
            trialJobId,
            'WAITING',
147
            paiJobName,
148
149
            Date.now(),
            trialWorkingFolder,
150
            form,
151
152
            hdfsLogPath);

153
154
155
        this.trialJobsMap.set(trialJobId, trialJobDetail);
        this.jobQueue.push(trialJobId);
        deferred.resolve(trialJobDetail);
156
157
158
159

        return deferred.promise;
    }

160
    public async updateTrialJob(trialJobId: string, form: TrialJobApplicationForm): Promise<TrialJobDetail> {
161
162
163
164
        const trialJobDetail: undefined | TrialJobDetail = this.trialJobsMap.get(trialJobId);
        if (trialJobDetail === undefined) {
            throw new Error(`updateTrialJob failed: ${trialJobId} not found`);
        }
165
        await this.writeParameterFile(trialJobId, form.hyperParameters);
166
167

        return trialJobDetail;
168
169
170
    }

    public get isMultiPhaseJobSupported(): boolean {
171
        return true;
172
173
    }

174
    // tslint:disable:no-http-string
QuanluZhang's avatar
QuanluZhang committed
175
    public cancelTrialJob(trialJobId: string, isEarlyStopped: boolean = false): Promise<void> {
chicm-ms's avatar
chicm-ms committed
176
177
        const trialJobDetail: PAITrialJobDetail | undefined =  this.trialJobsMap.get(trialJobId);
        const deferred: Deferred<void> = new Deferred<void>();
178
        if (trialJobDetail === undefined) {
179
            this.log.error(`cancelTrialJob: trial job id ${trialJobId} not found`);
180

181
182
183
            return Promise.reject();
        }

184
        if (this.paiClusterConfig === undefined) {
185
            throw new Error('PAI Cluster config is not initialized');
186
        }
187
        if (this.paiToken === undefined) {
188
189
190
191
            throw new Error('PAI token is not initialized');
        }

        const stopJobRequest: request.Options = {
192
            uri: `http://${this.paiClusterConfig.host}/rest-server/api/v1/user/${this.paiClusterConfig.userName}\
193
/jobs/${trialJobDetail.paiJobName}/executionType`, 
194
195
            method: 'PUT',
            json: true,
196
            body: {value: 'STOP'},
197
            headers: {
198
199
                'Content-Type': 'application/json',
                Authorization: `Bearer ${this.paiToken}`
200
201
            }
        };
202
203
204
205

        // Set trialjobDetail's early stopped field, to mark the job's cancellation source
        trialJobDetail.isEarlyStopped = isEarlyStopped;

206
        // tslint:disable-next-line:no-any
207
        request(stopJobRequest, (error: Error, response: request.Response, body: any) => {
208
            if ((error !== undefined && error !== null) || response.statusCode >= 400) {
209
                this.log.error(`PAI Training service: stop trial ${trialJobId} to PAI Cluster failed!`);
210
211
                deferred.reject((error !== undefined && error !== null) ? error.message :
                 `Stop trial failed, http code: ${response.statusCode}`);
212
213
214
215
216
            } else {
                deferred.resolve();
            }
        });

217
        return deferred.promise;
218
219
    }

220
    // tslint:disable: no-unsafe-any no-any
221
    // tslint:disable-next-line:max-func-body-length
fishyds's avatar
fishyds committed
222
    public async setClusterMetadata(key: string, value: string): Promise<void> {
chicm-ms's avatar
chicm-ms committed
223
        const deferred: Deferred<void> = new Deferred<void>();
224
225

        switch (key) {
226
227
228
229
230
            case TrialConfigMetadataKey.NNI_MANAGER_IP:
                this.nniManagerIpConfig = <NNIManagerIpConfig>JSON.parse(value);
                deferred.resolve();
                break;

231
232
            case TrialConfigMetadataKey.PAI_CLUSTER_CONFIG:
                this.paiClusterConfig = <PAIClusterConfig>JSON.parse(value);
233

234
235
                this.hdfsClient = WebHDFS.createClient({
                    user: this.paiClusterConfig.userName,
236
237
                    // Refer PAI document for Pylon mapping https://github.com/Microsoft/pai/tree/master/docs/pylon
                    port: 80,
238
                    path: '/webhdfs/api/v1',
239
240
                    host: this.paiClusterConfig.host
                });
241
242
243
244
245
246
247
248
                if(this.paiClusterConfig.passWord) {
                    // Get PAI authentication token
                    await this.updatePaiToken();
                } else if(this.paiClusterConfig.token) {
                    this.paiToken = this.paiClusterConfig.token;
                } else {
                    deferred.reject(new Error('pai cluster config format error, please set password or token!'));
                }
249

250
                deferred.resolve();
251
                break;
252

253
            case TrialConfigMetadataKey.TRIAL_CONFIG:
254
                if (this.paiClusterConfig === undefined) {
255
                    this.log.error('pai cluster config is not initialized');
fishyds's avatar
fishyds committed
256
                    deferred.reject(new Error('pai cluster config is not initialized'));
257
258
259
                    break;
                }
                this.paiTrialConfig = <NNIPAITrialConfig>JSON.parse(value);
260

261
262
263
                // Validate to make sure codeDir doesn't have too many files
                try {
                    await validateCodeDir(this.paiTrialConfig.codeDir);
264
                } catch (error) {
265
266
267
268
                    this.log.error(error);
                    deferred.reject(new Error(error));
                    break;
                }
269
           
270
                // Copy experiment files from local folder to HDFS
271
272
                this.copyExpCodeDirPromise = HDFSClientUtility.copyDirectoryToHdfs(
                    this.paiTrialConfig.codeDir,
273
                    HDFSClientUtility.getHdfsExpCodeDir(this.paiClusterConfig.userName),
274
275
                    this.hdfsClient
                );
276
277
278
279
280
281
                
                // Upload authFile to hdfs
                if (this.paiTrialConfig.authFile) {
                    this.authFileHdfsPath = unixPathJoin(HDFSClientUtility.hdfsExpRootDir(this.paiClusterConfig.userName), 'authFile');
                    this.copyAuthFilePromise = HDFSClientUtility.copyFileToHdfs(this.paiTrialConfig.authFile, this.authFileHdfsPath, this.hdfsClient);
                }
fishyds's avatar
fishyds committed
282

283
284
                deferred.resolve();
                break;
285
286
287
            case TrialConfigMetadataKey.VERSION_CHECK:
                this.versionCheck = (value === 'true' || value === 'True');
                break;
SparkSnail's avatar
SparkSnail committed
288
289
290
            case TrialConfigMetadataKey.LOG_COLLECTION:
                this.logCollection = value;
                break;
291
292
293
            case TrialConfigMetadataKey.MULTI_PHASE:
                this.isMultiPhase = (value === 'true' || value === 'True');
                break;
294
295
296
297
298
            default:
                //Reject for unknown keys
                throw new Error(`Uknown key: ${key}`);
        }

299
        return deferred.promise;
300
    }
301
    // tslint:enable: no-unsafe-any
302
303

    public getClusterMetadata(key: string): Promise<string> {
chicm-ms's avatar
chicm-ms committed
304
        const deferred: Deferred<string> = new Deferred<string>();
305
306

        deferred.resolve();
307
308

        return deferred.promise;
309
310
311
    }

    public async cleanUp(): Promise<void> {
chicm-ms's avatar
chicm-ms committed
312
        this.log.info('Stopping PAI training service...');
313
314
        this.stopping = true;

chicm-ms's avatar
chicm-ms committed
315
        const deferred: Deferred<void> = new Deferred<void>();
316
317
318
319
320
321
        const restServer: PAIJobRestServer = component.get(PAIJobRestServer);
        try {
            await restServer.stop();
            deferred.resolve();
            this.log.info('PAI Training service rest server stopped successfully.');
        } catch (error) {
322
            // tslint:disable-next-line: no-unsafe-any
323
            this.log.error(`PAI Training service rest server stopped failed, error: ${error.message}`);
324
325
326
            deferred.reject(error);
        }

327
        return deferred.promise;
328
329
    }

chicm-ms's avatar
chicm-ms committed
330
    public get MetricsEmitter(): EventEmitter {
331
332
        return this.metricsEmitter;
    }
333

334
335
    // tslint:disable-next-line:max-func-body-length
    private async submitTrialJobToPAI(trialJobId: string): Promise<boolean> {
chicm-ms's avatar
chicm-ms committed
336
        const deferred: Deferred<boolean> = new Deferred<boolean>();
337
338
        const trialJobDetail: PAITrialJobDetail | undefined = this.trialJobsMap.get(trialJobId);

339
        if (trialJobDetail === undefined) {
340
341
342
            throw new Error(`Failed to find PAITrialJobDetail for job ${trialJobId}`);
        }

343
        if (this.paiClusterConfig === undefined) {
344
345
            throw new Error('PAI Cluster config is not initialized');
        }
346
        if (this.paiTrialConfig === undefined) {
347
348
            throw new Error('trial config is not initialized');
        }
349
        if (this.paiToken === undefined) {
350
351
352
            throw new Error('PAI token is not initialized');
        }

353
        if (this.paiRestServerPort === undefined) {
354
355
356
357
358
            const restServer: PAIJobRestServer = component.get(PAIJobRestServer);
            this.paiRestServerPort = restServer.clusterRestServerPort;
        }

        // Make sure experiment code files is copied from local to HDFS
359
        if (this.copyExpCodeDirPromise !== undefined) {
360
361
362
            await this.copyExpCodeDirPromise;
        }

363
364
365
366
        //Make sure authFile is copied from local to HDFS
        if (this.paiTrialConfig.authFile) {
            await this.copyAuthFilePromise;
        }
367
368
369
370
        // Step 1. Prepare PAI job configuration

        const trialLocalTempFolder: string = path.join(getExperimentRootDir(), 'trials-local', trialJobId);
        //create tmp trial working folder locally.
371
        await execMkdir(trialLocalTempFolder);
372

chicm-ms's avatar
chicm-ms committed
373
        const runScriptContent: string = CONTAINER_INSTALL_NNI_SHELL_FORMAT;
374
375
376
377
        // Write NNI installation file to local tmp files
        await fs.promises.writeFile(path.join(trialLocalTempFolder, 'install_nni.sh'), runScriptContent, { encoding: 'utf8' });

        // Write file content ( parameter.cfg ) to local tmp folders
378
        if (trialJobDetail.form !== undefined) {
379
            await fs.promises.writeFile(
380
381
                path.join(trialLocalTempFolder, generateParamFileName(trialJobDetail.form.hyperParameters)),
                trialJobDetail.form.hyperParameters.value, { encoding: 'utf8' }
382
383
            );
        }
Shinai Yang's avatar
Shinai Yang committed
384
385
        const hdfsCodeDir: string = HDFSClientUtility.getHdfsTrialWorkDir(this.paiClusterConfig.userName, trialJobId);
        const hdfsOutputDir: string = unixPathJoin(hdfsCodeDir, 'nnioutput');
386
        // tslint:disable-next-line: strict-boolean-expressions
387
388
        const nniManagerIp: string = this.nniManagerIpConfig ? this.nniManagerIpConfig.nniManagerIp : getIPV4Address();
        const version: string = this.versionCheck ? await getVersion() : '';
chicm-ms's avatar
chicm-ms committed
389
        const nniPaiTrialCommand: string = String.Format(
390
391
392
393
394
395
            PAI_TRIAL_COMMAND_FORMAT,
            // PAI will copy job's codeDir into /root directory
            `$PWD/${trialJobId}`,
            `$PWD/${trialJobId}/nnioutput`,
            trialJobId,
            this.experimentId,
396
            trialJobDetail.form.sequenceId,
397
            this.isMultiPhase,
398
399
400
            this.paiTrialConfig.command,
            nniManagerIp,
            this.paiRestServerPort,
Shinai Yang's avatar
Shinai Yang committed
401
            hdfsOutputDir,
402
            this.paiClusterConfig.host,
403
404
405
406
            this.paiClusterConfig.userName,
            HDFSClientUtility.getHdfsExpCodeDir(this.paiClusterConfig.userName),
            version,
            this.logCollection
407
408
        )
        .replace(/\r\n|\n|\r/gm, '');
409

410
        // tslint:disable-next-line:no-console
SparkSnail's avatar
SparkSnail committed
411
        this.log.info(`nniPAItrial command is ${nniPaiTrialCommand.trim()}`);
chicm-ms's avatar
chicm-ms committed
412
        const paiTaskRoles: PAITaskRole[] = [
413
414
415
416
417
418
419
420
421
422
423
424
425
            new PAITaskRole(
                `nni_trail_${trialJobId}`,
                // Task role number
                1,
                // Task CPU number
                this.paiTrialConfig.cpuNum,
                // Task memory
                this.paiTrialConfig.memoryMB,
                // Task GPU number
                this.paiTrialConfig.gpuNum,
                // Task command
                nniPaiTrialCommand,
                // Task shared memory
426
                this.paiTrialConfig.shmMB,
427
428
                // Task portList
                this.paiTrialConfig.portList
429
430
431
            )
        ];

chicm-ms's avatar
chicm-ms committed
432
        const paiJobConfig: PAIJobConfig = new PAIJobConfig(
433
434
435
436
437
            // Job name
            trialJobDetail.paiJobName,
            // Docker image
            this.paiTrialConfig.image,
            // codeDir
Shinai Yang's avatar
Shinai Yang committed
438
            `$PAI_DEFAULT_FS_URI${hdfsCodeDir}`,
439
440
441
            // PAI Task roles
            paiTaskRoles,
            // Add Virutal Cluster
442
443
            this.paiTrialConfig.virtualCluster === undefined ? 'default' : this.paiTrialConfig.virtualCluster.toString(),
            //Task auth File
444
            this.authFileHdfsPath
445
446
447
448
        );

        // Step 2. Upload code files in codeDir onto HDFS
        try {
Shinai Yang's avatar
Shinai Yang committed
449
            await HDFSClientUtility.copyDirectoryToHdfs(trialLocalTempFolder, hdfsCodeDir, this.hdfsClient);
450
        } catch (error) {
Shinai Yang's avatar
Shinai Yang committed
451
            this.log.error(`PAI Training service: copy ${this.paiTrialConfig.codeDir} to HDFS ${hdfsCodeDir} failed, error is ${error}`);
452
453
454
455
            trialJobDetail.status = 'FAILED';
            deferred.resolve(true);

            return deferred.promise;
456
457
458
459
460
461
462
463
464
465
466
467
468
469
        }

        // Step 3. Submit PAI job via Rest call
        // Refer https://github.com/Microsoft/pai/blob/master/docs/rest-server/API.md for more detail about PAI Rest API
        const submitJobRequest: request.Options = {
            uri: `http://${this.paiClusterConfig.host}/rest-server/api/v1/user/${this.paiClusterConfig.userName}/jobs`,
            method: 'POST',
            json: true,
            body: paiJobConfig,
            headers: {
                'Content-Type': 'application/json',
                Authorization: `Bearer ${this.paiToken}`
            }
        };
470
        // tslint:disable:no-any no-unsafe-any
471
        request(submitJobRequest, (error: Error, response: request.Response, body: any) => {
472
            if ((error !== undefined && error !== null) || response.statusCode >= 400) {
chicm-ms's avatar
chicm-ms committed
473
                const errorMessage: string = (error !== undefined && error !== null) ? error.message :
474
                    `Submit trial ${trialJobId} failed, http code:${response.statusCode}, http body: ${response.body.message}`;
475
                trialJobDetail.status = 'FAILED';
476
                deferred.resolve(true);
477
478
479
480
481
482
483
484
485
486
487
            } else {
                trialJobDetail.submitTime = Date.now();
                deferred.resolve(true);
            }
        });

        return deferred.promise;
    }

    private async statusCheckingLoop(): Promise<void> {
        while (!this.stopping) {
488
489
490
491
492
493
494
495
496
            if(this.paiClusterConfig && this.paiClusterConfig.passWord) {
                try {
                    await this.updatePaiToken();
                } catch (error) {
                    this.log.error(`${error}`);
                    //only throw error when initlize paiToken first time
                    if (this.paiToken === undefined) {
                        throw new Error(error);
                    }
SparkSnail's avatar
SparkSnail committed
497
498
                }
            }
499
500
            await this.paiJobCollector.retrieveTrialStatus(this.paiToken, this.paiClusterConfig);
            const restServer: PAIJobRestServer = component.get(PAIJobRestServer);
501
            if (restServer.getErrorMessage !== undefined) {
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
                throw new Error(restServer.getErrorMessage);
            }
            await delay(3000);
        }
    }

    private async submitJobLoop(): Promise<void> {
        while (!this.stopping) {
            while (!this.stopping && this.jobQueue.length > 0) {
                const trialJobId: string = this.jobQueue[0];
                if (await this.submitTrialJobToPAI(trialJobId)) {
                    // Remove trial job with trialJobId from job queue
                    this.jobQueue.shift();
                } else {
                    // Break the while loop since failed to submitJob
                    break;
                }
            }
            await delay(3000);
        }
    }

524
525
526
527
    /**
     * Update pai token by the interval time or initialize the pai token
     */
    private async updatePaiToken(): Promise<void> {
chicm-ms's avatar
chicm-ms committed
528
        const deferred: Deferred<void> = new Deferred<void>();
529
530

        const currentTime: number = new Date().getTime();
531
        //If pai token initialized and not reach the interval time, do not update
532
        if (this.paiTokenUpdateTime !== undefined && (currentTime - this.paiTokenUpdateTime) < this.paiTokenUpdateInterval) {
533
534
            return Promise.resolve();
        }
535

536
        if (this.paiClusterConfig === undefined) {
537
            const paiClusterConfigError: string = `pai cluster config not initialized!`;
538
            this.log.error(`${paiClusterConfigError}`);
539
            throw Error(`${paiClusterConfigError}`);
540
541
        }

542
        const authenticationReq: request.Options = {
543
544
545
546
547
548
549
550
551
            uri: `http://${this.paiClusterConfig.host}/rest-server/api/v1/token`,
            method: 'POST',
            json: true,
            body: {
                username: this.paiClusterConfig.userName,
                password: this.paiClusterConfig.passWord
            }
        };

552
553
        request(authenticationReq, (error: Error, response: request.Response, body: any) => {
            if (error !== undefined && error !== null) {
554
555
556
                this.log.error(`Get PAI token failed: ${error.message}`);
                deferred.reject(new Error(`Get PAI token failed: ${error.message}`));
            } else {
557
                if (response.statusCode !== 200) {
558
                    this.log.error(`Get PAI token failed: get PAI Rest return code ${response.statusCode}`);
559
                    deferred.reject(new Error(`Get PAI token failed: ${response.body}, please check paiConfig username or password`));
560
561
562
563
564
565
                }
                this.paiToken = body.token;
                this.paiTokenUpdateTime = new Date().getTime();
                deferred.resolve();
            }
        });
566

567
568
569
570
571
572
573
574
        let timeoutId: NodeJS.Timer;
        const timeoutDelay: Promise<void> = new Promise<void>((resolve: Function, reject: Function): void => {
            // Set timeout and reject the promise once reach timeout (5 seconds)
            timeoutId = setTimeout(
                () => reject(new Error('Get PAI token timeout. Please check your PAI cluster.')),
                5000);
        });

575
        return Promise.race([timeoutDelay, deferred.promise])
576
            .finally(() => { clearTimeout(timeoutId); });
577
    }
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603

    private async writeParameterFile(trialJobId: string, hyperParameters: HyperParameters): Promise<void> {
        if (this.paiClusterConfig === undefined) {
            throw new Error('PAI Cluster config is not initialized');
        }
        if (this.paiTrialConfig === undefined) {
            throw new Error('PAI trial config is not initialized');
        }

        const trialLocalTempFolder: string = path.join(getExperimentRootDir(), 'trials-local', trialJobId);
        const hpFileName: string = generateParamFileName(hyperParameters);
        const localFilepath: string = path.join(trialLocalTempFolder, hpFileName);
        await fs.promises.writeFile(localFilepath, hyperParameters.value, { encoding: 'utf8' });
        const hdfsCodeDir: string = HDFSClientUtility.getHdfsTrialWorkDir(this.paiClusterConfig.userName, trialJobId);
        const hdfsHpFilePath: string = path.join(hdfsCodeDir, hpFileName);

        await HDFSClientUtility.copyFileToHdfs(localFilepath, hdfsHpFilePath, this.hdfsClient);

        await this.postParameterFileMeta({
            experimentId: this.experimentId,
            trialId: trialJobId,
            filePath: hdfsHpFilePath
        });
    }

    private postParameterFileMeta(parameterFileMeta: ParameterFileMeta): Promise<void> {
chicm-ms's avatar
chicm-ms committed
604
        const deferred: Deferred<void> = new Deferred<void>();
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
        const restServer: PAIJobRestServer = component.get(PAIJobRestServer);
        const req: request.Options = {
            uri: `${restServer.endPoint}${restServer.apiRootUrl}/parameter-file-meta`,
            method: 'POST',
            json: true,
            body: parameterFileMeta
        };
        request(req, (err: Error, res: request.Response) => {
            if (err) {
                deferred.reject(err);
            } else {
                deferred.resolve();
            }
        });

        return deferred.promise;
    }
622
623
}

624
export { PAITrainingService };