nnimanager.ts 34.9 KB
Newer Older
liuzhe-lz's avatar
liuzhe-lz committed
1
2
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
Deshui Yu's avatar
Deshui Yu committed
3
4
5
6

'use strict';

import * as assert from 'assert';
chicm-ms's avatar
chicm-ms committed
7
import { ChildProcess, StdioOptions } from 'child_process';
Deshui Yu's avatar
Deshui Yu committed
8
9
10
import { Deferred } from 'ts-deferred';
import * as component from '../common/component';
import { DataStore, MetricDataRecord, MetricType, TrialJobInfo } from '../common/datastore';
11
import { NNIError } from '../common/errors';
12
import { getExperimentId } from '../common/experimentStartupInfo';
Deshui Yu's avatar
Deshui Yu committed
13
14
import { getLogger, Logger } from '../common/log';
import {
chicm-ms's avatar
chicm-ms committed
15
    ExperimentParams, ExperimentProfile, Manager, ExperimentStatus,
16
    NNIManagerStatus, ProfileUpdateType, TrialJobStatistics
Deshui Yu's avatar
Deshui Yu committed
17
18
19
20
} from '../common/manager';
import {
    TrainingService, TrialJobApplicationForm, TrialJobDetail, TrialJobMetric, TrialJobStatus
} from '../common/trainingService';
21
import { delay, getCheckpointDir, getExperimentRootDir, getLogDir, getMsgDispatcherCommand, mkDirP, getTunerProc, getLogLevel, isAlive, killPid } from '../common/utils';
Deshui Yu's avatar
Deshui Yu committed
22
import {
chicm-ms's avatar
chicm-ms committed
23
    INITIALIZE, INITIALIZED, KILL_TRIAL_JOB, NEW_TRIAL_JOB, NO_MORE_TRIAL_JOBS, PING,
24
    REPORT_METRIC_DATA, REQUEST_TRIAL_JOBS, SEND_TRIAL_JOB_PARAMETER, TERMINATE, TRIAL_END, UPDATE_SEARCH_SPACE, IMPORT_DATA
Deshui Yu's avatar
Deshui Yu committed
25
} from './commands';
26
import { createDispatcherInterface, IpcInterface } from './ipcInterface';
Deshui Yu's avatar
Deshui Yu committed
27
28

/**
chicm-ms's avatar
chicm-ms committed
29
 * NNIManager which implements Manager interface
Deshui Yu's avatar
Deshui Yu committed
30
31
32
 */
class NNIManager implements Manager {
    private trainingService: TrainingService;
33
    private dispatcher: IpcInterface | undefined;
34
    private currSubmittedTrialNum: number;  // need to be recovered
QuanluZhang's avatar
QuanluZhang committed
35
    private trialConcurrencyChange: number; // >0: increase, <0: decrease
Deshui Yu's avatar
Deshui Yu committed
36
37
38
    private log: Logger;
    private dataStore: DataStore;
    private experimentProfile: ExperimentProfile;
39
    private dispatcherPid: number;
40
    private status: NNIManagerStatus;
41
    private waitingTrials: TrialJobApplicationForm[];
QuanluZhang's avatar
QuanluZhang committed
42
    private trialJobs: Map<string, TrialJobDetail>;
43
    private trialDataForTuner: string;
SparkSnail's avatar
SparkSnail committed
44
    private readonly: boolean;
45

46
    private trialJobMetricListener: (metric: TrialJobMetric) => void;
47

Deshui Yu's avatar
Deshui Yu committed
48
49
    constructor() {
        this.currSubmittedTrialNum = 0;
QuanluZhang's avatar
QuanluZhang committed
50
        this.trialConcurrencyChange = 0;
Deshui Yu's avatar
Deshui Yu committed
51
52
        this.trainingService = component.get(TrainingService);
        assert(this.trainingService);
53
        this.dispatcherPid = 0;
QuanluZhang's avatar
QuanluZhang committed
54
55
        this.waitingTrials = [];
        this.trialJobs = new Map<string, TrialJobDetail>();
56
        this.trialDataForTuner = '';
SparkSnail's avatar
SparkSnail committed
57
        this.readonly = false;
Deshui Yu's avatar
Deshui Yu committed
58
59
60

        this.log = getLogger();
        this.dataStore = component.get(DataStore);
61
62
63
64
        this.experimentProfile = this.createEmptyExperimentProfile();
        this.status = {
            status: 'INITIALIZED',
            errors: []
Deshui Yu's avatar
Deshui Yu committed
65
        };
chicm-ms's avatar
chicm-ms committed
66
        this.trialJobMetricListener = (metric: TrialJobMetric): void => {
67
68
69
70
            this.onTrialJobMetrics(metric).catch((err: Error) => {
                this.criticalError(NNIError.FromError(err, 'Job metrics error: '));
            });
        };
Deshui Yu's avatar
Deshui Yu committed
71
72
73
    }

