nnimanager.ts 34.6 KB
Newer Older
liuzhe-lz's avatar
liuzhe-lz committed
1
2
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
Deshui Yu's avatar
Deshui Yu committed
3
4
5
6
7

'use strict';

import * as assert from 'assert';
import * as cpp from 'child-process-promise';
goooxu's avatar
goooxu committed
8
import { ChildProcess, spawn, StdioOptions } from 'child_process';
Deshui Yu's avatar
Deshui Yu committed
9
10
11
import { Deferred } from 'ts-deferred';
import * as component from '../common/component';
import { DataStore, MetricDataRecord, MetricType, TrialJobInfo } from '../common/datastore';
12
import { NNIError } from '../common/errors';
13
import { getExperimentId } from '../common/experimentStartupInfo';
Deshui Yu's avatar
Deshui Yu committed
14
15
import { getLogger, Logger } from '../common/log';
import {
chicm-ms's avatar
chicm-ms committed
16
    ExperimentParams, ExperimentProfile, Manager, ExperimentStatus,
17
    NNIManagerStatus, ProfileUpdateType, TrialJobStatistics
Deshui Yu's avatar
Deshui Yu committed
18
19
20
21
} from '../common/manager';
import {
    TrainingService, TrialJobApplicationForm, TrialJobDetail, TrialJobMetric, TrialJobStatus
} from '../common/trainingService';
22
import { delay, getCheckpointDir, getExperimentRootDir, getLogDir, getMsgDispatcherCommand, mkDirP, getTunerProc, getLogLevel, isAlive, killPid } from '../common/utils';
Deshui Yu's avatar
Deshui Yu committed
23
import {
chicm-ms's avatar
chicm-ms committed
24
    ADD_CUSTOMIZED_TRIAL_JOB, INITIALIZE, INITIALIZED, KILL_TRIAL_JOB, NEW_TRIAL_JOB, NO_MORE_TRIAL_JOBS, PING,
25
    REPORT_METRIC_DATA, REQUEST_TRIAL_JOBS, SEND_TRIAL_JOB_PARAMETER, TERMINATE, TRIAL_END, UPDATE_SEARCH_SPACE, IMPORT_DATA
Deshui Yu's avatar
Deshui Yu committed
26
} from './commands';
27
import { createDispatcherInterface, IpcInterface } from './ipcInterface';
Deshui Yu's avatar
Deshui Yu committed
28
29

/**
chicm-ms's avatar
chicm-ms committed
30
 * NNIManager which implements Manager interface
Deshui Yu's avatar
Deshui Yu committed
31
32
33
 */
class NNIManager implements Manager {
    private trainingService: TrainingService;
34
    private dispatcher: IpcInterface | undefined;
35
    private currSubmittedTrialNum: number;  // need to be recovered
QuanluZhang's avatar
QuanluZhang committed
36
    private trialConcurrencyChange: number; // >0: increase, <0: decrease
Deshui Yu's avatar
Deshui Yu committed
37
38
39
    private log: Logger;
    private dataStore: DataStore;
    private experimentProfile: ExperimentProfile;
40
    private dispatcherPid: number;
41
    private status: NNIManagerStatus;
42
    private waitingTrials: TrialJobApplicationForm[];
QuanluZhang's avatar
QuanluZhang committed
43
    private trialJobs: Map<string, TrialJobDetail>;
44
    private trialDataForTuner: string;
SparkSnail's avatar
SparkSnail committed
45
    private readonly: boolean;
46

47
    private trialJobMetricListener: (metric: TrialJobMetric) => void;
48

Deshui Yu's avatar
Deshui Yu committed
49
50
    constructor() {
        this.currSubmittedTrialNum = 0;
QuanluZhang's avatar
QuanluZhang committed
51
        this.trialConcurrencyChange = 0;
Deshui Yu's avatar
Deshui Yu committed
52
53
        this.trainingService = component.get(TrainingService);
        assert(this.trainingService);
54
        this.dispatcherPid = 0;
QuanluZhang's avatar
QuanluZhang committed
55
56
        this.waitingTrials = [];
        this.trialJobs = new Map<string, TrialJobDetail>();
57
        this.trialDataForTuner = '';
SparkSnail's avatar
SparkSnail committed
58
        this.readonly = false;
Deshui Yu's avatar
Deshui Yu committed
59
60
61

        this.log = getLogger();
        this.dataStore = component.get(DataStore);
62
63
64
65
        this.experimentProfile = this.createEmptyExperimentProfile();
        this.status = {
            status: 'INITIALIZED',
            errors: []
Deshui Yu's avatar
Deshui Yu committed
66
        };
67
68
69
70
71
        this.trialJobMetricListener = (metric: TrialJobMetric) => {
            this.onTrialJobMetrics(metric).catch((err: Error) => {
                this.criticalError(NNIError.FromError(err, 'Job metrics error: '));
            });
        };
Deshui Yu's avatar
Deshui Yu committed
72
73
74
    }

