nnimanager.ts 35 KB
Newer Older
liuzhe-lz's avatar
liuzhe-lz committed
1
2
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
Deshui Yu's avatar
Deshui Yu committed
3
4
5
6

'use strict';

import * as assert from 'assert';
chicm-ms's avatar
chicm-ms committed
7
import { ChildProcess, StdioOptions } from 'child_process';
Deshui Yu's avatar
Deshui Yu committed
8
9
10
import { Deferred } from 'ts-deferred';
import * as component from '../common/component';
import { DataStore, MetricDataRecord, MetricType, TrialJobInfo } from '../common/datastore';
11
import { NNIError } from '../common/errors';
12
import { getExperimentId } from '../common/experimentStartupInfo';
Deshui Yu's avatar
Deshui Yu committed
13
14
import { getLogger, Logger } from '../common/log';
import {
chicm-ms's avatar
chicm-ms committed
15
    ExperimentParams, ExperimentProfile, Manager, ExperimentStatus,
16
    NNIManagerStatus, ProfileUpdateType, TrialJobStatistics
Deshui Yu's avatar
Deshui Yu committed
17
18
19
20
} from '../common/manager';
import {
    TrainingService, TrialJobApplicationForm, TrialJobDetail, TrialJobMetric, TrialJobStatus
} from '../common/trainingService';
21
import { delay, getCheckpointDir, getExperimentRootDir, getLogDir, getMsgDispatcherCommand, mkDirP, getTunerProc, getLogLevel, isAlive, killPid } from '../common/utils';
Deshui Yu's avatar
Deshui Yu committed
22
import {
chicm-ms's avatar
chicm-ms committed
23
    INITIALIZE, INITIALIZED, KILL_TRIAL_JOB, NEW_TRIAL_JOB, NO_MORE_TRIAL_JOBS, PING,
24
    REPORT_METRIC_DATA, REQUEST_TRIAL_JOBS, SEND_TRIAL_JOB_PARAMETER, TERMINATE, TRIAL_END, UPDATE_SEARCH_SPACE, IMPORT_DATA
Deshui Yu's avatar
Deshui Yu committed
25
} from './commands';
26
import { createDispatcherInterface, IpcInterface } from './ipcInterface';
Deshui Yu's avatar
Deshui Yu committed
27
28

/**
chicm-ms's avatar
chicm-ms committed
29
 * NNIManager which implements Manager interface
Deshui Yu's avatar
Deshui Yu committed
30
31
32
 */
class NNIManager implements Manager {
    private trainingService: TrainingService;
33
    private dispatcher: IpcInterface | undefined;
34
    private currSubmittedTrialNum: number;  // need to be recovered
QuanluZhang's avatar
QuanluZhang committed
35
    private trialConcurrencyChange: number; // >0: increase, <0: decrease
Deshui Yu's avatar
Deshui Yu committed
36
37
38
    private log: Logger;
    private dataStore: DataStore;
    private experimentProfile: ExperimentProfile;
39
    private dispatcherPid: number;
40
    private status: NNIManagerStatus;
41
    private waitingTrials: TrialJobApplicationForm[];
QuanluZhang's avatar
QuanluZhang committed
42
    private trialJobs: Map<string, TrialJobDetail>;
43
    private trialDataForTuner: string;
SparkSnail's avatar
SparkSnail committed
44
    private readonly: boolean;
45

46
    private trialJobMetricListener: (metric: TrialJobMetric) => void;
47

Deshui Yu's avatar
Deshui Yu committed
48
49
    constructor() {
        this.currSubmittedTrialNum = 0;
QuanluZhang's avatar
QuanluZhang committed
50
        this.trialConcurrencyChange = 0;
Deshui Yu's avatar
Deshui Yu committed
51
52
        this.trainingService = component.get(TrainingService);
        assert(this.trainingService);
53
        this.dispatcherPid = 0;
QuanluZhang's avatar
QuanluZhang committed
54
55
        this.waitingTrials = [];
        this.trialJobs = new Map<string, TrialJobDetail>();
56
        this.trialDataForTuner = '';
SparkSnail's avatar
SparkSnail committed
57
        this.readonly = false;
Deshui Yu's avatar
Deshui Yu committed
58
59
60

        this.log = getLogger();
        this.dataStore = component.get(DataStore);
61
62
63
64
        this.experimentProfile = this.createEmptyExperimentProfile();
        this.status = {
            status: 'INITIALIZED',
            errors: []
Deshui Yu's avatar
Deshui Yu committed
65
        };
chicm-ms's avatar
chicm-ms committed
66
        this.trialJobMetricListener = (metric: TrialJobMetric): void => {
67
68
69
70
            this.onTrialJobMetrics(metric).catch((err: Error) => {
                this.criticalError(NNIError.FromError(err, 'Job metrics error: '));
            });
        };
Deshui Yu's avatar
Deshui Yu committed
71
72
73
    }

