paiTrainingService.ts 28.7 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
 * Copyright (c) Microsoft Corporation
 * All rights reserved.
 *
 * MIT License
 *
 * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
 * documentation files (the "Software"), to deal in the Software without restriction, including without limitation
 * the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and
 * to permit persons to whom the Software is furnished to do so, subject to the following conditions:
 * The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
 *
 * THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING
 * BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
 * DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
 */

20
'use strict';
21
22
23
24

import * as cpp from 'child-process-promise';
import * as fs from 'fs';
import * as path from 'path';
25
// tslint:disable-next-line:no-implicit-dependencies
26
import * as request from 'request';
27
import * as component from '../../common/component';
28
29

import { EventEmitter } from 'events';
30
31
import { Deferred } from 'ts-deferred';
import { String } from 'typescript-string-operations';
32
import { MethodNotImplementedError } from '../../common/errors';
33
import { getExperimentId, getInitTrialSequenceId } from '../../common/experimentStartupInfo';
34
35
import { getLogger, Logger } from '../../common/log';
import {
36
    HyperParameters, JobApplicationForm, NNIManagerIpConfig, TrainingService,
37
    TrialJobApplicationForm, TrialJobDetail, TrialJobMetric
38
} from '../../common/trainingService';
39
import { delay, generateParamFileName,
40
    getExperimentRootDir, getIPV4Address, getVersion, uniqueString, unixPathJoin } from '../../common/utils';
41
42
import { CONTAINER_INSTALL_NNI_SHELL_FORMAT } from '../common/containerJobData';
import { TrialConfigMetadataKey } from '../common/trialConfigMetadataKey';
43
import { execMkdir, validateCodeDir } from '../common/util';
44
45
46
47
import { HDFSClientUtility } from './hdfsClientUtility';
import { NNIPAITrialConfig, PAIClusterConfig, PAIJobConfig, PAITaskRole } from './paiConfig';
import { PAI_LOG_PATH_FORMAT, PAI_OUTPUT_DIR_FORMAT, PAI_TRIAL_COMMAND_FORMAT, PAITrialJobDetail } from './paiData';
import { PAIJobInfoCollector } from './paiJobInfoCollector';
48
import { PAIJobRestServer, ParameterFileMeta } from './paiJobRestServer';
49

50
import * as WebHDFS from 'webhdfs';
51
52
53
54
55
56
57
58
59
60
61
62
63

/**
 * Training Service implementation for OpenPAI (Open Platform for AI)
 * Refer https://github.com/Microsoft/pai for more info about OpenPAI
 */
@component.Singleton
class PAITrainingService implements TrainingService {
    private readonly log!: Logger;
    private readonly metricsEmitter: EventEmitter;
    private readonly trialJobsMap: Map<string, PAITrialJobDetail>;
    private readonly expRootDir: string;
    private paiTrialConfig: NNIPAITrialConfig | undefined;
    private paiClusterConfig?: PAIClusterConfig;
64
    private readonly jobQueue: string[];
65
    private stopping: boolean = false;
66
    // tslint:disable-next-line:no-any
67
68
    private hdfsClient: any;
    private paiToken? : string;
69
    private paiTokenUpdateTime?: number;
70
71
    private readonly paiTokenUpdateInterval: number;
    private readonly experimentId! : string;
72
73
    private readonly paiJobCollector : PAIJobInfoCollector;
    private readonly hdfsDirPattern: string;
fishyds's avatar
fishyds committed
74
75
    private hdfsBaseDir: string | undefined;
    private hdfsOutputHost: string | undefined;
76
    private nextTrialSequenceId: number;
77
    private paiRestServerPort?: number;
78
    private nniManagerIpConfig?: NNIManagerIpConfig;
79
    private copyExpCodeDirPromise?: Promise<void>;
80
    private versionCheck: boolean = true;
SparkSnail's avatar
SparkSnail committed
81
    private logCollection: string;
82
    private isMultiPhase: boolean = false;
83
84
85
86
87

