nnimanager.ts 35.2 KB
Newer Older
liuzhe-lz's avatar
liuzhe-lz committed
1
2
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
Deshui Yu's avatar
Deshui Yu committed
3
4
5
6

'use strict';

import * as assert from 'assert';
chicm-ms's avatar
chicm-ms committed
7
import { ChildProcess, StdioOptions } from 'child_process';
Deshui Yu's avatar
Deshui Yu committed
8
9
10
import { Deferred } from 'ts-deferred';
import * as component from '../common/component';
import { DataStore, MetricDataRecord, MetricType, TrialJobInfo } from '../common/datastore';
11
import { NNIError } from '../common/errors';
12
import { getExperimentId } from '../common/experimentStartupInfo';
Deshui Yu's avatar
Deshui Yu committed
13
14
import { getLogger, Logger } from '../common/log';
import {
chicm-ms's avatar
chicm-ms committed
15
    ExperimentParams, ExperimentProfile, Manager, ExperimentStatus,
16
    NNIManagerStatus, ProfileUpdateType, TrialJobStatistics
Deshui Yu's avatar
Deshui Yu committed
17
18
19
20
} from '../common/manager';
import {
    TrainingService, TrialJobApplicationForm, TrialJobDetail, TrialJobMetric, TrialJobStatus
} from '../common/trainingService';
21
import { delay, getCheckpointDir, getExperimentRootDir, getLogDir, getMsgDispatcherCommand, mkDirP, getTunerProc, getLogLevel, isAlive, killPid } from '../common/utils';
Deshui Yu's avatar
Deshui Yu committed
22
import {
chicm-ms's avatar
chicm-ms committed
23
    INITIALIZE, INITIALIZED, KILL_TRIAL_JOB, NEW_TRIAL_JOB, NO_MORE_TRIAL_JOBS, PING,
24
    REPORT_METRIC_DATA, REQUEST_TRIAL_JOBS, SEND_TRIAL_JOB_PARAMETER, TERMINATE, TRIAL_END, UPDATE_SEARCH_SPACE, IMPORT_DATA
Deshui Yu's avatar
Deshui Yu committed
25
} from './commands';
26
import { createDispatcherInterface, IpcInterface } from './ipcInterface';
Deshui Yu's avatar
Deshui Yu committed
27
28

/**
chicm-ms's avatar
chicm-ms committed
29
 * NNIManager which implements Manager interface
Deshui Yu's avatar
Deshui Yu committed
30
31
32
 */
class NNIManager implements Manager {
    private trainingService: TrainingService;
33
    private dispatcher: IpcInterface | undefined;
34
    private currSubmittedTrialNum: number;  // need to be recovered
QuanluZhang's avatar
QuanluZhang committed
35
    private trialConcurrencyChange: number; // >0: increase, <0: decrease
Deshui Yu's avatar
Deshui Yu committed
36
37
38
    private log: Logger;
    private dataStore: DataStore;
    private experimentProfile: ExperimentProfile;
39
    private dispatcherPid: number;
40
    private status: NNIManagerStatus;
41
    private waitingTrials: TrialJobApplicationForm[];
QuanluZhang's avatar
QuanluZhang committed
42
    private trialJobs: Map<string, TrialJobDetail>;
43
    private trialDataForTuner: string;
SparkSnail's avatar
SparkSnail committed
44
    private readonly: boolean;
45

46
    private trialJobMetricListener: (metric: TrialJobMetric) => void;
47

Deshui Yu's avatar
Deshui Yu committed
48
49
    constructor() {
        this.currSubmittedTrialNum = 0;
QuanluZhang's avatar
QuanluZhang committed
50
        this.trialConcurrencyChange = 0;
Deshui Yu's avatar
Deshui Yu committed
51
52
        this.trainingService = component.get(TrainingService);
        assert(this.trainingService);
53
        this.dispatcherPid = 0;
QuanluZhang's avatar
QuanluZhang committed
54
55
        this.waitingTrials = [];
        this.trialJobs = new Map<string, TrialJobDetail>();
56
        this.trialDataForTuner = '';
SparkSnail's avatar
SparkSnail committed
57
        this.readonly = false;
Deshui Yu's avatar
Deshui Yu committed
58
59
60

        this.log = getLogger();
        this.dataStore = component.get(DataStore);
61
62
63
64
        this.experimentProfile = this.createEmptyExperimentProfile();
        this.status = {
            status: 'INITIALIZED',
            errors: []
Deshui Yu's avatar
Deshui Yu committed
65
        };
chicm-ms's avatar
chicm-ms committed
66
        this.trialJobMetricListener = (metric: TrialJobMetric): void => {
67
68
69
70
            this.onTrialJobMetrics(metric).catch((err: Error) => {
                this.criticalError(NNIError.FromError(err, 'Job metrics error: '));
            });
        };
Deshui Yu's avatar
Deshui Yu committed
71
72
73
    }