    public updateExperimentProfile(experimentProfile: ExperimentProfile, updateType: ProfileUpdateType): Promise<void> {
SparkSnail's avatar
SparkSnail committed
74
75
76
        if (this.readonly) {
            return Promise.reject(new Error('Error: can not update experiment profile in readonly mode!'));
        }
Deshui Yu's avatar
Deshui Yu committed
77
78
79
80
81
82
83
84
85
86
        switch (updateType) {
            case 'TRIAL_CONCURRENCY':
                this.updateTrialConcurrency(experimentProfile.params.trialConcurrency);
                break;
            case 'MAX_EXEC_DURATION':
                this.updateMaxExecDuration(experimentProfile.params.maxExecDuration);
                break;
            case 'SEARCH_SPACE':
                this.updateSearchSpace(experimentProfile.params.searchSpace);
                break;
QuanluZhang's avatar
QuanluZhang committed
87
88
89
            case 'MAX_TRIAL_NUM':
                this.updateMaxTrialNum(experimentProfile.params.maxTrialNum);
                break;
Deshui Yu's avatar
Deshui Yu committed
90
91
92
93
94
95
96
            default:
                throw new Error('Error: unrecognized updateType');
        }

        return this.storeExperimentProfile();
    }

97
    public importData(data: string): Promise<void> {
SparkSnail's avatar
SparkSnail committed
98
99
100
        if (this.readonly) {
            return Promise.reject(new Error('Error: can not import data in readonly mode!'));
        }
101
102
103
104
105
106
107
108
109
110
        if (this.dispatcher === undefined) {
            return Promise.reject(
                new Error('tuner has not been setup')
            );
        }
        this.dispatcher.sendCommand(IMPORT_DATA, data);

        return this.dataStore.storeTrialJobEvent('IMPORT_DATA', '', data);
    }

111
112
113
114
    public async exportData(): Promise<string> {
        return this.dataStore.exportTrialHpConfigs();
    }

115
    public addCustomizedTrialJob(hyperParams: string): Promise<number> {
SparkSnail's avatar
SparkSnail committed
116
117
118
        if (this.readonly) {
            return Promise.reject(new Error('Error: can not add customized trial job in readonly mode!'));
        }
Deshui Yu's avatar
Deshui Yu committed
119
        if (this.currSubmittedTrialNum >= this.experimentProfile.params.maxTrialNum) {
120
            return Promise.reject(new Error('reach maxTrialNum'));
Deshui Yu's avatar
Deshui Yu committed
121
        }
122
123
124

        // TODO: NNI manager should not peek tuner's internal protocol, let's refactor this later
        const packedParameter = {
chicm-ms's avatar
chicm-ms committed
125
126
            parameter_id: null, // eslint-disable-line @typescript-eslint/camelcase
            parameter_source: 'customized', // eslint-disable-line @typescript-eslint/camelcase
127
128
129
130
131
132
133
134
135
136
137
            parameters: JSON.parse(hyperParams)
        }

        const form: TrialJobApplicationForm = {
            sequenceId: this.experimentProfile.nextSequenceId++,
            hyperParameters: {
                value: JSON.stringify(packedParameter),
                index: 0
            }
        };
        this.waitingTrials.push(form);
Deshui Yu's avatar
Deshui Yu committed
138
139

        // trial id has not been generated yet, thus use '' instead
140
141
142
        this.dataStore.storeTrialJobEvent('ADD_CUSTOMIZED', '', hyperParams);

        return Promise.resolve(form.sequenceId);
Deshui Yu's avatar
Deshui Yu committed
143
144
145
    }

    public async cancelTrialJobByUser(trialJobId: string): Promise<void> {
SparkSnail's avatar
SparkSnail committed
146
147
148
        if (this.readonly) {
            return Promise.reject(new Error('Error: can not cancel trial job in readonly mode!'));
        }
chicm-ms's avatar
chicm-ms committed
149
        this.log.info(`User cancelTrialJob: ${trialJobId}`);
Deshui Yu's avatar
Deshui Yu committed
150
151
152
153
154
        await this.trainingService.cancelTrialJob(trialJobId);
        await this.dataStore.storeTrialJobEvent('USER_TO_CANCEL', trialJobId, '');
    }

