nnimanager.ts 34.9 KB
Newer Older
liuzhe-lz's avatar
liuzhe-lz committed
1
2
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
Deshui Yu's avatar
Deshui Yu committed
3
4
5
6

'use strict';

import * as assert from 'assert';
chicm-ms's avatar
chicm-ms committed
7
import { ChildProcess, StdioOptions } from 'child_process';
Deshui Yu's avatar
Deshui Yu committed
8
9
10
import { Deferred } from 'ts-deferred';
import * as component from '../common/component';
import { DataStore, MetricDataRecord, MetricType, TrialJobInfo } from '../common/datastore';
11
import { NNIError } from '../common/errors';
12
import { getExperimentId } from '../common/experimentStartupInfo';
Deshui Yu's avatar
Deshui Yu committed
13
14
import { getLogger, Logger } from '../common/log';
import {
chicm-ms's avatar
chicm-ms committed
15
    ExperimentParams, ExperimentProfile, Manager, ExperimentStatus,
16
    NNIManagerStatus, ProfileUpdateType, TrialJobStatistics
Deshui Yu's avatar
Deshui Yu committed
17
18
19
20
} from '../common/manager';
import {
    TrainingService, TrialJobApplicationForm, TrialJobDetail, TrialJobMetric, TrialJobStatus
} from '../common/trainingService';
21
import { delay, getCheckpointDir, getExperimentRootDir, getLogDir, getMsgDispatcherCommand, mkDirP, getTunerProc, getLogLevel, isAlive, killPid } from '../common/utils';
Deshui Yu's avatar
Deshui Yu committed
22
import {
chicm-ms's avatar
chicm-ms committed
23
    INITIALIZE, INITIALIZED, KILL_TRIAL_JOB, NEW_TRIAL_JOB, NO_MORE_TRIAL_JOBS, PING,
24
    REPORT_METRIC_DATA, REQUEST_TRIAL_JOBS, SEND_TRIAL_JOB_PARAMETER, TERMINATE, TRIAL_END, UPDATE_SEARCH_SPACE, IMPORT_DATA
Deshui Yu's avatar
Deshui Yu committed
25
} from './commands';
26
import { createDispatcherInterface, IpcInterface } from './ipcInterface';
Deshui Yu's avatar
Deshui Yu committed
27
28

/**
chicm-ms's avatar
chicm-ms committed
29
 * NNIManager which implements Manager interface
Deshui Yu's avatar
Deshui Yu committed
30
31
32
 */
class NNIManager implements Manager {
    private trainingService: TrainingService;
33
    private dispatcher: IpcInterface | undefined;
34
    private currSubmittedTrialNum: number;  // need to be recovered
QuanluZhang's avatar
QuanluZhang committed
35
    private trialConcurrencyChange: number; // >0: increase, <0: decrease
Deshui Yu's avatar
Deshui Yu committed
36
37
38
    private log: Logger;
    private dataStore: DataStore;
    private experimentProfile: ExperimentProfile;
39
    private dispatcherPid: number;
40
    private status: NNIManagerStatus;
41
    private waitingTrials: TrialJobApplicationForm[];
QuanluZhang's avatar
QuanluZhang committed
42
    private trialJobs: Map<string, TrialJobDetail>;
43
    private trialDataForTuner: string;
SparkSnail's avatar
SparkSnail committed
44
    private readonly: boolean;
45

46
    private trialJobMetricListener: (metric: TrialJobMetric) => void;
47

Deshui Yu's avatar
Deshui Yu committed
48
49
    constructor() {
        this.currSubmittedTrialNum = 0;
QuanluZhang's avatar
QuanluZhang committed
50
        this.trialConcurrencyChange = 0;
Deshui Yu's avatar
Deshui Yu committed
51
52
        this.trainingService = component.get(TrainingService);
        assert(this.trainingService);
53
        this.dispatcherPid = 0;
QuanluZhang's avatar
QuanluZhang committed
54
55
        this.waitingTrials = [];
        this.trialJobs = new Map<string, TrialJobDetail>();
56
        this.trialDataForTuner = '';
SparkSnail's avatar
SparkSnail committed
57
        this.readonly = false;
Deshui Yu's avatar
Deshui Yu committed
58
59
60

        this.log = getLogger();
        this.dataStore = component.get(DataStore);
61
62
63
64
        this.experimentProfile = this.createEmptyExperimentProfile();
        this.status = {
            status: 'INITIALIZED',
            errors: []
Deshui Yu's avatar
Deshui Yu committed
65
        };
chicm-ms's avatar
chicm-ms committed
66
        this.trialJobMetricListener = (metric: TrialJobMetric): void => {
67
68
69
70
            this.onTrialJobMetrics(metric).catch((err: Error) => {
                this.criticalError(NNIError.FromError(err, 'Job metrics error: '));
            });
        };
Deshui Yu's avatar
Deshui Yu committed
71
72
73
    }