    public updateExperimentProfile(experimentProfile: ExperimentProfile, updateType: ProfileUpdateType): Promise<void> {
SparkSnail's avatar
SparkSnail committed
74
75
76
        if (this.readonly) {
            return Promise.reject(new Error('Error: can not update experiment profile in readonly mode!'));
        }
Deshui Yu's avatar
Deshui Yu committed
77
78
79
80
81
82
83
84
85
86
        switch (updateType) {
            case 'TRIAL_CONCURRENCY':
                this.updateTrialConcurrency(experimentProfile.params.trialConcurrency);
                break;
            case 'MAX_EXEC_DURATION':
                this.updateMaxExecDuration(experimentProfile.params.maxExecDuration);
                break;
            case 'SEARCH_SPACE':
                this.updateSearchSpace(experimentProfile.params.searchSpace);
                break;
QuanluZhang's avatar
QuanluZhang committed
87
88
89
            case 'MAX_TRIAL_NUM':
                this.updateMaxTrialNum(experimentProfile.params.maxTrialNum);
                break;
Deshui Yu's avatar
Deshui Yu committed
90
91
92
93
94
95
96
            default:
                throw new Error('Error: unrecognized updateType');
        }

        return this.storeExperimentProfile();
    }

97
    public importData(data: string): Promise<void> {
SparkSnail's avatar
SparkSnail committed
98
99
100
        if (this.readonly) {
            return Promise.reject(new Error('Error: can not import data in readonly mode!'));
        }
101
102
103
104
105
106
107
108
109
110
        if (this.dispatcher === undefined) {
            return Promise.reject(
                new Error('tuner has not been setup')
            );
        }
        this.dispatcher.sendCommand(IMPORT_DATA, data);

        return this.dataStore.storeTrialJobEvent('IMPORT_DATA', '', data);
    }

111
112
113
114
    public async exportData(): Promise<string> {
        return this.dataStore.exportTrialHpConfigs();
    }

115
    public addCustomizedTrialJob(hyperParams: string): Promise<number> {
SparkSnail's avatar
SparkSnail committed
116
117
118
        if (this.readonly) {
            return Promise.reject(new Error('Error: can not add customized trial job in readonly mode!'));
        }
Deshui Yu's avatar
Deshui Yu committed
119
        if (this.currSubmittedTrialNum >= this.experimentProfile.params.maxTrialNum) {
120
            return Promise.reject(new Error('reach maxTrialNum'));
Deshui Yu's avatar
Deshui Yu committed
121
        }
122
123
124

        // TODO: NNI manager should not peek tuner's internal protocol, let's refactor this later
        const packedParameter = {
chicm-ms's avatar
chicm-ms committed
125
126
            parameter_id: null, // eslint-disable-line @typescript-eslint/camelcase
            parameter_source: 'customized', // eslint-disable-line @typescript-eslint/camelcase
127
128
129
130
131
132
133
134
135
136
137
            parameters: JSON.parse(hyperParams)
        }

        const form: TrialJobApplicationForm = {
            sequenceId: this.experimentProfile.nextSequenceId++,
            hyperParameters: {
                value: JSON.stringify(packedParameter),
                index: 0
            }
        };
        this.waitingTrials.push(form);
Deshui Yu's avatar
Deshui Yu committed
138
139

        // trial id has not been generated yet, thus use '' instead
140
141
142
        this.dataStore.storeTrialJobEvent('ADD_CUSTOMIZED', '', hyperParams);

        return Promise.resolve(form.sequenceId);
Deshui Yu's avatar
Deshui Yu committed
143
144
145
    }

    public async cancelTrialJobByUser(trialJobId: string): Promise<void> {
SparkSnail's avatar
SparkSnail committed
146
147
148
        if (this.readonly) {
            return Promise.reject(new Error('Error: can not cancel trial job in readonly mode!'));
        }
chicm-ms's avatar
chicm-ms committed
149
        this.log.info(`User cancelTrialJob: ${trialJobId}`);
Deshui Yu's avatar
Deshui Yu committed
150
151
152
153
154
        await this.trainingService.cancelTrialJob(trialJobId);
        await this.dataStore.storeTrialJobEvent('USER_TO_CANCEL', trialJobId, '');
    }