    public async startExperiment(expParams: ExperimentParams): Promise<string> {
chicm-ms's avatar
chicm-ms committed
155
        this.log.info(`Starting experiment: ${this.experimentProfile.id}`);
Deshui Yu's avatar
Deshui Yu committed
156
157
158
        this.experimentProfile.params = expParams;
        await this.storeExperimentProfile();
        this.log.debug('Setup tuner...');
159

160
        // Set up multiphase config
161
        if (expParams.multiPhase && this.trainingService.isMultiPhaseJobSupported) {
162
163
            this.trainingService.setClusterMetadata('multiPhase', expParams.multiPhase.toString());
        }
164
165
166
167
        // Set up versionCheck config
        if (expParams.versionCheck !== undefined) {
            this.trainingService.setClusterMetadata('version_check', expParams.versionCheck.toString());
        }
SparkSnail's avatar
SparkSnail committed
168
169
170
171
        // Set up logCollection config
        if (expParams.logCollection !== undefined) {
            this.trainingService.setClusterMetadata('log_collection', expParams.logCollection.toString());
        }
172

chicm-ms's avatar
chicm-ms committed
173
        const dispatcherCommand: string = getMsgDispatcherCommand(expParams);
174
        this.log.debug(`dispatcher command: ${dispatcherCommand}`);
QuanluZhang's avatar
QuanluZhang committed
175
        const checkpointDir: string = await this.createCheckpointDir();
Deshui Yu's avatar
Deshui Yu committed
176
        this.setupTuner(
177
178
            dispatcherCommand,
            undefined,
Deshui Yu's avatar
Deshui Yu committed
179
            'start',
QuanluZhang's avatar
QuanluZhang committed
180
            checkpointDir);
Deshui Yu's avatar
Deshui Yu committed
181

182
        this.experimentProfile.startTime = Date.now();
chicm-ms's avatar
chicm-ms committed
183
        this.setStatus('RUNNING');
Deshui Yu's avatar
Deshui Yu committed
184
        await this.storeExperimentProfile();
185
186
        this.run().catch((err: Error) => {
            this.criticalError(err);
Deshui Yu's avatar
Deshui Yu committed
187
        });
188

Deshui Yu's avatar
Deshui Yu committed
189
190
191
        return this.experimentProfile.id;
    }

SparkSnail's avatar
SparkSnail committed
192
    public async resumeExperiment(readonly: boolean): Promise<void> {
chicm-ms's avatar
chicm-ms committed
193
        this.log.info(`Resuming experiment: ${this.experimentProfile.id}`);
Deshui Yu's avatar
Deshui Yu committed
194
195
196
        //Fetch back the experiment profile
        const experimentId: string = getExperimentId();
        this.experimentProfile = await this.dataStore.getExperimentProfile(experimentId);
SparkSnail's avatar
SparkSnail committed
197
198
199
200
        this.readonly = readonly;
        if (readonly) {
            return Promise.resolve();
        }
Deshui Yu's avatar
Deshui Yu committed
201
        const expParams: ExperimentParams = this.experimentProfile.params;
202

203
        // Set up multiphase config
204
        if (expParams.multiPhase && this.trainingService.isMultiPhaseJobSupported) {
205
206
207
            this.trainingService.setClusterMetadata('multiPhase', expParams.multiPhase.toString());
        }

208
209
        // Set up versionCheck config
        if (expParams.versionCheck !== undefined) {
SparkSnail's avatar
SparkSnail committed
210
            this.trainingService.setClusterMetadata('version_check', expParams.versionCheck.toString());
211
212
        }

chicm-ms's avatar
chicm-ms committed
213
        const dispatcherCommand: string = getMsgDispatcherCommand(expParams);
214
        this.log.debug(`dispatcher command: ${dispatcherCommand}`);
QuanluZhang's avatar
QuanluZhang committed
215
        const checkpointDir: string = await this.createCheckpointDir();
Deshui Yu's avatar
Deshui Yu committed
216
        this.setupTuner(
217
218
            dispatcherCommand,
            undefined,
Deshui Yu's avatar
Deshui Yu committed
219
            'resume',
QuanluZhang's avatar
QuanluZhang committed
220
            checkpointDir);
Deshui Yu's avatar
Deshui Yu committed
221
222
223
224
225
226
227
228
229
230
231

        const allTrialJobs: TrialJobInfo[] = await this.dataStore.listTrialJobs();

        // Resume currSubmittedTrialNum
        this.currSubmittedTrialNum = allTrialJobs.length;

        // Check the final status for WAITING and RUNNING jobs
        await Promise.all(allTrialJobs
            .filter((job: TrialJobInfo) => job.status === 'WAITING' || job.status === 'RUNNING')
            .map((job: TrialJobInfo) => this.dataStore.storeTrialJobEvent('FAILED', job.id)));

232
233
234
        // Collect generated trials and imported trials
        const finishedTrialData: string = await this.exportData();
        const importedData: string[] = await this.dataStore.getImportedData();
chicm-ms's avatar
chicm-ms committed
235
        let trialData: Record<string, any>[] = JSON.parse(finishedTrialData);
236
237
        for (const oneImportedData of importedData) {
            // do not deduplicate
chicm-ms's avatar
chicm-ms committed
238
            trialData = trialData.concat(<Record<string, any>[]>JSON.parse(oneImportedData));
239
240
241
        }
        this.trialDataForTuner = JSON.stringify(trialData);

chicm-ms's avatar
chicm-ms committed
242
243
244
245
246
        if (this.experimentProfile.execDuration < this.experimentProfile.params.maxExecDuration &&
            this.currSubmittedTrialNum < this.experimentProfile.params.maxTrialNum &&
            this.experimentProfile.endTime) {
            delete this.experimentProfile.endTime;
        }
chicm-ms's avatar
chicm-ms committed
247
        this.setStatus('RUNNING');
248

Deshui Yu's avatar
Deshui Yu committed
249
        // TO DO: update database record for resume event
250
251
252
        this.run().catch((err: Error) => {
            this.criticalError(err);
        });
Deshui Yu's avatar
Deshui Yu committed
253
254
    }

255
256
    public getTrialJob(trialJobId: string): Promise<TrialJobInfo> {
        return this.dataStore.getTrialJob(trialJobId);
Deshui Yu's avatar
Deshui Yu committed
257
258
259
    }

    public async setClusterMetadata(key: string, value: string): Promise<void> {
SparkSnail's avatar
SparkSnail committed
260
261
262
        if (this.readonly) {
            return Promise.reject(new Error('Error: can not set cluster metadata in readonly mode!'));
        }
chicm-ms's avatar
chicm-ms committed
263
        this.log.info(`NNIManager setClusterMetadata, key: ${key}, value: ${value}`);
Deshui Yu's avatar
Deshui Yu committed
264
265
266
267
        let timeoutId: NodeJS.Timer;
        // TO DO: move timeout value to constants file
        const delay1: Promise<{}> = new Promise((resolve: Function, reject: Function): void => {
            timeoutId = setTimeout(
268
                () => { reject(new Error('TrainingService setClusterMetadata timeout. Please check your config file.')); },
Deshui Yu's avatar
Deshui Yu committed
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
                10000);
        });
        await Promise.race([delay1, this.trainingService.setClusterMetadata(key, value)]).finally(() => {
            clearTimeout(timeoutId);
        });
    }

    public getClusterMetadata(key: string): Promise<string> {
        return Promise.resolve(
            this.trainingService.getClusterMetadata(key)
        );
    }

    public async getTrialJobStatistics(): Promise<TrialJobStatistics[]> {
        return this.dataStore.getTrialJobStatistics();
    }

286
    public async stopExperiment(): Promise<void> {
chicm-ms's avatar
chicm-ms committed
287
288
        this.setStatus('STOPPING');
        this.log.info('Stopping experiment, cleaning up ...');
289
        await this.experimentDoneCleanUp();
chicm-ms's avatar
chicm-ms committed
290
        this.log.info('Experiment stopped.');
Deshui Yu's avatar
Deshui Yu committed
291
292
    }

293
    public async getMetricData(trialJobId?: string, metricType?: MetricType): Promise<MetricDataRecord[]> {
Deshui Yu's avatar
Deshui Yu committed
294
295
296
        return this.dataStore.getMetricData(trialJobId, metricType);
    }

297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
    public async getMetricDataByRange(minSeqId: number, maxSeqId: number): Promise<MetricDataRecord[]> {
        const trialJobs = await this.dataStore.listTrialJobs();
        const targetTrials = trialJobs.filter(trial => (
            // FIXME: can this be undefined?
            trial.sequenceId !== undefined && minSeqId <= trial.sequenceId && trial.sequenceId <= maxSeqId
        ));
        const targetTrialIds = new Set(targetTrials.map(trial => trial.id));

        const allMetrics = await this.dataStore.getMetricData();
        return allMetrics.filter(metric => targetTrialIds.has(metric.trialJobId));
    }