    public updateExperimentProfile(experimentProfile: ExperimentProfile, updateType: ProfileUpdateType): Promise<void> {
SparkSnail's avatar
SparkSnail committed
74
75
76
        if (this.readonly) {
            return Promise.reject(new Error('Error: can not update experiment profile in readonly mode!'));
        }
Deshui Yu's avatar
Deshui Yu committed
77
78
79
80
81
82
83
84
85
86
        switch (updateType) {
            case 'TRIAL_CONCURRENCY':
                this.updateTrialConcurrency(experimentProfile.params.trialConcurrency);
                break;
            case 'MAX_EXEC_DURATION':
                this.updateMaxExecDuration(experimentProfile.params.maxExecDuration);
                break;
            case 'SEARCH_SPACE':
                this.updateSearchSpace(experimentProfile.params.searchSpace);
                break;
QuanluZhang's avatar
QuanluZhang committed
87
88
89
            case 'MAX_TRIAL_NUM':
                this.updateMaxTrialNum(experimentProfile.params.maxTrialNum);
                break;
Deshui Yu's avatar
Deshui Yu committed
90
91
92
93
94
95
96
            default:
                throw new Error('Error: unrecognized updateType');
        }

        return this.storeExperimentProfile();
    }

97
    public importData(data: string): Promise<void> {
SparkSnail's avatar
SparkSnail committed
98
99
100
        if (this.readonly) {
            return Promise.reject(new Error('Error: can not import data in readonly mode!'));
        }
101
102
103
104
105
106
107
108
109
110
        if (this.dispatcher === undefined) {
            return Promise.reject(
                new Error('tuner has not been setup')
            );
        }
        this.dispatcher.sendCommand(IMPORT_DATA, data);

        return this.dataStore.storeTrialJobEvent('IMPORT_DATA', '', data);
    }

111
112
113
114
    public async exportData(): Promise<string> {
        return this.dataStore.exportTrialHpConfigs();
    }

115
    public addCustomizedTrialJob(hyperParams: string): Promise<number> {
SparkSnail's avatar
SparkSnail committed
116
117
118
        if (this.readonly) {
            return Promise.reject(new Error('Error: can not add customized trial job in readonly mode!'));
        }
Deshui Yu's avatar
Deshui Yu committed
119
        if (this.currSubmittedTrialNum >= this.experimentProfile.params.maxTrialNum) {
120
            return Promise.reject(new Error('reach maxTrialNum'));
Deshui Yu's avatar
Deshui Yu committed
121
        }
122
123
124

        // TODO: NNI manager should not peek tuner's internal protocol, let's refactor this later
        const packedParameter = {
chicm-ms's avatar
chicm-ms committed
125
126
            parameter_id: null, // eslint-disable-line @typescript-eslint/camelcase
            parameter_source: 'customized', // eslint-disable-line @typescript-eslint/camelcase
127
128
129
130
131
132
133
134
135
136
137
            parameters: JSON.parse(hyperParams)
        }

        const form: TrialJobApplicationForm = {
            sequenceId: this.experimentProfile.nextSequenceId++,
            hyperParameters: {
                value: JSON.stringify(packedParameter),
                index: 0
            }
        };
        this.waitingTrials.push(form);
Deshui Yu's avatar
Deshui Yu committed
138
139

        // trial id has not been generated yet, thus use '' instead
140
141
142
        this.dataStore.storeTrialJobEvent('ADD_CUSTOMIZED', '', hyperParams);

        return Promise.resolve(form.sequenceId);
Deshui Yu's avatar
Deshui Yu committed
143
144
145
    }

    public async cancelTrialJobByUser(trialJobId: string): Promise<void> {
SparkSnail's avatar
SparkSnail committed
146
147
148
        if (this.readonly) {
            return Promise.reject(new Error('Error: can not cancel trial job in readonly mode!'));
        }
chicm-ms's avatar
chicm-ms committed
149
        this.log.info(`User cancelTrialJob: ${trialJobId}`);
Deshui Yu's avatar
Deshui Yu committed
150
151
152
153
154
        await this.trainingService.cancelTrialJob(trialJobId);
        await this.dataStore.storeTrialJobEvent('USER_TO_CANCEL', trialJobId, '');
    }

