nnimanager.ts 35.4 KB
Newer Older
liuzhe-lz's avatar
liuzhe-lz committed
1
2
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
Deshui Yu's avatar
Deshui Yu committed
3
4
5
6

'use strict';

import * as assert from 'assert';
chicm-ms's avatar
chicm-ms committed
7
import { ChildProcess, StdioOptions } from 'child_process';
Deshui Yu's avatar
Deshui Yu committed
8
9
10
import { Deferred } from 'ts-deferred';
import * as component from '../common/component';
import { DataStore, MetricDataRecord, MetricType, TrialJobInfo } from '../common/datastore';
11
import { NNIError } from '../common/errors';
12
import { getExperimentId } from '../common/experimentStartupInfo';
Deshui Yu's avatar
Deshui Yu committed
13
14
import { getLogger, Logger } from '../common/log';
import {
chicm-ms's avatar
chicm-ms committed
15
    ExperimentParams, ExperimentProfile, Manager, ExperimentStatus,
16
    NNIManagerStatus, ProfileUpdateType, TrialJobStatistics
Deshui Yu's avatar
Deshui Yu committed
17
18
} from '../common/manager';
import {
19
    TrainingService, TrialJobApplicationForm, TrialJobDetail, TrialJobMetric, TrialJobStatus, LogType
Deshui Yu's avatar
Deshui Yu committed
20
} from '../common/trainingService';
21
import { delay, getCheckpointDir, getExperimentRootDir, getLogDir, getMsgDispatcherCommand, mkDirP, getTunerProc, getLogLevel, isAlive, killPid } from '../common/utils';
Deshui Yu's avatar
Deshui Yu committed
22
import {
chicm-ms's avatar
chicm-ms committed
23
    INITIALIZE, INITIALIZED, KILL_TRIAL_JOB, NEW_TRIAL_JOB, NO_MORE_TRIAL_JOBS, PING,
24
    REPORT_METRIC_DATA, REQUEST_TRIAL_JOBS, SEND_TRIAL_JOB_PARAMETER, TERMINATE, TRIAL_END, UPDATE_SEARCH_SPACE, IMPORT_DATA
Deshui Yu's avatar
Deshui Yu committed
25
} from './commands';
26
import { createDispatcherInterface, IpcInterface } from './ipcInterface';
Deshui Yu's avatar
Deshui Yu committed
27
28

/**
chicm-ms's avatar
chicm-ms committed
29
 * NNIManager which implements Manager interface
Deshui Yu's avatar
Deshui Yu committed
30
31
32
 */
class NNIManager implements Manager {
    private trainingService: TrainingService;
33
    private dispatcher: IpcInterface | undefined;
34
    private currSubmittedTrialNum: number;  // need to be recovered
QuanluZhang's avatar
QuanluZhang committed
35
    private trialConcurrencyChange: number; // >0: increase, <0: decrease
Deshui Yu's avatar
Deshui Yu committed
36
37
38
    private log: Logger;
    private dataStore: DataStore;
    private experimentProfile: ExperimentProfile;
39
    private dispatcherPid: number;
40
    private status: NNIManagerStatus;
41
    private waitingTrials: TrialJobApplicationForm[];
QuanluZhang's avatar
QuanluZhang committed
42
    private trialJobs: Map<string, TrialJobDetail>;
43
    private trialDataForTuner: string;
SparkSnail's avatar
SparkSnail committed
44
    private readonly: boolean;
45

46
    private trialJobMetricListener: (metric: TrialJobMetric) => void;
47

Deshui Yu's avatar
Deshui Yu committed
48
49
    constructor() {
        this.currSubmittedTrialNum = 0;
QuanluZhang's avatar
QuanluZhang committed
50
        this.trialConcurrencyChange = 0;
Deshui Yu's avatar
Deshui Yu committed
51
52
        this.trainingService = component.get(TrainingService);
        assert(this.trainingService);
53
        this.dispatcherPid = 0;
QuanluZhang's avatar
QuanluZhang committed
54
55
        this.waitingTrials = [];
        this.trialJobs = new Map<string, TrialJobDetail>();
56
        this.trialDataForTuner = '';
SparkSnail's avatar
SparkSnail committed
57
        this.readonly = false;
Deshui Yu's avatar
Deshui Yu committed
58
59
60

        this.log = getLogger();
        this.dataStore = component.get(DataStore);
61
62
63
64
        this.experimentProfile = this.createEmptyExperimentProfile();
        this.status = {
            status: 'INITIALIZED',
            errors: []
Deshui Yu's avatar
Deshui Yu committed
65
        };
chicm-ms's avatar
chicm-ms committed
66
        this.trialJobMetricListener = (metric: TrialJobMetric): void => {
67
68
69
70
            this.onTrialJobMetrics(metric).catch((err: Error) => {
                this.criticalError(NNIError.FromError(err, 'Job metrics error: '));
            });
        };
Deshui Yu's avatar
Deshui Yu committed
71
72
73
    }