    public async startExperiment(expParams: ExperimentParams): Promise<string> {
chicm-ms's avatar
chicm-ms committed
155
        this.log.info(`Starting experiment: ${this.experimentProfile.id}`);
Deshui Yu's avatar
Deshui Yu committed
156
157
158
        this.experimentProfile.params = expParams;
        await this.storeExperimentProfile();
        this.log.debug('Setup tuner...');
159

160
        // Set up multiphase config
161
        if (expParams.multiPhase && this.trainingService.isMultiPhaseJobSupported) {
162
163
            this.trainingService.setClusterMetadata('multiPhase', expParams.multiPhase.toString());
        }
164
165
166
167
        // Set up versionCheck config
        if (expParams.versionCheck !== undefined) {
            this.trainingService.setClusterMetadata('version_check', expParams.versionCheck.toString());
        }
SparkSnail's avatar
SparkSnail committed
168
169
170
171
        // Set up logCollection config
        if (expParams.logCollection !== undefined) {
            this.trainingService.setClusterMetadata('log_collection', expParams.logCollection.toString());
        }
172

chicm-ms's avatar
chicm-ms committed
173
        const dispatcherCommand: string = getMsgDispatcherCommand(expParams);
174
        this.log.debug(`dispatcher command: ${dispatcherCommand}`);
QuanluZhang's avatar
QuanluZhang committed
175
        const checkpointDir: string = await this.createCheckpointDir();
Deshui Yu's avatar
Deshui Yu committed
176
        this.setupTuner(
177
178
            dispatcherCommand,
            undefined,
Deshui Yu's avatar
Deshui Yu committed
179
            'start',
QuanluZhang's avatar
QuanluZhang committed
180
            checkpointDir);
Deshui Yu's avatar
Deshui Yu committed
181

182
        this.experimentProfile.startTime = Date.now();
chicm-ms's avatar
chicm-ms committed
183
        this.setStatus('RUNNING');
Deshui Yu's avatar
Deshui Yu committed
184
        await this.storeExperimentProfile();
185
186
        this.run().catch((err: Error) => {
            this.criticalError(err);
Deshui Yu's avatar
Deshui Yu committed
187
        });
188

Deshui Yu's avatar
Deshui Yu committed
189
190
191
        return this.experimentProfile.id;
    }

SparkSnail's avatar
SparkSnail committed
192
    public async resumeExperiment(readonly: boolean): Promise<void> {
chicm-ms's avatar
chicm-ms committed
193
        this.log.info(`Resuming experiment: ${this.experimentProfile.id}`);
Deshui Yu's avatar
Deshui Yu committed
194
195
196
        //Fetch back the experiment profile
        const experimentId: string = getExperimentId();
        this.experimentProfile = await this.dataStore.getExperimentProfile(experimentId);
SparkSnail's avatar
SparkSnail committed
197
198
199
200
        this.readonly = readonly;
        if (readonly) {
            return Promise.resolve();
        }
Deshui Yu's avatar
Deshui Yu committed
201
        const expParams: ExperimentParams = this.experimentProfile.params;
202

203
        // Set up multiphase config
204
        if (expParams.multiPhase && this.trainingService.isMultiPhaseJobSupported) {
205
206
207
            this.trainingService.setClusterMetadata('multiPhase', expParams.multiPhase.toString());
        }

208
209
        // Set up versionCheck config
        if (expParams.versionCheck !== undefined) {
SparkSnail's avatar
SparkSnail committed
210
            this.trainingService.setClusterMetadata('version_check', expParams.versionCheck.toString());
211
212
        }

chicm-ms's avatar
chicm-ms committed
213
        const dispatcherCommand: string = getMsgDispatcherCommand(expParams);
214
        this.log.debug(`dispatcher command: ${dispatcherCommand}`);
QuanluZhang's avatar
QuanluZhang committed
215
        const checkpointDir: string = await this.createCheckpointDir();
Deshui Yu's avatar
Deshui Yu committed
216
        this.setupTuner(
217
218
            dispatcherCommand,
            undefined,
Deshui Yu's avatar
Deshui Yu committed
219
            'resume',
QuanluZhang's avatar
QuanluZhang committed
220
            checkpointDir);
Deshui Yu's avatar
Deshui Yu committed
221
222
223
224
225
226
227
228
229
230
231

        const allTrialJobs: TrialJobInfo[] = await this.dataStore.listTrialJobs();

        // Resume currSubmittedTrialNum
        this.currSubmittedTrialNum = allTrialJobs.length;

        // Check the final status for WAITING and RUNNING jobs
        await Promise.all(allTrialJobs
            .filter((job: TrialJobInfo) => job.status === 'WAITING' || job.status === 'RUNNING')
            .map((job: TrialJobInfo) => this.dataStore.storeTrialJobEvent('FAILED', job.id)));

232
233
234
        // Collect generated trials and imported trials
        const finishedTrialData: string = await this.exportData();
        const importedData: string[] = await this.dataStore.getImportedData();
chicm-ms's avatar
chicm-ms committed
235
        let trialData: Record<string, any>[] = JSON.parse(finishedTrialData);
236
237
        for (const oneImportedData of importedData) {
            // do not deduplicate
chicm-ms's avatar
chicm-ms committed
238
            trialData = trialData.concat(<Record<string, any>[]>JSON.parse(oneImportedData));
239
240
241
        }
        this.trialDataForTuner = JSON.stringify(trialData);

chicm-ms's avatar
chicm-ms committed
242
243
244
245
246
        if (this.experimentProfile.execDuration < this.experimentProfile.params.maxExecDuration &&
            this.currSubmittedTrialNum < this.experimentProfile.params.maxTrialNum &&
            this.experimentProfile.endTime) {
            delete this.experimentProfile.endTime;
        }
chicm-ms's avatar
chicm-ms committed
247
        this.setStatus('RUNNING');
248

Deshui Yu's avatar
Deshui Yu committed
249
        // TO DO: update database record for resume event
250
251
252
        this.run().catch((err: Error) => {
            this.criticalError(err);
        });
Deshui Yu's avatar
Deshui Yu committed
253
254
    }

255
256
    public getTrialJob(trialJobId: string): Promise<TrialJobInfo> {
        return this.dataStore.getTrialJob(trialJobId);
Deshui Yu's avatar
Deshui Yu committed
257
258
259
    }

    public async setClusterMetadata(key: string, value: string): Promise<void> {
SparkSnail's avatar
SparkSnail committed
260
261
262
        if (this.readonly) {
            return Promise.reject(new Error('Error: can not set cluster metadata in readonly mode!'));
        }
chicm-ms's avatar
chicm-ms committed
263
        this.log.info(`NNIManager setClusterMetadata, key: ${key}, value: ${value}`);
Deshui Yu's avatar
Deshui Yu committed
264
265
266
267
        let timeoutId: NodeJS.Timer;
        // TO DO: move timeout value to constants file
        const delay1: Promise<{}> = new Promise((resolve: Function, reject: Function): void => {
            timeoutId = setTimeout(
268
                () => { reject(new Error('TrainingService setClusterMetadata timeout. Please check your config file.')); },
269
                30000);
Deshui Yu's avatar
Deshui Yu committed
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
        });
        await Promise.race([delay1, this.trainingService.setClusterMetadata(key, value)]).finally(() => {
            clearTimeout(timeoutId);
        });
    }

    public getClusterMetadata(key: string): Promise<string> {
        return Promise.resolve(
            this.trainingService.getClusterMetadata(key)
        );
    }

    public async getTrialJobStatistics(): Promise<TrialJobStatistics[]> {
        return this.dataStore.getTrialJobStatistics();
    }

286
    public async stopExperiment(): Promise<void> {
chicm-ms's avatar
chicm-ms committed
287
288
        this.setStatus('STOPPING');
        this.log.info('Stopping experiment, cleaning up ...');
289
        await this.experimentDoneCleanUp();
chicm-ms's avatar
chicm-ms committed
290
        this.log.info('Experiment stopped.');
Deshui Yu's avatar
Deshui Yu committed
291
292
    }

293
    public async getMetricData(trialJobId?: string, metricType?: MetricType): Promise<MetricDataRecord[]> {
Deshui Yu's avatar
Deshui Yu committed
294
295
296
        return this.dataStore.getMetricData(trialJobId, metricType);
    }

297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
    public async getMetricDataByRange(minSeqId: number, maxSeqId: number): Promise<MetricDataRecord[]> {
        const trialJobs = await this.dataStore.listTrialJobs();
        const targetTrials = trialJobs.filter(trial => (
            // FIXME: can this be undefined?
            trial.sequenceId !== undefined && minSeqId <= trial.sequenceId && trial.sequenceId <= maxSeqId
        ));
        const targetTrialIds = new Set(targetTrials.map(trial => trial.id));

        const allMetrics = await this.dataStore.getMetricData();
        return allMetrics.filter(metric => targetTrialIds.has(metric.trialJobId));
    }