    public async startExperiment(expParams: ExperimentParams): Promise<string> {
chicm-ms's avatar
chicm-ms committed
155
        this.log.info(`Starting experiment: ${this.experimentProfile.id}`);
Deshui Yu's avatar
Deshui Yu committed
156
157
158
        this.experimentProfile.params = expParams;
        await this.storeExperimentProfile();
        this.log.debug('Setup tuner...');
159

160
        // Set up multiphase config
161
        if (expParams.multiPhase && this.trainingService.isMultiPhaseJobSupported) {
162
163
            this.trainingService.setClusterMetadata('multiPhase', expParams.multiPhase.toString());
        }
164
165
166
167
        // Set up versionCheck config
        if (expParams.versionCheck !== undefined) {
            this.trainingService.setClusterMetadata('version_check', expParams.versionCheck.toString());
        }
SparkSnail's avatar
SparkSnail committed
168
169
170
171
        // Set up logCollection config
        if (expParams.logCollection !== undefined) {
            this.trainingService.setClusterMetadata('log_collection', expParams.logCollection.toString());
        }
172

QuanluZhang's avatar
QuanluZhang committed
173
        const dispatcherCommand: string = getMsgDispatcherCommand(expParams.tuner, expParams.assessor, expParams.advisor,
goooxu's avatar
goooxu committed
174
            expParams.multiPhase, expParams.multiThread);
175
        this.log.debug(`dispatcher command: ${dispatcherCommand}`);
QuanluZhang's avatar
QuanluZhang committed
176
        const checkpointDir: string = await this.createCheckpointDir();
Deshui Yu's avatar
Deshui Yu committed
177
        this.setupTuner(
178
179
            dispatcherCommand,
            undefined,
Deshui Yu's avatar
Deshui Yu committed
180
            'start',
QuanluZhang's avatar
QuanluZhang committed
181
            checkpointDir);
Deshui Yu's avatar
Deshui Yu committed
182

183
        this.experimentProfile.startTime = Date.now();
chicm-ms's avatar
chicm-ms committed
184
        this.setStatus('RUNNING');
Deshui Yu's avatar
Deshui Yu committed
185
        await this.storeExperimentProfile();
186
187
        this.run().catch((err: Error) => {
            this.criticalError(err);
Deshui Yu's avatar
Deshui Yu committed
188
        });
189

Deshui Yu's avatar
Deshui Yu committed
190
191
192
        return this.experimentProfile.id;
    }

SparkSnail's avatar
SparkSnail committed
193
    public async resumeExperiment(readonly: boolean): Promise<void> {
chicm-ms's avatar
chicm-ms committed
194
        this.log.info(`Resuming experiment: ${this.experimentProfile.id}`);
Deshui Yu's avatar
Deshui Yu committed
195
196
197
        //Fetch back the experiment profile
        const experimentId: string = getExperimentId();
        this.experimentProfile = await this.dataStore.getExperimentProfile(experimentId);
SparkSnail's avatar
SparkSnail committed
198
199
200
201
        this.readonly = readonly;
        if (readonly) {
            return Promise.resolve();
        }
Deshui Yu's avatar
Deshui Yu committed
202
        const expParams: ExperimentParams = this.experimentProfile.params;
203

204
        // Set up multiphase config
205
        if (expParams.multiPhase && this.trainingService.isMultiPhaseJobSupported) {
206
207
208
            this.trainingService.setClusterMetadata('multiPhase', expParams.multiPhase.toString());
        }

209
210
        // Set up versionCheck config
        if (expParams.versionCheck !== undefined) {
SparkSnail's avatar
SparkSnail committed
211
            this.trainingService.setClusterMetadata('version_check', expParams.versionCheck.toString());
212
213
        }

QuanluZhang's avatar
QuanluZhang committed
214
        const dispatcherCommand: string = getMsgDispatcherCommand(expParams.tuner, expParams.assessor, expParams.advisor,
goooxu's avatar
goooxu committed
215
            expParams.multiPhase, expParams.multiThread);
216
        this.log.debug(`dispatcher command: ${dispatcherCommand}`);
QuanluZhang's avatar
QuanluZhang committed
217
        const checkpointDir: string = await this.createCheckpointDir();
Deshui Yu's avatar
Deshui Yu committed
218
        this.setupTuner(
219
220
            dispatcherCommand,
            undefined,
Deshui Yu's avatar
Deshui Yu committed
221
            'resume',
QuanluZhang's avatar
QuanluZhang committed
222
            checkpointDir);
Deshui Yu's avatar
Deshui Yu committed
223
224
225
226
227
228
229
230
231
232
233

        const allTrialJobs: TrialJobInfo[] = await this.dataStore.listTrialJobs();

        // Resume currSubmittedTrialNum
        this.currSubmittedTrialNum = allTrialJobs.length;

        // Check the final status for WAITING and RUNNING jobs
        await Promise.all(allTrialJobs
            .filter((job: TrialJobInfo) => job.status === 'WAITING' || job.status === 'RUNNING')
            .map((job: TrialJobInfo) => this.dataStore.storeTrialJobEvent('FAILED', job.id)));

234
235
236
        // Collect generated trials and imported trials
        const finishedTrialData: string = await this.exportData();
        const importedData: string[] = await this.dataStore.getImportedData();
chicm-ms's avatar
chicm-ms committed
237
        let trialData: Record<string, any>[] = JSON.parse(finishedTrialData);
238
239
        for (const oneImportedData of importedData) {
            // do not deduplicate
chicm-ms's avatar
chicm-ms committed
240
            trialData = trialData.concat(<Record<string, any>[]>JSON.parse(oneImportedData));
241
242
243
        }
        this.trialDataForTuner = JSON.stringify(trialData);

chicm-ms's avatar
chicm-ms committed
244
245
246
247
248
        if (this.experimentProfile.execDuration < this.experimentProfile.params.maxExecDuration &&
            this.currSubmittedTrialNum < this.experimentProfile.params.maxTrialNum &&
            this.experimentProfile.endTime) {
            delete this.experimentProfile.endTime;
        }
chicm-ms's avatar
chicm-ms committed
249
        this.setStatus('RUNNING');
250

Deshui Yu's avatar
Deshui Yu committed
251
        // TO DO: update database record for resume event
252
253
254
        this.run().catch((err: Error) => {
            this.criticalError(err);
        });
Deshui Yu's avatar
Deshui Yu committed
255
256
    }

257
258
    public getTrialJob(trialJobId: string): Promise<TrialJobInfo> {
        return this.dataStore.getTrialJob(trialJobId);
Deshui Yu's avatar
Deshui Yu committed
259
260
261
    }

    public async setClusterMetadata(key: string, value: string): Promise<void> {
SparkSnail's avatar
SparkSnail committed
262
263
264
        if (this.readonly) {
            return Promise.reject(new Error('Error: can not set cluster metadata in readonly mode!'));
        }
chicm-ms's avatar
chicm-ms committed
265
        this.log.info(`NNIManager setClusterMetadata, key: ${key}, value: ${value}`);
Deshui Yu's avatar
Deshui Yu committed
266
267
268
269
        let timeoutId: NodeJS.Timer;
        // TO DO: move timeout value to constants file
        const delay1: Promise<{}> = new Promise((resolve: Function, reject: Function): void => {
            timeoutId = setTimeout(
270
                () => { reject(new Error('TrainingService setClusterMetadata timeout. Please check your config file.')); },
Deshui Yu's avatar
Deshui Yu committed
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
                10000);
        });
        await Promise.race([delay1, this.trainingService.setClusterMetadata(key, value)]).finally(() => {
            clearTimeout(timeoutId);
        });
    }

    public getClusterMetadata(key: string): Promise<string> {
        return Promise.resolve(
            this.trainingService.getClusterMetadata(key)
        );
    }

    public async getTrialJobStatistics(): Promise<TrialJobStatistics[]> {
        return this.dataStore.getTrialJobStatistics();
    }

288
    public async stopExperiment(): Promise<void> {
chicm-ms's avatar
chicm-ms committed
289
290
        this.setStatus('STOPPING');
        this.log.info('Stopping experiment, cleaning up ...');
291
        await this.experimentDoneCleanUp();
chicm-ms's avatar
chicm-ms committed
292
        this.log.info('Experiment stopped.');
Deshui Yu's avatar
Deshui Yu committed
293
294
    }

295
    public async getMetricData(trialJobId?: string, metricType?: MetricType): Promise<MetricDataRecord[]> {
Deshui Yu's avatar
Deshui Yu committed
296
297
298
        return this.dataStore.getMetricData(trialJobId, metricType);
    }

299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
    public async getMetricDataByRange(minSeqId: number, maxSeqId: number): Promise<MetricDataRecord[]> {
        const trialJobs = await this.dataStore.listTrialJobs();
        const targetTrials = trialJobs.filter(trial => (
            // FIXME: can this be undefined?
            trial.sequenceId !== undefined && minSeqId <= trial.sequenceId && trial.sequenceId <= maxSeqId
        ));
        const targetTrialIds = new Set(targetTrials.map(trial => trial.id));

        const allMetrics = await this.dataStore.getMetricData();
        return allMetrics.filter(metric => targetTrialIds.has(metric.trialJobId));
    }