    public updateExperimentProfile(experimentProfile: ExperimentProfile, updateType: ProfileUpdateType): Promise<void> {
SparkSnail's avatar
SparkSnail committed
74
75
76
        if (this.readonly) {
            return Promise.reject(new Error('Error: can not update experiment profile in readonly mode!'));
        }
Deshui Yu's avatar
Deshui Yu committed
77
78
79
80
81
82
83
84
85
86
        switch (updateType) {
            case 'TRIAL_CONCURRENCY':
                this.updateTrialConcurrency(experimentProfile.params.trialConcurrency);
                break;
            case 'MAX_EXEC_DURATION':
                this.updateMaxExecDuration(experimentProfile.params.maxExecDuration);
                break;
            case 'SEARCH_SPACE':
                this.updateSearchSpace(experimentProfile.params.searchSpace);
                break;
QuanluZhang's avatar
QuanluZhang committed
87
88
89
            case 'MAX_TRIAL_NUM':
                this.updateMaxTrialNum(experimentProfile.params.maxTrialNum);
                break;
Deshui Yu's avatar
Deshui Yu committed
90
91
92
93
94
95
96
            default:
                throw new Error('Error: unrecognized updateType');
        }

        return this.storeExperimentProfile();
    }

97
    public importData(data: string): Promise<void> {
SparkSnail's avatar
SparkSnail committed
98
99
100
        if (this.readonly) {
            return Promise.reject(new Error('Error: can not import data in readonly mode!'));
        }
101
102
103
104
105
106
107
108
109
110
        if (this.dispatcher === undefined) {
            return Promise.reject(
                new Error('tuner has not been setup')
            );
        }
        this.dispatcher.sendCommand(IMPORT_DATA, data);

        return this.dataStore.storeTrialJobEvent('IMPORT_DATA', '', data);
    }

111
112
113
114
    public async exportData(): Promise<string> {
        return this.dataStore.exportTrialHpConfigs();
    }

115
    public addCustomizedTrialJob(hyperParams: string): Promise<number> {
SparkSnail's avatar
SparkSnail committed
116
117
118
        if (this.readonly) {
            return Promise.reject(new Error('Error: can not add customized trial job in readonly mode!'));
        }
Deshui Yu's avatar
Deshui Yu committed
119
        if (this.currSubmittedTrialNum >= this.experimentProfile.params.maxTrialNum) {
120
            return Promise.reject(new Error('reach maxTrialNum'));
Deshui Yu's avatar
Deshui Yu committed
121
        }
122
123
124

        // TODO: NNI manager should not peek tuner's internal protocol, let's refactor this later
        const packedParameter = {
chicm-ms's avatar
chicm-ms committed
125
126
            parameter_id: null, // eslint-disable-line @typescript-eslint/camelcase
            parameter_source: 'customized', // eslint-disable-line @typescript-eslint/camelcase
127
128
129
130
131
132
133
134
135
136
137
            parameters: JSON.parse(hyperParams)
        }

        const form: TrialJobApplicationForm = {
            sequenceId: this.experimentProfile.nextSequenceId++,
            hyperParameters: {
                value: JSON.stringify(packedParameter),
                index: 0
            }
        };
        this.waitingTrials.push(form);
Deshui Yu's avatar
Deshui Yu committed
138
139

        // trial id has not been generated yet, thus use '' instead
140
141
142
        this.dataStore.storeTrialJobEvent('ADD_CUSTOMIZED', '', hyperParams);

        return Promise.resolve(form.sequenceId);
Deshui Yu's avatar
Deshui Yu committed
143
144
145
    }

    public async cancelTrialJobByUser(trialJobId: string): Promise<void> {
SparkSnail's avatar
SparkSnail committed
146
147
148
        if (this.readonly) {
            return Promise.reject(new Error('Error: can not cancel trial job in readonly mode!'));
        }
chicm-ms's avatar
chicm-ms committed
149
        this.log.info(`User cancelTrialJob: ${trialJobId}`);
Deshui Yu's avatar
Deshui Yu committed
150
151
152
153
154
        await this.trainingService.cancelTrialJob(trialJobId);
        await this.dataStore.storeTrialJobEvent('USER_TO_CANCEL', trialJobId, '');
    }