    public updateExperimentProfile(experimentProfile: ExperimentProfile, updateType: ProfileUpdateType): Promise<void> {
SparkSnail's avatar
SparkSnail committed
74
75
76
        if (this.readonly) {
            return Promise.reject(new Error('Error: can not update experiment profile in readonly mode!'));
        }
Deshui Yu's avatar
Deshui Yu committed
77
78
79
80
81
82
83
84
85
86
        switch (updateType) {
            case 'TRIAL_CONCURRENCY':
                this.updateTrialConcurrency(experimentProfile.params.trialConcurrency);
                break;
            case 'MAX_EXEC_DURATION':
                this.updateMaxExecDuration(experimentProfile.params.maxExecDuration);
                break;
            case 'SEARCH_SPACE':
                this.updateSearchSpace(experimentProfile.params.searchSpace);
                break;
QuanluZhang's avatar
QuanluZhang committed
87
88
89
            case 'MAX_TRIAL_NUM':
                this.updateMaxTrialNum(experimentProfile.params.maxTrialNum);
                break;
Deshui Yu's avatar
Deshui Yu committed
90
91
92
93
94
95
96
            default:
                throw new Error('Error: unrecognized updateType');
        }

        return this.storeExperimentProfile();
    }

97
    public importData(data: string): Promise<void> {
SparkSnail's avatar
SparkSnail committed
98
99
100
        if (this.readonly) {
            return Promise.reject(new Error('Error: can not import data in readonly mode!'));
        }
101
102
103
104
105
106
107
108
109
110
        if (this.dispatcher === undefined) {
            return Promise.reject(
                new Error('tuner has not been setup')
            );
        }
        this.dispatcher.sendCommand(IMPORT_DATA, data);

        return this.dataStore.storeTrialJobEvent('IMPORT_DATA', '', data);
    }

111
112
113
114
    public async exportData(): Promise<string> {
        return this.dataStore.exportTrialHpConfigs();
    }

115
    public addCustomizedTrialJob(hyperParams: string): Promise<number> {
SparkSnail's avatar
SparkSnail committed
116
117
118
        if (this.readonly) {
            return Promise.reject(new Error('Error: can not add customized trial job in readonly mode!'));
        }
Deshui Yu's avatar
Deshui Yu committed
119
        if (this.currSubmittedTrialNum >= this.experimentProfile.params.maxTrialNum) {
120
            return Promise.reject(new Error('reach maxTrialNum'));
Deshui Yu's avatar
Deshui Yu committed
121
        }
122
123
124

        // TODO: NNI manager should not peek tuner's internal protocol, let's refactor this later
        const packedParameter = {
chicm-ms's avatar
chicm-ms committed
125
126
            parameter_id: null, // eslint-disable-line @typescript-eslint/camelcase
            parameter_source: 'customized', // eslint-disable-line @typescript-eslint/camelcase
127
128
129
130
131
132
133
134
135
136
137
            parameters: JSON.parse(hyperParams)
        }

        const form: TrialJobApplicationForm = {
            sequenceId: this.experimentProfile.nextSequenceId++,
            hyperParameters: {
                value: JSON.stringify(packedParameter),
                index: 0
            }
        };
        this.waitingTrials.push(form);
Deshui Yu's avatar
Deshui Yu committed
138
139

        // trial id has not been generated yet, thus use '' instead
140
141
142
        this.dataStore.storeTrialJobEvent('ADD_CUSTOMIZED', '', hyperParams);

        return Promise.resolve(form.sequenceId);
Deshui Yu's avatar
Deshui Yu committed
143
144
145
    }

    public async cancelTrialJobByUser(trialJobId: string): Promise<void> {
SparkSnail's avatar
SparkSnail committed
146
147
148
        if (this.readonly) {
            return Promise.reject(new Error('Error: can not cancel trial job in readonly mode!'));
        }
chicm-ms's avatar
chicm-ms committed
149
        this.log.info(`User cancelTrialJob: ${trialJobId}`);
Deshui Yu's avatar
Deshui Yu committed
150
151
152
153
154
        await this.trainingService.cancelTrialJob(trialJobId);
        await this.dataStore.storeTrialJobEvent('USER_TO_CANCEL', trialJobId, '');
    }

