nnimanager.ts 42.2 KB
Newer Older
liuzhe-lz's avatar
liuzhe-lz committed
1
2
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
Deshui Yu's avatar
Deshui Yu committed
3

4
import assert from 'assert';
chicm-ms's avatar
chicm-ms committed
5
import { ChildProcess, StdioOptions } from 'child_process';
Deshui Yu's avatar
Deshui Yu committed
6
7
8
import { Deferred } from 'ts-deferred';
import * as component from '../common/component';
import { DataStore, MetricDataRecord, MetricType, TrialJobInfo } from '../common/datastore';
9
import { NNIError } from '../common/errors';
10
import { getExperimentId } from '../common/experimentStartupInfo';
11
12
import globals from 'common/globals';
import { Logger, getLogger } from '../common/log';
Deshui Yu's avatar
Deshui Yu committed
13
import {
14
    ExperimentProfile, Manager, ExperimentStatus,
15
    NNIManagerStatus, ProfileUpdateType, TrialJobStatistics
Deshui Yu's avatar
Deshui Yu committed
16
} from '../common/manager';
liuzhe-lz's avatar
liuzhe-lz committed
17
import { ExperimentConfig, LocalConfig, toSeconds, toCudaVisibleDevices } from '../common/experimentConfig';
18
import { getExperimentsManager } from 'extensions/experiments_manager';
J-shang's avatar
J-shang committed
19
import { TensorboardManager } from '../common/tensorboardManager';
Deshui Yu's avatar
Deshui Yu committed
20
import {
QuanluZhang's avatar
QuanluZhang committed
21
    TrainingService, TrialJobApplicationForm, TrialJobDetail, TrialJobMetric, TrialJobStatus, TrialCommandContent, PlacementConstraint
Deshui Yu's avatar
Deshui Yu committed
22
} from '../common/trainingService';
23
import { delay, getCheckpointDir, getExperimentRootDir, getLogDir, getMsgDispatcherCommand, mkDirP, getTunerProc, getLogLevel, isAlive, killPid } from '../common/utils';
Deshui Yu's avatar
Deshui Yu committed
24
import {
chicm-ms's avatar
chicm-ms committed
25
    INITIALIZE, INITIALIZED, KILL_TRIAL_JOB, NEW_TRIAL_JOB, NO_MORE_TRIAL_JOBS, PING,
26
    REPORT_METRIC_DATA, REQUEST_TRIAL_JOBS, SEND_TRIAL_JOB_PARAMETER, TERMINATE, TRIAL_END, UPDATE_SEARCH_SPACE, IMPORT_DATA, ADD_CUSTOMIZED_TRIAL_JOB
Deshui Yu's avatar
Deshui Yu committed
27
} from './commands';
28
import { createDispatcherInterface, IpcInterface } from './ipcInterface';
Deshui Yu's avatar
Deshui Yu committed
29
30

/**
chicm-ms's avatar
chicm-ms committed
31
 * NNIManager which implements Manager interface
Deshui Yu's avatar
Deshui Yu committed
32
33
 */
class NNIManager implements Manager {
34
    private trainingService!: TrainingService;
35
    private dispatcher: IpcInterface | undefined;
36
    private currSubmittedTrialNum: number;  // need to be recovered
QuanluZhang's avatar
QuanluZhang committed
37
    private trialConcurrencyChange: number; // >0: increase, <0: decrease
Deshui Yu's avatar
Deshui Yu committed
38
39
    private log: Logger;
    private dataStore: DataStore;
40
    private experimentProfile!: ExperimentProfile;
41
    private dispatcherPid: number;
42
    private status: NNIManagerStatus;
43
    private waitingTrials: TrialJobApplicationForm[];
QuanluZhang's avatar
QuanluZhang committed
44
    private trialJobs: Map<string, TrialJobDetail>;
45
    private trialDataForTuner: string;
46
    private trialDataForResume: string;
SparkSnail's avatar
SparkSnail committed
47
    private readonly: boolean;
48
    private config!: ExperimentConfig;
49

50
    private trialJobMetricListener: (metric: TrialJobMetric) => void;
51

Deshui Yu's avatar
Deshui Yu committed
52
53
    constructor() {
        this.currSubmittedTrialNum = 0;
QuanluZhang's avatar
QuanluZhang committed
54
        this.trialConcurrencyChange = 0;
55
        this.dispatcherPid = 0;
QuanluZhang's avatar
QuanluZhang committed
56
57
        this.waitingTrials = [];
        this.trialJobs = new Map<string, TrialJobDetail>();
58
        this.trialDataForTuner = '';
59
        this.trialDataForResume = '';
SparkSnail's avatar
SparkSnail committed
60
        this.readonly = false;
Deshui Yu's avatar
Deshui Yu committed
61

liuzhe-lz's avatar
liuzhe-lz committed
62
        this.log = getLogger('NNIManager');
Deshui Yu's avatar
Deshui Yu committed
63
        this.dataStore = component.get(DataStore);
64
65
66
        this.status = {
            status: 'INITIALIZED',
            errors: []
Deshui Yu's avatar
Deshui Yu committed
67
        };
chicm-ms's avatar
chicm-ms committed
68
        this.trialJobMetricListener = (metric: TrialJobMetric): void => {
69
70
71
72
            this.onTrialJobMetrics(metric).catch((err: Error) => {
                this.criticalError(NNIError.FromError(err, 'Job metrics error: '));
            });
        };
73

74
        globals.shutdown.register('NniManager', this.stopExperiment.bind(this));
Deshui Yu's avatar
Deshui Yu committed
75
76
77
    }