    public async getLatestMetricData(): Promise<MetricDataRecord[]> {
        // FIXME: this can take a long time
        const allMetrics: MetricDataRecord[] = await this.dataStore.getMetricData();
        const finals: MetricDataRecord[] = [];
        const latestIntermediates: Map<string, MetricDataRecord> = new Map<string, MetricDataRecord>();
        for (const metric of allMetrics) {
            if (metric.type !== 'PERIODICAL') {
                finals.push(metric);
            } else {
                const old: MetricDataRecord | undefined = latestIntermediates.get(metric.trialJobId);
                if (old === undefined || old.sequence <= metric.sequence) {
                    latestIntermediates.set(metric.trialJobId, metric);
                }
            }
        }
        return finals.concat(Array.from(latestIntermediates.values()));
        // FIXME: unit test
    }

Deshui Yu's avatar
Deshui Yu committed
328
329
330
331
332
333
334
335
    public getExperimentProfile(): Promise<ExperimentProfile> {
        // TO DO: using Promise.resolve()
        const deferred: Deferred<ExperimentProfile> = new Deferred<ExperimentProfile>();
        deferred.resolve(this.experimentProfile);

        return deferred.promise;
    }

336
337
338
339
    public getStatus(): NNIManagerStatus {
        return this.status;
    }

Deshui Yu's avatar
Deshui Yu committed
340
341
342
343
    public async listTrialJobs(status?: TrialJobStatus): Promise<TrialJobInfo[]> {
        return this.dataStore.listTrialJobs(status);
    }

344
345
    private setupTuner(command: string, cwd: string | undefined, mode: 'start' | 'resume', dataDirectory: string): void {
        if (this.dispatcher !== undefined) {
Deshui Yu's avatar
Deshui Yu committed
346
347
            return;
        }
goooxu's avatar
goooxu committed
348
        const stdio: StdioOptions = ['ignore', process.stdout, process.stderr, 'pipe', 'pipe'];
Deshui Yu's avatar
Deshui Yu committed
349
350
351
352
353
354
355
        let newCwd: string;
        if (cwd === undefined || cwd === '') {
            newCwd = getLogDir();
        } else {
            newCwd = cwd;
        }
        // TO DO: add CUDA_VISIBLE_DEVICES
356
357
358
359
360
        let includeIntermediateResultsEnv: boolean | undefined = false;
        if (this.experimentProfile.params.tuner !== undefined) {
            includeIntermediateResultsEnv = this.experimentProfile.params.tuner.includeIntermediateResults;
        }

chicm-ms's avatar
chicm-ms committed
361
        const nniEnv = {
chicm-ms's avatar
chicm-ms committed
362
            SDK_PROCESS: 'dispatcher',
Zejun Lin's avatar
Zejun Lin committed
363
364
            NNI_MODE: mode,
            NNI_CHECKPOINT_DIRECTORY: dataDirectory,
365
            NNI_LOG_DIRECTORY: getLogDir(),
366
            NNI_LOG_LEVEL: getLogLevel(),
367
368
            NNI_INCLUDE_INTERMEDIATE_RESULTS: includeIntermediateResultsEnv,
            CUDA_VISIBLE_DEVICES: this.getGpuEnvvarValue()
Zejun Lin's avatar
Zejun Lin committed
369
        };
chicm-ms's avatar
chicm-ms committed
370
        const newEnv = Object.assign({}, process.env, nniEnv);
371
        const tunerProc: ChildProcess = getTunerProc(command,stdio,newCwd,newEnv);
372
373
        this.dispatcherPid = tunerProc.pid;
        this.dispatcher = createDispatcherInterface(tunerProc);
Deshui Yu's avatar
Deshui Yu committed
374
375
376
377

        return;
    }

378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
    private getGpuEnvvarValue(): string {
        let cudaDevices: string | undefined;

        if (this.experimentProfile.params.advisor !== undefined) {
            cudaDevices = this.experimentProfile.params.advisor.gpuIndices;
        } else if (this.experimentProfile.params.tuner !== undefined) {
            cudaDevices = this.experimentProfile.params.tuner.gpuIndices;
        }

        if (cudaDevices === undefined) {
            return '';
        } else {
            return cudaDevices;
        }
    }

Deshui Yu's avatar
Deshui Yu committed
394
    private updateTrialConcurrency(trialConcurrency: number): void {
QuanluZhang's avatar
QuanluZhang committed
395
396
        // we assume trialConcurrency >= 0, which is checked by restserver
        this.trialConcurrencyChange += (trialConcurrency - this.experimentProfile.params.trialConcurrency);
Deshui Yu's avatar
Deshui Yu committed
397
398
399
400
401
402
403
404
405
406
407
408
        this.experimentProfile.params.trialConcurrency = trialConcurrency;

        return;
    }

    private updateMaxExecDuration(duration: number): void {
        this.experimentProfile.params.maxExecDuration = duration;

        return;
    }