    public updateExperimentProfile(experimentProfile: ExperimentProfile, updateType: ProfileUpdateType): Promise<void> {
SparkSnail's avatar
SparkSnail committed
75
76
77
        if (this.readonly) {
            return Promise.reject(new Error('Error: can not update experiment profile in readonly mode!'));
        }
Deshui Yu's avatar
Deshui Yu committed
78
79
80
81
82
83
84
85
86
87
        switch (updateType) {
            case 'TRIAL_CONCURRENCY':
                this.updateTrialConcurrency(experimentProfile.params.trialConcurrency);
                break;
            case 'MAX_EXEC_DURATION':
                this.updateMaxExecDuration(experimentProfile.params.maxExecDuration);
                break;
            case 'SEARCH_SPACE':
                this.updateSearchSpace(experimentProfile.params.searchSpace);
                break;
QuanluZhang's avatar
QuanluZhang committed
88
89
90
            case 'MAX_TRIAL_NUM':
                this.updateMaxTrialNum(experimentProfile.params.maxTrialNum);
                break;
Deshui Yu's avatar
Deshui Yu committed
91
92
93
94
95
96
97
            default:
                throw new Error('Error: unrecognized updateType');
        }

        return this.storeExperimentProfile();
    }

98
    public importData(data: string): Promise<void> {
SparkSnail's avatar
SparkSnail committed
99
100
101
        if (this.readonly) {
            return Promise.reject(new Error('Error: can not import data in readonly mode!'));
        }
102
103
104
105
106
107
108
109
110
111
        if (this.dispatcher === undefined) {
            return Promise.reject(
                new Error('tuner has not been setup')
            );
        }
        this.dispatcher.sendCommand(IMPORT_DATA, data);

        return this.dataStore.storeTrialJobEvent('IMPORT_DATA', '', data);
    }

112
113
114
115
    public async exportData(): Promise<string> {
        return this.dataStore.exportTrialHpConfigs();
    }

116
    public addCustomizedTrialJob(hyperParams: string): Promise<number> {
SparkSnail's avatar
SparkSnail committed
117
118
119
        if (this.readonly) {
            return Promise.reject(new Error('Error: can not add customized trial job in readonly mode!'));
        }
Deshui Yu's avatar
Deshui Yu committed
120
        if (this.currSubmittedTrialNum >= this.experimentProfile.params.maxTrialNum) {
121
            return Promise.reject(new Error('reach maxTrialNum'));
Deshui Yu's avatar
Deshui Yu committed
122
        }
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138

        // TODO: NNI manager should not peek tuner's internal protocol, let's refactor this later
        const packedParameter = {
            parameter_id: null,
            parameter_source: 'customized',
            parameters: JSON.parse(hyperParams)
        }

        const form: TrialJobApplicationForm = {
            sequenceId: this.experimentProfile.nextSequenceId++,
            hyperParameters: {
                value: JSON.stringify(packedParameter),
                index: 0
            }
        };
        this.waitingTrials.push(form);
Deshui Yu's avatar
Deshui Yu committed
139
140

        // trial id has not been generated yet, thus use '' instead
141
142
143
        this.dataStore.storeTrialJobEvent('ADD_CUSTOMIZED', '', hyperParams);

        return Promise.resolve(form.sequenceId);
Deshui Yu's avatar
Deshui Yu committed
144
145
146
    }

    public async cancelTrialJobByUser(trialJobId: string): Promise<void> {
SparkSnail's avatar
SparkSnail committed
147
148
149
        if (this.readonly) {
            return Promise.reject(new Error('Error: can not cancel trial job in readonly mode!'));
        }
chicm-ms's avatar
chicm-ms committed
150
        this.log.info(`User cancelTrialJob: ${trialJobId}`);
Deshui Yu's avatar
Deshui Yu committed
151
152
153
154
155
        await this.trainingService.cancelTrialJob(trialJobId);
        await this.dataStore.storeTrialJobEvent('USER_TO_CANCEL', trialJobId, '');
    }