    constructor() {
        this.log = getLogger();
        this.metricsEmitter = new EventEmitter();
        this.trialJobsMap = new Map<string, PAITrialJobDetail>();
88
        this.jobQueue = [];
89
90
        // Root dir on HDFS
        this.expRootDir = path.join('/nni', 'experiments', getExperimentId());
91
        this.experimentId = getExperimentId();
92
93
        this.paiJobCollector = new PAIJobInfoCollector(this.trialJobsMap);
        this.hdfsDirPattern = 'hdfs://(?<host>([0-9]{1,3}.){3}[0-9]{1,3})(:[0-9]{2,5})?(?<baseDir>/.*)?';
94
        this.nextTrialSequenceId = -1;
95
        this.paiTokenUpdateInterval = 7200000; //2hours
SparkSnail's avatar
SparkSnail committed
96
        this.logCollection = 'none';
chicm-ms's avatar
chicm-ms committed
97
        this.log.info('Construct OpenPAI training service.');
98
99
100
    }

    public async run(): Promise<void> {
chicm-ms's avatar
chicm-ms committed
101
        this.log.info('Run PAI training service.');
102
103
        const restServer: PAIJobRestServer = component.get(PAIJobRestServer);
        await restServer.start();
104
        restServer.setEnableVersionCheck = this.versionCheck;
105
        this.log.info(`PAI Training service rest server listening on: ${restServer.endPoint}`);
106
107
108
        await Promise.all([
            this.statusCheckingLoop(),
            this.submitJobLoop()]);
chicm-ms's avatar
chicm-ms committed
109
        this.log.info('PAI training service exit.');
110
111
112
113
    }

    public async listTrialJobs(): Promise<TrialJobDetail[]> {
        const jobs: TrialJobDetail[] = [];
114
115

        for (const [key, value] of this.trialJobsMap) {
116
117
118
            if (value.form.jobType === 'TRIAL') {
                jobs.push(await this.getTrialJob(key));
            }
119
        }
120
121
122
123

        return Promise.resolve(jobs);
    }

124
    public async getTrialJob(trialJobId: string): Promise<TrialJobDetail> {
125
        if (this.paiClusterConfig === undefined) {
126
127
128
129
130
            throw new Error('PAI Cluster config is not initialized');
        }

        const paiTrialJob: PAITrialJobDetail | undefined = this.trialJobsMap.get(trialJobId);

131
        if (paiTrialJob === undefined) {
132
133
            return Promise.reject(`trial job ${trialJobId} not found`);
        }
134
135
136
137

        return Promise.resolve(paiTrialJob);
    }

138
    public addTrialJobMetricListener(listener: (metric: TrialJobMetric) => void): void {
139
140
141
        this.metricsEmitter.on('metric', listener);
    }

142
    public removeTrialJobMetricListener(listener: (metric: TrialJobMetric) => void): void {
143
144
145
146
147
        this.metricsEmitter.off('metric', listener);
    }