    public async getLatestMetricData(): Promise<MetricDataRecord[]> {
        // FIXME: this can take a long time
        const allMetrics: MetricDataRecord[] = await this.dataStore.getMetricData();
        const finals: MetricDataRecord[] = [];
        const latestIntermediates: Map<string, MetricDataRecord> = new Map<string, MetricDataRecord>();
        for (const metric of allMetrics) {
            if (metric.type !== 'PERIODICAL') {
                finals.push(metric);
            } else {
                const old: MetricDataRecord | undefined = latestIntermediates.get(metric.trialJobId);
                if (old === undefined || old.sequence <= metric.sequence) {
                    latestIntermediates.set(metric.trialJobId, metric);
                }
            }
        }
        return finals.concat(Array.from(latestIntermediates.values()));
        // FIXME: unit test
    }

Deshui Yu's avatar
Deshui Yu committed
328
329
330
331
332
333
334
335
    public getExperimentProfile(): Promise<ExperimentProfile> {
        // TO DO: using Promise.resolve()
        const deferred: Deferred<ExperimentProfile> = new Deferred<ExperimentProfile>();
        deferred.resolve(this.experimentProfile);

        return deferred.promise;
    }

336
337
338
339
    public getStatus(): NNIManagerStatus {
        return this.status;
    }

Deshui Yu's avatar
Deshui Yu committed
340
341
342
343
    public async listTrialJobs(status?: TrialJobStatus): Promise<TrialJobInfo[]> {
        return this.dataStore.listTrialJobs(status);
    }

344
345
    private setupTuner(command: string, cwd: string | undefined, mode: 'start' | 'resume', dataDirectory: string): void {
        if (this.dispatcher !== undefined) {
Deshui Yu's avatar
Deshui Yu committed
346
347
            return;
        }
goooxu's avatar
goooxu committed
348
        const stdio: StdioOptions = ['ignore', process.stdout, process.stderr, 'pipe', 'pipe'];
Deshui Yu's avatar
Deshui Yu committed
349
350
351
352
353
354
355
        let newCwd: string;
        if (cwd === undefined || cwd === '') {
            newCwd = getLogDir();
        } else {
            newCwd = cwd;
        }
        // TO DO: add CUDA_VISIBLE_DEVICES
356
357
358
359
360
        let includeIntermediateResultsEnv: boolean | undefined = false;
        if (this.experimentProfile.params.tuner !== undefined) {
            includeIntermediateResultsEnv = this.experimentProfile.params.tuner.includeIntermediateResults;
        }

chicm-ms's avatar
chicm-ms committed
361
        const nniEnv = {
chicm-ms's avatar
chicm-ms committed
362
            SDK_PROCESS: 'dispatcher',
Zejun Lin's avatar
Zejun Lin committed
363
364
            NNI_MODE: mode,
            NNI_CHECKPOINT_DIRECTORY: dataDirectory,
365
            NNI_LOG_DIRECTORY: getLogDir(),
366
            NNI_LOG_LEVEL: getLogLevel(),
367
368
            NNI_INCLUDE_INTERMEDIATE_RESULTS: includeIntermediateResultsEnv,
            CUDA_VISIBLE_DEVICES: this.getGpuEnvvarValue()
Zejun Lin's avatar
Zejun Lin committed
369
        };
chicm-ms's avatar
chicm-ms committed
370
        const newEnv = Object.assign({}, process.env, nniEnv);
371
        const tunerProc: ChildProcess = getTunerProc(command, stdio, newCwd, newEnv);
372
373
        this.dispatcherPid = tunerProc.pid;
        this.dispatcher = createDispatcherInterface(tunerProc);
Deshui Yu's avatar
Deshui Yu committed
374
375
376
377

        return;
    }

378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
    private getGpuEnvvarValue(): string {
        let cudaDevices: string | undefined;

        if (this.experimentProfile.params.advisor !== undefined) {
            cudaDevices = this.experimentProfile.params.advisor.gpuIndices;
        } else if (this.experimentProfile.params.tuner !== undefined) {
            cudaDevices = this.experimentProfile.params.tuner.gpuIndices;
        }

        if (cudaDevices === undefined) {
            return '';
        } else {
            return cudaDevices;
        }
    }

Deshui Yu's avatar
Deshui Yu committed
394
    private updateTrialConcurrency(trialConcurrency: number): void {
QuanluZhang's avatar
QuanluZhang committed
395
396
        // we assume trialConcurrency >= 0, which is checked by restserver
        this.trialConcurrencyChange += (trialConcurrency - this.experimentProfile.params.trialConcurrency);
Deshui Yu's avatar
Deshui Yu committed
397
398
399
400
401
402
403
404
405
406
407
408
        this.experimentProfile.params.trialConcurrency = trialConcurrency;

        return;
    }

    private updateMaxExecDuration(duration: number): void {
        this.experimentProfile.params.maxExecDuration = duration;

        return;
    }