    public async startExperiment(expParams: ExperimentParams): Promise<string> {
chicm-ms's avatar
chicm-ms committed
155
        this.log.info(`Starting experiment: ${this.experimentProfile.id}`);
Deshui Yu's avatar
Deshui Yu committed
156
157
158
        this.experimentProfile.params = expParams;
        await this.storeExperimentProfile();
        this.log.debug('Setup tuner...');
159

160
        // Set up multiphase config
161
        if (expParams.multiPhase && this.trainingService.isMultiPhaseJobSupported) {
162
163
            this.trainingService.setClusterMetadata('multiPhase', expParams.multiPhase.toString());
        }
164
165
166
167
        // Set up versionCheck config
        if (expParams.versionCheck !== undefined) {
            this.trainingService.setClusterMetadata('version_check', expParams.versionCheck.toString());
        }
SparkSnail's avatar
SparkSnail committed
168
169
170
171
        // Set up logCollection config
        if (expParams.logCollection !== undefined) {
            this.trainingService.setClusterMetadata('log_collection', expParams.logCollection.toString());
        }
172

chicm-ms's avatar
chicm-ms committed
173
        const dispatcherCommand: string = getMsgDispatcherCommand(expParams);
174
        this.log.debug(`dispatcher command: ${dispatcherCommand}`);
QuanluZhang's avatar
QuanluZhang committed
175
        const checkpointDir: string = await this.createCheckpointDir();
Deshui Yu's avatar
Deshui Yu committed
176
        this.setupTuner(
177
178
            dispatcherCommand,
            undefined,
Deshui Yu's avatar
Deshui Yu committed
179
            'start',
QuanluZhang's avatar
QuanluZhang committed
180
            checkpointDir);
Deshui Yu's avatar
Deshui Yu committed
181

182
        this.experimentProfile.startTime = Date.now();
chicm-ms's avatar
chicm-ms committed
183
        this.setStatus('RUNNING');
Deshui Yu's avatar
Deshui Yu committed
184
        await this.storeExperimentProfile();
185
186
        this.run().catch((err: Error) => {
            this.criticalError(err);
Deshui Yu's avatar
Deshui Yu committed
187
        });
188

Deshui Yu's avatar
Deshui Yu committed
189
190
191
        return this.experimentProfile.id;
    }

SparkSnail's avatar
SparkSnail committed
192
    public async resumeExperiment(readonly: boolean): Promise<void> {
chicm-ms's avatar
chicm-ms committed
193
        this.log.info(`Resuming experiment: ${this.experimentProfile.id}`);
Deshui Yu's avatar
Deshui Yu committed
194
195
196
        //Fetch back the experiment profile
        const experimentId: string = getExperimentId();
        this.experimentProfile = await this.dataStore.getExperimentProfile(experimentId);
SparkSnail's avatar
SparkSnail committed
197
198
199
200
        this.readonly = readonly;
        if (readonly) {
            return Promise.resolve();
        }
Deshui Yu's avatar
Deshui Yu committed
201
        const expParams: ExperimentParams = this.experimentProfile.params;
202

203
        // Set up multiphase config
204
        if (expParams.multiPhase && this.trainingService.isMultiPhaseJobSupported) {
205
206
207
            this.trainingService.setClusterMetadata('multiPhase', expParams.multiPhase.toString());
        }

208
209
        // Set up versionCheck config
        if (expParams.versionCheck !== undefined) {
SparkSnail's avatar
SparkSnail committed
210
            this.trainingService.setClusterMetadata('version_check', expParams.versionCheck.toString());
211
212
        }

chicm-ms's avatar
chicm-ms committed
213
        const dispatcherCommand: string = getMsgDispatcherCommand(expParams);
214
        this.log.debug(`dispatcher command: ${dispatcherCommand}`);
QuanluZhang's avatar
QuanluZhang committed
215
        const checkpointDir: string = await this.createCheckpointDir();
Deshui Yu's avatar
Deshui Yu committed
216
        this.setupTuner(
217
218
            dispatcherCommand,
            undefined,
Deshui Yu's avatar
Deshui Yu committed
219
            'resume',
QuanluZhang's avatar
QuanluZhang committed
220
            checkpointDir);
Deshui Yu's avatar
Deshui Yu committed
221
222
223
224
225
226
227
228
229
230
231

        const allTrialJobs: TrialJobInfo[] = await this.dataStore.listTrialJobs();

        // Resume currSubmittedTrialNum
        this.currSubmittedTrialNum = allTrialJobs.length;

        // Check the final status for WAITING and RUNNING jobs
        await Promise.all(allTrialJobs
            .filter((job: TrialJobInfo) => job.status === 'WAITING' || job.status === 'RUNNING')
            .map((job: TrialJobInfo) => this.dataStore.storeTrialJobEvent('FAILED', job.id)));

232
233
234
        // Collect generated trials and imported trials
        const finishedTrialData: string = await this.exportData();
        const importedData: string[] = await this.dataStore.getImportedData();
chicm-ms's avatar
chicm-ms committed
235
        let trialData: Record<string, any>[] = JSON.parse(finishedTrialData);
236
237
        for (const oneImportedData of importedData) {
            // do not deduplicate
chicm-ms's avatar
chicm-ms committed
238
            trialData = trialData.concat(<Record<string, any>[]>JSON.parse(oneImportedData));
239
240
241
        }
        this.trialDataForTuner = JSON.stringify(trialData);

chicm-ms's avatar
chicm-ms committed
242
243
244
245
246
        if (this.experimentProfile.execDuration < this.experimentProfile.params.maxExecDuration &&
            this.currSubmittedTrialNum < this.experimentProfile.params.maxTrialNum &&
            this.experimentProfile.endTime) {
            delete this.experimentProfile.endTime;
        }
chicm-ms's avatar
chicm-ms committed
247
        this.setStatus('RUNNING');
248

Deshui Yu's avatar
Deshui Yu committed
249
        // TO DO: update database record for resume event
250
251
252
        this.run().catch((err: Error) => {
            this.criticalError(err);
        });
Deshui Yu's avatar
Deshui Yu committed
253
254
    }

255
256
    public getTrialJob(trialJobId: string): Promise<TrialJobInfo> {
        return this.dataStore.getTrialJob(trialJobId);
Deshui Yu's avatar
Deshui Yu committed
257
258
259
    }

    public async setClusterMetadata(key: string, value: string): Promise<void> {
SparkSnail's avatar
SparkSnail committed
260
261
262
        if (this.readonly) {
            return Promise.reject(new Error('Error: can not set cluster metadata in readonly mode!'));
        }
chicm-ms's avatar
chicm-ms committed
263
        this.log.info(`NNIManager setClusterMetadata, key: ${key}, value: ${value}`);
Deshui Yu's avatar
Deshui Yu committed
264
265
266
267
        let timeoutId: NodeJS.Timer;
        // TO DO: move timeout value to constants file
        const delay1: Promise<{}> = new Promise((resolve: Function, reject: Function): void => {
            timeoutId = setTimeout(
268
                () => { reject(new Error('TrainingService setClusterMetadata timeout. Please check your config file.')); },
Deshui Yu's avatar
Deshui Yu committed
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
                10000);
        });
        await Promise.race([delay1, this.trainingService.setClusterMetadata(key, value)]).finally(() => {
            clearTimeout(timeoutId);
        });
    }

    public getClusterMetadata(key: string): Promise<string> {
        return Promise.resolve(
            this.trainingService.getClusterMetadata(key)
        );
    }

    public async getTrialJobStatistics(): Promise<TrialJobStatistics[]> {
        return this.dataStore.getTrialJobStatistics();
    }

286
    public async stopExperiment(): Promise<void> {
chicm-ms's avatar
chicm-ms committed
287
288
        this.setStatus('STOPPING');
        this.log.info('Stopping experiment, cleaning up ...');
289
        await this.experimentDoneCleanUp();
chicm-ms's avatar
chicm-ms committed
290
        this.log.info('Experiment stopped.');
Deshui Yu's avatar
Deshui Yu committed
291
292
    }

293
    public async getMetricData(trialJobId?: string, metricType?: MetricType): Promise<MetricDataRecord[]> {
Deshui Yu's avatar
Deshui Yu committed
294
295
296
        return this.dataStore.getMetricData(trialJobId, metricType);
    }

297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
    public async getMetricDataByRange(minSeqId: number, maxSeqId: number): Promise<MetricDataRecord[]> {
        const trialJobs = await this.dataStore.listTrialJobs();
        const targetTrials = trialJobs.filter(trial => (
            // FIXME: can this be undefined?
            trial.sequenceId !== undefined && minSeqId <= trial.sequenceId && trial.sequenceId <= maxSeqId
        ));
        const targetTrialIds = new Set(targetTrials.map(trial => trial.id));

        const allMetrics = await this.dataStore.getMetricData();
        return allMetrics.filter(metric => targetTrialIds.has(metric.trialJobId));
    }