    public async startExperiment(expParams: ExperimentParams): Promise<string> {
chicm-ms's avatar
chicm-ms committed
155
        this.log.info(`Starting experiment: ${this.experimentProfile.id}`);
Deshui Yu's avatar
Deshui Yu committed
156
157
158
        this.experimentProfile.params = expParams;
        await this.storeExperimentProfile();
        this.log.debug('Setup tuner...');
159

160
        // Set up multiphase config
161
        if (expParams.multiPhase && this.trainingService.isMultiPhaseJobSupported) {
162
163
            this.trainingService.setClusterMetadata('multiPhase', expParams.multiPhase.toString());
        }
164
165
166
167
        // Set up versionCheck config
        if (expParams.versionCheck !== undefined) {
            this.trainingService.setClusterMetadata('version_check', expParams.versionCheck.toString());
        }
SparkSnail's avatar
SparkSnail committed
168
169
170
171
        // Set up logCollection config
        if (expParams.logCollection !== undefined) {
            this.trainingService.setClusterMetadata('log_collection', expParams.logCollection.toString());
        }
172

chicm-ms's avatar
chicm-ms committed
173
        const dispatcherCommand: string = getMsgDispatcherCommand(expParams);
174
        this.log.debug(`dispatcher command: ${dispatcherCommand}`);
QuanluZhang's avatar
QuanluZhang committed
175
        const checkpointDir: string = await this.createCheckpointDir();
Deshui Yu's avatar
Deshui Yu committed
176
        this.setupTuner(
177
178
            dispatcherCommand,
            undefined,
Deshui Yu's avatar
Deshui Yu committed
179
            'start',
QuanluZhang's avatar
QuanluZhang committed
180
            checkpointDir);
Deshui Yu's avatar
Deshui Yu committed
181

182
        this.experimentProfile.startTime = Date.now();
chicm-ms's avatar
chicm-ms committed
183
        this.setStatus('RUNNING');
Deshui Yu's avatar
Deshui Yu committed
184
        await this.storeExperimentProfile();
185
186
        this.run().catch((err: Error) => {
            this.criticalError(err);
Deshui Yu's avatar
Deshui Yu committed
187
        });
188

Deshui Yu's avatar
Deshui Yu committed
189
190
191
        return this.experimentProfile.id;
    }

SparkSnail's avatar
SparkSnail committed
192
    public async resumeExperiment(readonly: boolean): Promise<void> {
chicm-ms's avatar
chicm-ms committed
193
        this.log.info(`Resuming experiment: ${this.experimentProfile.id}`);
Deshui Yu's avatar
Deshui Yu committed
194
195
196
        //Fetch back the experiment profile
        const experimentId: string = getExperimentId();
        this.experimentProfile = await this.dataStore.getExperimentProfile(experimentId);
SparkSnail's avatar
SparkSnail committed
197
198
199
200
        this.readonly = readonly;
        if (readonly) {
            return Promise.resolve();
        }
Deshui Yu's avatar
Deshui Yu committed
201
        const expParams: ExperimentParams = this.experimentProfile.params;
202

203
        // Set up multiphase config
204
        if (expParams.multiPhase && this.trainingService.isMultiPhaseJobSupported) {
205
206
207
            this.trainingService.setClusterMetadata('multiPhase', expParams.multiPhase.toString());
        }

208
209
        // Set up versionCheck config
        if (expParams.versionCheck !== undefined) {
SparkSnail's avatar
SparkSnail committed
210
            this.trainingService.setClusterMetadata('version_check', expParams.versionCheck.toString());
211
212
        }

chicm-ms's avatar
chicm-ms committed
213
        const dispatcherCommand: string = getMsgDispatcherCommand(expParams);
214
        this.log.debug(`dispatcher command: ${dispatcherCommand}`);
QuanluZhang's avatar
QuanluZhang committed
215
        const checkpointDir: string = await this.createCheckpointDir();
Deshui Yu's avatar
Deshui Yu committed
216
        this.setupTuner(
217
218
            dispatcherCommand,
            undefined,
Deshui Yu's avatar
Deshui Yu committed
219
            'resume',
QuanluZhang's avatar
QuanluZhang committed
220
            checkpointDir);
Deshui Yu's avatar
Deshui Yu committed
221
222
223
224
225
226
227
228
229
230
231

        const allTrialJobs: TrialJobInfo[] = await this.dataStore.listTrialJobs();

        // Resume currSubmittedTrialNum
        this.currSubmittedTrialNum = allTrialJobs.length;

        // Check the final status for WAITING and RUNNING jobs
        await Promise.all(allTrialJobs
            .filter((job: TrialJobInfo) => job.status === 'WAITING' || job.status === 'RUNNING')
            .map((job: TrialJobInfo) => this.dataStore.storeTrialJobEvent('FAILED', job.id)));

232
233
234
        // Collect generated trials and imported trials
        const finishedTrialData: string = await this.exportData();
        const importedData: string[] = await this.dataStore.getImportedData();
chicm-ms's avatar
chicm-ms committed
235
        let trialData: Record<string, any>[] = JSON.parse(finishedTrialData);
236
237
        for (const oneImportedData of importedData) {
            // do not deduplicate
chicm-ms's avatar
chicm-ms committed
238
            trialData = trialData.concat(<Record<string, any>[]>JSON.parse(oneImportedData));
239
240
241
        }
        this.trialDataForTuner = JSON.stringify(trialData);

chicm-ms's avatar
chicm-ms committed
242
243
244
245
246
        if (this.experimentProfile.execDuration < this.experimentProfile.params.maxExecDuration &&
            this.currSubmittedTrialNum < this.experimentProfile.params.maxTrialNum &&
            this.experimentProfile.endTime) {
            delete this.experimentProfile.endTime;
        }
chicm-ms's avatar
chicm-ms committed
247
        this.setStatus('RUNNING');
248

Deshui Yu's avatar
Deshui Yu committed
249
        // TO DO: update database record for resume event
250
251
252
        this.run().catch((err: Error) => {
            this.criticalError(err);
        });
Deshui Yu's avatar
Deshui Yu committed
253
254
    }

255
256
    public getTrialJob(trialJobId: string): Promise<TrialJobInfo> {
        return this.dataStore.getTrialJob(trialJobId);
Deshui Yu's avatar
Deshui Yu committed
257
258
259
    }

    public async setClusterMetadata(key: string, value: string): Promise<void> {
SparkSnail's avatar
SparkSnail committed
260
261
262
        if (this.readonly) {
            return Promise.reject(new Error('Error: can not set cluster metadata in readonly mode!'));
        }
chicm-ms's avatar
chicm-ms committed
263
        this.log.info(`NNIManager setClusterMetadata, key: ${key}, value: ${value}`);
Deshui Yu's avatar
Deshui Yu committed
264
265
266
267
        let timeoutId: NodeJS.Timer;
        // TO DO: move timeout value to constants file
        const delay1: Promise<{}> = new Promise((resolve: Function, reject: Function): void => {
            timeoutId = setTimeout(
268
                () => { reject(new Error('TrainingService setClusterMetadata timeout. Please check your config file.')); },
269
                30000);
Deshui Yu's avatar
Deshui Yu committed
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
        });
        await Promise.race([delay1, this.trainingService.setClusterMetadata(key, value)]).finally(() => {
            clearTimeout(timeoutId);
        });
    }

    public getClusterMetadata(key: string): Promise<string> {
        return Promise.resolve(
            this.trainingService.getClusterMetadata(key)
        );
    }

    public async getTrialJobStatistics(): Promise<TrialJobStatistics[]> {
        return this.dataStore.getTrialJobStatistics();
    }

286
    public async stopExperiment(): Promise<void> {
chicm-ms's avatar
chicm-ms committed
287
288
        this.setStatus('STOPPING');
        this.log.info('Stopping experiment, cleaning up ...');
289
        await this.experimentDoneCleanUp();
chicm-ms's avatar
chicm-ms committed
290
        this.log.info('Experiment stopped.');
Deshui Yu's avatar
Deshui Yu committed
291
292
    }

293
    public async getMetricData(trialJobId?: string, metricType?: MetricType): Promise<MetricDataRecord[]> {
Deshui Yu's avatar
Deshui Yu committed
294
295
296
        return this.dataStore.getMetricData(trialJobId, metricType);
    }

297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
    public async getMetricDataByRange(minSeqId: number, maxSeqId: number): Promise<MetricDataRecord[]> {
        const trialJobs = await this.dataStore.listTrialJobs();
        const targetTrials = trialJobs.filter(trial => (
            // FIXME: can this be undefined?
            trial.sequenceId !== undefined && minSeqId <= trial.sequenceId && trial.sequenceId <= maxSeqId
        ));
        const targetTrialIds = new Set(targetTrials.map(trial => trial.id));

        const allMetrics = await this.dataStore.getMetricData();
        return allMetrics.filter(metric => targetTrialIds.has(metric.trialJobId));
    }