    public async startExperiment(expParams: ExperimentParams): Promise<string> {
chicm-ms's avatar
chicm-ms committed
156
        this.log.info(`Starting experiment: ${this.experimentProfile.id}`);
Deshui Yu's avatar
Deshui Yu committed
157
158
159
        this.experimentProfile.params = expParams;
        await this.storeExperimentProfile();
        this.log.debug('Setup tuner...');
160

161
        // Set up multiphase config
162
        if (expParams.multiPhase && this.trainingService.isMultiPhaseJobSupported) {
163
164
            this.trainingService.setClusterMetadata('multiPhase', expParams.multiPhase.toString());
        }
165
166
167
168
        // Set up versionCheck config
        if (expParams.versionCheck !== undefined) {
            this.trainingService.setClusterMetadata('version_check', expParams.versionCheck.toString());
        }
SparkSnail's avatar
SparkSnail committed
169
170
171
172
        // Set up logCollection config
        if (expParams.logCollection !== undefined) {
            this.trainingService.setClusterMetadata('log_collection', expParams.logCollection.toString());
        }
173

QuanluZhang's avatar
QuanluZhang committed
174
        const dispatcherCommand: string = getMsgDispatcherCommand(expParams.tuner, expParams.assessor, expParams.advisor,
goooxu's avatar
goooxu committed
175
            expParams.multiPhase, expParams.multiThread);
176
        this.log.debug(`dispatcher command: ${dispatcherCommand}`);
QuanluZhang's avatar
QuanluZhang committed
177
        const checkpointDir: string = await this.createCheckpointDir();
Deshui Yu's avatar
Deshui Yu committed
178
        this.setupTuner(
179
180
            dispatcherCommand,
            undefined,
Deshui Yu's avatar
Deshui Yu committed
181
            'start',
QuanluZhang's avatar
QuanluZhang committed
182
            checkpointDir);
Deshui Yu's avatar
Deshui Yu committed
183

184
        this.experimentProfile.startTime = Date.now();
chicm-ms's avatar
chicm-ms committed
185
        this.setStatus('RUNNING');
Deshui Yu's avatar
Deshui Yu committed
186
        await this.storeExperimentProfile();
187
188
        this.run().catch((err: Error) => {
            this.criticalError(err);
Deshui Yu's avatar
Deshui Yu committed
189
        });
190

Deshui Yu's avatar
Deshui Yu committed
191
192
193
        return this.experimentProfile.id;
    }

SparkSnail's avatar
SparkSnail committed
194
    public async resumeExperiment(readonly: boolean): Promise<void> {
chicm-ms's avatar
chicm-ms committed
195
        this.log.info(`Resuming experiment: ${this.experimentProfile.id}`);
Deshui Yu's avatar
Deshui Yu committed
196
197
198
        //Fetch back the experiment profile
        const experimentId: string = getExperimentId();
        this.experimentProfile = await this.dataStore.getExperimentProfile(experimentId);
SparkSnail's avatar
SparkSnail committed
199
200
201
202
        this.readonly = readonly;
        if (readonly) {
            return Promise.resolve();
        }
Deshui Yu's avatar
Deshui Yu committed
203
        const expParams: ExperimentParams = this.experimentProfile.params;
204

205
        // Set up multiphase config
206
        if (expParams.multiPhase && this.trainingService.isMultiPhaseJobSupported) {
207
208
209
            this.trainingService.setClusterMetadata('multiPhase', expParams.multiPhase.toString());
        }

210
211
        // Set up versionCheck config
        if (expParams.versionCheck !== undefined) {
SparkSnail's avatar
SparkSnail committed
212
            this.trainingService.setClusterMetadata('version_check', expParams.versionCheck.toString());
213
214
        }

QuanluZhang's avatar
QuanluZhang committed
215
        const dispatcherCommand: string = getMsgDispatcherCommand(expParams.tuner, expParams.assessor, expParams.advisor,
goooxu's avatar
goooxu committed
216
            expParams.multiPhase, expParams.multiThread);
217
        this.log.debug(`dispatcher command: ${dispatcherCommand}`);
QuanluZhang's avatar
QuanluZhang committed
218
        const checkpointDir: string = await this.createCheckpointDir();
Deshui Yu's avatar
Deshui Yu committed
219
        this.setupTuner(
220
221
            dispatcherCommand,
            undefined,
Deshui Yu's avatar
Deshui Yu committed
222
            'resume',
QuanluZhang's avatar
QuanluZhang committed
223
            checkpointDir);
Deshui Yu's avatar
Deshui Yu committed
224
225
226
227
228
229
230
231
232
233
234

        const allTrialJobs: TrialJobInfo[] = await this.dataStore.listTrialJobs();

        // Resume currSubmittedTrialNum
        this.currSubmittedTrialNum = allTrialJobs.length;

        // Check the final status for WAITING and RUNNING jobs
        await Promise.all(allTrialJobs
            .filter((job: TrialJobInfo) => job.status === 'WAITING' || job.status === 'RUNNING')
            .map((job: TrialJobInfo) => this.dataStore.storeTrialJobEvent('FAILED', job.id)));

235
236
237
238
239
240
241
242
243
244
        // Collect generated trials and imported trials
        const finishedTrialData: string = await this.exportData();
        const importedData: string[] = await this.dataStore.getImportedData();
        let trialData: Object[] = JSON.parse(finishedTrialData);
        for (const oneImportedData of importedData) {
            // do not deduplicate
            trialData = trialData.concat(<Object[]>JSON.parse(oneImportedData));
        }
        this.trialDataForTuner = JSON.stringify(trialData);

chicm-ms's avatar
chicm-ms committed
245
246
247
248
249
        if (this.experimentProfile.execDuration < this.experimentProfile.params.maxExecDuration &&
            this.currSubmittedTrialNum < this.experimentProfile.params.maxTrialNum &&
            this.experimentProfile.endTime) {
            delete this.experimentProfile.endTime;
        }
chicm-ms's avatar
chicm-ms committed
250
        this.setStatus('RUNNING');
251

Deshui Yu's avatar
Deshui Yu committed
252
        // TO DO: update database record for resume event
253
254
255
        this.run().catch((err: Error) => {
            this.criticalError(err);
        });
Deshui Yu's avatar
Deshui Yu committed
256
257
    }

258
259
    public getTrialJob(trialJobId: string): Promise<TrialJobInfo> {
        return this.dataStore.getTrialJob(trialJobId);
Deshui Yu's avatar
Deshui Yu committed
260
261
262
    }

    public async setClusterMetadata(key: string, value: string): Promise<void> {
SparkSnail's avatar
SparkSnail committed
263
264
265
        if (this.readonly) {
            return Promise.reject(new Error('Error: can not set cluster metadata in readonly mode!'));
        }
chicm-ms's avatar
chicm-ms committed
266
        this.log.info(`NNIManager setClusterMetadata, key: ${key}, value: ${value}`);
Deshui Yu's avatar
Deshui Yu committed
267
268
269
270
        let timeoutId: NodeJS.Timer;
        // TO DO: move timeout value to constants file
        const delay1: Promise<{}> = new Promise((resolve: Function, reject: Function): void => {
            timeoutId = setTimeout(
271
                () => { reject(new Error('TrainingService setClusterMetadata timeout. Please check your config file.')); },
Deshui Yu's avatar
Deshui Yu committed
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
                10000);
        });
        await Promise.race([delay1, this.trainingService.setClusterMetadata(key, value)]).finally(() => {
            clearTimeout(timeoutId);
        });
    }

    public getClusterMetadata(key: string): Promise<string> {
        return Promise.resolve(
            this.trainingService.getClusterMetadata(key)
        );
    }