    private updateSearchSpace(searchSpace: string): void {
409
        if (this.dispatcher === undefined) {
Deshui Yu's avatar
Deshui Yu committed
410
411
            throw new Error('Error: tuner has not been setup');
        }
412
        this.dispatcher.sendCommand(UPDATE_SEARCH_SPACE, searchSpace);
Deshui Yu's avatar
Deshui Yu committed
413
414
415
416
417
        this.experimentProfile.params.searchSpace = searchSpace;

        return;
    }

QuanluZhang's avatar
QuanluZhang committed
418
419
420
421
422
423
    private updateMaxTrialNum(maxTrialNum: number): void {
        this.experimentProfile.params.maxTrialNum = maxTrialNum;

        return;
    }

Deshui Yu's avatar
Deshui Yu committed
424
    private async experimentDoneCleanUp(): Promise<void> {
425
        if (this.dispatcher === undefined) {
Deshui Yu's avatar
Deshui Yu committed
426
427
            throw new Error('Error: tuner has not been setup');
        }
428
        this.trainingService.removeTrialJobMetricListener(this.trialJobMetricListener);
429
        this.dispatcher.sendCommand(TERMINATE);
Deshui Yu's avatar
Deshui Yu committed
430
431
432
        let tunerAlive: boolean = true;
        // gracefully terminate tuner and assessor here, wait at most 30 seconds.
        for (let i: number = 0; i < 30; i++) {
433
            if (!tunerAlive) { break; }
434
            tunerAlive = await isAlive(this.dispatcherPid);
Deshui Yu's avatar
Deshui Yu committed
435
436
            await delay(1000);
        }
437
        await killPid(this.dispatcherPid);
Deshui Yu's avatar
Deshui Yu committed
438
439
440
441
442
443
        const trialJobList: TrialJobDetail[] = await this.trainingService.listTrialJobs();
        // TO DO: to promise all
        for (const trialJob of trialJobList) {
            if (trialJob.status === 'RUNNING' ||
                trialJob.status === 'WAITING') {
                try {
chicm-ms's avatar
chicm-ms committed
444
                    this.log.info(`cancelTrialJob: ${trialJob.id}`);
Deshui Yu's avatar
Deshui Yu committed
445
446
447
448
449
450
451
                    await this.trainingService.cancelTrialJob(trialJob.id);
                } catch (error) {
                    // pid does not exist, do nothing here
                }
            }
        }
        await this.trainingService.cleanUp();
452
        this.experimentProfile.endTime = Date.now();
Deshui Yu's avatar
Deshui Yu committed
453
        await this.storeExperimentProfile();
chicm-ms's avatar
chicm-ms committed
454
        this.setStatus('STOPPED');
Deshui Yu's avatar
Deshui Yu committed
455
456
457
    }

    private async periodicallyUpdateExecDuration(): Promise<void> {
458
        let count: number = 1;
459
        while (!['ERROR', 'STOPPING', 'STOPPED'].includes(this.status.status)) {
460
            await delay(1000 * 1); // 1 seconds
461
            if (this.status.status === 'RUNNING') {
462
463
464
465
466
467
                this.experimentProfile.execDuration += 1;
                if (count % 10 === 0) {
                    await this.storeExperimentProfile();
                }
            }
            count += 1;
Deshui Yu's avatar
Deshui Yu committed
468
469
470
        }
    }

chicm-ms's avatar
chicm-ms committed
471
472
473
474
475
476
    private async pingDispatcher(): Promise<void> {
        if (this.dispatcher === undefined) {
            throw new Error('Error: tuner has not been setup');
        }
        while (!['ERROR', 'STOPPING', 'STOPPED'].includes(this.status.status)) {
            this.dispatcher.sendCommand(PING);
chicm-ms's avatar
chicm-ms committed
477
            await delay(1000 * 5);
chicm-ms's avatar
chicm-ms committed
478
479
480
        }
    }

QuanluZhang's avatar
QuanluZhang committed
481
482
    private async requestTrialJobsStatus(): Promise<number> {
        let finishedTrialJobNum: number = 0;
QuanluZhang's avatar
QuanluZhang committed
483
484
485
        if (this.dispatcher === undefined) {
            throw new Error('Error: tuner has not been setup');
        }
QuanluZhang's avatar
QuanluZhang committed
486
487
488
489
        for (const trialJobId of Array.from(this.trialJobs.keys())) {
            const trialJobDetail: TrialJobDetail = await this.trainingService.getTrialJob(trialJobId);
            const oldTrialJobDetail: TrialJobDetail | undefined = this.trialJobs.get(trialJobId);
            if (oldTrialJobDetail !== undefined && oldTrialJobDetail.status !== trialJobDetail.status) {
chicm-ms's avatar
chicm-ms committed
490
                this.log.info(`Trial job ${trialJobDetail.id} status changed from ${oldTrialJobDetail.status} to ${trialJobDetail.status}`);
QuanluZhang's avatar
QuanluZhang committed
491
                this.trialJobs.set(trialJobId, Object.assign({}, trialJobDetail));
492
                await this.dataStore.storeTrialJobEvent(trialJobDetail.status, trialJobDetail.id, undefined, trialJobDetail);
QuanluZhang's avatar
QuanluZhang committed
493
            }
QuanluZhang's avatar
QuanluZhang committed
494
            let hyperParams: string | undefined = undefined;
QuanluZhang's avatar
QuanluZhang committed
495
496
497
            switch (trialJobDetail.status) {
                case 'SUCCEEDED':
                case 'USER_CANCELED':
QuanluZhang's avatar
QuanluZhang committed
498
                case 'EARLY_STOPPED':
QuanluZhang's avatar
QuanluZhang committed
499
500
                    this.trialJobs.delete(trialJobId);
                    finishedTrialJobNum++;
501
                    hyperParams = trialJobDetail.form.hyperParameters.value;
QuanluZhang's avatar
QuanluZhang committed
502
                    this.dispatcher.sendCommand(TRIAL_END, JSON.stringify({
chicm-ms's avatar
chicm-ms committed
503
                        trial_job_id: trialJobDetail.id, // eslint-disable-line @typescript-eslint/camelcase
QuanluZhang's avatar
QuanluZhang committed
504
                        event: trialJobDetail.status,
chicm-ms's avatar
chicm-ms committed
505
                        hyper_params: hyperParams // eslint-disable-line @typescript-eslint/camelcase
goooxu's avatar
goooxu committed
506
                    }));
QuanluZhang's avatar
QuanluZhang committed
507
508
509
510
511
512
513
                    break;
                case 'FAILED':
                case 'SYS_CANCELED':
                    // In the current version, we do not retry
                    // TO DO: push this job to queue for retry
                    this.trialJobs.delete(trialJobId);
                    finishedTrialJobNum++;
514
                    hyperParams = trialJobDetail.form.hyperParameters.value;
QuanluZhang's avatar
QuanluZhang committed
515
                    this.dispatcher.sendCommand(TRIAL_END, JSON.stringify({
chicm-ms's avatar
chicm-ms committed
516
                        trial_job_id: trialJobDetail.id, // eslint-disable-line @typescript-eslint/camelcase
QuanluZhang's avatar
QuanluZhang committed
517
                        event: trialJobDetail.status,
chicm-ms's avatar
chicm-ms committed
518
                        hyper_params: hyperParams // eslint-disable-line @typescript-eslint/camelcase
goooxu's avatar
goooxu committed
519
                    }));
QuanluZhang's avatar
QuanluZhang committed
520
521
522
523
524
525
526
527
528
529
                    break;
                case 'WAITING':
                case 'RUNNING':
                case 'UNKNOWN':
                    // Do nothing
                    break;
                default:
                // TO DO: add warning in log
            }
        }
goooxu's avatar
goooxu committed
530

Gems Guo's avatar
Gems Guo committed
531
        return finishedTrialJobNum;
QuanluZhang's avatar
QuanluZhang committed
532
533
534
535
536
537
    }