    public async getLatestMetricData(): Promise<MetricDataRecord[]> {
        // FIXME: this can take a long time
        const allMetrics: MetricDataRecord[] = await this.dataStore.getMetricData();
        const finals: MetricDataRecord[] = [];
        const latestIntermediates: Map<string, MetricDataRecord> = new Map<string, MetricDataRecord>();
        for (const metric of allMetrics) {
            if (metric.type !== 'PERIODICAL') {
                finals.push(metric);
            } else {
                const old: MetricDataRecord | undefined = latestIntermediates.get(metric.trialJobId);
                if (old === undefined || old.sequence <= metric.sequence) {
                    latestIntermediates.set(metric.trialJobId, metric);
                }
            }
        }
        return finals.concat(Array.from(latestIntermediates.values()));
        // FIXME: unit test
    }

328
329
330
331
    public async getTrialLog(trialJobId: string, logType: LogType): Promise<string> {
        return this.trainingService.getTrialLog(trialJobId, logType);
    }

Deshui Yu's avatar
Deshui Yu committed
332
333
334
335
336
337
338
339
    public getExperimentProfile(): Promise<ExperimentProfile> {
        // TO DO: using Promise.resolve()
        const deferred: Deferred<ExperimentProfile> = new Deferred<ExperimentProfile>();
        deferred.resolve(this.experimentProfile);

        return deferred.promise;
    }

340
341
342
343
    public getStatus(): NNIManagerStatus {
        return this.status;
    }

Deshui Yu's avatar
Deshui Yu committed
344
345
346
347
    public async listTrialJobs(status?: TrialJobStatus): Promise<TrialJobInfo[]> {
        return this.dataStore.listTrialJobs(status);
    }

348
349
    private setupTuner(command: string, cwd: string | undefined, mode: 'start' | 'resume', dataDirectory: string): void {
        if (this.dispatcher !== undefined) {
Deshui Yu's avatar
Deshui Yu committed
350
351
            return;
        }
goooxu's avatar
goooxu committed
352
        const stdio: StdioOptions = ['ignore', process.stdout, process.stderr, 'pipe', 'pipe'];
Deshui Yu's avatar
Deshui Yu committed
353
354
355
356
357
358
359
        let newCwd: string;
        if (cwd === undefined || cwd === '') {
            newCwd = getLogDir();
        } else {
            newCwd = cwd;
        }
        // TO DO: add CUDA_VISIBLE_DEVICES
360
361
362
363
364
        let includeIntermediateResultsEnv: boolean | undefined = false;
        if (this.experimentProfile.params.tuner !== undefined) {
            includeIntermediateResultsEnv = this.experimentProfile.params.tuner.includeIntermediateResults;
        }

chicm-ms's avatar
chicm-ms committed
365
        const nniEnv = {
chicm-ms's avatar
chicm-ms committed
366
            SDK_PROCESS: 'dispatcher',
Zejun Lin's avatar
Zejun Lin committed
367
368
            NNI_MODE: mode,
            NNI_CHECKPOINT_DIRECTORY: dataDirectory,
369
            NNI_LOG_DIRECTORY: getLogDir(),
370
            NNI_LOG_LEVEL: getLogLevel(),
371
372
            NNI_INCLUDE_INTERMEDIATE_RESULTS: includeIntermediateResultsEnv,
            CUDA_VISIBLE_DEVICES: this.getGpuEnvvarValue()
Zejun Lin's avatar
Zejun Lin committed
373
        };
chicm-ms's avatar
chicm-ms committed
374
        const newEnv = Object.assign({}, process.env, nniEnv);
375
        const tunerProc: ChildProcess = getTunerProc(command, stdio, newCwd, newEnv);
376
377
        this.dispatcherPid = tunerProc.pid;
        this.dispatcher = createDispatcherInterface(tunerProc);
Deshui Yu's avatar
Deshui Yu committed
378
379
380
381

        return;
    }

382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
    private getGpuEnvvarValue(): string {
        let cudaDevices: string | undefined;

        if (this.experimentProfile.params.advisor !== undefined) {
            cudaDevices = this.experimentProfile.params.advisor.gpuIndices;
        } else if (this.experimentProfile.params.tuner !== undefined) {
            cudaDevices = this.experimentProfile.params.tuner.gpuIndices;
        }

        if (cudaDevices === undefined) {
            return '';
        } else {
            return cudaDevices;
        }
    }

Deshui Yu's avatar
Deshui Yu committed
398
    private updateTrialConcurrency(trialConcurrency: number): void {
QuanluZhang's avatar
QuanluZhang committed
399
400
        // we assume trialConcurrency >= 0, which is checked by restserver
        this.trialConcurrencyChange += (trialConcurrency - this.experimentProfile.params.trialConcurrency);
Deshui Yu's avatar
Deshui Yu committed
401
402
403
404
405
406
407
408
409
410
411
412
        this.experimentProfile.params.trialConcurrency = trialConcurrency;

        return;
    }

    private updateMaxExecDuration(duration: number): void {
        this.experimentProfile.params.maxExecDuration = duration;

        return;
    }