    public async getLatestMetricData(): Promise<MetricDataRecord[]> {
        // FIXME: this can take a long time
        const allMetrics: MetricDataRecord[] = await this.dataStore.getMetricData();
        const finals: MetricDataRecord[] = [];
        const latestIntermediates: Map<string, MetricDataRecord> = new Map<string, MetricDataRecord>();
        for (const metric of allMetrics) {
            if (metric.type !== 'PERIODICAL') {
                finals.push(metric);
            } else {
                const old: MetricDataRecord | undefined = latestIntermediates.get(metric.trialJobId);
                if (old === undefined || old.sequence <= metric.sequence) {
                    latestIntermediates.set(metric.trialJobId, metric);
                }
            }
        }
        return finals.concat(Array.from(latestIntermediates.values()));
        // FIXME: unit test
    }

Deshui Yu's avatar
Deshui Yu committed
328
329
330
331
332
333
334
335
    public getExperimentProfile(): Promise<ExperimentProfile> {
        // TO DO: using Promise.resolve()
        const deferred: Deferred<ExperimentProfile> = new Deferred<ExperimentProfile>();
        deferred.resolve(this.experimentProfile);

        return deferred.promise;
    }

336
337
338
339
    public getStatus(): NNIManagerStatus {
        return this.status;
    }

Deshui Yu's avatar
Deshui Yu committed
340
341
342
343
    public async listTrialJobs(status?: TrialJobStatus): Promise<TrialJobInfo[]> {
        return this.dataStore.listTrialJobs(status);
    }

344
345
    private setupTuner(command: string, cwd: string | undefined, mode: 'start' | 'resume', dataDirectory: string): void {
        if (this.dispatcher !== undefined) {
Deshui Yu's avatar
Deshui Yu committed
346
347
            return;
        }
goooxu's avatar
goooxu committed
348
        const stdio: StdioOptions = ['ignore', process.stdout, process.stderr, 'pipe', 'pipe'];
Deshui Yu's avatar
Deshui Yu committed
349
350
351
352
353
354
355
        let newCwd: string;
        if (cwd === undefined || cwd === '') {
            newCwd = getLogDir();
        } else {
            newCwd = cwd;
        }
        // TO DO: add CUDA_VISIBLE_DEVICES
356
357
358
359
360
        let includeIntermediateResultsEnv: boolean | undefined = false;
        if (this.experimentProfile.params.tuner !== undefined) {
            includeIntermediateResultsEnv = this.experimentProfile.params.tuner.includeIntermediateResults;
        }

chicm-ms's avatar
chicm-ms committed
361
        const nniEnv = {
Zejun Lin's avatar
Zejun Lin committed
362
363
            NNI_MODE: mode,
            NNI_CHECKPOINT_DIRECTORY: dataDirectory,
364
            NNI_LOG_DIRECTORY: getLogDir(),
365
            NNI_LOG_LEVEL: getLogLevel(),
366
367
            NNI_INCLUDE_INTERMEDIATE_RESULTS: includeIntermediateResultsEnv,
            CUDA_VISIBLE_DEVICES: this.getGpuEnvvarValue()
Zejun Lin's avatar
Zejun Lin committed
368
        };
chicm-ms's avatar
chicm-ms committed
369
        const newEnv = Object.assign({}, process.env, nniEnv);
370
        const tunerProc: ChildProcess = getTunerProc(command,stdio,newCwd,newEnv);
371
372
        this.dispatcherPid = tunerProc.pid;
        this.dispatcher = createDispatcherInterface(tunerProc);
Deshui Yu's avatar
Deshui Yu committed
373
374
375
376

        return;
    }

377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
    private getGpuEnvvarValue(): string {
        let cudaDevices: string | undefined;

        if (this.experimentProfile.params.advisor !== undefined) {
            cudaDevices = this.experimentProfile.params.advisor.gpuIndices;
        } else if (this.experimentProfile.params.tuner !== undefined) {
            cudaDevices = this.experimentProfile.params.tuner.gpuIndices;
        }

        if (cudaDevices === undefined) {
            return '';
        } else {
            return cudaDevices;
        }
    }

Deshui Yu's avatar
Deshui Yu committed
393
    private updateTrialConcurrency(trialConcurrency: number): void {
QuanluZhang's avatar
QuanluZhang committed
394
395
        // we assume trialConcurrency >= 0, which is checked by restserver
        this.trialConcurrencyChange += (trialConcurrency - this.experimentProfile.params.trialConcurrency);
Deshui Yu's avatar
Deshui Yu committed
396
397
398
399
400
401
402
403
404
405
406
407
        this.experimentProfile.params.trialConcurrency = trialConcurrency;

        return;
    }

    private updateMaxExecDuration(duration: number): void {
        this.experimentProfile.params.maxExecDuration = duration;

        return;
    }