    public async getLatestMetricData(): Promise<MetricDataRecord[]> {
        // FIXME: this can take a long time
        const allMetrics: MetricDataRecord[] = await this.dataStore.getMetricData();
        const finals: MetricDataRecord[] = [];
        const latestIntermediates: Map<string, MetricDataRecord> = new Map<string, MetricDataRecord>();
        for (const metric of allMetrics) {
            if (metric.type !== 'PERIODICAL') {
                finals.push(metric);
            } else {
                const old: MetricDataRecord | undefined = latestIntermediates.get(metric.trialJobId);
                if (old === undefined || old.sequence <= metric.sequence) {
                    latestIntermediates.set(metric.trialJobId, metric);
                }
            }
        }
        return finals.concat(Array.from(latestIntermediates.values()));
        // FIXME: unit test
    }

Deshui Yu's avatar
Deshui Yu committed
330
331
332
333
334
335
336
337
    public getExperimentProfile(): Promise<ExperimentProfile> {
        // TO DO: using Promise.resolve()
        const deferred: Deferred<ExperimentProfile> = new Deferred<ExperimentProfile>();
        deferred.resolve(this.experimentProfile);

        return deferred.promise;
    }

338
339
340
341
    public getStatus(): NNIManagerStatus {
        return this.status;
    }

Deshui Yu's avatar
Deshui Yu committed
342
343
344
345
    public async listTrialJobs(status?: TrialJobStatus): Promise<TrialJobInfo[]> {
        return this.dataStore.listTrialJobs(status);
    }

346
347
    private setupTuner(command: string, cwd: string | undefined, mode: 'start' | 'resume', dataDirectory: string): void {
        if (this.dispatcher !== undefined) {
Deshui Yu's avatar
Deshui Yu committed
348
349
            return;
        }
goooxu's avatar
goooxu committed
350
        const stdio: StdioOptions = ['ignore', process.stdout, process.stderr, 'pipe', 'pipe'];
Deshui Yu's avatar
Deshui Yu committed
351
352
353
354
355
356
357
        let newCwd: string;
        if (cwd === undefined || cwd === '') {
            newCwd = getLogDir();
        } else {
            newCwd = cwd;
        }
        // TO DO: add CUDA_VISIBLE_DEVICES
358
359
360
361
362
        let includeIntermediateResultsEnv: boolean | undefined = false;
        if (this.experimentProfile.params.tuner !== undefined) {
            includeIntermediateResultsEnv = this.experimentProfile.params.tuner.includeIntermediateResults;
        }

chicm-ms's avatar
chicm-ms committed
363
        const nniEnv = {
Zejun Lin's avatar
Zejun Lin committed
364
365
            NNI_MODE: mode,
            NNI_CHECKPOINT_DIRECTORY: dataDirectory,
366
            NNI_LOG_DIRECTORY: getLogDir(),
367
            NNI_LOG_LEVEL: getLogLevel(),
368
369
            NNI_INCLUDE_INTERMEDIATE_RESULTS: includeIntermediateResultsEnv,
            CUDA_VISIBLE_DEVICES: this.getGpuEnvvarValue()
Zejun Lin's avatar
Zejun Lin committed
370
        };
chicm-ms's avatar
chicm-ms committed
371
        const newEnv = Object.assign({}, process.env, nniEnv);
372
        const tunerProc: ChildProcess = getTunerProc(command,stdio,newCwd,newEnv);
373
374
        this.dispatcherPid = tunerProc.pid;
        this.dispatcher = createDispatcherInterface(tunerProc);
Deshui Yu's avatar
Deshui Yu committed
375
376
377
378

        return;
    }

379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
    private getGpuEnvvarValue(): string {
        let cudaDevices: string | undefined;

        if (this.experimentProfile.params.advisor !== undefined) {
            cudaDevices = this.experimentProfile.params.advisor.gpuIndices;
        } else if (this.experimentProfile.params.tuner !== undefined) {
            cudaDevices = this.experimentProfile.params.tuner.gpuIndices;
        }

        if (cudaDevices === undefined) {
            return '';
        } else {
            return cudaDevices;
        }
    }

Deshui Yu's avatar
Deshui Yu committed
395
    private updateTrialConcurrency(trialConcurrency: number): void {
QuanluZhang's avatar
QuanluZhang committed
396
397
        // we assume trialConcurrency >= 0, which is checked by restserver
        this.trialConcurrencyChange += (trialConcurrency - this.experimentProfile.params.trialConcurrency);
Deshui Yu's avatar
Deshui Yu committed
398
399
400
401
402
403
404
405
406
407
408
409
        this.experimentProfile.params.trialConcurrency = trialConcurrency;

        return;
    }

    private updateMaxExecDuration(duration: number): void {
        this.experimentProfile.params.maxExecDuration = duration;

        return;
    }