    private updateSearchSpace(searchSpace: string): void {
413
        if (this.dispatcher === undefined) {
Deshui Yu's avatar
Deshui Yu committed
414
415
            throw new Error('Error: tuner has not been setup');
        }
416
        this.dispatcher.sendCommand(UPDATE_SEARCH_SPACE, searchSpace);
Deshui Yu's avatar
Deshui Yu committed
417
418
419
420
421
        this.experimentProfile.params.searchSpace = searchSpace;

        return;
    }

QuanluZhang's avatar
QuanluZhang committed
422
423
424
425
426
427
    private updateMaxTrialNum(maxTrialNum: number): void {
        this.experimentProfile.params.maxTrialNum = maxTrialNum;

        return;
    }

Deshui Yu's avatar
Deshui Yu committed
428
    private async experimentDoneCleanUp(): Promise<void> {
429
        if (this.dispatcher === undefined) {
Deshui Yu's avatar
Deshui Yu committed
430
431
            throw new Error('Error: tuner has not been setup');
        }
432
        this.trainingService.removeTrialJobMetricListener(this.trialJobMetricListener);
433
        this.dispatcher.sendCommand(TERMINATE);
Deshui Yu's avatar
Deshui Yu committed
434
435
436
        let tunerAlive: boolean = true;
        // gracefully terminate tuner and assessor here, wait at most 30 seconds.
        for (let i: number = 0; i < 30; i++) {
437
            if (!tunerAlive) { break; }
438
            tunerAlive = await isAlive(this.dispatcherPid);
Deshui Yu's avatar
Deshui Yu committed
439
440
            await delay(1000);
        }
441
        await killPid(this.dispatcherPid);
Deshui Yu's avatar
Deshui Yu committed
442
        const trialJobList: TrialJobDetail[] = await this.trainingService.listTrialJobs();
443
444
445

        // DON'T try to make it in parallel, the training service may not handle it well.
        // If there is performance concern, consider to support batch cancellation on training service.
Deshui Yu's avatar
Deshui Yu committed
446
447
448
449
        for (const trialJob of trialJobList) {
            if (trialJob.status === 'RUNNING' ||
                trialJob.status === 'WAITING') {
                try {
chicm-ms's avatar
chicm-ms committed
450
                    this.log.info(`cancelTrialJob: ${trialJob.id}`);
Deshui Yu's avatar
Deshui Yu committed
451
452
                    await this.trainingService.cancelTrialJob(trialJob.id);
                } catch (error) {
453
                    this.log.debug(`ignorable error on canceling trial ${trialJob.id}. ${error}`);
Deshui Yu's avatar
Deshui Yu committed
454
455
456
457
                }
            }
        }
        await this.trainingService.cleanUp();
458
        this.experimentProfile.endTime = Date.now();
Deshui Yu's avatar
Deshui Yu committed
459
        await this.storeExperimentProfile();
chicm-ms's avatar
chicm-ms committed
460
        this.setStatus('STOPPED');
Deshui Yu's avatar
Deshui Yu committed
461
462
463
    }

    private async periodicallyUpdateExecDuration(): Promise<void> {
464
        let count: number = 1;
465
        while (!['ERROR', 'STOPPING', 'STOPPED'].includes(this.status.status)) {
466
            await delay(1000 * 1); // 1 seconds
467
            if (this.status.status === 'RUNNING') {
468
469
470
471
472
473
                this.experimentProfile.execDuration += 1;
                if (count % 10 === 0) {
                    await this.storeExperimentProfile();
                }
            }
            count += 1;
Deshui Yu's avatar
Deshui Yu committed
474
475
476
        }
    }

chicm-ms's avatar
chicm-ms committed
477
478
479
480
481
482
    private async pingDispatcher(): Promise<void> {
        if (this.dispatcher === undefined) {
            throw new Error('Error: tuner has not been setup');
        }
        while (!['ERROR', 'STOPPING', 'STOPPED'].includes(this.status.status)) {
            this.dispatcher.sendCommand(PING);
chicm-ms's avatar
chicm-ms committed
483
            await delay(1000 * 5);
chicm-ms's avatar
chicm-ms committed
484
485
486
        }
    }

QuanluZhang's avatar
QuanluZhang committed
487
488
    private async requestTrialJobsStatus(): Promise<number> {
        let finishedTrialJobNum: number = 0;
QuanluZhang's avatar
QuanluZhang committed
489
490
491
        if (this.dispatcher === undefined) {
            throw new Error('Error: tuner has not been setup');
        }
QuanluZhang's avatar
QuanluZhang committed
492
493
494
495
        for (const trialJobId of Array.from(this.trialJobs.keys())) {
            const trialJobDetail: TrialJobDetail = await this.trainingService.getTrialJob(trialJobId);
            const oldTrialJobDetail: TrialJobDetail | undefined = this.trialJobs.get(trialJobId);
            if (oldTrialJobDetail !== undefined && oldTrialJobDetail.status !== trialJobDetail.status) {
chicm-ms's avatar
chicm-ms committed
496
                this.log.info(`Trial job ${trialJobDetail.id} status changed from ${oldTrialJobDetail.status} to ${trialJobDetail.status}`);
QuanluZhang's avatar
QuanluZhang committed
497
                this.trialJobs.set(trialJobId, Object.assign({}, trialJobDetail));
498
                await this.dataStore.storeTrialJobEvent(trialJobDetail.status, trialJobDetail.id, undefined, trialJobDetail);
QuanluZhang's avatar
QuanluZhang committed
499
            }
QuanluZhang's avatar
QuanluZhang committed
500
            let hyperParams: string | undefined = undefined;
QuanluZhang's avatar
QuanluZhang committed
501
502
503
            switch (trialJobDetail.status) {
                case 'SUCCEEDED':
                case 'USER_CANCELED':
QuanluZhang's avatar
QuanluZhang committed
504
                case 'EARLY_STOPPED':
QuanluZhang's avatar
QuanluZhang committed
505
506
                    this.trialJobs.delete(trialJobId);
                    finishedTrialJobNum++;
507
                    hyperParams = trialJobDetail.form.hyperParameters.value;
QuanluZhang's avatar
QuanluZhang committed
508
                    this.dispatcher.sendCommand(TRIAL_END, JSON.stringify({
chicm-ms's avatar
chicm-ms committed
509
                        trial_job_id: trialJobDetail.id, // eslint-disable-line @typescript-eslint/camelcase
QuanluZhang's avatar
QuanluZhang committed
510
                        event: trialJobDetail.status,
chicm-ms's avatar
chicm-ms committed
511
                        hyper_params: hyperParams // eslint-disable-line @typescript-eslint/camelcase
goooxu's avatar
goooxu committed
512
                    }));
QuanluZhang's avatar
QuanluZhang committed
513
514
515
516
517
518
519
                    break;
                case 'FAILED':
                case 'SYS_CANCELED':
                    // In the current version, we do not retry
                    // TO DO: push this job to queue for retry
                    this.trialJobs.delete(trialJobId);
                    finishedTrialJobNum++;
520
                    hyperParams = trialJobDetail.form.hyperParameters.value;
QuanluZhang's avatar
QuanluZhang committed
521
                    this.dispatcher.sendCommand(TRIAL_END, JSON.stringify({
chicm-ms's avatar
chicm-ms committed
522
                        trial_job_id: trialJobDetail.id, // eslint-disable-line @typescript-eslint/camelcase
QuanluZhang's avatar
QuanluZhang committed
523
                        event: trialJobDetail.status,
chicm-ms's avatar
chicm-ms committed
524
                        hyper_params: hyperParams // eslint-disable-line @typescript-eslint/camelcase
goooxu's avatar
goooxu committed
525
                    }));
QuanluZhang's avatar
QuanluZhang committed
526
527
528
529
530
531
532
533
534
535
                    break;
                case 'WAITING':
                case 'RUNNING':
                case 'UNKNOWN':
                    // Do nothing
                    break;
                default:
                // TO DO: add warning in log
            }
        }
goooxu's avatar
goooxu committed
536

Gems Guo's avatar
Gems Guo committed
537
        return finishedTrialJobNum;
QuanluZhang's avatar
QuanluZhang committed
538
539
540
541
542
543
    }