    public async submitTrialJob(form: JobApplicationForm): Promise<TrialJobDetail> {
        const deferred : Deferred<PAITrialJobDetail> = new Deferred<PAITrialJobDetail>();
148
        if (this.hdfsBaseDir === undefined) {
fishyds's avatar
fishyds committed
149
150
151
            throw new Error('hdfsBaseDir is not initialized');
        }

152
153
154
        this.log.info(`submitTrialJob: form: ${JSON.stringify(form)}`);

        const trialJobId: string = uniqueString(5);
155
        const trialSequenceId: number = this.generateSequenceId();
156
157
        //TODO: use HDFS working folder instead
        const trialWorkingFolder: string = path.join(this.expRootDir, 'trials', trialJobId);
158
        const paiJobName: string = `nni_exp_${this.experimentId}_trial_${trialJobId}`;
159

fishyds's avatar
fishyds committed
160
        const hdfsOutputDir : string = path.join(this.hdfsBaseDir, this.experimentId, trialJobId);
161
162
        const hdfsLogPath : string = String.Format(
            PAI_LOG_PATH_FORMAT,
fishyds's avatar
fishyds committed
163
            this.hdfsOutputHost,
164
165
166
167
168
            hdfsOutputDir);

        const trialJobDetail: PAITrialJobDetail = new PAITrialJobDetail(
            trialJobId,
            'WAITING',
169
            paiJobName,
170
171
            Date.now(),
            trialWorkingFolder,
172
173
            form,
            trialSequenceId,
174
175
            hdfsLogPath);

176
177
178
        this.trialJobsMap.set(trialJobId, trialJobDetail);
        this.jobQueue.push(trialJobId);
        deferred.resolve(trialJobDetail);
179
180
181
182

        return deferred.promise;
    }

183
184
185
186
187
188
189
190
191
192
193
194
    public async updateTrialJob(trialJobId: string, form: JobApplicationForm): Promise<TrialJobDetail> {
        const trialJobDetail: undefined | TrialJobDetail = this.trialJobsMap.get(trialJobId);
        if (trialJobDetail === undefined) {
            throw new Error(`updateTrialJob failed: ${trialJobId} not found`);
        }
        if (form.jobType === 'TRIAL') {
                await this.writeParameterFile(trialJobId, (<TrialJobApplicationForm>form).hyperParameters);
        } else {
            throw new Error(`updateTrialJob failed: jobType ${form.jobType} not supported.`);
        }

        return trialJobDetail;
195
196
197
    }