    private updateSearchSpace(searchSpace: string): void {
410
        if (this.dispatcher === undefined) {
Deshui Yu's avatar
Deshui Yu committed
411
412
            throw new Error('Error: tuner has not been setup');
        }
413
        this.dispatcher.sendCommand(UPDATE_SEARCH_SPACE, searchSpace);
Deshui Yu's avatar
Deshui Yu committed
414
415
416
417
418
        this.experimentProfile.params.searchSpace = searchSpace;

        return;
    }

QuanluZhang's avatar
QuanluZhang committed
419
420
421
422
423
424
    private updateMaxTrialNum(maxTrialNum: number): void {
        this.experimentProfile.params.maxTrialNum = maxTrialNum;

        return;
    }

Deshui Yu's avatar
Deshui Yu committed
425
    private async experimentDoneCleanUp(): Promise<void> {
426
        if (this.dispatcher === undefined) {
Deshui Yu's avatar
Deshui Yu committed
427
428
            throw new Error('Error: tuner has not been setup');
        }
429
        this.trainingService.removeTrialJobMetricListener(this.trialJobMetricListener);
430
        this.dispatcher.sendCommand(TERMINATE);
Deshui Yu's avatar
Deshui Yu committed
431
432
433
        let tunerAlive: boolean = true;
        // gracefully terminate tuner and assessor here, wait at most 30 seconds.
        for (let i: number = 0; i < 30; i++) {
434
            if (!tunerAlive) { break; }
435
            tunerAlive = await isAlive(this.dispatcherPid);
Deshui Yu's avatar
Deshui Yu committed
436
437
            await delay(1000);
        }
438
        await killPid(this.dispatcherPid);
Deshui Yu's avatar
Deshui Yu committed
439
440
441
442
443
444
        const trialJobList: TrialJobDetail[] = await this.trainingService.listTrialJobs();
        // TO DO: to promise all
        for (const trialJob of trialJobList) {
            if (trialJob.status === 'RUNNING' ||
                trialJob.status === 'WAITING') {
                try {
chicm-ms's avatar
chicm-ms committed
445
                    this.log.info(`cancelTrialJob: ${trialJob.id}`);
Deshui Yu's avatar
Deshui Yu committed
446
447
448
449
450
451
452
                    await this.trainingService.cancelTrialJob(trialJob.id);
                } catch (error) {
                    // pid does not exist, do nothing here
                }
            }
        }
        await this.trainingService.cleanUp();
453
        this.experimentProfile.endTime = Date.now();
Deshui Yu's avatar
Deshui Yu committed
454
        await this.storeExperimentProfile();
chicm-ms's avatar
chicm-ms committed
455
        this.setStatus('STOPPED');
Deshui Yu's avatar
Deshui Yu committed
456
457
458
    }

    private async periodicallyUpdateExecDuration(): Promise<void> {
459
        let count: number = 1;
460
        while (!['ERROR', 'STOPPING', 'STOPPED'].includes(this.status.status)) {
461
            await delay(1000 * 1); // 1 seconds
462
            if (this.status.status === 'RUNNING') {
463
464
465
466
467
468
                this.experimentProfile.execDuration += 1;
                if (count % 10 === 0) {
                    await this.storeExperimentProfile();
                }
            }
            count += 1;
Deshui Yu's avatar
Deshui Yu committed
469
470
471
        }
    }

chicm-ms's avatar
chicm-ms committed
472
473
474
475
476
477
    private async pingDispatcher(): Promise<void> {
        if (this.dispatcher === undefined) {
            throw new Error('Error: tuner has not been setup');
        }
        while (!['ERROR', 'STOPPING', 'STOPPED'].includes(this.status.status)) {
            this.dispatcher.sendCommand(PING);
chicm-ms's avatar
chicm-ms committed
478
            await delay(1000 * 5);
chicm-ms's avatar
chicm-ms committed
479
480
481
        }
    }

QuanluZhang's avatar
QuanluZhang committed
482
483
    private async requestTrialJobsStatus(): Promise<number> {
        let finishedTrialJobNum: number = 0;
QuanluZhang's avatar
QuanluZhang committed
484
485
486
        if (this.dispatcher === undefined) {
            throw new Error('Error: tuner has not been setup');
        }
QuanluZhang's avatar
QuanluZhang committed
487
488
489
490
        for (const trialJobId of Array.from(this.trialJobs.keys())) {
            const trialJobDetail: TrialJobDetail = await this.trainingService.getTrialJob(trialJobId);
            const oldTrialJobDetail: TrialJobDetail | undefined = this.trialJobs.get(trialJobId);
            if (oldTrialJobDetail !== undefined && oldTrialJobDetail.status !== trialJobDetail.status) {
chicm-ms's avatar
chicm-ms committed
491
                this.log.info(`Trial job ${trialJobDetail.id} status changed from ${oldTrialJobDetail.status} to ${trialJobDetail.status}`);
QuanluZhang's avatar
QuanluZhang committed
492
                this.trialJobs.set(trialJobId, Object.assign({}, trialJobDetail));
493
                await this.dataStore.storeTrialJobEvent(trialJobDetail.status, trialJobDetail.id, undefined, trialJobDetail);
QuanluZhang's avatar
QuanluZhang committed
494
            }
QuanluZhang's avatar
QuanluZhang committed
495
            let hyperParams: string | undefined = undefined;
QuanluZhang's avatar
QuanluZhang committed
496
497
498
            switch (trialJobDetail.status) {
                case 'SUCCEEDED':
                case 'USER_CANCELED':
QuanluZhang's avatar
QuanluZhang committed
499
                case 'EARLY_STOPPED':
QuanluZhang's avatar
QuanluZhang committed
500
501
                    this.trialJobs.delete(trialJobId);
                    finishedTrialJobNum++;
502
                    hyperParams = trialJobDetail.form.hyperParameters.value;
QuanluZhang's avatar
QuanluZhang committed
503
                    this.dispatcher.sendCommand(TRIAL_END, JSON.stringify({
chicm-ms's avatar
chicm-ms committed
504
                        trial_job_id: trialJobDetail.id, // eslint-disable-line @typescript-eslint/camelcase
QuanluZhang's avatar
QuanluZhang committed
505
                        event: trialJobDetail.status,
chicm-ms's avatar
chicm-ms committed
506
                        hyper_params: hyperParams // eslint-disable-line @typescript-eslint/camelcase
goooxu's avatar
goooxu committed
507
                    }));
QuanluZhang's avatar
QuanluZhang committed
508
509
510
511
512
513
514
                    break;
                case 'FAILED':
                case 'SYS_CANCELED':
                    // In the current version, we do not retry
                    // TO DO: push this job to queue for retry
                    this.trialJobs.delete(trialJobId);
                    finishedTrialJobNum++;
515
                    hyperParams = trialJobDetail.form.hyperParameters.value;
QuanluZhang's avatar
QuanluZhang committed
516
                    this.dispatcher.sendCommand(TRIAL_END, JSON.stringify({
chicm-ms's avatar
chicm-ms committed
517
                        trial_job_id: trialJobDetail.id, // eslint-disable-line @typescript-eslint/camelcase
QuanluZhang's avatar
QuanluZhang committed
518
                        event: trialJobDetail.status,
chicm-ms's avatar
chicm-ms committed
519
                        hyper_params: hyperParams // eslint-disable-line @typescript-eslint/camelcase
goooxu's avatar
goooxu committed
520
                    }));
QuanluZhang's avatar
QuanluZhang committed
521
522
523
524
525
526
527
528
529
530
                    break;
                case 'WAITING':
                case 'RUNNING':
                case 'UNKNOWN':
                    // Do nothing
                    break;
                default:
                // TO DO: add warning in log
            }
        }
goooxu's avatar
goooxu committed
531

Gems Guo's avatar
Gems Guo committed
532
        return finishedTrialJobNum;
QuanluZhang's avatar
QuanluZhang committed
533
534
535
536
537
538
    }