    private updateSearchSpace(searchSpace: string): void {
409
        if (this.dispatcher === undefined) {
Deshui Yu's avatar
Deshui Yu committed
410
411
            throw new Error('Error: tuner has not been setup');
        }
412
        this.dispatcher.sendCommand(UPDATE_SEARCH_SPACE, searchSpace);
Deshui Yu's avatar
Deshui Yu committed
413
414
415
416
417
        this.experimentProfile.params.searchSpace = searchSpace;

        return;
    }

QuanluZhang's avatar
QuanluZhang committed
418
419
420
421
422
423
    private updateMaxTrialNum(maxTrialNum: number): void {
        this.experimentProfile.params.maxTrialNum = maxTrialNum;

        return;
    }

Deshui Yu's avatar
Deshui Yu committed
424
    private async experimentDoneCleanUp(): Promise<void> {
425
        if (this.dispatcher === undefined) {
Deshui Yu's avatar
Deshui Yu committed
426
427
            throw new Error('Error: tuner has not been setup');
        }
428
        this.trainingService.removeTrialJobMetricListener(this.trialJobMetricListener);
429
        this.dispatcher.sendCommand(TERMINATE);
Deshui Yu's avatar
Deshui Yu committed
430
431
432
        let tunerAlive: boolean = true;
        // gracefully terminate tuner and assessor here, wait at most 30 seconds.
        for (let i: number = 0; i < 30; i++) {
433
            if (!tunerAlive) { break; }
434
            tunerAlive = await isAlive(this.dispatcherPid);
Deshui Yu's avatar
Deshui Yu committed
435
436
            await delay(1000);
        }
437
        await killPid(this.dispatcherPid);
Deshui Yu's avatar
Deshui Yu committed
438
        const trialJobList: TrialJobDetail[] = await this.trainingService.listTrialJobs();
439
440
441

        // DON'T try to make it in parallel, the training service may not handle it well.
        // If there is performance concern, consider to support batch cancellation on training service.
Deshui Yu's avatar
Deshui Yu committed
442
443
444
445
        for (const trialJob of trialJobList) {
            if (trialJob.status === 'RUNNING' ||
                trialJob.status === 'WAITING') {
                try {
chicm-ms's avatar
chicm-ms committed
446
                    this.log.info(`cancelTrialJob: ${trialJob.id}`);
Deshui Yu's avatar
Deshui Yu committed
447
448
                    await this.trainingService.cancelTrialJob(trialJob.id);
                } catch (error) {
449
                    this.log.debug(`ignorable error on canceling trial ${trialJob.id}. ${error}`);
Deshui Yu's avatar
Deshui Yu committed
450
451
452
453
                }
            }
        }
        await this.trainingService.cleanUp();
454
        this.experimentProfile.endTime = Date.now();
Deshui Yu's avatar
Deshui Yu committed
455
        await this.storeExperimentProfile();
chicm-ms's avatar
chicm-ms committed
456
        this.setStatus('STOPPED');
Deshui Yu's avatar
Deshui Yu committed
457
458
459
    }

    private async periodicallyUpdateExecDuration(): Promise<void> {
460
        let count: number = 1;
461
        while (!['ERROR', 'STOPPING', 'STOPPED'].includes(this.status.status)) {
462
            await delay(1000 * 1); // 1 seconds
463
            if (this.status.status === 'RUNNING') {
464
465
466
467
468
469
                this.experimentProfile.execDuration += 1;
                if (count % 10 === 0) {
                    await this.storeExperimentProfile();
                }
            }
            count += 1;
Deshui Yu's avatar
Deshui Yu committed
470
471
472
        }
    }

chicm-ms's avatar
chicm-ms committed
473
474
475
476
477
478
    private async pingDispatcher(): Promise<void> {
        if (this.dispatcher === undefined) {
            throw new Error('Error: tuner has not been setup');
        }
        while (!['ERROR', 'STOPPING', 'STOPPED'].includes(this.status.status)) {
            this.dispatcher.sendCommand(PING);
chicm-ms's avatar
chicm-ms committed
479
            await delay(1000 * 5);
chicm-ms's avatar
chicm-ms committed
480
481
482
        }
    }

QuanluZhang's avatar
QuanluZhang committed
483
484
    private async requestTrialJobsStatus(): Promise<number> {
        let finishedTrialJobNum: number = 0;
QuanluZhang's avatar
QuanluZhang committed
485
486
487
        if (this.dispatcher === undefined) {
            throw new Error('Error: tuner has not been setup');
        }
QuanluZhang's avatar
QuanluZhang committed
488
489
490
491
        for (const trialJobId of Array.from(this.trialJobs.keys())) {
            const trialJobDetail: TrialJobDetail = await this.trainingService.getTrialJob(trialJobId);
            const oldTrialJobDetail: TrialJobDetail | undefined = this.trialJobs.get(trialJobId);
            if (oldTrialJobDetail !== undefined && oldTrialJobDetail.status !== trialJobDetail.status) {
chicm-ms's avatar
chicm-ms committed
492
                this.log.info(`Trial job ${trialJobDetail.id} status changed from ${oldTrialJobDetail.status} to ${trialJobDetail.status}`);
QuanluZhang's avatar
QuanluZhang committed
493
                this.trialJobs.set(trialJobId, Object.assign({}, trialJobDetail));
494
                await this.dataStore.storeTrialJobEvent(trialJobDetail.status, trialJobDetail.id, undefined, trialJobDetail);
QuanluZhang's avatar
QuanluZhang committed
495
            }
QuanluZhang's avatar
QuanluZhang committed
496
            let hyperParams: string | undefined = undefined;
QuanluZhang's avatar
QuanluZhang committed
497
498
499
            switch (trialJobDetail.status) {
                case 'SUCCEEDED':
                case 'USER_CANCELED':
QuanluZhang's avatar
QuanluZhang committed
500
                case 'EARLY_STOPPED':
QuanluZhang's avatar
QuanluZhang committed
501
502
                    this.trialJobs.delete(trialJobId);
                    finishedTrialJobNum++;
503
                    hyperParams = trialJobDetail.form.hyperParameters.value;
QuanluZhang's avatar
QuanluZhang committed
504
                    this.dispatcher.sendCommand(TRIAL_END, JSON.stringify({
chicm-ms's avatar
chicm-ms committed
505
                        trial_job_id: trialJobDetail.id, // eslint-disable-line @typescript-eslint/camelcase
QuanluZhang's avatar
QuanluZhang committed
506
                        event: trialJobDetail.status,
chicm-ms's avatar
chicm-ms committed
507
                        hyper_params: hyperParams // eslint-disable-line @typescript-eslint/camelcase
goooxu's avatar
goooxu committed
508
                    }));
QuanluZhang's avatar
QuanluZhang committed
509
510
511
512
513
514
515
                    break;
                case 'FAILED':
                case 'SYS_CANCELED':
                    // In the current version, we do not retry
                    // TO DO: push this job to queue for retry
                    this.trialJobs.delete(trialJobId);
                    finishedTrialJobNum++;
516
                    hyperParams = trialJobDetail.form.hyperParameters.value;
QuanluZhang's avatar
QuanluZhang committed
517
                    this.dispatcher.sendCommand(TRIAL_END, JSON.stringify({
chicm-ms's avatar
chicm-ms committed
518
                        trial_job_id: trialJobDetail.id, // eslint-disable-line @typescript-eslint/camelcase
QuanluZhang's avatar
QuanluZhang committed
519
                        event: trialJobDetail.status,
chicm-ms's avatar
chicm-ms committed
520
                        hyper_params: hyperParams // eslint-disable-line @typescript-eslint/camelcase
goooxu's avatar
goooxu committed
521
                    }));
QuanluZhang's avatar
QuanluZhang committed
522
523
524
525
526
527
528
529
530
531
                    break;
                case 'WAITING':
                case 'RUNNING':
                case 'UNKNOWN':
                    // Do nothing
                    break;
                default:
                // TO DO: add warning in log
            }
        }
goooxu's avatar
goooxu committed
532

Gems Guo's avatar
Gems Guo committed
533
        return finishedTrialJobNum;
QuanluZhang's avatar
QuanluZhang committed
534
535
536
537
538
539
    }