    public async getTrialJobStatistics(): Promise<TrialJobStatistics[]> {
        return this.dataStore.getTrialJobStatistics();
    }

289
    public async stopExperiment(): Promise<void> {
chicm-ms's avatar
chicm-ms committed
290
291
        this.setStatus('STOPPING');
        this.log.info('Stopping experiment, cleaning up ...');
292
        await this.experimentDoneCleanUp();
chicm-ms's avatar
chicm-ms committed
293
        this.log.info('Experiment stopped.');
Deshui Yu's avatar
Deshui Yu committed
294
295
    }

296
    public async getMetricData(trialJobId?: string, metricType?: MetricType): Promise<MetricDataRecord[]> {
Deshui Yu's avatar
Deshui Yu committed
297
298
299
        return this.dataStore.getMetricData(trialJobId, metricType);
    }

300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
    public async getMetricDataByRange(minSeqId: number, maxSeqId: number): Promise<MetricDataRecord[]> {
        const trialJobs = await this.dataStore.listTrialJobs();
        const targetTrials = trialJobs.filter(trial => (
            // FIXME: can this be undefined?
            trial.sequenceId !== undefined && minSeqId <= trial.sequenceId && trial.sequenceId <= maxSeqId
        ));
        const targetTrialIds = new Set(targetTrials.map(trial => trial.id));

        const allMetrics = await this.dataStore.getMetricData();
        return allMetrics.filter(metric => targetTrialIds.has(metric.trialJobId));
    }

    public async getLatestMetricData(): Promise<MetricDataRecord[]> {
        // FIXME: this can take a long time
        const allMetrics: MetricDataRecord[] = await this.dataStore.getMetricData();
        const finals: MetricDataRecord[] = [];
        const latestIntermediates: Map<string, MetricDataRecord> = new Map<string, MetricDataRecord>();
        for (const metric of allMetrics) {
            if (metric.type !== 'PERIODICAL') {
                finals.push(metric);
            } else {
                const old: MetricDataRecord | undefined = latestIntermediates.get(metric.trialJobId);
                if (old === undefined || old.sequence <= metric.sequence) {
                    latestIntermediates.set(metric.trialJobId, metric);
                }
            }
        }
        return finals.concat(Array.from(latestIntermediates.values()));
        // FIXME: unit test
    }

Deshui Yu's avatar
Deshui Yu committed
331
332
333
334
335
336
337
338
    public getExperimentProfile(): Promise<ExperimentProfile> {
        // TO DO: using Promise.resolve()
        const deferred: Deferred<ExperimentProfile> = new Deferred<ExperimentProfile>();
        deferred.resolve(this.experimentProfile);

        return deferred.promise;
    }

339
340
341
342
    public getStatus(): NNIManagerStatus {
        return this.status;
    }

Deshui Yu's avatar
Deshui Yu committed
343
344
345
346
    public async listTrialJobs(status?: TrialJobStatus): Promise<TrialJobInfo[]> {
        return this.dataStore.listTrialJobs(status);
    }

347
348
    private setupTuner(command: string, cwd: string | undefined, mode: 'start' | 'resume', dataDirectory: string): void {
        if (this.dispatcher !== undefined) {
Deshui Yu's avatar
Deshui Yu committed
349
350
            return;
        }
goooxu's avatar
goooxu committed
351
        const stdio: StdioOptions = ['ignore', process.stdout, process.stderr, 'pipe', 'pipe'];
Deshui Yu's avatar
Deshui Yu committed
352
353
354
355
356
357
358
        let newCwd: string;
        if (cwd === undefined || cwd === '') {
            newCwd = getLogDir();
        } else {
            newCwd = cwd;
        }
        // TO DO: add CUDA_VISIBLE_DEVICES
359
360
361
362
363
        let includeIntermediateResultsEnv: boolean | undefined = false;
        if (this.experimentProfile.params.tuner !== undefined) {
            includeIntermediateResultsEnv = this.experimentProfile.params.tuner.includeIntermediateResults;
        }

Zejun Lin's avatar
Zejun Lin committed
364
365
366
        let nniEnv = {
            NNI_MODE: mode,
            NNI_CHECKPOINT_DIRECTORY: dataDirectory,
367
            NNI_LOG_DIRECTORY: getLogDir(),
368
            NNI_LOG_LEVEL: getLogLevel(),
369
370
            NNI_INCLUDE_INTERMEDIATE_RESULTS: includeIntermediateResultsEnv,
            CUDA_VISIBLE_DEVICES: this.getGpuEnvvarValue()
Zejun Lin's avatar
Zejun Lin committed
371
372
        };
        let newEnv = Object.assign({}, process.env, nniEnv);
373
        const tunerProc: ChildProcess = getTunerProc(command,stdio,newCwd,newEnv);
374
375
        this.dispatcherPid = tunerProc.pid;
        this.dispatcher = createDispatcherInterface(tunerProc);
Deshui Yu's avatar
Deshui Yu committed
376
377
378
379

        return;
    }

380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
    private getGpuEnvvarValue(): string {
        let cudaDevices: string | undefined;

        if (this.experimentProfile.params.advisor !== undefined) {
            cudaDevices = this.experimentProfile.params.advisor.gpuIndices;
        } else if (this.experimentProfile.params.tuner !== undefined) {
            cudaDevices = this.experimentProfile.params.tuner.gpuIndices;
        }

        if (cudaDevices === undefined) {
            return '';
        } else {
            return cudaDevices;
        }
    }

Deshui Yu's avatar
Deshui Yu committed
396
    private updateTrialConcurrency(trialConcurrency: number): void {
QuanluZhang's avatar
QuanluZhang committed
397
398
        // we assume trialConcurrency >= 0, which is checked by restserver
        this.trialConcurrencyChange += (trialConcurrency - this.experimentProfile.params.trialConcurrency);
Deshui Yu's avatar
Deshui Yu committed
399
400
401
402
403
404
405
406
407
408
409
410
        this.experimentProfile.params.trialConcurrency = trialConcurrency;

        return;
    }

    private updateMaxExecDuration(duration: number): void {
        this.experimentProfile.params.maxExecDuration = duration;

        return;
    }