    private async manageTrials(): Promise<void> {
        if (this.dispatcher === undefined) {
            throw new Error('Error: tuner has not been setup');
        }
QuanluZhang's avatar
QuanluZhang committed
539
        let allFinishedTrialJobNum: number = this.currSubmittedTrialNum;
QuanluZhang's avatar
QuanluZhang committed
540
        let waitSubmittedToFinish: number;
541
        while (!['ERROR', 'STOPPING', 'STOPPED'].includes(this.status.status)) {
QuanluZhang's avatar
QuanluZhang committed
542
543
544
545
546
547
548
            const finishedTrialJobNum: number = await this.requestTrialJobsStatus();
            allFinishedTrialJobNum += finishedTrialJobNum;

            // requestTrialNum is the number of trials that will be requested from tuner.
            // If trialConcurrency does not change, requestTrialNum equals finishedTrialJobNum.
            // If trialConcurrency changes, for example, trialConcurrency increases by 2 (trialConcurrencyChange=2), then
            // requestTrialNum equals 2 + finishedTrialJobNum and trialConcurrencyChange becomes 0.
549
            // If trialConcurrency changes, for example, trialConcurrency decreases by 4 (trialConcurrencyChange=-4) and
QuanluZhang's avatar
QuanluZhang committed
550
551
552
553
554
555
556
557
            // finishedTrialJobNum is 2, then requestTrialNum becomes -2. No trial will be requested from tuner,
            // and trialConcurrencyChange becomes -2.
            const requestTrialNum: number = this.trialConcurrencyChange + finishedTrialJobNum;
            if (requestTrialNum >= 0) {
                this.trialConcurrencyChange = 0;
            } else {
                this.trialConcurrencyChange = requestTrialNum;
            }
chicm-ms's avatar
chicm-ms committed
558

559
            this.requestTrialJobs(requestTrialNum);
chicm-ms's avatar
chicm-ms committed
560

QuanluZhang's avatar
QuanluZhang committed
561
            // check maxtrialnum and maxduration here
562
            // NO_MORE_TRIAL is more like a subset of RUNNING, because during RUNNING tuner
563
            // might tell nnimanager that this is no more trials. In NO_MORE_TRIAL state, the experiment is viewed
564
565
            // as still running. DONE could be transfered from RUNNING or NO_MORE_TRIAL.
            assert(this.status.status === 'RUNNING' ||
566
                this.status.status === 'DONE' ||
QuanluZhang's avatar
QuanluZhang committed
567
568
                this.status.status === 'NO_MORE_TRIAL' ||
                this.status.status === 'TUNER_NO_MORE_TRIAL');
569
            if (this.experimentProfile.execDuration > this.experimentProfile.params.maxExecDuration ||
QuanluZhang's avatar
QuanluZhang committed
570
                this.currSubmittedTrialNum >= this.experimentProfile.params.maxTrialNum) {
QuanluZhang's avatar
QuanluZhang committed
571
                if (this.status.status !== 'DONE') {
chicm-ms's avatar
chicm-ms committed
572
                    this.setStatus('NO_MORE_TRIAL');
QuanluZhang's avatar
QuanluZhang committed
573
574
575
576
                    waitSubmittedToFinish = this.currSubmittedTrialNum;

                    assert(allFinishedTrialJobNum <= waitSubmittedToFinish);
                    if (allFinishedTrialJobNum >= waitSubmittedToFinish) {
chicm-ms's avatar
chicm-ms committed
577
                        this.setStatus('DONE');
QuanluZhang's avatar
QuanluZhang committed
578
579
580
581
582
                        this.experimentProfile.endTime = Date.now();
                        await this.storeExperimentProfile();
                        // write this log for travis CI
                        this.log.info('Experiment done.');
                    }
QuanluZhang's avatar
QuanluZhang committed
583
584
585
                }
            } else {
                if (this.status.status === 'DONE') {
586
587
                    delete this.experimentProfile.endTime;
                    await this.storeExperimentProfile();
QuanluZhang's avatar
QuanluZhang committed
588
                }
QuanluZhang's avatar
QuanluZhang committed
589
                if (this.status.status !== 'TUNER_NO_MORE_TRIAL') {
chicm-ms's avatar
chicm-ms committed
590
                    this.setStatus('RUNNING');
591
                }
QuanluZhang's avatar
QuanluZhang committed
592
593
594
595
596
                for (let i: number = this.trialJobs.size; i < this.experimentProfile.params.trialConcurrency; i++) {
                    if (this.waitingTrials.length === 0 ||
                        this.currSubmittedTrialNum >= this.experimentProfile.params.maxTrialNum) {
                        break;
                    }
597
                    const form = this.waitingTrials.shift() as TrialJobApplicationForm;
QuanluZhang's avatar
QuanluZhang committed
598
                    this.currSubmittedTrialNum++;
599
600
                    this.log.info(`submitTrialJob: form: ${JSON.stringify(form)}`);
                    const trialJobDetail: TrialJobDetail = await this.trainingService.submitTrialJob(form);
601
                    await this.storeExperimentProfile();
QuanluZhang's avatar
QuanluZhang committed
602
603
604
605
                    this.trialJobs.set(trialJobDetail.id, Object.assign({}, trialJobDetail));
                    const trialJobDetailSnapshot: TrialJobDetail | undefined = this.trialJobs.get(trialJobDetail.id);
                    if (trialJobDetailSnapshot != undefined) {
                        await this.dataStore.storeTrialJobEvent(
606
                            trialJobDetailSnapshot.status, trialJobDetailSnapshot.id, form.hyperParameters.value, trialJobDetailSnapshot);
QuanluZhang's avatar
QuanluZhang committed
607
608
609
610
611
612
613
614
615
                    } else {
                        assert(false, `undefined trialJobDetail in trialJobs: ${trialJobDetail.id}`);
                    }
                }
            }
            await delay(1000 * 5); // 5 seconds
        }
    }

Deshui Yu's avatar
Deshui Yu committed
616
617
618
619
620
621
    private storeExperimentProfile(): Promise<void> {
        this.experimentProfile.revision += 1;

        return this.dataStore.storeExperimentProfile(this.experimentProfile);
    }

622
    private async run(): Promise<void> {
QuanluZhang's avatar
QuanluZhang committed
623
        assert(this.dispatcher !== undefined);
624
625
626
627
628
629
630

        this.addEventListeners();

        this.sendInitTunerCommands();

        await Promise.all([
            this.periodicallyUpdateExecDuration(),
chicm-ms's avatar
chicm-ms committed
631
            this.pingDispatcher().catch((err: Error) => {
chicm-ms's avatar
chicm-ms committed
632
                throw NNIError.FromError(err, 'Dispatcher error: ');
chicm-ms's avatar
chicm-ms committed
633
            }),
634
            this.trainingService.run().catch((err: Error) => {
chicm-ms's avatar
chicm-ms committed
635
                throw NNIError.FromError(err, 'Training service error: ');
636
            }),
QuanluZhang's avatar
QuanluZhang committed
637
            this.manageTrials().catch((err: Error) => {
chicm-ms's avatar
chicm-ms committed
638
                throw NNIError.FromError(err, 'Job management error: ');
639
            })]);
640
641
    }

QuanluZhang's avatar
QuanluZhang committed
642
    private addEventListeners(): void {
chicm-ms's avatar
chicm-ms committed
643
        this.log.info('Add event listeners');
644
        // TO DO: cannot run this method more than once in one NNIManager instance
QuanluZhang's avatar
QuanluZhang committed
645
        if (this.dispatcher === undefined) {
646
647
            throw new Error('Error: tuner or job maintainer have not been setup');
        }
648
        this.trainingService.addTrialJobMetricListener(this.trialJobMetricListener);
649
650
651

        this.dispatcher.onCommand((commandType: string, content: string) => {
            this.onTunerCommand(commandType, content).catch((err: Error) => {
chicm-ms's avatar
chicm-ms committed
652
                this.criticalError(NNIError.FromError(err, 'Tuner command event error: '));
653
654
655
656
657
658
            });
        });
    }