    public get isMultiPhaseJobSupported(): boolean {
198
        return true;
199
200
    }

201
    // tslint:disable:no-http-string
QuanluZhang's avatar
QuanluZhang committed
202
    public cancelTrialJob(trialJobId: string, isEarlyStopped: boolean = false): Promise<void> {
203
204
        const trialJobDetail : PAITrialJobDetail | undefined =  this.trialJobsMap.get(trialJobId);
        const deferred : Deferred<void> = new Deferred<void>();
205
        if (trialJobDetail === undefined) {
206
            this.log.error(`cancelTrialJob: trial job id ${trialJobId} not found`);
207

208
209
210
            return Promise.reject();
        }

211
        if (this.paiClusterConfig === undefined) {
212
            throw new Error('PAI Cluster config is not initialized');
213
        }
214
        if (this.paiToken === undefined) {
215
216
217
218
            throw new Error('PAI token is not initialized');
        }

        const stopJobRequest: request.Options = {
219
220
            uri: `http://${this.paiClusterConfig.host}/rest-server/api/v1/user/${this.paiClusterConfig.userName}\
/jobs/${trialJobDetail.paiJobName}/executionType`,
221
222
            method: 'PUT',
            json: true,
223
            body: {value: 'STOP'},
224
            headers: {
225
226
                'Content-Type': 'application/json',
                Authorization: `Bearer ${this.paiToken}`
227
228
            }
        };
229
230
231
232

        // Set trialjobDetail's early stopped field, to mark the job's cancellation source
        trialJobDetail.isEarlyStopped = isEarlyStopped;

233
        // tslint:disable-next-line:no-any
234
        request(stopJobRequest, (error: Error, response: request.Response, body: any) => {
235
            if ((error !== undefined && error !== null) || response.statusCode >= 400) {
236
                this.log.error(`PAI Training service: stop trial ${trialJobId} to PAI Cluster failed!`);
237
238
                deferred.reject((error !== undefined && error !== null) ? error.message :
                 `Stop trial failed, http code: ${response.statusCode}`);
239
240
241
242
243
            } else {
                deferred.resolve();
            }
        });

244
        return deferred.promise;
245
246
    }

247
    // tslint:disable: no-unsafe-any no-any
248
    // tslint:disable-next-line:max-func-body-length
fishyds's avatar
fishyds committed
249
    public async setClusterMetadata(key: string, value: string): Promise<void> {
250
251
252
        const deferred : Deferred<void> = new Deferred<void>();

        switch (key) {
253
254
255
256
257
            case TrialConfigMetadataKey.NNI_MANAGER_IP:
                this.nniManagerIpConfig = <NNIManagerIpConfig>JSON.parse(value);
                deferred.resolve();
                break;

258
259
            case TrialConfigMetadataKey.PAI_CLUSTER_CONFIG:
                this.paiClusterConfig = <PAIClusterConfig>JSON.parse(value);
260

261
262
                this.hdfsClient = WebHDFS.createClient({
                    user: this.paiClusterConfig.userName,
263
264
                    // Refer PAI document for Pylon mapping https://github.com/Microsoft/pai/tree/master/docs/pylon
                    port: 80,
265
                    path: '/webhdfs/api/v1',
266
267
268
269
                    host: this.paiClusterConfig.host
                });

                // Get PAI authentication token
270
                await this.updatePaiToken();
271
                deferred.resolve();
272
                break;
273

274
            case TrialConfigMetadataKey.TRIAL_CONFIG:
275
                if (this.paiClusterConfig === undefined) {
276
                    this.log.error('pai cluster config is not initialized');
fishyds's avatar
fishyds committed
277
                    deferred.reject(new Error('pai cluster config is not initialized'));
278
279
280
281
                    break;
                }
                this.paiTrialConfig = <NNIPAITrialConfig>JSON.parse(value);
                //paiTrialConfig.outputDir could be null if it is not set in nnictl
282
                if (this.paiTrialConfig.outputDir === undefined || this.paiTrialConfig.outputDir === null) {
283
284
285
                    this.paiTrialConfig.outputDir = String.Format(
                        PAI_OUTPUT_DIR_FORMAT,
                        this.paiClusterConfig.host
286
287
                    )
                    .replace(/\r\n|\n|\r/gm, '');
288
                }
289

290
291
292
                // Validate to make sure codeDir doesn't have too many files
                try {
                    await validateCodeDir(this.paiTrialConfig.codeDir);
293
                } catch (error) {
294
295
296
297
298
                    this.log.error(error);
                    deferred.reject(new Error(error));
                    break;
                }

299
                const hdfsDirContent: any = this.paiTrialConfig.outputDir.match(this.hdfsDirPattern);
fishyds's avatar
fishyds committed
300

301
                if (hdfsDirContent === null) {
fishyds's avatar
fishyds committed
302
303
                    throw new Error('Trial outputDir format Error');
                }
304
                const groups: any = hdfsDirContent.groups;
305
                if (groups === undefined) {
fishyds's avatar
fishyds committed
306
307
                    throw new Error('Trial outputDir format Error');
                }
308
                this.hdfsOutputHost = groups.host;
309
                //TODO: choose to use /${username} as baseDir
310
311
                this.hdfsBaseDir = groups.baseDir;
                if (this.hdfsBaseDir === undefined) {
312
                    this.hdfsBaseDir = '/';
fishyds's avatar
fishyds committed
313
                }
314

315
                let dataOutputHdfsClient: any;
316
                if (this.paiClusterConfig.host === this.hdfsOutputHost && this.hdfsClient) {
317
                    dataOutputHdfsClient = this.hdfsClient;
318
319
320
321
322
323
324
                } else {
                    dataOutputHdfsClient = WebHDFS.createClient({
                        user: this.paiClusterConfig.userName,
                        port: 50070,
                        host: this.hdfsOutputHost
                    });
                }
fishyds's avatar
fishyds committed
325
326

                try {
327
328
                    const exist : boolean = await HDFSClientUtility.pathExists('/', dataOutputHdfsClient);
                    if (!exist) {
fishyds's avatar
fishyds committed
329
330
                        deferred.reject(new Error(`Please check hdfsOutputDir host!`));
                    }
331
                } catch (error) {
fishyds's avatar
fishyds committed
332
333
                    deferred.reject(new Error(`HDFS encounters problem, error is ${error}. Please check hdfsOutputDir host!`));
                }
334

335
                // Copy experiment files from local folder to HDFS
336
337
                this.copyExpCodeDirPromise = HDFSClientUtility.copyDirectoryToHdfs(
                    this.paiTrialConfig.codeDir,
338
                    HDFSClientUtility.getHdfsExpCodeDir(this.paiClusterConfig.userName),
339
340
                    this.hdfsClient
                );
fishyds's avatar
fishyds committed
341

342
343
                deferred.resolve();
                break;
344
345
346
            case TrialConfigMetadataKey.VERSION_CHECK:
                this.versionCheck = (value === 'true' || value === 'True');
                break;
SparkSnail's avatar
SparkSnail committed
347
348
349
            case TrialConfigMetadataKey.LOG_COLLECTION:
                this.logCollection = value;
                break;
350
351
352
            case TrialConfigMetadataKey.MULTI_PHASE:
                this.isMultiPhase = (value === 'true' || value === 'True');
                break;
353
354
355
356
357
            default:
                //Reject for unknown keys
                throw new Error(`Uknown key: ${key}`);
        }

358
        return deferred.promise;
359
    }
360
    // tslint:enable: no-unsafe-any
361
362
363
364
365