    public updateExperimentProfile(experimentProfile: ExperimentProfile, updateType: ProfileUpdateType): Promise<void> {
SparkSnail's avatar
SparkSnail committed
78
79
80
        if (this.readonly) {
            return Promise.reject(new Error('Error: can not update experiment profile in readonly mode!'));
        }
Deshui Yu's avatar
Deshui Yu committed
81
82
83
84
85
        switch (updateType) {
            case 'TRIAL_CONCURRENCY':
                this.updateTrialConcurrency(experimentProfile.params.trialConcurrency);
                break;
            case 'MAX_EXEC_DURATION':
86
                this.experimentProfile.params.maxExperimentDuration = experimentProfile.params.maxExperimentDuration;
Deshui Yu's avatar
Deshui Yu committed
87
88
89
90
                break;
            case 'SEARCH_SPACE':
                this.updateSearchSpace(experimentProfile.params.searchSpace);
                break;
QuanluZhang's avatar
QuanluZhang committed
91
            case 'MAX_TRIAL_NUM':
92
                this.experimentProfile.params.maxTrialNumber = experimentProfile.params.maxTrialNumber;
QuanluZhang's avatar
QuanluZhang committed
93
                break;
Deshui Yu's avatar
Deshui Yu committed
94
95
96
97
98
99
100
            default:
                throw new Error('Error: unrecognized updateType');
        }

        return this.storeExperimentProfile();
    }

101
    public importData(data: string): Promise<void> {
SparkSnail's avatar
SparkSnail committed
102
103
104
        if (this.readonly) {
            return Promise.reject(new Error('Error: can not import data in readonly mode!'));
        }
105
106
107
108
109
110
111
112
113
114
        if (this.dispatcher === undefined) {
            return Promise.reject(
                new Error('tuner has not been setup')
            );
        }
        this.dispatcher.sendCommand(IMPORT_DATA, data);

        return this.dataStore.storeTrialJobEvent('IMPORT_DATA', '', data);
    }

115
116
117
118
    public getImportedData(): Promise<string[]> {
        return this.dataStore.getImportedData();
    }

119
120
121
122
    public async exportData(): Promise<string> {
        return this.dataStore.exportTrialHpConfigs();
    }

123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
    public addRecoveredTrialJob(allTrialJobs: Array<TrialJobInfo>): void {
        const jobs: Array<TrialJobInfo> = allTrialJobs.filter((job: TrialJobInfo) => job.status === 'WAITING' || job.status === 'RUNNING');
        const trialData: any[] = [];
        let maxSequeceId = 0;
        for (const job of jobs) {
            if (job.sequenceId === undefined || job.hyperParameters === undefined) {
                this.log.warning('The trial to be recovered missing sequenceId and/or hyperParameters', job);
                continue;
            }
            const params: string = job.hyperParameters[0];
            const sequenceId: number = job.sequenceId;
            maxSequeceId = Math.max(maxSequeceId, sequenceId);
            
            const hyperParams = JSON.parse(params);
            const packedParameter = {
                parameter_id: hyperParams['parameter_id'], // eslint-disable-line @typescript-eslint/camelcase
                parameter_source: 'resumed', // eslint-disable-line @typescript-eslint/camelcase
                parameters: hyperParams['parameters'],
                parameter_index: hyperParams['parameter_index'], // eslint-disable-line @typescript-eslint/camelcase
            }
            const form: TrialJobApplicationForm = {
                id: job.trialJobId,
                sequenceId: sequenceId,
                hyperParameters: {
                    value: JSON.stringify(packedParameter),
                    index: 0
                },
            };

            this.waitingTrials.push(form);
            trialData.push(packedParameter);
            this.dataStore.storeTrialJobEvent('ADD_RESUMED', job.trialJobId, '');
        }
        this.trialDataForResume = JSON.stringify(trialData);

        // next sequenceId
        this.experimentProfile.nextSequenceId = maxSequeceId + 1;
    }

162
    public addCustomizedTrialJob(hyperParams: string): Promise<number> {
SparkSnail's avatar
SparkSnail committed
163
164
165
        if (this.readonly) {
            return Promise.reject(new Error('Error: can not add customized trial job in readonly mode!'));
        }
166
        if (this.currSubmittedTrialNum >= this.maxTrialNum) {
167
            return Promise.reject(new Error('reach maxTrialNum'));
Deshui Yu's avatar
Deshui Yu committed
168
        }
169
170
171

        // TODO: NNI manager should not peek tuner's internal protocol, let's refactor this later
        const packedParameter = {
chicm-ms's avatar
chicm-ms committed
172
173
            parameter_id: null, // eslint-disable-line @typescript-eslint/camelcase
            parameter_source: 'customized', // eslint-disable-line @typescript-eslint/camelcase
174
175
176
177
178
179
180
181
182
183
184
            parameters: JSON.parse(hyperParams)
        }

        const form: TrialJobApplicationForm = {
            sequenceId: this.experimentProfile.nextSequenceId++,
            hyperParameters: {
                value: JSON.stringify(packedParameter),
                index: 0
            }
        };
        this.waitingTrials.push(form);
Deshui Yu's avatar
Deshui Yu committed
185
186

        // trial id has not been generated yet, thus use '' instead
187
188
189
        this.dataStore.storeTrialJobEvent('ADD_CUSTOMIZED', '', hyperParams);

        return Promise.resolve(form.sequenceId);
Deshui Yu's avatar
Deshui Yu committed
190
191
192
    }