    private updateSearchSpace(searchSpace: string): void {
411
        if (this.dispatcher === undefined) {
Deshui Yu's avatar
Deshui Yu committed
412
413
            throw new Error('Error: tuner has not been setup');
        }
414
        this.dispatcher.sendCommand(UPDATE_SEARCH_SPACE, searchSpace);
Deshui Yu's avatar
Deshui Yu committed
415
416
417
418
419
        this.experimentProfile.params.searchSpace = searchSpace;

        return;
    }

QuanluZhang's avatar
QuanluZhang committed
420
421
422
423
424
425
    private updateMaxTrialNum(maxTrialNum: number): void {
        this.experimentProfile.params.maxTrialNum = maxTrialNum;

        return;
    }

Deshui Yu's avatar
Deshui Yu committed
426
    private async experimentDoneCleanUp(): Promise<void> {
427
        if (this.dispatcher === undefined) {
Deshui Yu's avatar
Deshui Yu committed
428
429
            throw new Error('Error: tuner has not been setup');
        }
430
        this.trainingService.removeTrialJobMetricListener(this.trialJobMetricListener);
431
        this.dispatcher.sendCommand(TERMINATE);
Deshui Yu's avatar
Deshui Yu committed
432
433
434
        let tunerAlive: boolean = true;
        // gracefully terminate tuner and assessor here, wait at most 30 seconds.
        for (let i: number = 0; i < 30; i++) {
435
            if (!tunerAlive) { break; }
436
            tunerAlive = await isAlive(this.dispatcherPid);
Deshui Yu's avatar
Deshui Yu committed
437
438
            await delay(1000);
        }
439
        await killPid(this.dispatcherPid);
Deshui Yu's avatar
Deshui Yu committed
440
441
442
443
444
445
        const trialJobList: TrialJobDetail[] = await this.trainingService.listTrialJobs();
        // TO DO: to promise all
        for (const trialJob of trialJobList) {
            if (trialJob.status === 'RUNNING' ||
                trialJob.status === 'WAITING') {
                try {
chicm-ms's avatar
chicm-ms committed
446
                    this.log.info(`cancelTrialJob: ${trialJob.id}`);
Deshui Yu's avatar
Deshui Yu committed
447
448
449
450
451
452
453
                    await this.trainingService.cancelTrialJob(trialJob.id);
                } catch (error) {
                    // pid does not exist, do nothing here
                }
            }
        }
        await this.trainingService.cleanUp();
454
        this.experimentProfile.endTime = Date.now();
Deshui Yu's avatar
Deshui Yu committed
455
        await this.storeExperimentProfile();
chicm-ms's avatar
chicm-ms committed
456
        this.setStatus('STOPPED');
Deshui Yu's avatar
Deshui Yu committed
457
458
459
    }

    private async periodicallyUpdateExecDuration(): Promise<void> {
460
        let count: number = 1;
461
        while (!['ERROR', 'STOPPING', 'STOPPED'].includes(this.status.status)) {
462
            await delay(1000 * 1); // 1 seconds
463
            if (this.status.status === 'RUNNING') {
464
465
466
467
468
469
                this.experimentProfile.execDuration += 1;
                if (count % 10 === 0) {
                    await this.storeExperimentProfile();
                }
            }
            count += 1;
Deshui Yu's avatar
Deshui Yu committed
470
471
472
        }
    }

chicm-ms's avatar
chicm-ms committed
473
474
475
476
477
478
    private async pingDispatcher(): Promise<void> {
        if (this.dispatcher === undefined) {
            throw new Error('Error: tuner has not been setup');
        }
        while (!['ERROR', 'STOPPING', 'STOPPED'].includes(this.status.status)) {
            this.dispatcher.sendCommand(PING);
chicm-ms's avatar
chicm-ms committed
479
            await delay(1000 * 5);
chicm-ms's avatar
chicm-ms committed
480
481
482
        }
    }

QuanluZhang's avatar
QuanluZhang committed
483
484
    private async requestTrialJobsStatus(): Promise<number> {
        let finishedTrialJobNum: number = 0;
QuanluZhang's avatar
QuanluZhang committed
485
486
487
        if (this.dispatcher === undefined) {
            throw new Error('Error: tuner has not been setup');
        }
QuanluZhang's avatar
QuanluZhang committed
488
489
490
491
        for (const trialJobId of Array.from(this.trialJobs.keys())) {
            const trialJobDetail: TrialJobDetail = await this.trainingService.getTrialJob(trialJobId);
            const oldTrialJobDetail: TrialJobDetail | undefined = this.trialJobs.get(trialJobId);
            if (oldTrialJobDetail !== undefined && oldTrialJobDetail.status !== trialJobDetail.status) {
chicm-ms's avatar
chicm-ms committed
492
                this.log.info(`Trial job ${trialJobDetail.id} status changed from ${oldTrialJobDetail.status} to ${trialJobDetail.status}`);
QuanluZhang's avatar
QuanluZhang committed
493
                this.trialJobs.set(trialJobId, Object.assign({}, trialJobDetail));
494
                await this.dataStore.storeTrialJobEvent(trialJobDetail.status, trialJobDetail.id, undefined, trialJobDetail);
QuanluZhang's avatar
QuanluZhang committed
495
            }
QuanluZhang's avatar
QuanluZhang committed
496
            let hyperParams: string | undefined = undefined;
QuanluZhang's avatar
QuanluZhang committed
497
498
499
            switch (trialJobDetail.status) {
                case 'SUCCEEDED':
                case 'USER_CANCELED':
QuanluZhang's avatar
QuanluZhang committed
500
                case 'EARLY_STOPPED':
QuanluZhang's avatar
QuanluZhang committed
501
502
                    this.trialJobs.delete(trialJobId);
                    finishedTrialJobNum++;
503
                    hyperParams = trialJobDetail.form.hyperParameters.value;
QuanluZhang's avatar
QuanluZhang committed
504
505
506
                    this.dispatcher.sendCommand(TRIAL_END, JSON.stringify({
                        trial_job_id: trialJobDetail.id,
                        event: trialJobDetail.status,
goooxu's avatar
goooxu committed
507
508
                        hyper_params: hyperParams
                    }));
QuanluZhang's avatar
QuanluZhang committed
509
510
511
512
513
514
515
                    break;
                case 'FAILED':
                case 'SYS_CANCELED':
                    // In the current version, we do not retry
                    // TO DO: push this job to queue for retry
                    this.trialJobs.delete(trialJobId);
                    finishedTrialJobNum++;
516
                    hyperParams = trialJobDetail.form.hyperParameters.value;
QuanluZhang's avatar
QuanluZhang committed
517
518
519
                    this.dispatcher.sendCommand(TRIAL_END, JSON.stringify({
                        trial_job_id: trialJobDetail.id,
                        event: trialJobDetail.status,
goooxu's avatar
goooxu committed
520
521
                        hyper_params: hyperParams
                    }));
QuanluZhang's avatar
QuanluZhang committed
522
523
524
525
526
527
528
529
530
531
                    break;
                case 'WAITING':
                case 'RUNNING':
                case 'UNKNOWN':
                    // Do nothing
                    break;
                default:
                // TO DO: add warning in log
            }
        }
goooxu's avatar
goooxu committed
532

Gems Guo's avatar
Gems Guo committed
533
        return finishedTrialJobNum;
QuanluZhang's avatar
QuanluZhang committed
534
535
536
537
538
539
    }