    public getClusterMetadata(key: string): Promise<string> {
        const deferred : Deferred<string> = new Deferred<string>();

        deferred.resolve();
366
367

        return deferred.promise;
368
369
370
    }

    public async cleanUp(): Promise<void> {
chicm-ms's avatar
chicm-ms committed
371
        this.log.info('Stopping PAI training service...');
372
373
374
375
376
377
378
379
380
        this.stopping = true;

        const deferred : Deferred<void> = new Deferred<void>();
        const restServer: PAIJobRestServer = component.get(PAIJobRestServer);
        try {
            await restServer.stop();
            deferred.resolve();
            this.log.info('PAI Training service rest server stopped successfully.');
        } catch (error) {
381
            // tslint:disable-next-line: no-unsafe-any
382
            this.log.error(`PAI Training service rest server stopped failed, error: ${error.message}`);
383
384
385
            deferred.reject(error);
        }

386
        return deferred.promise;
387
388
389
390
391
    }

    public get MetricsEmitter() : EventEmitter {
        return this.metricsEmitter;
    }
392

393
394
395
396
397
    // tslint:disable-next-line:max-func-body-length
    private async submitTrialJobToPAI(trialJobId: string): Promise<boolean> {
        const deferred : Deferred<boolean> = new Deferred<boolean>();
        const trialJobDetail: PAITrialJobDetail | undefined = this.trialJobsMap.get(trialJobId);

398
        if (trialJobDetail === undefined) {
399
400
401
            throw new Error(`Failed to find PAITrialJobDetail for job ${trialJobId}`);
        }

402
        if (this.paiClusterConfig === undefined) {
403
404
            throw new Error('PAI Cluster config is not initialized');
        }
405
        if (this.paiTrialConfig === undefined) {
406
407
            throw new Error('trial config is not initialized');
        }
408
        if (this.paiToken === undefined) {
409
410
411
            throw new Error('PAI token is not initialized');
        }

412
        if (this.hdfsBaseDir === undefined) {
413
414
415
            throw new Error('hdfsBaseDir is not initialized');
        }

416
        if (this.hdfsOutputHost === undefined) {
417
418
419
            throw new Error('hdfsOutputHost is not initialized');
        }

420
        if (this.paiRestServerPort === undefined) {
421
422
423
424
425
            const restServer: PAIJobRestServer = component.get(PAIJobRestServer);
            this.paiRestServerPort = restServer.clusterRestServerPort;
        }

        // Make sure experiment code files is copied from local to HDFS
426
        if (this.copyExpCodeDirPromise !== undefined) {
427
428
429
430
            await this.copyExpCodeDirPromise;
        }

        // Step 1. Prepare PAI job configuration
431
        const hdfsOutputDir : string = unixPathJoin(this.hdfsBaseDir, this.experimentId, trialJobId);
432
433
434
435
        const hdfsCodeDir: string = HDFSClientUtility.getHdfsTrialWorkDir(this.paiClusterConfig.userName, trialJobId);

        const trialLocalTempFolder: string = path.join(getExperimentRootDir(), 'trials-local', trialJobId);
        //create tmp trial working folder locally.
436
        await execMkdir(trialLocalTempFolder);
437
438
439
440
441
442
443

        const runScriptContent : string = CONTAINER_INSTALL_NNI_SHELL_FORMAT;
        // Write NNI installation file to local tmp files
        await fs.promises.writeFile(path.join(trialLocalTempFolder, 'install_nni.sh'), runScriptContent, { encoding: 'utf8' });

        // Write file content ( parameter.cfg ) to local tmp folders
        const trialForm : TrialJobApplicationForm = (<TrialJobApplicationForm>trialJobDetail.form);
444
        if (trialForm !== undefined) {
445
446
447
448
449
450
            await fs.promises.writeFile(
                path.join(trialLocalTempFolder, generateParamFileName(trialForm.hyperParameters)),
                trialForm.hyperParameters.value, { encoding: 'utf8' }
            );
        }

451
        // tslint:disable-next-line: strict-boolean-expressions
452
453
454
455
456
457
458
459
460
461
        const nniManagerIp: string = this.nniManagerIpConfig ? this.nniManagerIpConfig.nniManagerIp : getIPV4Address();
        const version: string = this.versionCheck ? await getVersion() : '';
        const nniPaiTrialCommand : string = String.Format(
            PAI_TRIAL_COMMAND_FORMAT,
            // PAI will copy job's codeDir into /root directory
            `$PWD/${trialJobId}`,
            `$PWD/${trialJobId}/nnioutput`,
            trialJobId,
            this.experimentId,
            trialJobDetail.sequenceId,
462
            this.isMultiPhase,
463
464
465
466
467
468
469
470
471
            this.paiTrialConfig.command,
            nniManagerIp,
            this.paiRestServerPort,
            hdfsOutputDir,
            this.hdfsOutputHost,
            this.paiClusterConfig.userName,
            HDFSClientUtility.getHdfsExpCodeDir(this.paiClusterConfig.userName),
            version,
            this.logCollection
472
473
        )
        .replace(/\r\n|\n|\r/gm, '');
474

475
        // tslint:disable-next-line:no-console
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
        console.log(`nniPAItrial command is ${nniPaiTrialCommand.trim()}`);
        const paiTaskRoles : PAITaskRole[] = [
            new PAITaskRole(
                `nni_trail_${trialJobId}`,
                // Task role number
                1,
                // Task CPU number
                this.paiTrialConfig.cpuNum,
                // Task memory
                this.paiTrialConfig.memoryMB,
                // Task GPU number
                this.paiTrialConfig.gpuNum,
                // Task command
                nniPaiTrialCommand,
                // Task shared memory
                this.paiTrialConfig.shmMB
            )
        ];

        const paiJobConfig : PAIJobConfig = new PAIJobConfig(
            // Job name
            trialJobDetail.paiJobName,
            // Docker image
            this.paiTrialConfig.image,
            // dataDir
            this.paiTrialConfig.dataDir,
            // outputDir
            this.paiTrialConfig.outputDir,
            // codeDir
            `$PAI_DEFAULT_FS_URI${hdfsCodeDir}`,
            // PAI Task roles
            paiTaskRoles,
            // Add Virutal Cluster
            this.paiTrialConfig.virtualCluster === undefined ? 'default' : this.paiTrialConfig.virtualCluster.toString()
        );

        // Step 2. Upload code files in codeDir onto HDFS
        try {
            await HDFSClientUtility.copyDirectoryToHdfs(trialLocalTempFolder, hdfsCodeDir, this.hdfsClient);
        } catch (error) {
            this.log.error(`PAI Training service: copy ${this.paiTrialConfig.codeDir} to HDFS ${hdfsCodeDir} failed, error is ${error}`);
517
518
519
520
            trialJobDetail.status = 'FAILED';
            deferred.resolve(true);

            return deferred.promise;
521
522
523
524
525
526
527
528
529
530
531
532
533
534
        }

        // Step 3. Submit PAI job via Rest call
        // Refer https://github.com/Microsoft/pai/blob/master/docs/rest-server/API.md for more detail about PAI Rest API
        const submitJobRequest: request.Options = {
            uri: `http://${this.paiClusterConfig.host}/rest-server/api/v1/user/${this.paiClusterConfig.userName}/jobs`,
            method: 'POST',
            json: true,
            body: paiJobConfig,
            headers: {
                'Content-Type': 'application/json',
                Authorization: `Bearer ${this.paiToken}`
            }
        };
535
        // tslint:disable:no-any no-unsafe-any
536
        request(submitJobRequest, (error: Error, response: request.Response, body: any) => {
537
538
            if ((error !== undefined && error !== null) || response.statusCode >= 400) {
                const errorMessage : string = (error !== undefined && error !== null) ? error.message :
539
540
541
                    `Submit trial ${trialJobId} failed, http code:${response.statusCode}, http body: ${response.body}`;
                this.log.error(errorMessage);
                trialJobDetail.status = 'FAILED';
542
                deferred.resolve(true);
543
544
545
546
547
548
549
550
551
            } else {
                trialJobDetail.submitTime = Date.now();
                deferred.resolve(true);
            }
        });

        return deferred.promise;
    }

552
    private generateSequenceId(): number {
553
554
        if (this.nextTrialSequenceId === -1) {
            this.nextTrialSequenceId = getInitTrialSequenceId();
555
556
        }

557
        return this.nextTrialSequenceId++;
558
    }
559
560
561