    private async manageTrials(): Promise<void> {
        if (this.dispatcher === undefined) {
            throw new Error('Error: tuner has not been setup');
        }
QuanluZhang's avatar
QuanluZhang committed
540
        let allFinishedTrialJobNum: number = this.currSubmittedTrialNum;
QuanluZhang's avatar
QuanluZhang committed
541
        let waitSubmittedToFinish: number;
542
        while (!['ERROR', 'STOPPING', 'STOPPED'].includes(this.status.status)) {
QuanluZhang's avatar
QuanluZhang committed
543
544
545
546
547
548
549
            const finishedTrialJobNum: number = await this.requestTrialJobsStatus();
            allFinishedTrialJobNum += finishedTrialJobNum;

            // requestTrialNum is the number of trials that will be requested from tuner.
            // If trialConcurrency does not change, requestTrialNum equals finishedTrialJobNum.
            // If trialConcurrency changes, for example, trialConcurrency increases by 2 (trialConcurrencyChange=2), then
            // requestTrialNum equals 2 + finishedTrialJobNum and trialConcurrencyChange becomes 0.
550
            // If trialConcurrency changes, for example, trialConcurrency decreases by 4 (trialConcurrencyChange=-4) and
QuanluZhang's avatar
QuanluZhang committed
551
552
553
554
555
556
557
558
            // finishedTrialJobNum is 2, then requestTrialNum becomes -2. No trial will be requested from tuner,
            // and trialConcurrencyChange becomes -2.
            const requestTrialNum: number = this.trialConcurrencyChange + finishedTrialJobNum;
            if (requestTrialNum >= 0) {
                this.trialConcurrencyChange = 0;
            } else {
                this.trialConcurrencyChange = requestTrialNum;
            }
chicm-ms's avatar
chicm-ms committed
559

560
            this.requestTrialJobs(requestTrialNum);
chicm-ms's avatar
chicm-ms committed
561

QuanluZhang's avatar
QuanluZhang committed
562
            // check maxtrialnum and maxduration here
563
            // NO_MORE_TRIAL is more like a subset of RUNNING, because during RUNNING tuner
564
            // might tell nnimanager that this is no more trials. In NO_MORE_TRIAL state, the experiment is viewed
565
566
            // as still running. DONE could be transfered from RUNNING or NO_MORE_TRIAL.
            assert(this.status.status === 'RUNNING' ||
567
                this.status.status === 'DONE' ||
QuanluZhang's avatar
QuanluZhang committed
568
569
                this.status.status === 'NO_MORE_TRIAL' ||
                this.status.status === 'TUNER_NO_MORE_TRIAL');
570
            if (this.experimentProfile.execDuration > this.experimentProfile.params.maxExecDuration ||
QuanluZhang's avatar
QuanluZhang committed
571
                this.currSubmittedTrialNum >= this.experimentProfile.params.maxTrialNum) {
QuanluZhang's avatar
QuanluZhang committed
572
                if (this.status.status !== 'DONE') {
chicm-ms's avatar
chicm-ms committed
573
                    this.setStatus('NO_MORE_TRIAL');
QuanluZhang's avatar
QuanluZhang committed
574
575
576
577
                    waitSubmittedToFinish = this.currSubmittedTrialNum;

                    assert(allFinishedTrialJobNum <= waitSubmittedToFinish);
                    if (allFinishedTrialJobNum >= waitSubmittedToFinish) {
chicm-ms's avatar
chicm-ms committed
578
                        this.setStatus('DONE');
QuanluZhang's avatar
QuanluZhang committed
579
580
581
582
583
                        this.experimentProfile.endTime = Date.now();
                        await this.storeExperimentProfile();
                        // write this log for travis CI
                        this.log.info('Experiment done.');
                    }
QuanluZhang's avatar
QuanluZhang committed
584
585
586
                }
            } else {
                if (this.status.status === 'DONE') {
587
588
                    delete this.experimentProfile.endTime;
                    await this.storeExperimentProfile();
QuanluZhang's avatar
QuanluZhang committed
589
                }
QuanluZhang's avatar
QuanluZhang committed
590
                if (this.status.status !== 'TUNER_NO_MORE_TRIAL') {
chicm-ms's avatar
chicm-ms committed
591
                    this.setStatus('RUNNING');
592
                }
QuanluZhang's avatar
QuanluZhang committed
593
594
595
596
597
                for (let i: number = this.trialJobs.size; i < this.experimentProfile.params.trialConcurrency; i++) {
                    if (this.waitingTrials.length === 0 ||
                        this.currSubmittedTrialNum >= this.experimentProfile.params.maxTrialNum) {
                        break;
                    }
598
                    const form = this.waitingTrials.shift() as TrialJobApplicationForm;
QuanluZhang's avatar
QuanluZhang committed
599
                    this.currSubmittedTrialNum++;
600
601
                    this.log.info(`submitTrialJob: form: ${JSON.stringify(form)}`);
                    const trialJobDetail: TrialJobDetail = await this.trainingService.submitTrialJob(form);
602
                    await this.storeExperimentProfile();
QuanluZhang's avatar
QuanluZhang committed
603
604
605
606
                    this.trialJobs.set(trialJobDetail.id, Object.assign({}, trialJobDetail));
                    const trialJobDetailSnapshot: TrialJobDetail | undefined = this.trialJobs.get(trialJobDetail.id);
                    if (trialJobDetailSnapshot != undefined) {
                        await this.dataStore.storeTrialJobEvent(
607
                            trialJobDetailSnapshot.status, trialJobDetailSnapshot.id, form.hyperParameters.value, trialJobDetailSnapshot);
QuanluZhang's avatar
QuanluZhang committed
608
609
610
611
612
613
614
615
616
                    } else {
                        assert(false, `undefined trialJobDetail in trialJobs: ${trialJobDetail.id}`);
                    }
                }
            }
            await delay(1000 * 5); // 5 seconds
        }
    }

Deshui Yu's avatar
Deshui Yu committed
617
618
619
620
621
622
    private storeExperimentProfile(): Promise<void> {
        this.experimentProfile.revision += 1;

        return this.dataStore.storeExperimentProfile(this.experimentProfile);
    }

623
    private async run(): Promise<void> {
QuanluZhang's avatar
QuanluZhang committed
624
        assert(this.dispatcher !== undefined);
625
626
627
628
629
630
631

        this.addEventListeners();

        this.sendInitTunerCommands();

        await Promise.all([
            this.periodicallyUpdateExecDuration(),
chicm-ms's avatar
chicm-ms committed
632
            this.pingDispatcher().catch((err: Error) => {
chicm-ms's avatar
chicm-ms committed
633
                throw NNIError.FromError(err, 'Dispatcher error: ');
chicm-ms's avatar
chicm-ms committed
634
            }),
635
            this.trainingService.run().catch((err: Error) => {
chicm-ms's avatar
chicm-ms committed
636
                throw NNIError.FromError(err, 'Training service error: ');
637
            }),
QuanluZhang's avatar
QuanluZhang committed
638
            this.manageTrials().catch((err: Error) => {
chicm-ms's avatar
chicm-ms committed
639
                throw NNIError.FromError(err, 'Job management error: ');
640
            })]);
641
642
    }

QuanluZhang's avatar
QuanluZhang committed
643
    private addEventListeners(): void {
chicm-ms's avatar
chicm-ms committed
644
        this.log.info('Add event listeners');
645
        // TO DO: cannot run this method more than once in one NNIManager instance
QuanluZhang's avatar
QuanluZhang committed
646
        if (this.dispatcher === undefined) {
647
648
            throw new Error('Error: tuner or job maintainer have not been setup');
        }
649
        this.trainingService.addTrialJobMetricListener(this.trialJobMetricListener);
650
651
652

        this.dispatcher.onCommand((commandType: string, content: string) => {
            this.onTunerCommand(commandType, content).catch((err: Error) => {
chicm-ms's avatar
chicm-ms committed
653
                this.criticalError(NNIError.FromError(err, 'Tuner command event error: '));
654
655
            });
        });
656
657
658
659
        this.dispatcher.onError((error: Error) => {
            this.log.error(`Dispatcher error: ${error.message}`);
            this.criticalError(new Error('Dispatcher stream error, tuner may have crashed.'));
        });
660
661
662
663
    }