    private async manageTrials(): Promise<void> {
        if (this.dispatcher === undefined) {
            throw new Error('Error: tuner has not been setup');
        }
QuanluZhang's avatar
QuanluZhang committed
538
        let allFinishedTrialJobNum: number = this.currSubmittedTrialNum;
QuanluZhang's avatar
QuanluZhang committed
539
        let waitSubmittedToFinish: number;
540
        while (!['ERROR', 'STOPPING', 'STOPPED'].includes(this.status.status)) {
QuanluZhang's avatar
QuanluZhang committed
541
542
543
544
545
546
547
            const finishedTrialJobNum: number = await this.requestTrialJobsStatus();
            allFinishedTrialJobNum += finishedTrialJobNum;

            // requestTrialNum is the number of trials that will be requested from tuner.
            // If trialConcurrency does not change, requestTrialNum equals finishedTrialJobNum.
            // If trialConcurrency changes, for example, trialConcurrency increases by 2 (trialConcurrencyChange=2), then
            // requestTrialNum equals 2 + finishedTrialJobNum and trialConcurrencyChange becomes 0.
548
            // If trialConcurrency changes, for example, trialConcurrency decreases by 4 (trialConcurrencyChange=-4) and
QuanluZhang's avatar
QuanluZhang committed
549
550
551
552
553
554
555
556
            // finishedTrialJobNum is 2, then requestTrialNum becomes -2. No trial will be requested from tuner,
            // and trialConcurrencyChange becomes -2.
            const requestTrialNum: number = this.trialConcurrencyChange + finishedTrialJobNum;
            if (requestTrialNum >= 0) {
                this.trialConcurrencyChange = 0;
            } else {
                this.trialConcurrencyChange = requestTrialNum;
            }
chicm-ms's avatar
chicm-ms committed
557

558
            this.requestTrialJobs(requestTrialNum);
chicm-ms's avatar
chicm-ms committed
559

QuanluZhang's avatar
QuanluZhang committed
560
            // check maxtrialnum and maxduration here
561
            // NO_MORE_TRIAL is more like a subset of RUNNING, because during RUNNING tuner
562
            // might tell nnimanager that this is no more trials. In NO_MORE_TRIAL state, the experiment is viewed
563
564
            // as still running. DONE could be transfered from RUNNING or NO_MORE_TRIAL.
            assert(this.status.status === 'RUNNING' ||
565
                this.status.status === 'DONE' ||
QuanluZhang's avatar
QuanluZhang committed
566
567
                this.status.status === 'NO_MORE_TRIAL' ||
                this.status.status === 'TUNER_NO_MORE_TRIAL');
568
            if (this.experimentProfile.execDuration > this.experimentProfile.params.maxExecDuration ||
QuanluZhang's avatar
QuanluZhang committed
569
                this.currSubmittedTrialNum >= this.experimentProfile.params.maxTrialNum) {
QuanluZhang's avatar
QuanluZhang committed
570
                if (this.status.status !== 'DONE') {
chicm-ms's avatar
chicm-ms committed
571
                    this.setStatus('NO_MORE_TRIAL');
QuanluZhang's avatar
QuanluZhang committed
572
573
574
575
                    waitSubmittedToFinish = this.currSubmittedTrialNum;

                    assert(allFinishedTrialJobNum <= waitSubmittedToFinish);
                    if (allFinishedTrialJobNum >= waitSubmittedToFinish) {
chicm-ms's avatar
chicm-ms committed
576
                        this.setStatus('DONE');
QuanluZhang's avatar
QuanluZhang committed
577
578
579
580
581
                        this.experimentProfile.endTime = Date.now();
                        await this.storeExperimentProfile();
                        // write this log for travis CI
                        this.log.info('Experiment done.');
                    }
QuanluZhang's avatar
QuanluZhang committed
582
583
584
                }
            } else {
                if (this.status.status === 'DONE') {
585
586
                    delete this.experimentProfile.endTime;
                    await this.storeExperimentProfile();
QuanluZhang's avatar
QuanluZhang committed
587
                }
QuanluZhang's avatar
QuanluZhang committed
588
                if (this.status.status !== 'TUNER_NO_MORE_TRIAL') {
chicm-ms's avatar
chicm-ms committed
589
                    this.setStatus('RUNNING');
590
                }
QuanluZhang's avatar
QuanluZhang committed
591
592
593
594
595
                for (let i: number = this.trialJobs.size; i < this.experimentProfile.params.trialConcurrency; i++) {
                    if (this.waitingTrials.length === 0 ||
                        this.currSubmittedTrialNum >= this.experimentProfile.params.maxTrialNum) {
                        break;
                    }
596
                    const form = this.waitingTrials.shift() as TrialJobApplicationForm;
QuanluZhang's avatar
QuanluZhang committed
597
                    this.currSubmittedTrialNum++;
598
599
                    this.log.info(`submitTrialJob: form: ${JSON.stringify(form)}`);
                    const trialJobDetail: TrialJobDetail = await this.trainingService.submitTrialJob(form);
600
                    await this.storeExperimentProfile();
QuanluZhang's avatar
QuanluZhang committed
601
602
603
604
                    this.trialJobs.set(trialJobDetail.id, Object.assign({}, trialJobDetail));
                    const trialJobDetailSnapshot: TrialJobDetail | undefined = this.trialJobs.get(trialJobDetail.id);
                    if (trialJobDetailSnapshot != undefined) {
                        await this.dataStore.storeTrialJobEvent(
605
                            trialJobDetailSnapshot.status, trialJobDetailSnapshot.id, form.hyperParameters.value, trialJobDetailSnapshot);
QuanluZhang's avatar
QuanluZhang committed
606
607
608
609
610
611
612
613
614
                    } else {
                        assert(false, `undefined trialJobDetail in trialJobs: ${trialJobDetail.id}`);
                    }
                }
            }
            await delay(1000 * 5); // 5 seconds
        }
    }

Deshui Yu's avatar
Deshui Yu committed
615
616
617
618
619
620
    private storeExperimentProfile(): Promise<void> {
        this.experimentProfile.revision += 1;

        return this.dataStore.storeExperimentProfile(this.experimentProfile);
    }

621
    private async run(): Promise<void> {
QuanluZhang's avatar
QuanluZhang committed
622
        assert(this.dispatcher !== undefined);
623
624
625
626
627
628
629

        this.addEventListeners();

        this.sendInitTunerCommands();

        await Promise.all([
            this.periodicallyUpdateExecDuration(),
chicm-ms's avatar
chicm-ms committed
630
            this.pingDispatcher().catch((err: Error) => {
chicm-ms's avatar
chicm-ms committed
631
                throw NNIError.FromError(err, 'Dispatcher error: ');
chicm-ms's avatar
chicm-ms committed
632
            }),
633
            this.trainingService.run().catch((err: Error) => {
chicm-ms's avatar
chicm-ms committed
634
                throw NNIError.FromError(err, 'Training service error: ');
635
            }),
QuanluZhang's avatar
QuanluZhang committed
636
            this.manageTrials().catch((err: Error) => {
chicm-ms's avatar
chicm-ms committed
637
                throw NNIError.FromError(err, 'Job management error: ');
638
            })]);
639
640
    }

QuanluZhang's avatar
QuanluZhang committed
641
    private addEventListeners(): void {
chicm-ms's avatar
chicm-ms committed
642
        this.log.info('Add event listeners');
643
        // TO DO: cannot run this method more than once in one NNIManager instance
QuanluZhang's avatar
QuanluZhang committed
644
        if (this.dispatcher === undefined) {
645
646
            throw new Error('Error: tuner or job maintainer have not been setup');
        }
647
        this.trainingService.addTrialJobMetricListener(this.trialJobMetricListener);
648
649
650

        this.dispatcher.onCommand((commandType: string, content: string) => {
            this.onTunerCommand(commandType, content).catch((err: Error) => {
chicm-ms's avatar
chicm-ms committed
651
                this.criticalError(NNIError.FromError(err, 'Tuner command event error: '));
652
653
            });
        });
654
655
656
657
        this.dispatcher.onError((error: Error) => {
            this.log.error(`Dispatcher error: ${error.message}`);
            this.criticalError(new Error('Dispatcher stream error, tuner may have crashed.'));
        });
658
659
660
661
    }