    private async statusCheckingLoop(): Promise<void> {
        while (!this.stopping) {
562
            try {
SparkSnail's avatar
SparkSnail committed
563
                await this.updatePaiToken();
564
            } catch (error) {
SparkSnail's avatar
SparkSnail committed
565
566
                this.log.error(`${error}`);
                //only throw error when initlize paiToken first time
567
                if (this.paiToken === undefined) {
SparkSnail's avatar
SparkSnail committed
568
569
570
                    throw new Error(error);
                }
            }
571
572
            await this.paiJobCollector.retrieveTrialStatus(this.paiToken, this.paiClusterConfig);
            const restServer: PAIJobRestServer = component.get(PAIJobRestServer);
573
            if (restServer.getErrorMessage !== undefined) {
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
                throw new Error(restServer.getErrorMessage);
            }
            await delay(3000);
        }
    }

    private async submitJobLoop(): Promise<void> {
        while (!this.stopping) {
            while (!this.stopping && this.jobQueue.length > 0) {
                const trialJobId: string = this.jobQueue[0];
                if (await this.submitTrialJobToPAI(trialJobId)) {
                    // Remove trial job with trialJobId from job queue
                    this.jobQueue.shift();
                } else {
                    // Break the while loop since failed to submitJob
                    break;
                }
            }
            await delay(3000);
        }
    }

596
597
598
599
600
    /**
     * Update pai token by the interval time or initialize the pai token
     */
    private async updatePaiToken(): Promise<void> {
        const deferred : Deferred<void> = new Deferred<void>();
601
602

        const currentTime: number = new Date().getTime();
603
        //If pai token initialized and not reach the interval time, do not update
604
        if (this.paiTokenUpdateTime !== undefined && (currentTime - this.paiTokenUpdateTime) < this.paiTokenUpdateInterval) {
605
606
            return Promise.resolve();
        }
607

608
        if (this.paiClusterConfig === undefined) {
609
            const paiClusterConfigError: string = `pai cluster config not initialized!`;
610
            this.log.error(`${paiClusterConfigError}`);
611
            throw Error(`${paiClusterConfigError}`);
612
613
        }

614
        const authenticationReq: request.Options = {
615
616
617
618
619
620
621
622
623
            uri: `http://${this.paiClusterConfig.host}/rest-server/api/v1/token`,
            method: 'POST',
            json: true,
            body: {
                username: this.paiClusterConfig.userName,
                password: this.paiClusterConfig.passWord
            }
        };

624
625
        request(authenticationReq, (error: Error, response: request.Response, body: any) => {
            if (error !== undefined && error !== null) {
626
627
628
                this.log.error(`Get PAI token failed: ${error.message}`);
                deferred.reject(new Error(`Get PAI token failed: ${error.message}`));
            } else {
629
                if (response.statusCode !== 200) {
630
                    this.log.error(`Get PAI token failed: get PAI Rest return code ${response.statusCode}`);
631
                    deferred.reject(new Error(`Get PAI token failed: ${response.body}, please check paiConfig username or password`));
632
633
634
635
636
637
                }
                this.paiToken = body.token;
                this.paiTokenUpdateTime = new Date().getTime();
                deferred.resolve();
            }
        });
638

639
640
641
642
643
644
645
646
        let timeoutId: NodeJS.Timer;
        const timeoutDelay: Promise<void> = new Promise<void>((resolve: Function, reject: Function): void => {
            // Set timeout and reject the promise once reach timeout (5 seconds)
            timeoutId = setTimeout(
                () => reject(new Error('Get PAI token timeout. Please check your PAI cluster.')),
                5000);
        });

647
        return Promise.race([timeoutDelay, deferred.promise])
648
            .finally(() => { clearTimeout(timeoutId); });
649
    }
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693