    private sendInitTunerCommands(): void {
        if (this.dispatcher === undefined) {
664
            throw new Error('Dispatcher error: tuner has not been setup');
665
        }
chicm-ms's avatar
chicm-ms committed
666
667
668
        this.log.debug(`Send tuner command: INITIALIZE: ${this.experimentProfile.params.searchSpace}`);
        // Tuner need to be initialized with search space before generating any hyper parameters
        this.dispatcher.sendCommand(INITIALIZE, this.experimentProfile.params.searchSpace);
669
670
671
    }

    private async onTrialJobMetrics(metric: TrialJobMetric): Promise<void> {
chicm-ms's avatar
chicm-ms committed
672
        this.log.debug(`NNIManager received trial job metrics: ${metric}`);
673
674
675
676
677
678
679
        await this.dataStore.storeMetricData(metric.id, metric.data);
        if (this.dispatcher === undefined) {
            throw new Error('Error: tuner has not been setup');
        }
        this.dispatcher.sendCommand(REPORT_METRIC_DATA, metric.data);
    }

chicm-ms's avatar
chicm-ms committed
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
    private requestTrialJobs(jobNum: number): void {
        if (jobNum < 1) {
            return;
        }
        if (this.dispatcher === undefined) {
            throw new Error('Dispatcher error: tuner has not been setup');
        }
        if (this.experimentProfile.params.multiThread) {
            // Send multiple requests to ensure multiple hyper parameters are generated in non-blocking way.
            // For a single REQUEST_TRIAL_JOBS request, hyper parameters are generated one by one
            // sequentially.
            for (let i: number = 0; i < jobNum; i++) {
                this.dispatcher.sendCommand(REQUEST_TRIAL_JOBS, '1');
            }
        } else {
            this.dispatcher.sendCommand(REQUEST_TRIAL_JOBS, String(jobNum));
        }
    }

699
    private async onTunerCommand(commandType: string, content: string): Promise<void> {
horizon365's avatar
horizon365 committed
700
        this.log.info(`NNIManager received command from dispatcher: ${commandType}, ${content}`);
701
        switch (commandType) {
chicm-ms's avatar
chicm-ms committed
702
            case INITIALIZED: {
chicm-ms's avatar
chicm-ms committed
703
                // Tuner is intialized, search space is set, request tuner to generate hyper parameters
704
705
706
707
708
709
                if (this.trialDataForTuner.length > 0) {
                    if (this.dispatcher === undefined) {
                        throw new Error('Dispatcher error: tuner has not been setup');
                    }
                    this.dispatcher.sendCommand(IMPORT_DATA, this.trialDataForTuner);
                }
chicm-ms's avatar
chicm-ms committed
710
711
                this.requestTrialJobs(this.experimentProfile.params.trialConcurrency);
                break;
chicm-ms's avatar
chicm-ms committed
712
713
            }
            case NEW_TRIAL_JOB: {
QuanluZhang's avatar
QuanluZhang committed
714
                if (this.status.status === 'TUNER_NO_MORE_TRIAL') {
715
                    this.log.warning('It is not supposed to receive more trials after NO_MORE_TRIAL is set');
chicm-ms's avatar
chicm-ms committed
716
                    this.setStatus('RUNNING');
717
                }
718
719
720
721
722
723
724
725
                const form: TrialJobApplicationForm = {
                    sequenceId: this.experimentProfile.nextSequenceId++,
                    hyperParameters: {
                        value: content,
                        index: 0
                    }
                };
                this.waitingTrials.push(form);
726
                break;
chicm-ms's avatar
chicm-ms committed
727
728
            }
            case SEND_TRIAL_JOB_PARAMETER: {
chicm-ms's avatar
chicm-ms committed
729
730
731
732
733
                const tunerCommand: any = JSON.parse(content);
                assert(tunerCommand.parameter_index >= 0);
                assert(tunerCommand.trial_job_id !== undefined);

                const trialJobForm: TrialJobApplicationForm = {
734
                    sequenceId: -1,  // FIXME: multi-phase tuner should use sequence ID instead of trial job ID
chicm-ms's avatar
chicm-ms committed
735
736
737
738
739
                    hyperParameters: {
                        value: content,
                        index: tunerCommand.parameter_index
                    }
                };
chicm-ms's avatar
chicm-ms committed
740
                this.log.info(`updateTrialJob: job id: ${tunerCommand.trial_job_id}, form: ${JSON.stringify(trialJobForm)}`);
chicm-ms's avatar
chicm-ms committed
741
                await this.trainingService.updateTrialJob(tunerCommand.trial_job_id, trialJobForm);
742
743
744
745
746
                if (tunerCommand['parameters'] !== null) {
                    // parameters field is set as empty string if no more hyper parameter can be generated by tuner.
                    await this.dataStore.storeTrialJobEvent(
                        'ADD_HYPERPARAMETER', tunerCommand.trial_job_id, content, undefined);
                }
chicm-ms's avatar
chicm-ms committed
747
                break;
chicm-ms's avatar
chicm-ms committed
748
749
            }
            case NO_MORE_TRIAL_JOBS: {
750
751
752
                if (!['ERROR', 'STOPPING', 'STOPPED'].includes(this.status.status)) {
                    this.setStatus('TUNER_NO_MORE_TRIAL');
                }
753
                break;
chicm-ms's avatar
chicm-ms committed
754
755
            }
            case KILL_TRIAL_JOB: {
chicm-ms's avatar
chicm-ms committed
756
                this.log.info(`cancelTrialJob: ${JSON.parse(content)}`);
QuanluZhang's avatar
QuanluZhang committed
757
                await this.trainingService.cancelTrialJob(JSON.parse(content), true);
758
                break;
chicm-ms's avatar
chicm-ms committed
759
            }
760
761
762
            default:
                throw new Error('Error: unsupported command type from tuner');
        }
Deshui Yu's avatar
Deshui Yu committed
763
764
    }

765
766
767
768
769
770
771
772
773
774
    private criticalError(err: Error): void {
        this.logError(err);
        console.error(err);
    }