    private sendInitTunerCommands(): void {
        if (this.dispatcher === undefined) {
662
            throw new Error('Dispatcher error: tuner has not been setup');
663
        }
chicm-ms's avatar
chicm-ms committed
664
665
666
        this.log.debug(`Send tuner command: INITIALIZE: ${this.experimentProfile.params.searchSpace}`);
        // Tuner need to be initialized with search space before generating any hyper parameters
        this.dispatcher.sendCommand(INITIALIZE, this.experimentProfile.params.searchSpace);
667
668
669
    }

    private async onTrialJobMetrics(metric: TrialJobMetric): Promise<void> {
chicm-ms's avatar
chicm-ms committed
670
        this.log.debug(`NNIManager received trial job metrics: ${metric}`);
671
672
673
674
675
676
677
        await this.dataStore.storeMetricData(metric.id, metric.data);
        if (this.dispatcher === undefined) {
            throw new Error('Error: tuner has not been setup');
        }
        this.dispatcher.sendCommand(REPORT_METRIC_DATA, metric.data);
    }

chicm-ms's avatar
chicm-ms committed
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
    private requestTrialJobs(jobNum: number): void {
        if (jobNum < 1) {
            return;
        }
        if (this.dispatcher === undefined) {
            throw new Error('Dispatcher error: tuner has not been setup');
        }
        if (this.experimentProfile.params.multiThread) {
            // Send multiple requests to ensure multiple hyper parameters are generated in non-blocking way.
            // For a single REQUEST_TRIAL_JOBS request, hyper parameters are generated one by one
            // sequentially.
            for (let i: number = 0; i < jobNum; i++) {
                this.dispatcher.sendCommand(REQUEST_TRIAL_JOBS, '1');
            }
        } else {
            this.dispatcher.sendCommand(REQUEST_TRIAL_JOBS, String(jobNum));
        }
    }

697
    private async onTunerCommand(commandType: string, content: string): Promise<void> {
horizon365's avatar
horizon365 committed
698
        this.log.info(`NNIManager received command from dispatcher: ${commandType}, ${content}`);
699
        switch (commandType) {
chicm-ms's avatar
chicm-ms committed
700
            case INITIALIZED: {
chicm-ms's avatar
chicm-ms committed
701
                // Tuner is intialized, search space is set, request tuner to generate hyper parameters
702
703
704
705
706
707
                if (this.trialDataForTuner.length > 0) {
                    if (this.dispatcher === undefined) {
                        throw new Error('Dispatcher error: tuner has not been setup');
                    }
                    this.dispatcher.sendCommand(IMPORT_DATA, this.trialDataForTuner);
                }
chicm-ms's avatar
chicm-ms committed
708
709
                this.requestTrialJobs(this.experimentProfile.params.trialConcurrency);
                break;
chicm-ms's avatar
chicm-ms committed
710
711
            }
            case NEW_TRIAL_JOB: {
QuanluZhang's avatar
QuanluZhang committed
712
                if (this.status.status === 'TUNER_NO_MORE_TRIAL') {
713
                    this.log.warning('It is not supposed to receive more trials after NO_MORE_TRIAL is set');
chicm-ms's avatar
chicm-ms committed
714
                    this.setStatus('RUNNING');
715
                }
716
717
718
719
720
721
722
723
                const form: TrialJobApplicationForm = {
                    sequenceId: this.experimentProfile.nextSequenceId++,
                    hyperParameters: {
                        value: content,
                        index: 0
                    }
                };
                this.waitingTrials.push(form);
724
                break;
chicm-ms's avatar
chicm-ms committed
725
726
            }
            case SEND_TRIAL_JOB_PARAMETER: {
chicm-ms's avatar
chicm-ms committed
727
728
729
730
731
                const tunerCommand: any = JSON.parse(content);
                assert(tunerCommand.parameter_index >= 0);
                assert(tunerCommand.trial_job_id !== undefined);

                const trialJobForm: TrialJobApplicationForm = {
732
                    sequenceId: -1,  // FIXME: multi-phase tuner should use sequence ID instead of trial job ID
chicm-ms's avatar
chicm-ms committed
733
734
735
736
737
                    hyperParameters: {
                        value: content,
                        index: tunerCommand.parameter_index
                    }
                };
chicm-ms's avatar
chicm-ms committed
738
                this.log.info(`updateTrialJob: job id: ${tunerCommand.trial_job_id}, form: ${JSON.stringify(trialJobForm)}`);
chicm-ms's avatar
chicm-ms committed
739
                await this.trainingService.updateTrialJob(tunerCommand.trial_job_id, trialJobForm);
740
741
742
743
744
                if (tunerCommand['parameters'] !== null) {
                    // parameters field is set as empty string if no more hyper parameter can be generated by tuner.
                    await this.dataStore.storeTrialJobEvent(
                        'ADD_HYPERPARAMETER', tunerCommand.trial_job_id, content, undefined);
                }
chicm-ms's avatar
chicm-ms committed
745
                break;
chicm-ms's avatar
chicm-ms committed
746
747
            }
            case NO_MORE_TRIAL_JOBS: {
748
749
750
                if (!['ERROR', 'STOPPING', 'STOPPED'].includes(this.status.status)) {
                    this.setStatus('TUNER_NO_MORE_TRIAL');
                }
751
                break;
chicm-ms's avatar
chicm-ms committed
752
753
            }
            case KILL_TRIAL_JOB: {
chicm-ms's avatar
chicm-ms committed
754
                this.log.info(`cancelTrialJob: ${JSON.parse(content)}`);
QuanluZhang's avatar
QuanluZhang committed
755
                await this.trainingService.cancelTrialJob(JSON.parse(content), true);
756
                break;
chicm-ms's avatar
chicm-ms committed
757
            }
758
759
760
            default:
                throw new Error('Error: unsupported command type from tuner');
        }
Deshui Yu's avatar
Deshui Yu committed
761
762
    }

763
764
765
766
767
768
769
770
771
772
    private criticalError(err: Error): void {
        this.logError(err);
        console.error(err);
    }