    public async cancelTrialJobByUser(trialJobId: string): Promise<void> {
SparkSnail's avatar
SparkSnail committed
193
194
195
        if (this.readonly) {
            return Promise.reject(new Error('Error: can not cancel trial job in readonly mode!'));
        }
chicm-ms's avatar
chicm-ms committed
196
        this.log.info(`User cancelTrialJob: ${trialJobId}`);
Deshui Yu's avatar
Deshui Yu committed
197
198
199
200
        await this.trainingService.cancelTrialJob(trialJobId);
        await this.dataStore.storeTrialJobEvent('USER_TO_CANCEL', trialJobId, '');
    }

201
202
203
204
205
206
207
208
209
210
211
    public async startExperiment(config: ExperimentConfig): Promise<string> {
        this.experimentProfile = {
            params: config,
            id: getExperimentId(),
            execDuration: 0,
            logDir: getExperimentRootDir(),
            startTime: Date.now(),
            endTime: undefined,
            nextSequenceId: 0,
            revision: 0
        };
212
        this.config = config;
chicm-ms's avatar
chicm-ms committed
213
        this.log.info(`Starting experiment: ${this.experimentProfile.id}`);
Deshui Yu's avatar
Deshui Yu committed
214
        await this.storeExperimentProfile();
215

216
217
218
219
        if (this.trainingService === undefined) {
            this.log.info('Setup training service...');
            this.trainingService = await this.initTrainingService(config);
        }
220

221
        this.log.info('Setup tuner...');
222
        const dispatcherCommand: string[] = getMsgDispatcherCommand(config);
223
        this.log.debug(`dispatcher command: ${dispatcherCommand}`);
QuanluZhang's avatar
QuanluZhang committed
224
        const checkpointDir: string = await this.createCheckpointDir();
225
        await this.setupTuner(dispatcherCommand, undefined, 'start', checkpointDir);
chicm-ms's avatar
chicm-ms committed
226
        this.setStatus('RUNNING');
Deshui Yu's avatar
Deshui Yu committed
227
        await this.storeExperimentProfile();
228
229
        this.run().catch((err: Error) => {
            this.criticalError(err);
Deshui Yu's avatar
Deshui Yu committed
230
        });
231

Deshui Yu's avatar
Deshui Yu committed
232
233
234
        return this.experimentProfile.id;
    }

SparkSnail's avatar
SparkSnail committed
235
    public async resumeExperiment(readonly: boolean): Promise<void> {
Deshui Yu's avatar
Deshui Yu committed
236
237
        //Fetch back the experiment profile
        const experimentId: string = getExperimentId();
238
        this.log.info(`Resuming experiment: ${experimentId}`);
Deshui Yu's avatar
Deshui Yu committed
239
        this.experimentProfile = await this.dataStore.getExperimentProfile(experimentId);
240

241
        const config: ExperimentConfig = this.experimentProfile.params;
SparkSnail's avatar
SparkSnail committed
242
243
244
245
246
        this.config = config;
        if (this.trainingService === undefined) {
            this.log.info('Setup training service...');
            this.trainingService = await this.initTrainingService(config);
        }
247

248
249
250
251
252
253
        this.readonly = readonly;
        if (readonly) {
            this.setStatus('VIEWED');
            return;
        }

254
        this.log.info('Setup tuner...');
255
        const dispatcherCommand: string[] = getMsgDispatcherCommand(config);
256
        this.log.debug(`dispatcher command: ${dispatcherCommand}`);
QuanluZhang's avatar
QuanluZhang committed
257
        const checkpointDir: string = await this.createCheckpointDir();
258
        await this.setupTuner(dispatcherCommand, undefined, 'resume', checkpointDir);
Deshui Yu's avatar
Deshui Yu committed
259
260
261
262
263

        const allTrialJobs: TrialJobInfo[] = await this.dataStore.listTrialJobs();

        // Resume currSubmittedTrialNum
        this.currSubmittedTrialNum = allTrialJobs.length;
264
        this.addRecoveredTrialJob(allTrialJobs);
Deshui Yu's avatar
Deshui Yu committed
265

266
267
268
        // Collect generated trials and imported trials
        const finishedTrialData: string = await this.exportData();
        const importedData: string[] = await this.dataStore.getImportedData();
chicm-ms's avatar
chicm-ms committed
269
        let trialData: Record<string, any>[] = JSON.parse(finishedTrialData);
270
271
        for (const oneImportedData of importedData) {
            // do not deduplicate
chicm-ms's avatar
chicm-ms committed
272
            trialData = trialData.concat(<Record<string, any>[]>JSON.parse(oneImportedData));
273
274
275
        }
        this.trialDataForTuner = JSON.stringify(trialData);

276
277
        if (this.experimentProfile.execDuration < this.maxDuration &&
            this.currSubmittedTrialNum < this.maxTrialNum &&
chicm-ms's avatar
chicm-ms committed
278
279
280
            this.experimentProfile.endTime) {
            delete this.experimentProfile.endTime;
        }
chicm-ms's avatar
chicm-ms committed
281
        this.setStatus('RUNNING');
282

Deshui Yu's avatar
Deshui Yu committed
283
        // TO DO: update database record for resume event
284
285
286
        this.run().catch((err: Error) => {
            this.criticalError(err);
        });
Deshui Yu's avatar
Deshui Yu committed
287
288
    }

289
290
    public getTrialJob(trialJobId: string): Promise<TrialJobInfo> {
        return this.dataStore.getTrialJob(trialJobId);
Deshui Yu's avatar
Deshui Yu committed
291
292
    }

liuzhe-lz's avatar
liuzhe-lz committed
293
    public async setClusterMetadata(key: string, value: string): Promise<void> {
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
        // Hack for supporting v2 config, need refactor
        if (this.trainingService === undefined) {
            this.log.info('Setup training service...');
            switch (key) {
                case 'kubeflow_config': {
                    const kubeflowModule = await import('../training_service/kubernetes/kubeflow/kubeflowTrainingService');
                    this.trainingService = new kubeflowModule.KubeflowTrainingService();
                    break;
                }
                case 'frameworkcontroller_config': {
                    const fcModule = await import('../training_service/kubernetes/frameworkcontroller/frameworkcontrollerTrainingService');
                    this.trainingService = new fcModule.FrameworkControllerTrainingService();
                    break;
                }
                case 'adl_config': {
                    const adlModule = await import('../training_service/kubernetes/adl/adlTrainingService');
                    this.trainingService = new adlModule.AdlTrainingService();
                    break;
                }
                default:
                    throw new Error("Setup training service failed.");
            }
liuzhe-lz's avatar
liuzhe-lz committed
316
        }
317
        await this.trainingService.setClusterMetadata(key, value);
Deshui Yu's avatar
Deshui Yu committed
318
319
    }

liuzhe-lz's avatar
liuzhe-lz committed
320
321
    public getClusterMetadata(key: string): Promise<string> {
        return this.trainingService.getClusterMetadata(key);
Deshui Yu's avatar
Deshui Yu committed
322
323
324
325
326
327
    }

    public async getTrialJobStatistics(): Promise<TrialJobStatistics[]> {
        return this.dataStore.getTrialJobStatistics();
    }

328
    private async stopExperiment(): Promise<void> {
329
330
331
332
        await this.stopExperimentTopHalf();
        await this.stopExperimentBottomHalf();
    }

333
    private async stopExperimentTopHalf(): Promise<void> {
chicm-ms's avatar
chicm-ms committed
334
335
        this.setStatus('STOPPING');
        this.log.info('Stopping experiment, cleaning up ...');
336
337
338
339
340
341
342

        if (this.dispatcher === undefined) {
            this.log.error('Tuner has not been setup');
            return;
        }

        this.trainingService.removeTrialJobMetricListener(this.trialJobMetricListener);
343
344
345
346
        // NOTE: this sending TERMINATE should be out of the if clause,
        // because when python dispatcher is started before nnimanager
        // this.dispatcherPid would not have a valid value (i.e., not >0).
        this.dispatcher.sendCommand(TERMINATE);
347
348
349
350
351
352
353
354
355
356
357
358
359
        if (this.dispatcherPid > 0) {
            // gracefully terminate tuner and assessor here, wait at most 30 seconds.
            for (let i: number = 0; i < 30; i++) {
                if (!await isAlive(this.dispatcherPid)) {
                    break;
                }
                await delay(1000);
            }
            await killPid(this.dispatcherPid);
        }
        this.dispatcher = undefined;
    }

360
    private async stopExperimentBottomHalf(): Promise<void> {
361
362
363
364
365
366
367
368
369
370
371
372
373
374
        try {
            const trialJobList: TrialJobDetail[] = await this.trainingService.listTrialJobs();

            // DON'T try to make it in parallel, the training service may not handle it well.
            // If there is performance concern, consider to support batch cancellation on training service.
            for (const trialJob of trialJobList) {
                if (trialJob.status === 'RUNNING' ||
                    trialJob.status === 'WAITING') {
                    try {
                        this.log.info(`cancelTrialJob: ${trialJob.id}`);
                        await this.trainingService.cancelTrialJob(trialJob.id);
                    } catch (error) {
                        this.log.debug(`ignorable error on canceling trial ${trialJob.id}. ${error}`);
                    }
375
376
                }
            }
377
378
379
            await this.trainingService.cleanUp();
        } catch (err) {
            this.log.error(`${err.stack}`);
380
381
382
383
384
385
        }
        if (this.experimentProfile.endTime === undefined) {
            this.setEndtime();
        }
        await this.storeExperimentProfile();
        this.setStatus('STOPPED');
chicm-ms's avatar
chicm-ms committed
386
        this.log.info('Experiment stopped.');
387

388
389
        await component.get<TensorboardManager>(TensorboardManager).stop();
        await this.dataStore.close();
Deshui Yu's avatar
Deshui Yu committed
390
391
    }

392
    public async getMetricData(trialJobId?: string, metricType?: MetricType): Promise<MetricDataRecord[]> {
Deshui Yu's avatar
Deshui Yu committed
393
394
395
        return this.dataStore.getMetricData(trialJobId, metricType);
    }

396
397
398
399
400
401
    public async getMetricDataByRange(minSeqId: number, maxSeqId: number): Promise<MetricDataRecord[]> {
        const trialJobs = await this.dataStore.listTrialJobs();
        const targetTrials = trialJobs.filter(trial => (
            // FIXME: can this be undefined?
            trial.sequenceId !== undefined && minSeqId <= trial.sequenceId && trial.sequenceId <= maxSeqId
        ));
J-shang's avatar
J-shang committed
402
        const targetTrialIds = new Set(targetTrials.map(trial => trial.trialJobId));
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426

        const allMetrics = await this.dataStore.getMetricData();
        return allMetrics.filter(metric => targetTrialIds.has(metric.trialJobId));
    }