    private sendInitTunerCommands(): void {
        if (this.dispatcher === undefined) {
659
            throw new Error('Dispatcher error: tuner has not been setup');
660
        }
chicm-ms's avatar
chicm-ms committed
661
662
663
        this.log.debug(`Send tuner command: INITIALIZE: ${this.experimentProfile.params.searchSpace}`);
        // Tuner need to be initialized with search space before generating any hyper parameters
        this.dispatcher.sendCommand(INITIALIZE, this.experimentProfile.params.searchSpace);
664
665
666
    }

    private async onTrialJobMetrics(metric: TrialJobMetric): Promise<void> {
chicm-ms's avatar
chicm-ms committed
667
        this.log.debug(`NNIManager received trial job metrics: ${metric}`);
668
669
670
671
672
673
674
        await this.dataStore.storeMetricData(metric.id, metric.data);
        if (this.dispatcher === undefined) {
            throw new Error('Error: tuner has not been setup');
        }
        this.dispatcher.sendCommand(REPORT_METRIC_DATA, metric.data);
    }

chicm-ms's avatar
chicm-ms committed
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
    private requestTrialJobs(jobNum: number): void {
        if (jobNum < 1) {
            return;
        }
        if (this.dispatcher === undefined) {
            throw new Error('Dispatcher error: tuner has not been setup');
        }
        if (this.experimentProfile.params.multiThread) {
            // Send multiple requests to ensure multiple hyper parameters are generated in non-blocking way.
            // For a single REQUEST_TRIAL_JOBS request, hyper parameters are generated one by one
            // sequentially.
            for (let i: number = 0; i < jobNum; i++) {
                this.dispatcher.sendCommand(REQUEST_TRIAL_JOBS, '1');
            }
        } else {
            this.dispatcher.sendCommand(REQUEST_TRIAL_JOBS, String(jobNum));
        }
    }

694
    private async onTunerCommand(commandType: string, content: string): Promise<void> {
horizon365's avatar
horizon365 committed
695
        this.log.info(`NNIManager received command from dispatcher: ${commandType}, ${content}`);
696
        switch (commandType) {
chicm-ms's avatar
chicm-ms committed
697
            case INITIALIZED: {
chicm-ms's avatar
chicm-ms committed
698
                // Tuner is intialized, search space is set, request tuner to generate hyper parameters
699
700
701
702
703
704
                if (this.trialDataForTuner.length > 0) {
                    if (this.dispatcher === undefined) {
                        throw new Error('Dispatcher error: tuner has not been setup');
                    }
                    this.dispatcher.sendCommand(IMPORT_DATA, this.trialDataForTuner);
                }
chicm-ms's avatar
chicm-ms committed
705
706
                this.requestTrialJobs(this.experimentProfile.params.trialConcurrency);
                break;
chicm-ms's avatar
chicm-ms committed
707
708
            }
            case NEW_TRIAL_JOB: {
QuanluZhang's avatar
QuanluZhang committed
709
                if (this.status.status === 'TUNER_NO_MORE_TRIAL') {
710
                    this.log.warning('It is not supposed to receive more trials after NO_MORE_TRIAL is set');
chicm-ms's avatar
chicm-ms committed
711
                    this.setStatus('RUNNING');
712
                }
713
714
715
716
717
718
719
720
                const form: TrialJobApplicationForm = {
                    sequenceId: this.experimentProfile.nextSequenceId++,
                    hyperParameters: {
                        value: content,
                        index: 0
                    }
                };
                this.waitingTrials.push(form);
721
                break;
chicm-ms's avatar
chicm-ms committed
722
723
            }
            case SEND_TRIAL_JOB_PARAMETER: {
chicm-ms's avatar
chicm-ms committed
724
725
726
727
728
                const tunerCommand: any = JSON.parse(content);
                assert(tunerCommand.parameter_index >= 0);
                assert(tunerCommand.trial_job_id !== undefined);

                const trialJobForm: TrialJobApplicationForm = {
729
                    sequenceId: -1,  // FIXME: multi-phase tuner should use sequence ID instead of trial job ID
chicm-ms's avatar
chicm-ms committed
730
731
732
733
734
                    hyperParameters: {
                        value: content,
                        index: tunerCommand.parameter_index
                    }
                };
chicm-ms's avatar
chicm-ms committed
735
                this.log.info(`updateTrialJob: job id: ${tunerCommand.trial_job_id}, form: ${JSON.stringify(trialJobForm)}`);
chicm-ms's avatar
chicm-ms committed
736
                await this.trainingService.updateTrialJob(tunerCommand.trial_job_id, trialJobForm);
737
738
739
740
741
                if (tunerCommand['parameters'] !== null) {
                    // parameters field is set as empty string if no more hyper parameter can be generated by tuner.
                    await this.dataStore.storeTrialJobEvent(
                        'ADD_HYPERPARAMETER', tunerCommand.trial_job_id, content, undefined);
                }
chicm-ms's avatar
chicm-ms committed
742
                break;
chicm-ms's avatar
chicm-ms committed
743
744
            }
            case NO_MORE_TRIAL_JOBS: {
745
746
747
                if (!['ERROR', 'STOPPING', 'STOPPED'].includes(this.status.status)) {
                    this.setStatus('TUNER_NO_MORE_TRIAL');
                }
748
                break;
chicm-ms's avatar
chicm-ms committed
749
750
            }
            case KILL_TRIAL_JOB: {
chicm-ms's avatar
chicm-ms committed
751
                this.log.info(`cancelTrialJob: ${JSON.parse(content)}`);
QuanluZhang's avatar
QuanluZhang committed
752
                await this.trainingService.cancelTrialJob(JSON.parse(content), true);
753
                break;
chicm-ms's avatar
chicm-ms committed
754
            }
755
756
757
            default:
                throw new Error('Error: unsupported command type from tuner');
        }
Deshui Yu's avatar
Deshui Yu committed
758
759
    }

760
761
762
763
764
765
766
767
768
769
    private criticalError(err: Error): void {
        this.logError(err);
        console.error(err);
    }