    private logError(err: Error): void {
        if (err.stack !== undefined) {
            this.log.error(err.stack);
        }
        this.status.errors.push(err.message);
chicm-ms's avatar
chicm-ms committed
773
774
775
776
777
778
779
780
        this.setStatus('ERROR');
    }

    private setStatus(status: ExperimentStatus): void {
        if (status !== this.status.status) {
            this.log.info(`Change NNIManager status from: ${this.status.status} to: ${status}`);
            this.status.status = status;
        }
781
782
783
784
785
786
787
    }

    private createEmptyExperimentProfile(): ExperimentProfile {
        return {
            id: getExperimentId(),
            revision: 0,
            execDuration: 0,
788
            logDir: getExperimentRootDir(),
789
            nextSequenceId: 0,
790
791
792
793
794
795
            params: {
                authorName: '',
                experimentName: '',
                trialConcurrency: 0,
                maxExecDuration: 0, // unit: second
                maxTrialNum: 0, // maxTrialNum includes all the submitted trial jobs
796
                trainingServicePlatform: '',
QuanluZhang's avatar
QuanluZhang committed
797
                searchSpace: ''
798
799
            }
        };
Deshui Yu's avatar
Deshui Yu committed
800
    }
801

QuanluZhang's avatar
QuanluZhang committed
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
    private async createCheckpointDir(): Promise<string> {
        // TODO: test
        const chkpDir: string = getCheckpointDir();
        // create checkpoint directory
        await mkDirP(chkpDir);
        // assign this directory to exp profile's checkpointDir
        if (this.experimentProfile.params.advisor) {
            this.experimentProfile.params.advisor.checkpointDir = chkpDir;
        }
        if (this.experimentProfile.params.tuner) {
            this.experimentProfile.params.tuner.checkpointDir = chkpDir;
        }
        if (this.experimentProfile.params.assessor) {
            this.experimentProfile.params.assessor.checkpointDir = chkpDir;
        }

        return Promise.resolve(chkpDir);
    }
Deshui Yu's avatar
Deshui Yu committed
820
821
822
}

export { NNIManager };