    private async manageTrials(): Promise<void> {
        if (this.dispatcher === undefined) {
            throw new Error('Error: tuner has not been setup');
        }
QuanluZhang's avatar
QuanluZhang committed
540
        let allFinishedTrialJobNum: number = this.currSubmittedTrialNum;
QuanluZhang's avatar
QuanluZhang committed
541
        let waitSubmittedToFinish: number;
542
        while (!['ERROR', 'STOPPING', 'STOPPED'].includes(this.status.status)) {
QuanluZhang's avatar
QuanluZhang committed
543
544
545
546
547
548
549
            const finishedTrialJobNum: number = await this.requestTrialJobsStatus();
            allFinishedTrialJobNum += finishedTrialJobNum;

            // requestTrialNum is the number of trials that will be requested from tuner.
            // If trialConcurrency does not change, requestTrialNum equals finishedTrialJobNum.
            // If trialConcurrency changes, for example, trialConcurrency increases by 2 (trialConcurrencyChange=2), then
            // requestTrialNum equals 2 + finishedTrialJobNum and trialConcurrencyChange becomes 0.
550
            // If trialConcurrency changes, for example, trialConcurrency decreases by 4 (trialConcurrencyChange=-4) and
QuanluZhang's avatar
QuanluZhang committed
551
552
553
554
555
556
557
558
            // finishedTrialJobNum is 2, then requestTrialNum becomes -2. No trial will be requested from tuner,
            // and trialConcurrencyChange becomes -2.
            const requestTrialNum: number = this.trialConcurrencyChange + finishedTrialJobNum;
            if (requestTrialNum >= 0) {
                this.trialConcurrencyChange = 0;
            } else {
                this.trialConcurrencyChange = requestTrialNum;
            }
chicm-ms's avatar
chicm-ms committed
559

560
            this.requestTrialJobs(requestTrialNum);
chicm-ms's avatar
chicm-ms committed
561

QuanluZhang's avatar
QuanluZhang committed
562
            // check maxtrialnum and maxduration here
563
            // NO_MORE_TRIAL is more like a subset of RUNNING, because during RUNNING tuner
564
            // might tell nnimanager that this is no more trials. In NO_MORE_TRIAL state, the experiment is viewed
565
566
            // as still running. DONE could be transfered from RUNNING or NO_MORE_TRIAL.
            assert(this.status.status === 'RUNNING' ||
567
                this.status.status === 'DONE' ||
QuanluZhang's avatar
QuanluZhang committed
568
569
                this.status.status === 'NO_MORE_TRIAL' ||
                this.status.status === 'TUNER_NO_MORE_TRIAL');
570
            if (this.experimentProfile.execDuration > this.experimentProfile.params.maxExecDuration ||
QuanluZhang's avatar
QuanluZhang committed
571
                this.currSubmittedTrialNum >= this.experimentProfile.params.maxTrialNum) {
QuanluZhang's avatar
QuanluZhang committed
572
                if (this.status.status !== 'DONE') {
chicm-ms's avatar
chicm-ms committed
573
                    this.setStatus('NO_MORE_TRIAL');
QuanluZhang's avatar
QuanluZhang committed
574
575
576
577
                    waitSubmittedToFinish = this.currSubmittedTrialNum;

                    assert(allFinishedTrialJobNum <= waitSubmittedToFinish);
                    if (allFinishedTrialJobNum >= waitSubmittedToFinish) {
chicm-ms's avatar
chicm-ms committed
578
                        this.setStatus('DONE');
QuanluZhang's avatar
QuanluZhang committed
579
580
581
582
583
                        this.experimentProfile.endTime = Date.now();
                        await this.storeExperimentProfile();
                        // write this log for travis CI
                        this.log.info('Experiment done.');
                    }
QuanluZhang's avatar
QuanluZhang committed
584
585
586
                }
            } else {
                if (this.status.status === 'DONE') {
587
588
                    delete this.experimentProfile.endTime;
                    await this.storeExperimentProfile();
QuanluZhang's avatar
QuanluZhang committed
589
                }
QuanluZhang's avatar
QuanluZhang committed
590
                if (this.status.status !== 'TUNER_NO_MORE_TRIAL') {
chicm-ms's avatar
chicm-ms committed
591
                    this.setStatus('RUNNING');
592
                }
QuanluZhang's avatar
QuanluZhang committed
593
594
595
596
597
                for (let i: number = this.trialJobs.size; i < this.experimentProfile.params.trialConcurrency; i++) {
                    if (this.waitingTrials.length === 0 ||
                        this.currSubmittedTrialNum >= this.experimentProfile.params.maxTrialNum) {
                        break;
                    }
598
                    const form = this.waitingTrials.shift() as TrialJobApplicationForm;
QuanluZhang's avatar
QuanluZhang committed
599
                    this.currSubmittedTrialNum++;
600
601
                    this.log.info(`submitTrialJob: form: ${JSON.stringify(form)}`);
                    const trialJobDetail: TrialJobDetail = await this.trainingService.submitTrialJob(form);
602
                    await this.storeExperimentProfile();
QuanluZhang's avatar
QuanluZhang committed
603
604
605
606
                    this.trialJobs.set(trialJobDetail.id, Object.assign({}, trialJobDetail));
                    const trialJobDetailSnapshot: TrialJobDetail | undefined = this.trialJobs.get(trialJobDetail.id);
                    if (trialJobDetailSnapshot != undefined) {
                        await this.dataStore.storeTrialJobEvent(
607
                            trialJobDetailSnapshot.status, trialJobDetailSnapshot.id, form.hyperParameters.value, trialJobDetailSnapshot);
QuanluZhang's avatar
QuanluZhang committed
608
609
610
611
612
613
614
615
616
                    } else {
                        assert(false, `undefined trialJobDetail in trialJobs: ${trialJobDetail.id}`);
                    }
                }
            }
            await delay(1000 * 5); // 5 seconds
        }
    }

Deshui Yu's avatar
Deshui Yu committed
617
618
619
620
621
622
    private storeExperimentProfile(): Promise<void> {
        this.experimentProfile.revision += 1;

        return this.dataStore.storeExperimentProfile(this.experimentProfile);
    }

623
    private async run(): Promise<void> {
QuanluZhang's avatar
QuanluZhang committed
624
        assert(this.dispatcher !== undefined);
625
626
627
628
629
630
631

        this.addEventListeners();

        this.sendInitTunerCommands();

        await Promise.all([
            this.periodicallyUpdateExecDuration(),
chicm-ms's avatar
chicm-ms committed
632
            this.pingDispatcher().catch((err: Error) => {
chicm-ms's avatar
chicm-ms committed
633
                throw NNIError.FromError(err, 'Dispatcher error: ');
chicm-ms's avatar
chicm-ms committed
634
            }),
635
            this.trainingService.run().catch((err: Error) => {
chicm-ms's avatar
chicm-ms committed
636
                throw NNIError.FromError(err, 'Training service error: ');
637
            }),
QuanluZhang's avatar
QuanluZhang committed
638
            this.manageTrials().catch((err: Error) => {
chicm-ms's avatar
chicm-ms committed
639
                throw NNIError.FromError(err, 'Job management error: ');
640
            })]);
641
642
    }

QuanluZhang's avatar
QuanluZhang committed
643
    private addEventListeners(): void {
chicm-ms's avatar
chicm-ms committed
644
        this.log.info('Add event listeners');
645
        // TO DO: cannot run this method more than once in one NNIManager instance
QuanluZhang's avatar
QuanluZhang committed
646
        if (this.dispatcher === undefined) {
647
648
            throw new Error('Error: tuner or job maintainer have not been setup');
        }
649
        this.trainingService.addTrialJobMetricListener(this.trialJobMetricListener);
650
651
652

        this.dispatcher.onCommand((commandType: string, content: string) => {
            this.onTunerCommand(commandType, content).catch((err: Error) => {
chicm-ms's avatar
chicm-ms committed
653
                this.criticalError(NNIError.FromError(err, 'Tuner command event error: '));
654
655
656
657
658
659
            });
        });
    }