    private async manageTrials(): Promise<void> {
        if (this.dispatcher === undefined) {
            throw new Error('Error: tuner has not been setup');
        }
QuanluZhang's avatar
QuanluZhang committed
544
        let allFinishedTrialJobNum: number = this.currSubmittedTrialNum;
QuanluZhang's avatar
QuanluZhang committed
545
        let waitSubmittedToFinish: number;
546
        while (!['ERROR', 'STOPPING', 'STOPPED'].includes(this.status.status)) {
QuanluZhang's avatar
QuanluZhang committed
547
548
549
550
551
552
553
            const finishedTrialJobNum: number = await this.requestTrialJobsStatus();
            allFinishedTrialJobNum += finishedTrialJobNum;

            // requestTrialNum is the number of trials that will be requested from tuner.
            // If trialConcurrency does not change, requestTrialNum equals finishedTrialJobNum.
            // If trialConcurrency changes, for example, trialConcurrency increases by 2 (trialConcurrencyChange=2), then
            // requestTrialNum equals 2 + finishedTrialJobNum and trialConcurrencyChange becomes 0.
554
            // If trialConcurrency changes, for example, trialConcurrency decreases by 4 (trialConcurrencyChange=-4) and
QuanluZhang's avatar
QuanluZhang committed
555
556
557
558
559
560
561
562
            // finishedTrialJobNum is 2, then requestTrialNum becomes -2. No trial will be requested from tuner,
            // and trialConcurrencyChange becomes -2.
            const requestTrialNum: number = this.trialConcurrencyChange + finishedTrialJobNum;
            if (requestTrialNum >= 0) {
                this.trialConcurrencyChange = 0;
            } else {
                this.trialConcurrencyChange = requestTrialNum;
            }
chicm-ms's avatar
chicm-ms committed
563

564
            this.requestTrialJobs(requestTrialNum);
chicm-ms's avatar
chicm-ms committed
565

QuanluZhang's avatar
QuanluZhang committed
566
            // check maxtrialnum and maxduration here
567
            // NO_MORE_TRIAL is more like a subset of RUNNING, because during RUNNING tuner
568
            // might tell nnimanager that this is no more trials. In NO_MORE_TRIAL state, the experiment is viewed
569
570
            // as still running. DONE could be transfered from RUNNING or NO_MORE_TRIAL.
            assert(this.status.status === 'RUNNING' ||
571
                this.status.status === 'DONE' ||
QuanluZhang's avatar
QuanluZhang committed
572
                this.status.status === 'NO_MORE_TRIAL' ||
573
                this.status.status === 'TUNER_NO_MORE_TRIAL', `Actual status: ${this.status.status}`);
574
            if (this.experimentProfile.execDuration > this.experimentProfile.params.maxExecDuration ||
QuanluZhang's avatar
QuanluZhang committed
575
                this.currSubmittedTrialNum >= this.experimentProfile.params.maxTrialNum) {
QuanluZhang's avatar
QuanluZhang committed
576
                if (this.status.status !== 'DONE') {
chicm-ms's avatar
chicm-ms committed
577
                    this.setStatus('NO_MORE_TRIAL');
QuanluZhang's avatar
QuanluZhang committed
578
579
580
581
                    waitSubmittedToFinish = this.currSubmittedTrialNum;

                    assert(allFinishedTrialJobNum <= waitSubmittedToFinish);
                    if (allFinishedTrialJobNum >= waitSubmittedToFinish) {
chicm-ms's avatar
chicm-ms committed
582
                        this.setStatus('DONE');
QuanluZhang's avatar
QuanluZhang committed
583
584
585
586
587
                        this.experimentProfile.endTime = Date.now();
                        await this.storeExperimentProfile();
                        // write this log for travis CI
                        this.log.info('Experiment done.');
                    }
QuanluZhang's avatar
QuanluZhang committed
588
589
590
                }
            } else {
                if (this.status.status === 'DONE') {
591
592
                    delete this.experimentProfile.endTime;
                    await this.storeExperimentProfile();
QuanluZhang's avatar
QuanluZhang committed
593
                }
QuanluZhang's avatar
QuanluZhang committed
594
                if (this.status.status !== 'TUNER_NO_MORE_TRIAL') {
chicm-ms's avatar
chicm-ms committed
595
                    this.setStatus('RUNNING');
596
                }
QuanluZhang's avatar
QuanluZhang committed
597
598
599
600
601
                for (let i: number = this.trialJobs.size; i < this.experimentProfile.params.trialConcurrency; i++) {
                    if (this.waitingTrials.length === 0 ||
                        this.currSubmittedTrialNum >= this.experimentProfile.params.maxTrialNum) {
                        break;
                    }
602
                    const form = this.waitingTrials.shift() as TrialJobApplicationForm;
QuanluZhang's avatar
QuanluZhang committed
603
                    this.currSubmittedTrialNum++;
604
605
                    this.log.info(`submitTrialJob: form: ${JSON.stringify(form)}`);
                    const trialJobDetail: TrialJobDetail = await this.trainingService.submitTrialJob(form);
606
                    await this.storeExperimentProfile();
QuanluZhang's avatar
QuanluZhang committed
607
608
609
610
                    this.trialJobs.set(trialJobDetail.id, Object.assign({}, trialJobDetail));
                    const trialJobDetailSnapshot: TrialJobDetail | undefined = this.trialJobs.get(trialJobDetail.id);
                    if (trialJobDetailSnapshot != undefined) {
                        await this.dataStore.storeTrialJobEvent(
611
                            trialJobDetailSnapshot.status, trialJobDetailSnapshot.id, form.hyperParameters.value, trialJobDetailSnapshot);
QuanluZhang's avatar
QuanluZhang committed
612
613
614
615
616
617
618
619
620
                    } else {
                        assert(false, `undefined trialJobDetail in trialJobs: ${trialJobDetail.id}`);
                    }
                }
            }
            await delay(1000 * 5); // 5 seconds
        }
    }

Deshui Yu's avatar
Deshui Yu committed
621
622
623
624
625
626
    private storeExperimentProfile(): Promise<void> {
        this.experimentProfile.revision += 1;

        return this.dataStore.storeExperimentProfile(this.experimentProfile);
    }

627
    private async run(): Promise<void> {
QuanluZhang's avatar
QuanluZhang committed
628
        assert(this.dispatcher !== undefined);
629
630
631
632
633
634
635

        this.addEventListeners();

        this.sendInitTunerCommands();

        await Promise.all([
            this.periodicallyUpdateExecDuration(),
chicm-ms's avatar
chicm-ms committed
636
            this.pingDispatcher().catch((err: Error) => {
chicm-ms's avatar
chicm-ms committed
637
                throw NNIError.FromError(err, 'Dispatcher error: ');
chicm-ms's avatar
chicm-ms committed
638
            }),
639
            this.trainingService.run().catch((err: Error) => {
chicm-ms's avatar
chicm-ms committed
640
                throw NNIError.FromError(err, 'Training service error: ');
641
            }),
QuanluZhang's avatar
QuanluZhang committed
642
            this.manageTrials().catch((err: Error) => {
chicm-ms's avatar
chicm-ms committed
643
                throw NNIError.FromError(err, 'Job management error: ');
644
            })]);
645
646
    }

QuanluZhang's avatar
QuanluZhang committed
647
    private addEventListeners(): void {
chicm-ms's avatar
chicm-ms committed
648
        this.log.info('Add event listeners');
649
        // TO DO: cannot run this method more than once in one NNIManager instance
QuanluZhang's avatar
QuanluZhang committed
650
        if (this.dispatcher === undefined) {
651
652
            throw new Error('Error: tuner or job maintainer have not been setup');
        }
653
        this.trainingService.addTrialJobMetricListener(this.trialJobMetricListener);
654
655
656

        this.dispatcher.onCommand((commandType: string, content: string) => {
            this.onTunerCommand(commandType, content).catch((err: Error) => {
chicm-ms's avatar
chicm-ms committed
657
                this.criticalError(NNIError.FromError(err, 'Tuner command event error: '));
658
659
            });
        });
660
661
662
663
        this.dispatcher.onError((error: Error) => {
            this.log.error(`Dispatcher error: ${error.message}`);
            this.criticalError(new Error('Dispatcher stream error, tuner may have crashed.'));
        });
664
665
666
667
    }