    private logError(err: Error): void {
        if (err.stack !== undefined) {
            this.log.error(err.stack);
        }
        this.status.errors.push(err.message);
chicm-ms's avatar
chicm-ms committed
775
776
777
778
779
780
781
782
        this.setStatus('ERROR');
    }

    private setStatus(status: ExperimentStatus): void {
        if (status !== this.status.status) {
            this.log.info(`Change NNIManager status from: ${this.status.status} to: ${status}`);
            this.status.status = status;
        }
783
784
785
786
787
788
789
    }

    private createEmptyExperimentProfile(): ExperimentProfile {
        return {
            id: getExperimentId(),
            revision: 0,
            execDuration: 0,
790
            logDir: getExperimentRootDir(),
791
            nextSequenceId: 0,
792
793
794
795
796
797
            params: {
                authorName: '',
                experimentName: '',
                trialConcurrency: 0,
                maxExecDuration: 0, // unit: second
                maxTrialNum: 0, // maxTrialNum includes all the submitted trial jobs
798
                trainingServicePlatform: '',
QuanluZhang's avatar
QuanluZhang committed
799
                searchSpace: ''
800
801
            }
        };
Deshui Yu's avatar
Deshui Yu committed
802
    }
803

QuanluZhang's avatar
QuanluZhang committed
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
    private async createCheckpointDir(): Promise<string> {
        // TODO: test
        const chkpDir: string = getCheckpointDir();
        // create checkpoint directory
        await mkDirP(chkpDir);
        // assign this directory to exp profile's checkpointDir
        if (this.experimentProfile.params.advisor) {
            this.experimentProfile.params.advisor.checkpointDir = chkpDir;
        }
        if (this.experimentProfile.params.tuner) {
            this.experimentProfile.params.tuner.checkpointDir = chkpDir;
        }
        if (this.experimentProfile.params.assessor) {
            this.experimentProfile.params.assessor.checkpointDir = chkpDir;
        }

        return Promise.resolve(chkpDir);
    }
Deshui Yu's avatar
Deshui Yu committed
822
823
824
}

export { NNIManager };