    public async getLatestMetricData(): Promise<MetricDataRecord[]> {
        // FIXME: this can take a long time
        const allMetrics: MetricDataRecord[] = await this.dataStore.getMetricData();
        const finals: MetricDataRecord[] = [];
        const latestIntermediates: Map<string, MetricDataRecord> = new Map<string, MetricDataRecord>();
        for (const metric of allMetrics) {
            if (metric.type !== 'PERIODICAL') {
                finals.push(metric);
            } else {
                const old: MetricDataRecord | undefined = latestIntermediates.get(metric.trialJobId);
                if (old === undefined || old.sequence <= metric.sequence) {
                    latestIntermediates.set(metric.trialJobId, metric);
                }
            }
        }
        return finals.concat(Array.from(latestIntermediates.values()));
        // FIXME: unit test
    }

Yuge Zhang's avatar
Yuge Zhang committed
427
428
    public async getTrialFile(trialJobId: string, fileName: string): Promise<Buffer | string> {
        return this.trainingService.getTrialFile(trialJobId, fileName);
429
430
    }

Deshui Yu's avatar
Deshui Yu committed
431
432
433
434
435
436
437
438
    public getExperimentProfile(): Promise<ExperimentProfile> {
        // TO DO: using Promise.resolve()
        const deferred: Deferred<ExperimentProfile> = new Deferred<ExperimentProfile>();
        deferred.resolve(this.experimentProfile);

        return deferred.promise;
    }

439
440
441
442
    public getStatus(): NNIManagerStatus {
        return this.status;
    }

Deshui Yu's avatar
Deshui Yu committed
443
444
445
446
    public async listTrialJobs(status?: TrialJobStatus): Promise<TrialJobInfo[]> {
        return this.dataStore.listTrialJobs(status);
    }

447
448
449
450
451
452
453
454
455
456
    private get maxDuration(): number {
        const value = this.experimentProfile.params.maxExperimentDuration;
        return (value === undefined ? Infinity : toSeconds(value));
    }

    private get maxTrialNum(): number {
        const value = this.experimentProfile.params.maxTrialNumber;
        return (value === undefined ? Infinity : value);
    }

Ni Hao's avatar
Ni Hao committed
457
458
459
460
461
    private get maxTrialDuration(): number {
        const value = this.experimentProfile.params.maxTrialDuration;
        return (value === undefined ? Infinity : toSeconds(value));
    }

462
    private async initTrainingService(config: ExperimentConfig): Promise<TrainingService> {
liuzhe-lz's avatar
liuzhe-lz committed
463
464
465
466
467
468
469
470
471
472
473
        let platform: string;
        if (Array.isArray(config.trainingService)) {
            platform = 'hybrid';
        } else if (config.trainingService.platform) {
            platform = config.trainingService.platform;
        } else {
            platform = (config as any).trainingServicePlatform;
        }
        if (!platform) {
            throw new Error('Cannot detect training service platform');
        }
474
        const reuseMode = Array.isArray(config.trainingService) || (config.trainingService as any).reuseMode;
475

476
477
478
479
        if (reuseMode) {
            const module_ = await import('../training_service/reusable/routerTrainingService');
            return await module_.RouterTrainingService.construct(config);
        } else if (platform === 'local') {
480
            const module_ = await import('../training_service/local/localTrainingService');
liuzhe-lz's avatar
liuzhe-lz committed
481
            return new module_.LocalTrainingService(<LocalConfig>config.trainingService);
482
483
484
        } else if (platform === 'kubeflow') {
            const module_ = await import('../training_service/kubernetes/kubeflow/kubeflowTrainingService');
            return new module_.KubeflowTrainingService();
485
486
487
488
489
490
        } else if (platform === 'frameworkcontroller') {
            const module_ = await import('../training_service/kubernetes/frameworkcontroller/frameworkcontrollerTrainingService');
            return new module_.FrameworkControllerTrainingService();
        } else if (platform === 'adl') {
            const module_ = await import('../training_service/kubernetes/adl/adlTrainingService');
            return new module_.AdlTrainingService();
491
492
493
        } else {
            const module_ = await import('../training_service/reusable/routerTrainingService');
            return await module_.RouterTrainingService.construct(config);
494
495
496
        }
    }

497
    private async setupTuner(command: string[], cwd: string | undefined, mode: 'start' | 'resume', dataDirectory: string): Promise<void> {
498
        if (this.dispatcher !== undefined) {
Deshui Yu's avatar
Deshui Yu committed
499
500
            return;
        }
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516

        let tunerWs: string;
        if (globals.args.urlPrefix) {
            tunerWs = `ws://localhost:${globals.args.port}/${globals.args.urlPrefix}/tuner`;
        } else {
            tunerWs = `ws://localhost:${globals.args.port}/tuner`;
        }

        if (globals.args.tunerCommandChannel) {
            // TODO: this will become configurable after refactoring rest handler interface
            assert.equal(tunerWs, globals.args.tunerCommandChannel);
            this.dispatcher = await createDispatcherInterface();
            return;
        }

        const stdio: StdioOptions = ['ignore', process.stdout, process.stderr];
Deshui Yu's avatar
Deshui Yu committed
517
518
519
520
521
522
523
        let newCwd: string;
        if (cwd === undefined || cwd === '') {
            newCwd = getLogDir();
        } else {
            newCwd = cwd;
        }
        // TO DO: add CUDA_VISIBLE_DEVICES
524
        const includeIntermediateResultsEnv = !!(this.config.deprecated && this.config.deprecated.includeIntermediateResults);
525

chicm-ms's avatar
chicm-ms committed
526
        const nniEnv = {
chicm-ms's avatar
chicm-ms committed
527
            SDK_PROCESS: 'dispatcher',
Zejun Lin's avatar
Zejun Lin committed
528
529
            NNI_MODE: mode,
            NNI_CHECKPOINT_DIRECTORY: dataDirectory,
530
            NNI_LOG_DIRECTORY: getLogDir(),
531
            NNI_LOG_LEVEL: getLogLevel(),
532
            NNI_INCLUDE_INTERMEDIATE_RESULTS: includeIntermediateResultsEnv,
533
            NNI_TUNER_COMMAND_CHANNEL: tunerWs,
534
            CUDA_VISIBLE_DEVICES: toCudaVisibleDevices(this.experimentProfile.params.tunerGpuIndices)
Zejun Lin's avatar
Zejun Lin committed
535
        };
chicm-ms's avatar
chicm-ms committed
536
        const newEnv = Object.assign({}, process.env, nniEnv);
537
        const tunerProc: ChildProcess = getTunerProc(command, stdio, newCwd, newEnv);
538
        this.dispatcherPid = tunerProc.pid!;
539
        this.dispatcher = await createDispatcherInterface();
Deshui Yu's avatar
Deshui Yu committed
540
541
542
543
544

        return;
    }