    private updateSearchSpace(searchSpace: string): void {
408
        if (this.dispatcher === undefined) {
Deshui Yu's avatar
Deshui Yu committed
409
410
            throw new Error('Error: tuner has not been setup');
        }
411
        this.dispatcher.sendCommand(UPDATE_SEARCH_SPACE, searchSpace);
Deshui Yu's avatar
Deshui Yu committed
412
413
414
415
416
        this.experimentProfile.params.searchSpace = searchSpace;

        return;
    }

QuanluZhang's avatar
QuanluZhang committed
417
418
419
420
421
422
    private updateMaxTrialNum(maxTrialNum: number): void {
        this.experimentProfile.params.maxTrialNum = maxTrialNum;

        return;
    }

Deshui Yu's avatar
Deshui Yu committed
423
    private async experimentDoneCleanUp(): Promise<void> {
424
        if (this.dispatcher === undefined) {
Deshui Yu's avatar
Deshui Yu committed
425
426
            throw new Error('Error: tuner has not been setup');
        }
427
        this.trainingService.removeTrialJobMetricListener(this.trialJobMetricListener);
428
        this.dispatcher.sendCommand(TERMINATE);
Deshui Yu's avatar
Deshui Yu committed
429
430
431
        let tunerAlive: boolean = true;
        // gracefully terminate tuner and assessor here, wait at most 30 seconds.
        for (let i: number = 0; i < 30; i++) {
432
            if (!tunerAlive) { break; }
433
            tunerAlive = await isAlive(this.dispatcherPid);
Deshui Yu's avatar
Deshui Yu committed
434
435
            await delay(1000);
        }
436
        await killPid(this.dispatcherPid);
Deshui Yu's avatar
Deshui Yu committed
437
438
439
440
441
442
        const trialJobList: TrialJobDetail[] = await this.trainingService.listTrialJobs();
        // TO DO: to promise all
        for (const trialJob of trialJobList) {
            if (trialJob.status === 'RUNNING' ||
                trialJob.status === 'WAITING') {
                try {
chicm-ms's avatar
chicm-ms committed
443
                    this.log.info(`cancelTrialJob: ${trialJob.id}`);
Deshui Yu's avatar
Deshui Yu committed
444
445
446
447
448
449
450
                    await this.trainingService.cancelTrialJob(trialJob.id);
                } catch (error) {
                    // pid does not exist, do nothing here
                }
            }
        }
        await this.trainingService.cleanUp();
451
        this.experimentProfile.endTime = Date.now();
Deshui Yu's avatar
Deshui Yu committed
452
        await this.storeExperimentProfile();
chicm-ms's avatar
chicm-ms committed
453
        this.setStatus('STOPPED');
Deshui Yu's avatar
Deshui Yu committed
454
455
456
    }

    private async periodicallyUpdateExecDuration(): Promise<void> {
457
        let count: number = 1;
458
        while (!['ERROR', 'STOPPING', 'STOPPED'].includes(this.status.status)) {
459
            await delay(1000 * 1); // 1 seconds
460
            if (this.status.status === 'RUNNING') {
461
462
463
464
465
466
                this.experimentProfile.execDuration += 1;
                if (count % 10 === 0) {
                    await this.storeExperimentProfile();
                }
            }
            count += 1;
Deshui Yu's avatar
Deshui Yu committed
467
468
469
        }
    }

chicm-ms's avatar
chicm-ms committed
470
471
472
473
474
475
    private async pingDispatcher(): Promise<void> {
        if (this.dispatcher === undefined) {
            throw new Error('Error: tuner has not been setup');
        }
        while (!['ERROR', 'STOPPING', 'STOPPED'].includes(this.status.status)) {
            this.dispatcher.sendCommand(PING);
chicm-ms's avatar
chicm-ms committed
476
            await delay(1000 * 5);
chicm-ms's avatar
chicm-ms committed
477
478
479
        }
    }

QuanluZhang's avatar
QuanluZhang committed
480
481
    private async requestTrialJobsStatus(): Promise<number> {
        let finishedTrialJobNum: number = 0;
QuanluZhang's avatar
QuanluZhang committed
482
483
484
        if (this.dispatcher === undefined) {
            throw new Error('Error: tuner has not been setup');
        }
QuanluZhang's avatar
QuanluZhang committed
485
486
487
488
        for (const trialJobId of Array.from(this.trialJobs.keys())) {
            const trialJobDetail: TrialJobDetail = await this.trainingService.getTrialJob(trialJobId);
            const oldTrialJobDetail: TrialJobDetail | undefined = this.trialJobs.get(trialJobId);
            if (oldTrialJobDetail !== undefined && oldTrialJobDetail.status !== trialJobDetail.status) {
chicm-ms's avatar
chicm-ms committed
489
                this.log.info(`Trial job ${trialJobDetail.id} status changed from ${oldTrialJobDetail.status} to ${trialJobDetail.status}`);
QuanluZhang's avatar
QuanluZhang committed
490
                this.trialJobs.set(trialJobId, Object.assign({}, trialJobDetail));
491
                await this.dataStore.storeTrialJobEvent(trialJobDetail.status, trialJobDetail.id, undefined, trialJobDetail);
QuanluZhang's avatar
QuanluZhang committed
492
            }
QuanluZhang's avatar
QuanluZhang committed
493
            let hyperParams: string | undefined = undefined;
QuanluZhang's avatar
QuanluZhang committed
494
495
496
            switch (trialJobDetail.status) {
                case 'SUCCEEDED':
                case 'USER_CANCELED':
QuanluZhang's avatar
QuanluZhang committed
497
                case 'EARLY_STOPPED':
QuanluZhang's avatar
QuanluZhang committed
498
499
                    this.trialJobs.delete(trialJobId);
                    finishedTrialJobNum++;
500
                    hyperParams = trialJobDetail.form.hyperParameters.value;
QuanluZhang's avatar
QuanluZhang committed
501
                    this.dispatcher.sendCommand(TRIAL_END, JSON.stringify({
chicm-ms's avatar
chicm-ms committed
502
                        trial_job_id: trialJobDetail.id, // eslint-disable-line @typescript-eslint/camelcase
QuanluZhang's avatar
QuanluZhang committed
503
                        event: trialJobDetail.status,
chicm-ms's avatar
chicm-ms committed
504
                        hyper_params: hyperParams // eslint-disable-line @typescript-eslint/camelcase
goooxu's avatar
goooxu committed
505
                    }));
QuanluZhang's avatar
QuanluZhang committed
506
507
508
509
510
511
512
                    break;
                case 'FAILED':
                case 'SYS_CANCELED':
                    // In the current version, we do not retry
                    // TO DO: push this job to queue for retry
                    this.trialJobs.delete(trialJobId);
                    finishedTrialJobNum++;
513
                    hyperParams = trialJobDetail.form.hyperParameters.value;
QuanluZhang's avatar
QuanluZhang committed
514
                    this.dispatcher.sendCommand(TRIAL_END, JSON.stringify({
chicm-ms's avatar
chicm-ms committed
515
                        trial_job_id: trialJobDetail.id, // eslint-disable-line @typescript-eslint/camelcase
QuanluZhang's avatar
QuanluZhang committed
516
                        event: trialJobDetail.status,
chicm-ms's avatar
chicm-ms committed
517
                        hyper_params: hyperParams // eslint-disable-line @typescript-eslint/camelcase
goooxu's avatar
goooxu committed
518
                    }));
QuanluZhang's avatar
QuanluZhang committed
519
520
521
522
523
524
525
526
527
528
                    break;
                case 'WAITING':
                case 'RUNNING':
                case 'UNKNOWN':
                    // Do nothing
                    break;
                default:
                // TO DO: add warning in log
            }
        }
goooxu's avatar
goooxu committed
529

Gems Guo's avatar
Gems Guo committed
530
        return finishedTrialJobNum;
QuanluZhang's avatar
QuanluZhang committed
531
532
533
534
535
536
    }