    private sendInitTunerCommands(): void {
        if (this.dispatcher === undefined) {
660
            throw new Error('Dispatcher error: tuner has not been setup');
661
        }
chicm-ms's avatar
chicm-ms committed
662
663
664
        this.log.debug(`Send tuner command: INITIALIZE: ${this.experimentProfile.params.searchSpace}`);
        // Tuner need to be initialized with search space before generating any hyper parameters
        this.dispatcher.sendCommand(INITIALIZE, this.experimentProfile.params.searchSpace);
665
666
667
    }

    private async onTrialJobMetrics(metric: TrialJobMetric): Promise<void> {
chicm-ms's avatar
chicm-ms committed
668
        this.log.debug(`NNIManager received trial job metrics: ${metric}`);
669
670
671
672
673
674
675
        await this.dataStore.storeMetricData(metric.id, metric.data);
        if (this.dispatcher === undefined) {
            throw new Error('Error: tuner has not been setup');
        }
        this.dispatcher.sendCommand(REPORT_METRIC_DATA, metric.data);
    }

chicm-ms's avatar
chicm-ms committed
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
    private requestTrialJobs(jobNum: number): void {
        if (jobNum < 1) {
            return;
        }
        if (this.dispatcher === undefined) {
            throw new Error('Dispatcher error: tuner has not been setup');
        }
        if (this.experimentProfile.params.multiThread) {
            // Send multiple requests to ensure multiple hyper parameters are generated in non-blocking way.
            // For a single REQUEST_TRIAL_JOBS request, hyper parameters are generated one by one
            // sequentially.
            for (let i: number = 0; i < jobNum; i++) {
                this.dispatcher.sendCommand(REQUEST_TRIAL_JOBS, '1');
            }
        } else {
            this.dispatcher.sendCommand(REQUEST_TRIAL_JOBS, String(jobNum));
        }
    }

695
    private async onTunerCommand(commandType: string, content: string): Promise<void> {
horizon365's avatar
horizon365 committed
696
        this.log.info(`NNIManager received command from dispatcher: ${commandType}, ${content}`);
697
        switch (commandType) {
chicm-ms's avatar
chicm-ms committed
698
699
            case INITIALIZED:
                // Tuner is intialized, search space is set, request tuner to generate hyper parameters
700
701
702
703
704
705
                if (this.trialDataForTuner.length > 0) {
                    if (this.dispatcher === undefined) {
                        throw new Error('Dispatcher error: tuner has not been setup');
                    }
                    this.dispatcher.sendCommand(IMPORT_DATA, this.trialDataForTuner);
                }
chicm-ms's avatar
chicm-ms committed
706
707
                this.requestTrialJobs(this.experimentProfile.params.trialConcurrency);
                break;
708
            case NEW_TRIAL_JOB:
QuanluZhang's avatar
QuanluZhang committed
709
                if (this.status.status === 'TUNER_NO_MORE_TRIAL') {
710
                    this.log.warning('It is not supposed to receive more trials after NO_MORE_TRIAL is set');
chicm-ms's avatar
chicm-ms committed
711
                    this.setStatus('RUNNING');
712
                }
713
714
715
716
717
718
719
720
                const form: TrialJobApplicationForm = {
                    sequenceId: this.experimentProfile.nextSequenceId++,
                    hyperParameters: {
                        value: content,
                        index: 0
                    }
                };
                this.waitingTrials.push(form);
721
                break;
chicm-ms's avatar
chicm-ms committed
722
723
724
725
726
727
            case SEND_TRIAL_JOB_PARAMETER:
                const tunerCommand: any = JSON.parse(content);
                assert(tunerCommand.parameter_index >= 0);
                assert(tunerCommand.trial_job_id !== undefined);

                const trialJobForm: TrialJobApplicationForm = {
728
                    sequenceId: -1,  // FIXME: multi-phase tuner should use sequence ID instead of trial job ID
chicm-ms's avatar
chicm-ms committed
729
730
731
732
733
                    hyperParameters: {
                        value: content,
                        index: tunerCommand.parameter_index
                    }
                };
chicm-ms's avatar
chicm-ms committed
734
                this.log.info(`updateTrialJob: job id: ${tunerCommand.trial_job_id}, form: ${JSON.stringify(trialJobForm)}`);
chicm-ms's avatar
chicm-ms committed
735
                await this.trainingService.updateTrialJob(tunerCommand.trial_job_id, trialJobForm);
736
737
738
739
740
                if (tunerCommand['parameters'] !== null) {
                    // parameters field is set as empty string if no more hyper parameter can be generated by tuner.
                    await this.dataStore.storeTrialJobEvent(
                        'ADD_HYPERPARAMETER', tunerCommand.trial_job_id, content, undefined);
                }
chicm-ms's avatar
chicm-ms committed
741
                break;
742
            case NO_MORE_TRIAL_JOBS:
743
744
745
                if (!['ERROR', 'STOPPING', 'STOPPED'].includes(this.status.status)) {
                    this.setStatus('TUNER_NO_MORE_TRIAL');
                }
746
747
                break;
            case KILL_TRIAL_JOB:
chicm-ms's avatar
chicm-ms committed
748
                this.log.info(`cancelTrialJob: ${JSON.parse(content)}`);
QuanluZhang's avatar
QuanluZhang committed
749
                await this.trainingService.cancelTrialJob(JSON.parse(content), true);
750
751
752
753
                break;
            default:
                throw new Error('Error: unsupported command type from tuner');
        }
Deshui Yu's avatar
Deshui Yu committed
754
755
    }

756
757
758
759
760
761
762
763
764
765
    private criticalError(err: Error): void {
        this.logError(err);
        console.error(err);
    }