    private updateTrialConcurrency(trialConcurrency: number): void {
QuanluZhang's avatar
QuanluZhang committed
545
546
        // we assume trialConcurrency >= 0, which is checked by restserver
        this.trialConcurrencyChange += (trialConcurrency - this.experimentProfile.params.trialConcurrency);
Deshui Yu's avatar
Deshui Yu committed
547
548
549
550
551
        this.experimentProfile.params.trialConcurrency = trialConcurrency;

        return;
    }

552
    private updateSearchSpace(searchSpace: object): void {
553
        if (this.dispatcher === undefined) {
Deshui Yu's avatar
Deshui Yu committed
554
555
            throw new Error('Error: tuner has not been setup');
        }
556
        this.log.info(`Updated search space ${searchSpace}`);
557
558
        this.dispatcher.sendCommand(UPDATE_SEARCH_SPACE, JSON.stringify(searchSpace));
        this.experimentProfile.params.searchSpace = searchSpace;
Deshui Yu's avatar
Deshui Yu committed
559
560
561
562
563

        return;
    }

    private async periodicallyUpdateExecDuration(): Promise<void> {
564
        let count: number = 1;
565
        while (!['ERROR', 'STOPPING', 'STOPPED'].includes(this.status.status)) {
566
            await delay(1000 * 1); // 1 seconds
567
            if (['RUNNING', 'NO_MORE_TRIAL', 'TUNER_NO_MORE_TRIAL'].includes(this.status.status)) {
568
569
570
571
572
573
                this.experimentProfile.execDuration += 1;
                if (count % 10 === 0) {
                    await this.storeExperimentProfile();
                }
            }
            count += 1;
Deshui Yu's avatar
Deshui Yu committed
574
575
576
        }
    }

chicm-ms's avatar
chicm-ms committed
577
578
579
580
581
582
    private async pingDispatcher(): Promise<void> {
        if (this.dispatcher === undefined) {
            throw new Error('Error: tuner has not been setup');
        }
        while (!['ERROR', 'STOPPING', 'STOPPED'].includes(this.status.status)) {
            this.dispatcher.sendCommand(PING);
chicm-ms's avatar
chicm-ms committed
583
            await delay(1000 * 5);
chicm-ms's avatar
chicm-ms committed
584
585
586
        }
    }

587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
    private async stopTrialIfOverMaxDurationLimit(): Promise<void> {
        if(this.maxTrialDuration === Infinity){
            return;
        }

        for (const trialJobId of Array.from(this.trialJobs.keys())) {
            const trialJobDetail: TrialJobDetail | undefined = this.trialJobs.get(trialJobId);
            if(undefined !== trialJobDetail &&
                trialJobDetail.status === 'RUNNING' &&
                trialJobDetail.startTime !== undefined){
                const currentTrialDuration = (new Date().getTime() - trialJobDetail.startTime) / 1000;
                if(currentTrialDuration>this.maxTrialDuration) {
                    const isEarlyStopped = true;
                    await this.trainingService.cancelTrialJob(trialJobId, isEarlyStopped);
                    this.log.info(`Trial job ${trialJobDetail.id} has been canceled because it is over max trial duration.`);
                }
            }
604
        }
Ni Hao's avatar
Ni Hao committed
605
606
    }

QuanluZhang's avatar
QuanluZhang committed
607
608
    private async requestTrialJobsStatus(): Promise<number> {
        let finishedTrialJobNum: number = 0;
QuanluZhang's avatar
QuanluZhang committed
609
610
611
        if (this.dispatcher === undefined) {
            throw new Error('Error: tuner has not been setup');
        }
QuanluZhang's avatar
QuanluZhang committed
612
613
614
615
        for (const trialJobId of Array.from(this.trialJobs.keys())) {
            const trialJobDetail: TrialJobDetail = await this.trainingService.getTrialJob(trialJobId);
            const oldTrialJobDetail: TrialJobDetail | undefined = this.trialJobs.get(trialJobId);
            if (oldTrialJobDetail !== undefined && oldTrialJobDetail.status !== trialJobDetail.status) {
chicm-ms's avatar
chicm-ms committed
616
                this.log.info(`Trial job ${trialJobDetail.id} status changed from ${oldTrialJobDetail.status} to ${trialJobDetail.status}`);
QuanluZhang's avatar
QuanluZhang committed
617
                this.trialJobs.set(trialJobId, Object.assign({}, trialJobDetail));
618
                await this.dataStore.storeTrialJobEvent(trialJobDetail.status, trialJobDetail.id, undefined, trialJobDetail);
QuanluZhang's avatar
QuanluZhang committed
619
            }
620
621
622
623
            const newTrialJobDetail: TrialJobDetail | undefined = this.trialJobs.get(trialJobId);
            if (newTrialJobDetail !== undefined) {
                newTrialJobDetail.message = trialJobDetail.message;
            }
QuanluZhang's avatar
QuanluZhang committed
624
            let hyperParams: string | undefined = undefined;
QuanluZhang's avatar
QuanluZhang committed
625
626
627
            switch (trialJobDetail.status) {
                case 'SUCCEEDED':
                case 'USER_CANCELED':
QuanluZhang's avatar
QuanluZhang committed
628
                case 'EARLY_STOPPED':
QuanluZhang's avatar
QuanluZhang committed
629
630
                    this.trialJobs.delete(trialJobId);
                    finishedTrialJobNum++;
631
                    hyperParams = trialJobDetail.form.hyperParameters.value;
QuanluZhang's avatar
QuanluZhang committed
632
                    this.dispatcher.sendCommand(TRIAL_END, JSON.stringify({
chicm-ms's avatar
chicm-ms committed
633
                        trial_job_id: trialJobDetail.id, // eslint-disable-line @typescript-eslint/camelcase
QuanluZhang's avatar
QuanluZhang committed
634
                        event: trialJobDetail.status,
chicm-ms's avatar
chicm-ms committed
635
                        hyper_params: hyperParams // eslint-disable-line @typescript-eslint/camelcase
goooxu's avatar
goooxu committed
636
                    }));
QuanluZhang's avatar
QuanluZhang committed
637
638
639
640
641
642
643
                    break;
                case 'FAILED':
                case 'SYS_CANCELED':
                    // In the current version, we do not retry
                    // TO DO: push this job to queue for retry
                    this.trialJobs.delete(trialJobId);
                    finishedTrialJobNum++;
644
                    hyperParams = trialJobDetail.form.hyperParameters.value;
QuanluZhang's avatar
QuanluZhang committed
645
                    this.dispatcher.sendCommand(TRIAL_END, JSON.stringify({
chicm-ms's avatar
chicm-ms committed
646
                        trial_job_id: trialJobDetail.id, // eslint-disable-line @typescript-eslint/camelcase
QuanluZhang's avatar
QuanluZhang committed
647
                        event: trialJobDetail.status,
chicm-ms's avatar
chicm-ms committed
648
                        hyper_params: hyperParams // eslint-disable-line @typescript-eslint/camelcase
goooxu's avatar
goooxu committed
649
                    }));
QuanluZhang's avatar
QuanluZhang committed
650
651
652
653
654
655
656
657
658
659
                    break;
                case 'WAITING':
                case 'RUNNING':
                case 'UNKNOWN':
                    // Do nothing
                    break;
                default:
                // TO DO: add warning in log
            }
        }
goooxu's avatar
goooxu committed
660

Gems Guo's avatar
Gems Guo committed
661
        return finishedTrialJobNum;
QuanluZhang's avatar
QuanluZhang committed
662
663
664
665
666
667
    }