    private sendInitTunerCommands(): void {
        if (this.dispatcher === undefined) {
668
            throw new Error('Dispatcher error: tuner has not been setup');
669
        }
chicm-ms's avatar
chicm-ms committed
670
671
672
        this.log.debug(`Send tuner command: INITIALIZE: ${this.experimentProfile.params.searchSpace}`);
        // Tuner need to be initialized with search space before generating any hyper parameters
        this.dispatcher.sendCommand(INITIALIZE, this.experimentProfile.params.searchSpace);
673
674
675
    }

    private async onTrialJobMetrics(metric: TrialJobMetric): Promise<void> {
chicm-ms's avatar
chicm-ms committed
676
        this.log.debug(`NNIManager received trial job metrics: ${metric}`);
677
678
679
680
681
682
683
        await this.dataStore.storeMetricData(metric.id, metric.data);
        if (this.dispatcher === undefined) {
            throw new Error('Error: tuner has not been setup');
        }
        this.dispatcher.sendCommand(REPORT_METRIC_DATA, metric.data);
    }

chicm-ms's avatar
chicm-ms committed
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
    private requestTrialJobs(jobNum: number): void {
        if (jobNum < 1) {
            return;
        }
        if (this.dispatcher === undefined) {
            throw new Error('Dispatcher error: tuner has not been setup');
        }
        if (this.experimentProfile.params.multiThread) {
            // Send multiple requests to ensure multiple hyper parameters are generated in non-blocking way.
            // For a single REQUEST_TRIAL_JOBS request, hyper parameters are generated one by one
            // sequentially.
            for (let i: number = 0; i < jobNum; i++) {
                this.dispatcher.sendCommand(REQUEST_TRIAL_JOBS, '1');
            }
        } else {
            this.dispatcher.sendCommand(REQUEST_TRIAL_JOBS, String(jobNum));
        }
    }

703
    private async onTunerCommand(commandType: string, content: string): Promise<void> {
horizon365's avatar
horizon365 committed
704
        this.log.info(`NNIManager received command from dispatcher: ${commandType}, ${content}`);
705
        switch (commandType) {
chicm-ms's avatar
chicm-ms committed
706
            case INITIALIZED: {
chicm-ms's avatar
chicm-ms committed
707
                // Tuner is intialized, search space is set, request tuner to generate hyper parameters
708
709
710
711
712
713
                if (this.trialDataForTuner.length > 0) {
                    if (this.dispatcher === undefined) {
                        throw new Error('Dispatcher error: tuner has not been setup');
                    }
                    this.dispatcher.sendCommand(IMPORT_DATA, this.trialDataForTuner);
                }
chicm-ms's avatar
chicm-ms committed
714
715
                this.requestTrialJobs(this.experimentProfile.params.trialConcurrency);
                break;
chicm-ms's avatar
chicm-ms committed
716
717
            }
            case NEW_TRIAL_JOB: {
QuanluZhang's avatar
QuanluZhang committed
718
                if (this.status.status === 'TUNER_NO_MORE_TRIAL') {
719
                    this.log.warning('It is not supposed to receive more trials after NO_MORE_TRIAL is set');
chicm-ms's avatar
chicm-ms committed
720
                    this.setStatus('RUNNING');
721
                }
722
723
724
725
726
727
728
729
                const form: TrialJobApplicationForm = {
                    sequenceId: this.experimentProfile.nextSequenceId++,
                    hyperParameters: {
                        value: content,
                        index: 0
                    }
                };
                this.waitingTrials.push(form);
730
                break;
chicm-ms's avatar
chicm-ms committed
731
732
            }
            case SEND_TRIAL_JOB_PARAMETER: {
chicm-ms's avatar
chicm-ms committed
733
734
735
736
737
                const tunerCommand: any = JSON.parse(content);
                assert(tunerCommand.parameter_index >= 0);
                assert(tunerCommand.trial_job_id !== undefined);

                const trialJobForm: TrialJobApplicationForm = {
738
                    sequenceId: -1,  // FIXME: multi-phase tuner should use sequence ID instead of trial job ID
chicm-ms's avatar
chicm-ms committed
739
740
741
742
743
                    hyperParameters: {
                        value: content,
                        index: tunerCommand.parameter_index
                    }
                };
chicm-ms's avatar
chicm-ms committed
744
                this.log.info(`updateTrialJob: job id: ${tunerCommand.trial_job_id}, form: ${JSON.stringify(trialJobForm)}`);
chicm-ms's avatar
chicm-ms committed
745
                await this.trainingService.updateTrialJob(tunerCommand.trial_job_id, trialJobForm);
746
747
748
749
750
                if (tunerCommand['parameters'] !== null) {
                    // parameters field is set as empty string if no more hyper parameter can be generated by tuner.
                    await this.dataStore.storeTrialJobEvent(
                        'ADD_HYPERPARAMETER', tunerCommand.trial_job_id, content, undefined);
                }
chicm-ms's avatar
chicm-ms committed
751
                break;
chicm-ms's avatar
chicm-ms committed
752
753
            }
            case NO_MORE_TRIAL_JOBS: {
754
755
756
                if (!['ERROR', 'STOPPING', 'STOPPED'].includes(this.status.status)) {
                    this.setStatus('TUNER_NO_MORE_TRIAL');
                }
757
                break;
chicm-ms's avatar
chicm-ms committed
758
759
            }
            case KILL_TRIAL_JOB: {
chicm-ms's avatar
chicm-ms committed
760
                this.log.info(`cancelTrialJob: ${JSON.parse(content)}`);
QuanluZhang's avatar
QuanluZhang committed
761
                await this.trainingService.cancelTrialJob(JSON.parse(content), true);
762
                break;
chicm-ms's avatar
chicm-ms committed
763
            }
764
765
766
            default:
                throw new Error('Error: unsupported command type from tuner');
        }
Deshui Yu's avatar
Deshui Yu committed
767
768
    }

769
770
771
772
773
774
775
776
777
778
    private criticalError(err: Error): void {
        this.logError(err);
        console.error(err);
    }