    private async manageTrials(): Promise<void> {
        if (this.dispatcher === undefined) {
            throw new Error('Error: tuner has not been setup');
        }
QuanluZhang's avatar
QuanluZhang committed
537
        let allFinishedTrialJobNum: number = this.currSubmittedTrialNum;
QuanluZhang's avatar
QuanluZhang committed
538
        let waitSubmittedToFinish: number;
539
        while (!['ERROR', 'STOPPING', 'STOPPED'].includes(this.status.status)) {
QuanluZhang's avatar
QuanluZhang committed
540
541
542
543
544
545
546
            const finishedTrialJobNum: number = await this.requestTrialJobsStatus();
            allFinishedTrialJobNum += finishedTrialJobNum;

            // requestTrialNum is the number of trials that will be requested from tuner.
            // If trialConcurrency does not change, requestTrialNum equals finishedTrialJobNum.
            // If trialConcurrency changes, for example, trialConcurrency increases by 2 (trialConcurrencyChange=2), then
            // requestTrialNum equals 2 + finishedTrialJobNum and trialConcurrencyChange becomes 0.
547
            // If trialConcurrency changes, for example, trialConcurrency decreases by 4 (trialConcurrencyChange=-4) and
QuanluZhang's avatar
QuanluZhang committed
548
549
550
551
552
553
554
555
            // finishedTrialJobNum is 2, then requestTrialNum becomes -2. No trial will be requested from tuner,
            // and trialConcurrencyChange becomes -2.
            const requestTrialNum: number = this.trialConcurrencyChange + finishedTrialJobNum;
            if (requestTrialNum >= 0) {
                this.trialConcurrencyChange = 0;
            } else {
                this.trialConcurrencyChange = requestTrialNum;
            }
chicm-ms's avatar
chicm-ms committed
556

557
            this.requestTrialJobs(requestTrialNum);
chicm-ms's avatar
chicm-ms committed
558

QuanluZhang's avatar
QuanluZhang committed
559
            // check maxtrialnum and maxduration here
560
            // NO_MORE_TRIAL is more like a subset of RUNNING, because during RUNNING tuner
561
            // might tell nnimanager that this is no more trials. In NO_MORE_TRIAL state, the experiment is viewed
562
563
            // as still running. DONE could be transfered from RUNNING or NO_MORE_TRIAL.
            assert(this.status.status === 'RUNNING' ||
564
                this.status.status === 'DONE' ||
QuanluZhang's avatar
QuanluZhang committed
565
566
                this.status.status === 'NO_MORE_TRIAL' ||
                this.status.status === 'TUNER_NO_MORE_TRIAL');
567
            if (this.experimentProfile.execDuration > this.experimentProfile.params.maxExecDuration ||
QuanluZhang's avatar
QuanluZhang committed
568
                this.currSubmittedTrialNum >= this.experimentProfile.params.maxTrialNum) {
QuanluZhang's avatar
QuanluZhang committed
569
                if (this.status.status !== 'DONE') {
chicm-ms's avatar
chicm-ms committed
570
                    this.setStatus('NO_MORE_TRIAL');
QuanluZhang's avatar
QuanluZhang committed
571
572
573
574
                    waitSubmittedToFinish = this.currSubmittedTrialNum;

                    assert(allFinishedTrialJobNum <= waitSubmittedToFinish);
                    if (allFinishedTrialJobNum >= waitSubmittedToFinish) {
chicm-ms's avatar
chicm-ms committed
575
                        this.setStatus('DONE');
QuanluZhang's avatar
QuanluZhang committed
576
577
578
579
580
                        this.experimentProfile.endTime = Date.now();
                        await this.storeExperimentProfile();
                        // write this log for travis CI
                        this.log.info('Experiment done.');
                    }
QuanluZhang's avatar
QuanluZhang committed
581
582
583
                }
            } else {
                if (this.status.status === 'DONE') {
584
585
                    delete this.experimentProfile.endTime;
                    await this.storeExperimentProfile();
QuanluZhang's avatar
QuanluZhang committed
586
                }
QuanluZhang's avatar
QuanluZhang committed
587
                if (this.status.status !== 'TUNER_NO_MORE_TRIAL') {
chicm-ms's avatar
chicm-ms committed
588
                    this.setStatus('RUNNING');
589
                }
QuanluZhang's avatar
QuanluZhang committed
590
591
592
593
594
                for (let i: number = this.trialJobs.size; i < this.experimentProfile.params.trialConcurrency; i++) {
                    if (this.waitingTrials.length === 0 ||
                        this.currSubmittedTrialNum >= this.experimentProfile.params.maxTrialNum) {
                        break;
                    }
595
                    const form = this.waitingTrials.shift() as TrialJobApplicationForm;
QuanluZhang's avatar
QuanluZhang committed
596
                    this.currSubmittedTrialNum++;
597
598
                    this.log.info(`submitTrialJob: form: ${JSON.stringify(form)}`);
                    const trialJobDetail: TrialJobDetail = await this.trainingService.submitTrialJob(form);
599
                    await this.storeExperimentProfile();
QuanluZhang's avatar
QuanluZhang committed
600
601
602
603
                    this.trialJobs.set(trialJobDetail.id, Object.assign({}, trialJobDetail));
                    const trialJobDetailSnapshot: TrialJobDetail | undefined = this.trialJobs.get(trialJobDetail.id);
                    if (trialJobDetailSnapshot != undefined) {
                        await this.dataStore.storeTrialJobEvent(
604
                            trialJobDetailSnapshot.status, trialJobDetailSnapshot.id, form.hyperParameters.value, trialJobDetailSnapshot);
QuanluZhang's avatar
QuanluZhang committed
605
606
607
608
609
610
611
612
613
                    } else {
                        assert(false, `undefined trialJobDetail in trialJobs: ${trialJobDetail.id}`);
                    }
                }
            }
            await delay(1000 * 5); // 5 seconds
        }
    }

Deshui Yu's avatar
Deshui Yu committed
614
615
616
617
618
619
    private storeExperimentProfile(): Promise<void> {
        this.experimentProfile.revision += 1;

        return this.dataStore.storeExperimentProfile(this.experimentProfile);
    }

620
    private async run(): Promise<void> {
QuanluZhang's avatar
QuanluZhang committed
621
        assert(this.dispatcher !== undefined);
622
623
624
625
626
627
628

        this.addEventListeners();

        this.sendInitTunerCommands();

        await Promise.all([
            this.periodicallyUpdateExecDuration(),
chicm-ms's avatar
chicm-ms committed
629
            this.pingDispatcher().catch((err: Error) => {
chicm-ms's avatar
chicm-ms committed
630
                throw NNIError.FromError(err, 'Dispatcher error: ');
chicm-ms's avatar
chicm-ms committed
631
            }),
632
            this.trainingService.run().catch((err: Error) => {
chicm-ms's avatar
chicm-ms committed
633
                throw NNIError.FromError(err, 'Training service error: ');
634
            }),
QuanluZhang's avatar
QuanluZhang committed
635
            this.manageTrials().catch((err: Error) => {
chicm-ms's avatar
chicm-ms committed
636
                throw NNIError.FromError(err, 'Job management error: ');
637
            })]);
638
639
    }

QuanluZhang's avatar
QuanluZhang committed
640
    private addEventListeners(): void {
chicm-ms's avatar
chicm-ms committed
641
        this.log.info('Add event listeners');
642
        // TO DO: cannot run this method more than once in one NNIManager instance
QuanluZhang's avatar
QuanluZhang committed
643
        if (this.dispatcher === undefined) {
644
645
            throw new Error('Error: tuner or job maintainer have not been setup');
        }
646
        this.trainingService.addTrialJobMetricListener(this.trialJobMetricListener);
647
648
649

        this.dispatcher.onCommand((commandType: string, content: string) => {
            this.onTunerCommand(commandType, content).catch((err: Error) => {
chicm-ms's avatar
chicm-ms committed
650
                this.criticalError(NNIError.FromError(err, 'Tuner command event error: '));
651
652
            });
        });
653
654
655
656
        this.dispatcher.onError((error: Error) => {
            this.log.error(`Dispatcher error: ${error.message}`);
            this.criticalError(new Error('Dispatcher stream error, tuner may have crashed.'));
        });
657
658
659
660
    }