    private async manageTrials(): Promise<void> {
        if (this.dispatcher === undefined) {
            throw new Error('Error: tuner has not been setup');
        }
QuanluZhang's avatar
QuanluZhang committed
668
        let allFinishedTrialJobNum: number = this.currSubmittedTrialNum;
QuanluZhang's avatar
QuanluZhang committed
669
        let waitSubmittedToFinish: number;
670
        while (!['ERROR', 'STOPPING', 'STOPPED'].includes(this.status.status)) {
671
672
            await this.stopTrialIfOverMaxDurationLimit();

QuanluZhang's avatar
QuanluZhang committed
673
674
675
676
677
678
679
            const finishedTrialJobNum: number = await this.requestTrialJobsStatus();
            allFinishedTrialJobNum += finishedTrialJobNum;

            // requestTrialNum is the number of trials that will be requested from tuner.
            // If trialConcurrency does not change, requestTrialNum equals finishedTrialJobNum.
            // If trialConcurrency changes, for example, trialConcurrency increases by 2 (trialConcurrencyChange=2), then
            // requestTrialNum equals 2 + finishedTrialJobNum and trialConcurrencyChange becomes 0.
680
            // If trialConcurrency changes, for example, trialConcurrency decreases by 4 (trialConcurrencyChange=-4) and
QuanluZhang's avatar
QuanluZhang committed
681
682
683
684
685
686
687
688
            // finishedTrialJobNum is 2, then requestTrialNum becomes -2. No trial will be requested from tuner,
            // and trialConcurrencyChange becomes -2.
            const requestTrialNum: number = this.trialConcurrencyChange + finishedTrialJobNum;
            if (requestTrialNum >= 0) {
                this.trialConcurrencyChange = 0;
            } else {
                this.trialConcurrencyChange = requestTrialNum;
            }
chicm-ms's avatar
chicm-ms committed
689

QuanluZhang's avatar
QuanluZhang committed
690
            // check maxtrialnum and maxduration here
691
            // NO_MORE_TRIAL is more like a subset of RUNNING, because during RUNNING tuner
692
            // might tell nnimanager that this is no more trials. In NO_MORE_TRIAL state, the experiment is viewed
693
694
            // as still running. DONE could be transfered from RUNNING or NO_MORE_TRIAL.
            assert(this.status.status === 'RUNNING' ||
695
                this.status.status === 'DONE' ||
QuanluZhang's avatar
QuanluZhang committed
696
                this.status.status === 'NO_MORE_TRIAL' ||
697
                this.status.status === 'TUNER_NO_MORE_TRIAL', `Actual status: ${this.status.status}`);
698
699
            if (this.experimentProfile.execDuration > this.maxDuration ||
                this.currSubmittedTrialNum >= this.maxTrialNum) {
QuanluZhang's avatar
QuanluZhang committed
700
                if (this.status.status !== 'DONE') {
chicm-ms's avatar
chicm-ms committed
701
                    this.setStatus('NO_MORE_TRIAL');
QuanluZhang's avatar
QuanluZhang committed
702
703
704
705
                    waitSubmittedToFinish = this.currSubmittedTrialNum;

                    assert(allFinishedTrialJobNum <= waitSubmittedToFinish);
                    if (allFinishedTrialJobNum >= waitSubmittedToFinish) {
chicm-ms's avatar
chicm-ms committed
706
                        this.setStatus('DONE');
707
                        this.setEndtime();
QuanluZhang's avatar
QuanluZhang committed
708
709
710
711
                        await this.storeExperimentProfile();
                        // write this log for travis CI
                        this.log.info('Experiment done.');
                    }
QuanluZhang's avatar
QuanluZhang committed
712
713
                }
            } else {
QuanluZhang's avatar
QuanluZhang committed
714
715
                this.requestTrialJobs(requestTrialNum);

QuanluZhang's avatar
QuanluZhang committed
716
                if (this.status.status === 'DONE') {
717
718
                    delete this.experimentProfile.endTime;
                    await this.storeExperimentProfile();
QuanluZhang's avatar
QuanluZhang committed
719
                }
QuanluZhang's avatar
QuanluZhang committed
720
                if (this.status.status !== 'TUNER_NO_MORE_TRIAL') {
chicm-ms's avatar
chicm-ms committed
721
                    this.setStatus('RUNNING');
722
                }
QuanluZhang's avatar
QuanluZhang committed
723
724
                for (let i: number = this.trialJobs.size; i < this.experimentProfile.params.trialConcurrency; i++) {
                    if (this.waitingTrials.length === 0 ||
725
                        this.currSubmittedTrialNum >= this.maxTrialNum) {
QuanluZhang's avatar
QuanluZhang committed
726
727
                        break;
                    }
728
                    const form = this.waitingTrials.shift() as TrialJobApplicationForm;
QuanluZhang's avatar
QuanluZhang committed
729
                    this.currSubmittedTrialNum++;
liuzhe-lz's avatar
liuzhe-lz committed
730
                    this.log.info('submitTrialJob: form:', form);
731
                    const trialJobDetail: TrialJobDetail = await this.trainingService.submitTrialJob(form);
732
                    const Snapshot: TrialJobDetail = Object.assign({}, trialJobDetail);
733
                    await this.storeExperimentProfile();
734
                    this.trialJobs.set(trialJobDetail.id, Snapshot);
QuanluZhang's avatar
QuanluZhang committed
735
736
737
                    const trialJobDetailSnapshot: TrialJobDetail | undefined = this.trialJobs.get(trialJobDetail.id);
                    if (trialJobDetailSnapshot != undefined) {
                        await this.dataStore.storeTrialJobEvent(
738
                            trialJobDetailSnapshot.status, trialJobDetailSnapshot.id, form.hyperParameters.value, trialJobDetailSnapshot);
QuanluZhang's avatar
QuanluZhang committed
739
740
741
742
743
744
745
746
747
                    } else {
                        assert(false, `undefined trialJobDetail in trialJobs: ${trialJobDetail.id}`);
                    }
                }
            }
            await delay(1000 * 5); // 5 seconds
        }
    }

Deshui Yu's avatar
Deshui Yu committed
748
749
750
751
752
753
    private storeExperimentProfile(): Promise<void> {
        this.experimentProfile.revision += 1;

        return this.dataStore.storeExperimentProfile(this.experimentProfile);
    }

754
    private async run(): Promise<void> {
QuanluZhang's avatar
QuanluZhang committed
755
        assert(this.dispatcher !== undefined);
756
        await this.dispatcher.init();
757
758
759
760
761
762
763

        this.addEventListeners();

        this.sendInitTunerCommands();

        await Promise.all([
            this.periodicallyUpdateExecDuration(),
chicm-ms's avatar
chicm-ms committed
764
            this.pingDispatcher().catch((err: Error) => {
chicm-ms's avatar
chicm-ms committed
765
                throw NNIError.FromError(err, 'Dispatcher error: ');
chicm-ms's avatar
chicm-ms committed
766
            }),
767
            this.trainingService.run().catch((err: Error) => {
768
                // FIXME: The error handling here could crash when err is undefined.
chicm-ms's avatar
chicm-ms committed
769
                throw NNIError.FromError(err, 'Training service error: ');
770
            }),
QuanluZhang's avatar
QuanluZhang committed
771
            this.manageTrials().catch((err: Error) => {
chicm-ms's avatar
chicm-ms committed
772
                throw NNIError.FromError(err, 'Job management error: ');
773
            })]);
774
775
    }

QuanluZhang's avatar
QuanluZhang committed
776
    private addEventListeners(): void {
chicm-ms's avatar
chicm-ms committed
777
        this.log.info('Add event listeners');
778
        // TO DO: cannot run this method more than once in one NNIManager instance
QuanluZhang's avatar
QuanluZhang committed
779
        if (this.dispatcher === undefined) {
780
781
            throw new Error('Error: tuner or job maintainer have not been setup');
        }
782
        this.trainingService.addTrialJobMetricListener(this.trialJobMetricListener);
783
784
785

        this.dispatcher.onCommand((commandType: string, content: string) => {
            this.onTunerCommand(commandType, content).catch((err: Error) => {
chicm-ms's avatar
chicm-ms committed
786
                this.criticalError(NNIError.FromError(err, 'Tuner command event error: '));
787
788
            });
        });
789
790
791
792
        this.dispatcher.onError((error: Error) => {
            this.log.error(`Dispatcher error: ${error.message}`);
            this.criticalError(new Error('Dispatcher stream error, tuner may have crashed.'));
        });
793
794
795
796
    }