    private logError(err: Error): void {
        if (err.stack !== undefined) {
            this.log.error(err.stack);
        }
        this.status.errors.push(err.message);
chicm-ms's avatar
chicm-ms committed
779
780
781
782
783
784
785
786
        this.setStatus('ERROR');
    }

    private setStatus(status: ExperimentStatus): void {
        if (status !== this.status.status) {
            this.log.info(`Change NNIManager status from: ${this.status.status} to: ${status}`);
            this.status.status = status;
        }
787
788
789
790
791
792
793
    }

    private createEmptyExperimentProfile(): ExperimentProfile {
        return {
            id: getExperimentId(),
            revision: 0,
            execDuration: 0,
794
            logDir: getExperimentRootDir(),
795
            nextSequenceId: 0,
796
797
798
799
800
801
            params: {
                authorName: '',
                experimentName: '',
                trialConcurrency: 0,
                maxExecDuration: 0, // unit: second
                maxTrialNum: 0, // maxTrialNum includes all the submitted trial jobs
802
                trainingServicePlatform: '',
QuanluZhang's avatar
QuanluZhang committed
803
                searchSpace: ''
804
805
            }
        };
Deshui Yu's avatar
Deshui Yu committed
806
    }
807

QuanluZhang's avatar
QuanluZhang committed
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
    private async createCheckpointDir(): Promise<string> {
        // TODO: test
        const chkpDir: string = getCheckpointDir();
        // create checkpoint directory
        await mkDirP(chkpDir);
        // assign this directory to exp profile's checkpointDir
        if (this.experimentProfile.params.advisor) {
            this.experimentProfile.params.advisor.checkpointDir = chkpDir;
        }
        if (this.experimentProfile.params.tuner) {
            this.experimentProfile.params.tuner.checkpointDir = chkpDir;
        }
        if (this.experimentProfile.params.assessor) {
            this.experimentProfile.params.assessor.checkpointDir = chkpDir;
        }

        return Promise.resolve(chkpDir);
    }
Deshui Yu's avatar
Deshui Yu committed
826
827
828
}

export { NNIManager };