    private async writeParameterFile(trialJobId: string, hyperParameters: HyperParameters): Promise<void> {
        if (this.paiClusterConfig === undefined) {
            throw new Error('PAI Cluster config is not initialized');
        }
        if (this.paiTrialConfig === undefined) {
            throw new Error('PAI trial config is not initialized');
        }

        const trialLocalTempFolder: string = path.join(getExperimentRootDir(), 'trials-local', trialJobId);
        const hpFileName: string = generateParamFileName(hyperParameters);
        const localFilepath: string = path.join(trialLocalTempFolder, hpFileName);
        await fs.promises.writeFile(localFilepath, hyperParameters.value, { encoding: 'utf8' });
        const hdfsCodeDir: string = HDFSClientUtility.getHdfsTrialWorkDir(this.paiClusterConfig.userName, trialJobId);
        const hdfsHpFilePath: string = path.join(hdfsCodeDir, hpFileName);

        await HDFSClientUtility.copyFileToHdfs(localFilepath, hdfsHpFilePath, this.hdfsClient);

        await this.postParameterFileMeta({
            experimentId: this.experimentId,
            trialId: trialJobId,
            filePath: hdfsHpFilePath
        });
    }

    private postParameterFileMeta(parameterFileMeta: ParameterFileMeta): Promise<void> {
        const deferred : Deferred<void> = new Deferred<void>();
        const restServer: PAIJobRestServer = component.get(PAIJobRestServer);
        const req: request.Options = {
            uri: `${restServer.endPoint}${restServer.apiRootUrl}/parameter-file-meta`,
            method: 'POST',
            json: true,
            body: parameterFileMeta
        };
        request(req, (err: Error, res: request.Response) => {
            if (err) {
                deferred.reject(err);
            } else {
                deferred.resolve();
            }
        });

        return deferred.promise;
    }
694
695
}

696
export { PAITrainingService };