    private sendInitTunerCommands(): void {
        if (this.dispatcher === undefined) {
797
            throw new Error('Dispatcher error: tuner has not been setup');
798
        }
chicm-ms's avatar
chicm-ms committed
799
800
        this.log.debug(`Send tuner command: INITIALIZE: ${this.experimentProfile.params.searchSpace}`);
        // Tuner need to be initialized with search space before generating any hyper parameters
801
        this.dispatcher.sendCommand(INITIALIZE, JSON.stringify(this.experimentProfile.params.searchSpace));
802
803
804
    }

    private async onTrialJobMetrics(metric: TrialJobMetric): Promise<void> {
liuzhe-lz's avatar
liuzhe-lz committed
805
        this.log.debug('NNIManager received trial job metrics:', metric);
806
        if (this.trialJobs.has(metric.id)) {
807
808
809
810
811
812
            await this.dataStore.storeMetricData(metric.id, metric.data);
            if (this.dispatcher === undefined) {
                throw new Error('Error: tuner has not been setup');
            }
            this.dispatcher.sendCommand(REPORT_METRIC_DATA, metric.data);
        } else {
liuzhe-lz's avatar
liuzhe-lz committed
813
            this.log.warning('NNIManager received non-existent trial job metrics:', metric);
814
815
816
        }
    }

chicm-ms's avatar
chicm-ms committed
817
818
819
820
821
822
823
    private requestTrialJobs(jobNum: number): void {
        if (jobNum < 1) {
            return;
        }
        if (this.dispatcher === undefined) {
            throw new Error('Dispatcher error: tuner has not been setup');
        }
824
        if (this.config.deprecated && this.config.deprecated.multiThread) {
chicm-ms's avatar
chicm-ms committed
825
826
827
828
829
830
831
832
833
834
835
            // Send multiple requests to ensure multiple hyper parameters are generated in non-blocking way.
            // For a single REQUEST_TRIAL_JOBS request, hyper parameters are generated one by one
            // sequentially.
            for (let i: number = 0; i < jobNum; i++) {
                this.dispatcher.sendCommand(REQUEST_TRIAL_JOBS, '1');
            }
        } else {
            this.dispatcher.sendCommand(REQUEST_TRIAL_JOBS, String(jobNum));
        }
    }

836
    private async onTunerCommand(commandType: string, content: string): Promise<void> {
horizon365's avatar
horizon365 committed
837
        this.log.info(`NNIManager received command from dispatcher: ${commandType}, ${content}`);
838
        switch (commandType) {
chicm-ms's avatar
chicm-ms committed
839
            case INITIALIZED: {
chicm-ms's avatar
chicm-ms committed
840
                // Tuner is intialized, search space is set, request tuner to generate hyper parameters
841
842
843
844
845
846
                if (this.trialDataForTuner.length > 0) {
                    if (this.dispatcher === undefined) {
                        throw new Error('Dispatcher error: tuner has not been setup');
                    }
                    this.dispatcher.sendCommand(IMPORT_DATA, this.trialDataForTuner);
                }
847
848
849
850
851
852
                if (this.trialDataForResume.length > 0 ) {
                    if (this.dispatcher === undefined) {
                        throw new Error('Dispatcher error: tuner has not been setup');
                    }
                    this.dispatcher.sendCommand(ADD_CUSTOMIZED_TRIAL_JOB, this.trialDataForResume);
                }
chicm-ms's avatar
chicm-ms committed
853
854
                this.requestTrialJobs(this.experimentProfile.params.trialConcurrency);
                break;
chicm-ms's avatar
chicm-ms committed
855
856
            }
            case NEW_TRIAL_JOB: {
QuanluZhang's avatar
QuanluZhang committed
857
                if (this.status.status === 'TUNER_NO_MORE_TRIAL') {
858
                    this.log.warning('It is not supposed to receive more trials after NO_MORE_TRIAL is set');
chicm-ms's avatar
chicm-ms committed
859
                    this.setStatus('RUNNING');
860
                }
861
                const trialRequestContent: TrialCommandContent = JSON.parse(content);
QuanluZhang's avatar
QuanluZhang committed
862
                const noneConstraint: PlacementConstraint = {type: 'None', gpus: []};
863
864
865
866
867
                const form: TrialJobApplicationForm = {
                    sequenceId: this.experimentProfile.nextSequenceId++,
                    hyperParameters: {
                        value: content,
                        index: 0
868
                    },
QuanluZhang's avatar
QuanluZhang committed
869
                    placementConstraint: trialRequestContent.placement_constraint? trialRequestContent.placement_constraint : noneConstraint
870
871
                };
                this.waitingTrials.push(form);
872
                break;
chicm-ms's avatar
chicm-ms committed
873
874
            }
            case SEND_TRIAL_JOB_PARAMETER: {
chicm-ms's avatar
chicm-ms committed
875
876
877
878
879
                const tunerCommand: any = JSON.parse(content);
                assert(tunerCommand.parameter_index >= 0);
                assert(tunerCommand.trial_job_id !== undefined);

                const trialJobForm: TrialJobApplicationForm = {
880
                    sequenceId: -1,  // FIXME: multi-phase tuner should use sequence ID instead of trial job ID
chicm-ms's avatar
chicm-ms committed
881
882
883
884
885
                    hyperParameters: {
                        value: content,
                        index: tunerCommand.parameter_index
                    }
                };
liuzhe-lz's avatar
liuzhe-lz committed
886
                this.log.info('updateTrialJob: job id:', tunerCommand.trial_job_id, 'form:', trialJobForm);
chicm-ms's avatar
chicm-ms committed
887
                await this.trainingService.updateTrialJob(tunerCommand.trial_job_id, trialJobForm);
888
889
890
891
892
                if (tunerCommand['parameters'] !== null) {
                    // parameters field is set as empty string if no more hyper parameter can be generated by tuner.
                    await this.dataStore.storeTrialJobEvent(
                        'ADD_HYPERPARAMETER', tunerCommand.trial_job_id, content, undefined);
                }
chicm-ms's avatar
chicm-ms committed
893
                break;
chicm-ms's avatar
chicm-ms committed
894
895
            }
            case NO_MORE_TRIAL_JOBS: {
896
897
898
                if (!['ERROR', 'STOPPING', 'STOPPED'].includes(this.status.status)) {
                    this.setStatus('TUNER_NO_MORE_TRIAL');
                }
899
                break;
chicm-ms's avatar
chicm-ms committed
900
901
            }
            case KILL_TRIAL_JOB: {
liuzhe-lz's avatar
liuzhe-lz committed
902
                this.log.info('cancelTrialJob:', content);
QuanluZhang's avatar
QuanluZhang committed
903
                await this.trainingService.cancelTrialJob(JSON.parse(content), true);
904
                break;
chicm-ms's avatar
chicm-ms committed
905
            }
906
907
908
            default:
                throw new Error('Error: unsupported command type from tuner');
        }
Deshui Yu's avatar
Deshui Yu committed
909
910
    }

911
912
913
914
915
916
    private criticalError(err: Error): void {
        this.logError(err);
        console.error(err);
    }