    private logError(err: Error): void {
        if (err.stack !== undefined) {
            this.log.error(err.stack);
        }
        this.status.errors.push(err.message);
chicm-ms's avatar
chicm-ms committed
766
767
768
769
770
771
772
773
        this.setStatus('ERROR');
    }

    private setStatus(status: ExperimentStatus): void {
        if (status !== this.status.status) {
            this.log.info(`Change NNIManager status from: ${this.status.status} to: ${status}`);
            this.status.status = status;
        }
774
775
776
777
778
779
780
    }

    private createEmptyExperimentProfile(): ExperimentProfile {
        return {
            id: getExperimentId(),
            revision: 0,
            execDuration: 0,
781
            logDir: getExperimentRootDir(),
782
            nextSequenceId: 0,
783
784
785
786
787
788
            params: {
                authorName: '',
                experimentName: '',
                trialConcurrency: 0,
                maxExecDuration: 0, // unit: second
                maxTrialNum: 0, // maxTrialNum includes all the submitted trial jobs
789
                trainingServicePlatform: '',
QuanluZhang's avatar
QuanluZhang committed
790
                searchSpace: ''
791
792
            }
        };
Deshui Yu's avatar
Deshui Yu committed
793
    }
794

QuanluZhang's avatar
QuanluZhang committed
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
    private async createCheckpointDir(): Promise<string> {
        // TODO: test
        const chkpDir: string = getCheckpointDir();
        // create checkpoint directory
        await mkDirP(chkpDir);
        // assign this directory to exp profile's checkpointDir
        if (this.experimentProfile.params.advisor) {
            this.experimentProfile.params.advisor.checkpointDir = chkpDir;
        }
        if (this.experimentProfile.params.tuner) {
            this.experimentProfile.params.tuner.checkpointDir = chkpDir;
        }
        if (this.experimentProfile.params.assessor) {
            this.experimentProfile.params.assessor.checkpointDir = chkpDir;
        }

        return Promise.resolve(chkpDir);
    }
Deshui Yu's avatar
Deshui Yu committed
813
814
815
}

export { NNIManager };