    private logError(err: Error): void {
        if (err.stack !== undefined) {
            this.log.error(err.stack);
        }
        this.status.errors.push(err.message);
chicm-ms's avatar
chicm-ms committed
770
771
772
773
774
775
776
777
        this.setStatus('ERROR');
    }

    private setStatus(status: ExperimentStatus): void {
        if (status !== this.status.status) {
            this.log.info(`Change NNIManager status from: ${this.status.status} to: ${status}`);
            this.status.status = status;
        }
778
779
780
781
782
783
784
    }

    private createEmptyExperimentProfile(): ExperimentProfile {
        return {
            id: getExperimentId(),
            revision: 0,
            execDuration: 0,
785
            logDir: getExperimentRootDir(),
786
            nextSequenceId: 0,
787
788
789
790
791
792
            params: {
                authorName: '',
                experimentName: '',
                trialConcurrency: 0,
                maxExecDuration: 0, // unit: second
                maxTrialNum: 0, // maxTrialNum includes all the submitted trial jobs
793
                trainingServicePlatform: '',
QuanluZhang's avatar
QuanluZhang committed
794
                searchSpace: ''
795
796
            }
        };
Deshui Yu's avatar
Deshui Yu committed
797
    }
798

QuanluZhang's avatar
QuanluZhang committed
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
    private async createCheckpointDir(): Promise<string> {
        // TODO: test
        const chkpDir: string = getCheckpointDir();
        // create checkpoint directory
        await mkDirP(chkpDir);
        // assign this directory to exp profile's checkpointDir
        if (this.experimentProfile.params.advisor) {
            this.experimentProfile.params.advisor.checkpointDir = chkpDir;
        }
        if (this.experimentProfile.params.tuner) {
            this.experimentProfile.params.tuner.checkpointDir = chkpDir;
        }
        if (this.experimentProfile.params.assessor) {
            this.experimentProfile.params.assessor.checkpointDir = chkpDir;
        }

        return Promise.resolve(chkpDir);
    }
Deshui Yu's avatar
Deshui Yu committed
817
818
819
}

export { NNIManager };