    private logError(err: Error): void {
917
918
919
920
921
922
923
924
        if (err !== undefined) {
            // FIXME: I don't know why, but in some cases err could be undefined.
            if (err.stack !== undefined) {
                this.log.error(err.stack);
                this.status.errors.push(err.message);
            } else {
                this.status.errors.push(`Undefined error, stack: ${new Error().stack}`);
            }
925
        }
926
        this.setEndtime();
chicm-ms's avatar
chicm-ms committed
927
928
929
930
931
932
933
        this.setStatus('ERROR');
    }

    private setStatus(status: ExperimentStatus): void {
        if (status !== this.status.status) {
            this.log.info(`Change NNIManager status from: ${this.status.status} to: ${status}`);
            this.status.status = status;
934
            getExperimentsManager().setExperimentInfo(this.experimentProfile.id, 'status', this.status.status);
chicm-ms's avatar
chicm-ms committed
935
        }
936
937
    }

938
939
    private setEndtime(): void {
        this.experimentProfile.endTime = Date.now();
940
        getExperimentsManager().setExperimentInfo(this.experimentProfile.id, 'endTime', this.experimentProfile.endTime);
941
942
    }

QuanluZhang's avatar
QuanluZhang committed
943
944
945
946
    private async createCheckpointDir(): Promise<string> {
        // TODO: test
        const chkpDir: string = getCheckpointDir();
        await mkDirP(chkpDir);
947
        return chkpDir;
QuanluZhang's avatar
QuanluZhang committed
948
    }
J-shang's avatar
J-shang committed
949
950
951
952
953
954
955
956

    public async getTrialOutputLocalPath(trialJobId: string): Promise<string> {
        return this.trainingService.getTrialOutputLocalPath(trialJobId);
    }

    public async fetchTrialOutput(trialJobId: string, subpath: string): Promise<void> {
        return this.trainingService.fetchTrialOutput(trialJobId, subpath);
    }
Deshui Yu's avatar
Deshui Yu committed
957
958
959
}

export { NNIManager };