    private sendInitTunerCommands(): void {
        if (this.dispatcher === undefined) {
661
            throw new Error('Dispatcher error: tuner has not been setup');
662
        }
chicm-ms's avatar
chicm-ms committed
663
664
665
        this.log.debug(`Send tuner command: INITIALIZE: ${this.experimentProfile.params.searchSpace}`);
        // Tuner need to be initialized with search space before generating any hyper parameters
        this.dispatcher.sendCommand(INITIALIZE, this.experimentProfile.params.searchSpace);
666
667
668
    }

    private async onTrialJobMetrics(metric: TrialJobMetric): Promise<void> {
chicm-ms's avatar
chicm-ms committed
669
        this.log.debug(`NNIManager received trial job metrics: ${metric}`);
670
671
672
673
674
675
676
        await this.dataStore.storeMetricData(metric.id, metric.data);
        if (this.dispatcher === undefined) {
            throw new Error('Error: tuner has not been setup');
        }
        this.dispatcher.sendCommand(REPORT_METRIC_DATA, metric.data);
    }

chicm-ms's avatar
chicm-ms committed
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
    private requestTrialJobs(jobNum: number): void {
        if (jobNum < 1) {
            return;
        }
        if (this.dispatcher === undefined) {
            throw new Error('Dispatcher error: tuner has not been setup');
        }
        if (this.experimentProfile.params.multiThread) {
            // Send multiple requests to ensure multiple hyper parameters are generated in non-blocking way.
            // For a single REQUEST_TRIAL_JOBS request, hyper parameters are generated one by one
            // sequentially.
            for (let i: number = 0; i < jobNum; i++) {
                this.dispatcher.sendCommand(REQUEST_TRIAL_JOBS, '1');
            }
        } else {
            this.dispatcher.sendCommand(REQUEST_TRIAL_JOBS, String(jobNum));
        }
    }

696
    private async onTunerCommand(commandType: string, content: string): Promise<void> {
horizon365's avatar
horizon365 committed
697
        this.log.info(`NNIManager received command from dispatcher: ${commandType}, ${content}`);
698
        switch (commandType) {
chicm-ms's avatar
chicm-ms committed
699
            case INITIALIZED: {
chicm-ms's avatar
chicm-ms committed
700
                // Tuner is intialized, search space is set, request tuner to generate hyper parameters
701
702
703
704
705
706
                if (this.trialDataForTuner.length > 0) {
                    if (this.dispatcher === undefined) {
                        throw new Error('Dispatcher error: tuner has not been setup');
                    }
                    this.dispatcher.sendCommand(IMPORT_DATA, this.trialDataForTuner);
                }
chicm-ms's avatar
chicm-ms committed
707
708
                this.requestTrialJobs(this.experimentProfile.params.trialConcurrency);
                break;
chicm-ms's avatar
chicm-ms committed
709
710
            }
            case NEW_TRIAL_JOB: {
QuanluZhang's avatar
QuanluZhang committed
711
                if (this.status.status === 'TUNER_NO_MORE_TRIAL') {
712
                    this.log.warning('It is not supposed to receive more trials after NO_MORE_TRIAL is set');
chicm-ms's avatar
chicm-ms committed
713
                    this.setStatus('RUNNING');
714
                }
715
716
717
718
719
720
721
722
                const form: TrialJobApplicationForm = {
                    sequenceId: this.experimentProfile.nextSequenceId++,
                    hyperParameters: {
                        value: content,
                        index: 0
                    }
                };
                this.waitingTrials.push(form);
723
                break;
chicm-ms's avatar
chicm-ms committed
724
725
            }
            case SEND_TRIAL_JOB_PARAMETER: {
chicm-ms's avatar
chicm-ms committed
726
727
728
729
730
                const tunerCommand: any = JSON.parse(content);
                assert(tunerCommand.parameter_index >= 0);
                assert(tunerCommand.trial_job_id !== undefined);

                const trialJobForm: TrialJobApplicationForm = {
731
                    sequenceId: -1,  // FIXME: multi-phase tuner should use sequence ID instead of trial job ID
chicm-ms's avatar
chicm-ms committed
732
733
734
735
736
                    hyperParameters: {
                        value: content,
                        index: tunerCommand.parameter_index
                    }
                };
chicm-ms's avatar
chicm-ms committed
737
                this.log.info(`updateTrialJob: job id: ${tunerCommand.trial_job_id}, form: ${JSON.stringify(trialJobForm)}`);
chicm-ms's avatar
chicm-ms committed
738
                await this.trainingService.updateTrialJob(tunerCommand.trial_job_id, trialJobForm);
739
740
741
742
743
                if (tunerCommand['parameters'] !== null) {
                    // parameters field is set as empty string if no more hyper parameter can be generated by tuner.
                    await this.dataStore.storeTrialJobEvent(
                        'ADD_HYPERPARAMETER', tunerCommand.trial_job_id, content, undefined);
                }
chicm-ms's avatar
chicm-ms committed
744
                break;
chicm-ms's avatar
chicm-ms committed
745
746
            }
            case NO_MORE_TRIAL_JOBS: {
747
748
749
                if (!['ERROR', 'STOPPING', 'STOPPED'].includes(this.status.status)) {
                    this.setStatus('TUNER_NO_MORE_TRIAL');
                }
750
                break;
chicm-ms's avatar
chicm-ms committed
751
752
            }
            case KILL_TRIAL_JOB: {
chicm-ms's avatar
chicm-ms committed
753
                this.log.info(`cancelTrialJob: ${JSON.parse(content)}`);
QuanluZhang's avatar
QuanluZhang committed
754
                await this.trainingService.cancelTrialJob(JSON.parse(content), true);
755
                break;
chicm-ms's avatar
chicm-ms committed
756
            }
757
758
759
            default:
                throw new Error('Error: unsupported command type from tuner');
        }
Deshui Yu's avatar
Deshui Yu committed
760
761
    }

762
763
764
765
766
767
768
769
770
771
    private criticalError(err: Error): void {
        this.logError(err);
        console.error(err);
    }

    private logError(err: Error): void {
        if (err.stack !== undefined) {
            this.log.error(err.stack);
        }
        this.status.errors.push(err.message);
chicm-ms's avatar
chicm-ms committed
772
773
774
775
776
777
778
779
        this.setStatus('ERROR');
    }

    private setStatus(status: ExperimentStatus): void {
        if (status !== this.status.status) {
            this.log.info(`Change NNIManager status from: ${this.status.status} to: ${status}`);
            this.status.status = status;
        }
780
781
782
783
784
785
786
    }

    private createEmptyExperimentProfile(): ExperimentProfile {
        return {
            id: getExperimentId(),
            revision: 0,
            execDuration: 0,
787
            logDir: getExperimentRootDir(),
788
            nextSequenceId: 0,
789
790
791
792
793
794
            params: {
                authorName: '',
                experimentName: '',
                trialConcurrency: 0,
                maxExecDuration: 0, // unit: second
                maxTrialNum: 0, // maxTrialNum includes all the submitted trial jobs
795
                trainingServicePlatform: '',
QuanluZhang's avatar
QuanluZhang committed
796
                searchSpace: ''
797
798
            }
        };
Deshui Yu's avatar
Deshui Yu committed
799
    }
800

QuanluZhang's avatar
QuanluZhang committed
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
    private async createCheckpointDir(): Promise<string> {
        // TODO: test
        const chkpDir: string = getCheckpointDir();
        // create checkpoint directory
        await mkDirP(chkpDir);
        // assign this directory to exp profile's checkpointDir
        if (this.experimentProfile.params.advisor) {
            this.experimentProfile.params.advisor.checkpointDir = chkpDir;
        }
        if (this.experimentProfile.params.tuner) {
            this.experimentProfile.params.tuner.checkpointDir = chkpDir;
        }
        if (this.experimentProfile.params.assessor) {
            this.experimentProfile.params.assessor.checkpointDir = chkpDir;
        }

        return Promise.resolve(chkpDir);
    }
Deshui Yu's avatar
Deshui Yu committed
819
820
821
}

export { NNIManager };