nnimanager.ts 37.4 KB
Newer Older
liuzhe-lz's avatar
liuzhe-lz committed
1
2
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
Deshui Yu's avatar
Deshui Yu committed
3
4
5
6

'use strict';

import * as assert from 'assert';
chicm-ms's avatar
chicm-ms committed
7
import { ChildProcess, StdioOptions } from 'child_process';
Deshui Yu's avatar
Deshui Yu committed
8
9
10
import { Deferred } from 'ts-deferred';
import * as component from '../common/component';
import { DataStore, MetricDataRecord, MetricType, TrialJobInfo } from '../common/datastore';
11
import { NNIError } from '../common/errors';
12
import { getExperimentId, getDispatcherPipe } from '../common/experimentStartupInfo';
Deshui Yu's avatar
Deshui Yu committed
13
14
import { getLogger, Logger } from '../common/log';
import {
chicm-ms's avatar
chicm-ms committed
15
    ExperimentParams, ExperimentProfile, Manager, ExperimentStatus,
16
    NNIManagerStatus, ProfileUpdateType, TrialJobStatistics
Deshui Yu's avatar
Deshui Yu committed
17
} from '../common/manager';
18
import { ExperimentManager } from '../common/experimentManager';
Deshui Yu's avatar
Deshui Yu committed
19
import {
20
    TrainingService, TrialJobApplicationForm, TrialJobDetail, TrialJobMetric, TrialJobStatus, LogType
Deshui Yu's avatar
Deshui Yu committed
21
} from '../common/trainingService';
22
import { delay, getCheckpointDir, getExperimentRootDir, getLogDir, getMsgDispatcherCommand, mkDirP, getTunerProc, getLogLevel, isAlive, killPid } from '../common/utils';
Deshui Yu's avatar
Deshui Yu committed
23
import {
chicm-ms's avatar
chicm-ms committed
24
    INITIALIZE, INITIALIZED, KILL_TRIAL_JOB, NEW_TRIAL_JOB, NO_MORE_TRIAL_JOBS, PING,
25
    REPORT_METRIC_DATA, REQUEST_TRIAL_JOBS, SEND_TRIAL_JOB_PARAMETER, TERMINATE, TRIAL_END, UPDATE_SEARCH_SPACE, IMPORT_DATA
Deshui Yu's avatar
Deshui Yu committed
26
} from './commands';
27
import { createDispatcherInterface, createDispatcherPipeInterface, IpcInterface } from './ipcInterface';
28
import { NNIRestServer } from '../rest_server/nniRestServer';
Deshui Yu's avatar
Deshui Yu committed
29
30

/**
chicm-ms's avatar
chicm-ms committed
31
 * NNIManager which implements Manager interface
Deshui Yu's avatar
Deshui Yu committed
32
33
34
 */
class NNIManager implements Manager {
    private trainingService: TrainingService;
35
    private dispatcher: IpcInterface | undefined;
36
    private experimentManager: ExperimentManager;
37
    private currSubmittedTrialNum: number;  // need to be recovered
QuanluZhang's avatar
QuanluZhang committed
38
    private trialConcurrencyChange: number; // >0: increase, <0: decrease
Deshui Yu's avatar
Deshui Yu committed
39
40
41
    private log: Logger;
    private dataStore: DataStore;
    private experimentProfile: ExperimentProfile;
42
    private dispatcherPid: number;
43
    private status: NNIManagerStatus;
44
    private waitingTrials: TrialJobApplicationForm[];
QuanluZhang's avatar
QuanluZhang committed
45
    private trialJobs: Map<string, TrialJobDetail>;
46
    private trialDataForTuner: string;
SparkSnail's avatar
SparkSnail committed
47
    private readonly: boolean;
48

49
    private trialJobMetricListener: (metric: TrialJobMetric) => void;
50

Deshui Yu's avatar
Deshui Yu committed
51
52
    constructor() {
        this.currSubmittedTrialNum = 0;
QuanluZhang's avatar
QuanluZhang committed
53
        this.trialConcurrencyChange = 0;
Deshui Yu's avatar
Deshui Yu committed
54
        this.trainingService = component.get(TrainingService);
55
        this.experimentManager = component.get(ExperimentManager);
Deshui Yu's avatar
Deshui Yu committed
56
        assert(this.trainingService);
57
        this.dispatcherPid = 0;
QuanluZhang's avatar
QuanluZhang committed
58
59
        this.waitingTrials = [];
        this.trialJobs = new Map<string, TrialJobDetail>();
60
        this.trialDataForTuner = '';
SparkSnail's avatar
SparkSnail committed
61
        this.readonly = false;
Deshui Yu's avatar
Deshui Yu committed
62
63
64

        this.log = getLogger();
        this.dataStore = component.get(DataStore);
65
66
67
68
        this.experimentProfile = this.createEmptyExperimentProfile();
        this.status = {
            status: 'INITIALIZED',
            errors: []
Deshui Yu's avatar
Deshui Yu committed
69
        };
chicm-ms's avatar
chicm-ms committed
70
        this.trialJobMetricListener = (metric: TrialJobMetric): void => {
71
72
73
74
            this.onTrialJobMetrics(metric).catch((err: Error) => {
                this.criticalError(NNIError.FromError(err, 'Job metrics error: '));
            });
        };
75
76
77
78
79

        const pipe = getDispatcherPipe();
        if (pipe !== null) {
            this.dispatcher = createDispatcherPipeInterface(pipe);
        }
Deshui Yu's avatar
Deshui Yu committed
80
81
82
    }

    public updateExperimentProfile(experimentProfile: ExperimentProfile, updateType: ProfileUpdateType): Promise<void> {
SparkSnail's avatar
SparkSnail committed
83
84
85
        if (this.readonly) {
            return Promise.reject(new Error('Error: can not update experiment profile in readonly mode!'));
        }
Deshui Yu's avatar
Deshui Yu committed
86
87
88
89
90
91
92
93
94
95
        switch (updateType) {
            case 'TRIAL_CONCURRENCY':
                this.updateTrialConcurrency(experimentProfile.params.trialConcurrency);
                break;
            case 'MAX_EXEC_DURATION':
                this.updateMaxExecDuration(experimentProfile.params.maxExecDuration);
                break;
            case 'SEARCH_SPACE':
                this.updateSearchSpace(experimentProfile.params.searchSpace);
                break;
QuanluZhang's avatar
QuanluZhang committed
96
97
98
            case 'MAX_TRIAL_NUM':
                this.updateMaxTrialNum(experimentProfile.params.maxTrialNum);
                break;
Deshui Yu's avatar
Deshui Yu committed
99
100
101
102
103
104
105
            default:
                throw new Error('Error: unrecognized updateType');
        }

        return this.storeExperimentProfile();
    }

106
    public importData(data: string): Promise<void> {
SparkSnail's avatar
SparkSnail committed
107
108
109
        if (this.readonly) {
            return Promise.reject(new Error('Error: can not import data in readonly mode!'));
        }
110
111
112
113
114
115
116
117
118
119
        if (this.dispatcher === undefined) {
            return Promise.reject(
                new Error('tuner has not been setup')
            );
        }
        this.dispatcher.sendCommand(IMPORT_DATA, data);

        return this.dataStore.storeTrialJobEvent('IMPORT_DATA', '', data);
    }

120
121
122
123
    public getImportedData(): Promise<string[]> {
        return this.dataStore.getImportedData();
    }

124
125
126
127
    public async exportData(): Promise<string> {
        return this.dataStore.exportTrialHpConfigs();
    }

128
    public addCustomizedTrialJob(hyperParams: string): Promise<number> {
SparkSnail's avatar
SparkSnail committed
129
130
131
        if (this.readonly) {
            return Promise.reject(new Error('Error: can not add customized trial job in readonly mode!'));
        }
Deshui Yu's avatar
Deshui Yu committed
132
        if (this.currSubmittedTrialNum >= this.experimentProfile.params.maxTrialNum) {
133
            return Promise.reject(new Error('reach maxTrialNum'));
Deshui Yu's avatar
Deshui Yu committed
134
        }
135
136
137

        // TODO: NNI manager should not peek tuner's internal protocol, let's refactor this later
        const packedParameter = {
chicm-ms's avatar
chicm-ms committed
138
139
            parameter_id: null, // eslint-disable-line @typescript-eslint/camelcase
            parameter_source: 'customized', // eslint-disable-line @typescript-eslint/camelcase
140
141
142
143
144
145
146
147
148
149
150
            parameters: JSON.parse(hyperParams)
        }

        const form: TrialJobApplicationForm = {
            sequenceId: this.experimentProfile.nextSequenceId++,
            hyperParameters: {
                value: JSON.stringify(packedParameter),
                index: 0
            }
        };
        this.waitingTrials.push(form);
Deshui Yu's avatar
Deshui Yu committed
151
152

        // trial id has not been generated yet, thus use '' instead
153
154
155
        this.dataStore.storeTrialJobEvent('ADD_CUSTOMIZED', '', hyperParams);

        return Promise.resolve(form.sequenceId);
Deshui Yu's avatar
Deshui Yu committed
156
157
158
    }

    public async cancelTrialJobByUser(trialJobId: string): Promise<void> {
SparkSnail's avatar
SparkSnail committed
159
160
161
        if (this.readonly) {
            return Promise.reject(new Error('Error: can not cancel trial job in readonly mode!'));
        }
chicm-ms's avatar
chicm-ms committed
162
        this.log.info(`User cancelTrialJob: ${trialJobId}`);
Deshui Yu's avatar
Deshui Yu committed
163
164
165
166
167
        await this.trainingService.cancelTrialJob(trialJobId);
        await this.dataStore.storeTrialJobEvent('USER_TO_CANCEL', trialJobId, '');
    }

    public async startExperiment(expParams: ExperimentParams): Promise<string> {
chicm-ms's avatar
chicm-ms committed
168
        this.log.info(`Starting experiment: ${this.experimentProfile.id}`);
Deshui Yu's avatar
Deshui Yu committed
169
170
171
        this.experimentProfile.params = expParams;
        await this.storeExperimentProfile();
        this.log.debug('Setup tuner...');
172

173
        // Set up multiphase config
174
        if (expParams.multiPhase && this.trainingService.isMultiPhaseJobSupported) {
175
176
            this.trainingService.setClusterMetadata('multiPhase', expParams.multiPhase.toString());
        }
177
178
179
180
        // Set up versionCheck config
        if (expParams.versionCheck !== undefined) {
            this.trainingService.setClusterMetadata('version_check', expParams.versionCheck.toString());
        }
SparkSnail's avatar
SparkSnail committed
181
182
183
184
        // Set up logCollection config
        if (expParams.logCollection !== undefined) {
            this.trainingService.setClusterMetadata('log_collection', expParams.logCollection.toString());
        }
185

chicm-ms's avatar
chicm-ms committed
186
        const dispatcherCommand: string = getMsgDispatcherCommand(expParams);
187
        this.log.debug(`dispatcher command: ${dispatcherCommand}`);
QuanluZhang's avatar
QuanluZhang committed
188
        const checkpointDir: string = await this.createCheckpointDir();
Deshui Yu's avatar
Deshui Yu committed
189
        this.setupTuner(
190
191
            dispatcherCommand,
            undefined,
Deshui Yu's avatar
Deshui Yu committed
192
            'start',
QuanluZhang's avatar
QuanluZhang committed
193
            checkpointDir);
Deshui Yu's avatar
Deshui Yu committed
194

195
        this.experimentProfile.startTime = Date.now();
chicm-ms's avatar
chicm-ms committed
196
        this.setStatus('RUNNING');
Deshui Yu's avatar
Deshui Yu committed
197
        await this.storeExperimentProfile();
198
199
        this.run().catch((err: Error) => {
            this.criticalError(err);
Deshui Yu's avatar
Deshui Yu committed
200
        });
201

Deshui Yu's avatar
Deshui Yu committed
202
203
204
        return this.experimentProfile.id;
    }

SparkSnail's avatar
SparkSnail committed
205
    public async resumeExperiment(readonly: boolean): Promise<void> {
chicm-ms's avatar
chicm-ms committed
206
        this.log.info(`Resuming experiment: ${this.experimentProfile.id}`);
Deshui Yu's avatar
Deshui Yu committed
207
208
209
        //Fetch back the experiment profile
        const experimentId: string = getExperimentId();
        this.experimentProfile = await this.dataStore.getExperimentProfile(experimentId);
SparkSnail's avatar
SparkSnail committed
210
211
212
213
        this.readonly = readonly;
        if (readonly) {
            return Promise.resolve();
        }
Deshui Yu's avatar
Deshui Yu committed
214
        const expParams: ExperimentParams = this.experimentProfile.params;
215

216
        // Set up multiphase config
217
        if (expParams.multiPhase && this.trainingService.isMultiPhaseJobSupported) {
218
219
220
            this.trainingService.setClusterMetadata('multiPhase', expParams.multiPhase.toString());
        }

221
222
        // Set up versionCheck config
        if (expParams.versionCheck !== undefined) {
SparkSnail's avatar
SparkSnail committed
223
            this.trainingService.setClusterMetadata('version_check', expParams.versionCheck.toString());
224
225
        }

chicm-ms's avatar
chicm-ms committed
226
        const dispatcherCommand: string = getMsgDispatcherCommand(expParams);
227
        this.log.debug(`dispatcher command: ${dispatcherCommand}`);
QuanluZhang's avatar
QuanluZhang committed
228
        const checkpointDir: string = await this.createCheckpointDir();
Deshui Yu's avatar
Deshui Yu committed
229
        this.setupTuner(
230
231
            dispatcherCommand,
            undefined,
Deshui Yu's avatar
Deshui Yu committed
232
            'resume',
QuanluZhang's avatar
QuanluZhang committed
233
            checkpointDir);
Deshui Yu's avatar
Deshui Yu committed
234
235
236
237
238
239
240
241
242

        const allTrialJobs: TrialJobInfo[] = await this.dataStore.listTrialJobs();

        // Resume currSubmittedTrialNum
        this.currSubmittedTrialNum = allTrialJobs.length;

        // Check the final status for WAITING and RUNNING jobs
        await Promise.all(allTrialJobs
            .filter((job: TrialJobInfo) => job.status === 'WAITING' || job.status === 'RUNNING')
J-shang's avatar
J-shang committed
243
            .map((job: TrialJobInfo) => this.dataStore.storeTrialJobEvent('FAILED', job.trialJobId)));
Deshui Yu's avatar
Deshui Yu committed
244

245
246
247
        // Collect generated trials and imported trials
        const finishedTrialData: string = await this.exportData();
        const importedData: string[] = await this.dataStore.getImportedData();
chicm-ms's avatar
chicm-ms committed
248
        let trialData: Record<string, any>[] = JSON.parse(finishedTrialData);
249
250
        for (const oneImportedData of importedData) {
            // do not deduplicate
chicm-ms's avatar
chicm-ms committed
251
            trialData = trialData.concat(<Record<string, any>[]>JSON.parse(oneImportedData));
252
253
254
        }
        this.trialDataForTuner = JSON.stringify(trialData);

chicm-ms's avatar
chicm-ms committed
255
256
257
258
259
        if (this.experimentProfile.execDuration < this.experimentProfile.params.maxExecDuration &&
            this.currSubmittedTrialNum < this.experimentProfile.params.maxTrialNum &&
            this.experimentProfile.endTime) {
            delete this.experimentProfile.endTime;
        }
chicm-ms's avatar
chicm-ms committed
260
        this.setStatus('RUNNING');
261

Deshui Yu's avatar
Deshui Yu committed
262
        // TO DO: update database record for resume event
263
264
265
        this.run().catch((err: Error) => {
            this.criticalError(err);
        });
Deshui Yu's avatar
Deshui Yu committed
266
267
    }

268
269
    public getTrialJob(trialJobId: string): Promise<TrialJobInfo> {
        return this.dataStore.getTrialJob(trialJobId);
Deshui Yu's avatar
Deshui Yu committed
270
271
272
    }

    public async setClusterMetadata(key: string, value: string): Promise<void> {
SparkSnail's avatar
SparkSnail committed
273
274
275
        if (this.readonly) {
            return Promise.reject(new Error('Error: can not set cluster metadata in readonly mode!'));
        }
chicm-ms's avatar
chicm-ms committed
276
        this.log.info(`NNIManager setClusterMetadata, key: ${key}, value: ${value}`);
Deshui Yu's avatar
Deshui Yu committed
277
278
279
280
        let timeoutId: NodeJS.Timer;
        // TO DO: move timeout value to constants file
        const delay1: Promise<{}> = new Promise((resolve: Function, reject: Function): void => {
            timeoutId = setTimeout(
281
                () => { reject(new Error('TrainingService setClusterMetadata timeout. Please check your config file.')); },
282
                30000);
Deshui Yu's avatar
Deshui Yu committed
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
        });
        await Promise.race([delay1, this.trainingService.setClusterMetadata(key, value)]).finally(() => {
            clearTimeout(timeoutId);
        });
    }

    public getClusterMetadata(key: string): Promise<string> {
        return Promise.resolve(
            this.trainingService.getClusterMetadata(key)
        );
    }

    public async getTrialJobStatistics(): Promise<TrialJobStatistics[]> {
        return this.dataStore.getTrialJobStatistics();
    }

299
    public async stopExperiment(): Promise<void> {
300
301
302
303
304
        await this.stopExperimentTopHalf();
        await this.stopExperimentBottomHalf();
    }

    public async stopExperimentTopHalf(): Promise<void> {
chicm-ms's avatar
chicm-ms committed
305
306
        this.setStatus('STOPPING');
        this.log.info('Stopping experiment, cleaning up ...');
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349

        if (this.dispatcher === undefined) {
            this.log.error('Tuner has not been setup');
            return;
        }

        this.trainingService.removeTrialJobMetricListener(this.trialJobMetricListener);
        if (this.dispatcherPid > 0) {
            this.dispatcher.sendCommand(TERMINATE);
            // gracefully terminate tuner and assessor here, wait at most 30 seconds.
            for (let i: number = 0; i < 30; i++) {
                if (!await isAlive(this.dispatcherPid)) {
                    break;
                }
                await delay(1000);
            }
            await killPid(this.dispatcherPid);
        }
        this.dispatcher = undefined;
    }

    public async stopExperimentBottomHalf(): Promise<void> {
        const trialJobList: TrialJobDetail[] = await this.trainingService.listTrialJobs();

        // DON'T try to make it in parallel, the training service may not handle it well.
        // If there is performance concern, consider to support batch cancellation on training service.
        for (const trialJob of trialJobList) {
            if (trialJob.status === 'RUNNING' ||
                trialJob.status === 'WAITING') {
                try {
                    this.log.info(`cancelTrialJob: ${trialJob.id}`);
                    await this.trainingService.cancelTrialJob(trialJob.id);
                } catch (error) {
                    this.log.debug(`ignorable error on canceling trial ${trialJob.id}. ${error}`);
                }
            }
        }
        await this.trainingService.cleanUp();
        if (this.experimentProfile.endTime === undefined) {
            this.setEndtime();
        }
        await this.storeExperimentProfile();
        this.setStatus('STOPPED');
chicm-ms's avatar
chicm-ms committed
350
        this.log.info('Experiment stopped.');
351
352
353
354
355
356
357
358
359
360
361
362
363

        let hasError: boolean = false;
        try {
            this.experimentManager.stop();
            this.dataStore.close();
            await component.get<NNIRestServer>(NNIRestServer).stop();
        } catch (err) {
            hasError = true;
            this.log.error(`${err.stack}`);
        } finally {
            this.log.close();
            process.exit(hasError ? 1 : 0);
        }
Deshui Yu's avatar
Deshui Yu committed
364
365
    }

366
    public async getMetricData(trialJobId?: string, metricType?: MetricType): Promise<MetricDataRecord[]> {
Deshui Yu's avatar
Deshui Yu committed
367
368
369
        return this.dataStore.getMetricData(trialJobId, metricType);
    }

370
371
372
373
374
375
    public async getMetricDataByRange(minSeqId: number, maxSeqId: number): Promise<MetricDataRecord[]> {
        const trialJobs = await this.dataStore.listTrialJobs();
        const targetTrials = trialJobs.filter(trial => (
            // FIXME: can this be undefined?
            trial.sequenceId !== undefined && minSeqId <= trial.sequenceId && trial.sequenceId <= maxSeqId
        ));
J-shang's avatar
J-shang committed
376
        const targetTrialIds = new Set(targetTrials.map(trial => trial.trialJobId));
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400

        const allMetrics = await this.dataStore.getMetricData();
        return allMetrics.filter(metric => targetTrialIds.has(metric.trialJobId));
    }

    public async getLatestMetricData(): Promise<MetricDataRecord[]> {
        // FIXME: this can take a long time
        const allMetrics: MetricDataRecord[] = await this.dataStore.getMetricData();
        const finals: MetricDataRecord[] = [];
        const latestIntermediates: Map<string, MetricDataRecord> = new Map<string, MetricDataRecord>();
        for (const metric of allMetrics) {
            if (metric.type !== 'PERIODICAL') {
                finals.push(metric);
            } else {
                const old: MetricDataRecord | undefined = latestIntermediates.get(metric.trialJobId);
                if (old === undefined || old.sequence <= metric.sequence) {
                    latestIntermediates.set(metric.trialJobId, metric);
                }
            }
        }
        return finals.concat(Array.from(latestIntermediates.values()));
        // FIXME: unit test
    }

401
402
403
404
    public async getTrialLog(trialJobId: string, logType: LogType): Promise<string> {
        return this.trainingService.getTrialLog(trialJobId, logType);
    }

Deshui Yu's avatar
Deshui Yu committed
405
406
407
408
409
410
411
412
    public getExperimentProfile(): Promise<ExperimentProfile> {
        // TO DO: using Promise.resolve()
        const deferred: Deferred<ExperimentProfile> = new Deferred<ExperimentProfile>();
        deferred.resolve(this.experimentProfile);

        return deferred.promise;
    }

413
414
415
416
    public getStatus(): NNIManagerStatus {
        return this.status;
    }

Deshui Yu's avatar
Deshui Yu committed
417
418
419
420
    public async listTrialJobs(status?: TrialJobStatus): Promise<TrialJobInfo[]> {
        return this.dataStore.listTrialJobs(status);
    }

421
422
    private setupTuner(command: string, cwd: string | undefined, mode: 'start' | 'resume', dataDirectory: string): void {
        if (this.dispatcher !== undefined) {
Deshui Yu's avatar
Deshui Yu committed
423
424
            return;
        }
goooxu's avatar
goooxu committed
425
        const stdio: StdioOptions = ['ignore', process.stdout, process.stderr, 'pipe', 'pipe'];
Deshui Yu's avatar
Deshui Yu committed
426
427
428
429
430
431
432
        let newCwd: string;
        if (cwd === undefined || cwd === '') {
            newCwd = getLogDir();
        } else {
            newCwd = cwd;
        }
        // TO DO: add CUDA_VISIBLE_DEVICES
433
434
435
436
437
        let includeIntermediateResultsEnv: boolean | undefined = false;
        if (this.experimentProfile.params.tuner !== undefined) {
            includeIntermediateResultsEnv = this.experimentProfile.params.tuner.includeIntermediateResults;
        }

chicm-ms's avatar
chicm-ms committed
438
        const nniEnv = {
chicm-ms's avatar
chicm-ms committed
439
            SDK_PROCESS: 'dispatcher',
Zejun Lin's avatar
Zejun Lin committed
440
441
            NNI_MODE: mode,
            NNI_CHECKPOINT_DIRECTORY: dataDirectory,
442
            NNI_LOG_DIRECTORY: getLogDir(),
443
            NNI_LOG_LEVEL: getLogLevel(),
444
445
            NNI_INCLUDE_INTERMEDIATE_RESULTS: includeIntermediateResultsEnv,
            CUDA_VISIBLE_DEVICES: this.getGpuEnvvarValue()
Zejun Lin's avatar
Zejun Lin committed
446
        };
chicm-ms's avatar
chicm-ms committed
447
        const newEnv = Object.assign({}, process.env, nniEnv);
448
        const tunerProc: ChildProcess = getTunerProc(command, stdio, newCwd, newEnv);
449
450
        this.dispatcherPid = tunerProc.pid;
        this.dispatcher = createDispatcherInterface(tunerProc);
Deshui Yu's avatar
Deshui Yu committed
451
452
453
454

        return;
    }

455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
    private getGpuEnvvarValue(): string {
        let cudaDevices: string | undefined;

        if (this.experimentProfile.params.advisor !== undefined) {
            cudaDevices = this.experimentProfile.params.advisor.gpuIndices;
        } else if (this.experimentProfile.params.tuner !== undefined) {
            cudaDevices = this.experimentProfile.params.tuner.gpuIndices;
        }

        if (cudaDevices === undefined) {
            return '';
        } else {
            return cudaDevices;
        }
    }

Deshui Yu's avatar
Deshui Yu committed
471
    private updateTrialConcurrency(trialConcurrency: number): void {
QuanluZhang's avatar
QuanluZhang committed
472
473
        // we assume trialConcurrency >= 0, which is checked by restserver
        this.trialConcurrencyChange += (trialConcurrency - this.experimentProfile.params.trialConcurrency);
Deshui Yu's avatar
Deshui Yu committed
474
475
476
477
478
479
480
481
482
483
484
485
        this.experimentProfile.params.trialConcurrency = trialConcurrency;

        return;
    }

    private updateMaxExecDuration(duration: number): void {
        this.experimentProfile.params.maxExecDuration = duration;

        return;
    }

    private updateSearchSpace(searchSpace: string): void {
486
        if (this.dispatcher === undefined) {
Deshui Yu's avatar
Deshui Yu committed
487
488
            throw new Error('Error: tuner has not been setup');
        }
489
        this.dispatcher.sendCommand(UPDATE_SEARCH_SPACE, searchSpace);
Deshui Yu's avatar
Deshui Yu committed
490
491
492
493
494
        this.experimentProfile.params.searchSpace = searchSpace;

        return;
    }

QuanluZhang's avatar
QuanluZhang committed
495
496
497
498
499
500
    private updateMaxTrialNum(maxTrialNum: number): void {
        this.experimentProfile.params.maxTrialNum = maxTrialNum;

        return;
    }

Deshui Yu's avatar
Deshui Yu committed
501
    private async periodicallyUpdateExecDuration(): Promise<void> {
502
        let count: number = 1;
503
        while (!['ERROR', 'STOPPING', 'STOPPED'].includes(this.status.status)) {
504
            await delay(1000 * 1); // 1 seconds
505
            if (['RUNNING', 'NO_MORE_TRIAL', 'TUNER_NO_MORE_TRIAL'].includes(this.status.status)) {
506
507
508
509
510
511
                this.experimentProfile.execDuration += 1;
                if (count % 10 === 0) {
                    await this.storeExperimentProfile();
                }
            }
            count += 1;
Deshui Yu's avatar
Deshui Yu committed
512
513
514
        }
    }

chicm-ms's avatar
chicm-ms committed
515
516
517
518
519
520
    private async pingDispatcher(): Promise<void> {
        if (this.dispatcher === undefined) {
            throw new Error('Error: tuner has not been setup');
        }
        while (!['ERROR', 'STOPPING', 'STOPPED'].includes(this.status.status)) {
            this.dispatcher.sendCommand(PING);
chicm-ms's avatar
chicm-ms committed
521
            await delay(1000 * 5);
chicm-ms's avatar
chicm-ms committed
522
523
524
        }
    }

QuanluZhang's avatar
QuanluZhang committed
525
526
    private async requestTrialJobsStatus(): Promise<number> {
        let finishedTrialJobNum: number = 0;
QuanluZhang's avatar
QuanluZhang committed
527
528
529
        if (this.dispatcher === undefined) {
            throw new Error('Error: tuner has not been setup');
        }
QuanluZhang's avatar
QuanluZhang committed
530
531
532
533
        for (const trialJobId of Array.from(this.trialJobs.keys())) {
            const trialJobDetail: TrialJobDetail = await this.trainingService.getTrialJob(trialJobId);
            const oldTrialJobDetail: TrialJobDetail | undefined = this.trialJobs.get(trialJobId);
            if (oldTrialJobDetail !== undefined && oldTrialJobDetail.status !== trialJobDetail.status) {
chicm-ms's avatar
chicm-ms committed
534
                this.log.info(`Trial job ${trialJobDetail.id} status changed from ${oldTrialJobDetail.status} to ${trialJobDetail.status}`);
QuanluZhang's avatar
QuanluZhang committed
535
                this.trialJobs.set(trialJobId, Object.assign({}, trialJobDetail));
536
                await this.dataStore.storeTrialJobEvent(trialJobDetail.status, trialJobDetail.id, undefined, trialJobDetail);
QuanluZhang's avatar
QuanluZhang committed
537
            }
538
539
540
541
            const newTrialJobDetail: TrialJobDetail | undefined = this.trialJobs.get(trialJobId);
            if (newTrialJobDetail !== undefined) {
                newTrialJobDetail.message = trialJobDetail.message;
            }
QuanluZhang's avatar
QuanluZhang committed
542
            let hyperParams: string | undefined = undefined;
QuanluZhang's avatar
QuanluZhang committed
543
544
545
            switch (trialJobDetail.status) {
                case 'SUCCEEDED':
                case 'USER_CANCELED':
QuanluZhang's avatar
QuanluZhang committed
546
                case 'EARLY_STOPPED':
QuanluZhang's avatar
QuanluZhang committed
547
548
                    this.trialJobs.delete(trialJobId);
                    finishedTrialJobNum++;
549
                    hyperParams = trialJobDetail.form.hyperParameters.value;
QuanluZhang's avatar
QuanluZhang committed
550
                    this.dispatcher.sendCommand(TRIAL_END, JSON.stringify({
chicm-ms's avatar
chicm-ms committed
551
                        trial_job_id: trialJobDetail.id, // eslint-disable-line @typescript-eslint/camelcase
QuanluZhang's avatar
QuanluZhang committed
552
                        event: trialJobDetail.status,
chicm-ms's avatar
chicm-ms committed
553
                        hyper_params: hyperParams // eslint-disable-line @typescript-eslint/camelcase
goooxu's avatar
goooxu committed
554
                    }));
QuanluZhang's avatar
QuanluZhang committed
555
556
557
558
559
560
561
                    break;
                case 'FAILED':
                case 'SYS_CANCELED':
                    // In the current version, we do not retry
                    // TO DO: push this job to queue for retry
                    this.trialJobs.delete(trialJobId);
                    finishedTrialJobNum++;
562
                    hyperParams = trialJobDetail.form.hyperParameters.value;
QuanluZhang's avatar
QuanluZhang committed
563
                    this.dispatcher.sendCommand(TRIAL_END, JSON.stringify({
chicm-ms's avatar
chicm-ms committed
564
                        trial_job_id: trialJobDetail.id, // eslint-disable-line @typescript-eslint/camelcase
QuanluZhang's avatar
QuanluZhang committed
565
                        event: trialJobDetail.status,
chicm-ms's avatar
chicm-ms committed
566
                        hyper_params: hyperParams // eslint-disable-line @typescript-eslint/camelcase
goooxu's avatar
goooxu committed
567
                    }));
QuanluZhang's avatar
QuanluZhang committed
568
569
570
571
572
573
574
575
576
577
                    break;
                case 'WAITING':
                case 'RUNNING':
                case 'UNKNOWN':
                    // Do nothing
                    break;
                default:
                // TO DO: add warning in log
            }
        }
goooxu's avatar
goooxu committed
578

Gems Guo's avatar
Gems Guo committed
579
        return finishedTrialJobNum;
QuanluZhang's avatar
QuanluZhang committed
580
581
582
583
584
585
    }

    private async manageTrials(): Promise<void> {
        if (this.dispatcher === undefined) {
            throw new Error('Error: tuner has not been setup');
        }
QuanluZhang's avatar
QuanluZhang committed
586
        let allFinishedTrialJobNum: number = this.currSubmittedTrialNum;
QuanluZhang's avatar
QuanluZhang committed
587
        let waitSubmittedToFinish: number;
588
        while (!['ERROR', 'STOPPING', 'STOPPED'].includes(this.status.status)) {
QuanluZhang's avatar
QuanluZhang committed
589
590
591
592
593
594
595
            const finishedTrialJobNum: number = await this.requestTrialJobsStatus();
            allFinishedTrialJobNum += finishedTrialJobNum;

            // requestTrialNum is the number of trials that will be requested from tuner.
            // If trialConcurrency does not change, requestTrialNum equals finishedTrialJobNum.
            // If trialConcurrency changes, for example, trialConcurrency increases by 2 (trialConcurrencyChange=2), then
            // requestTrialNum equals 2 + finishedTrialJobNum and trialConcurrencyChange becomes 0.
596
            // If trialConcurrency changes, for example, trialConcurrency decreases by 4 (trialConcurrencyChange=-4) and
QuanluZhang's avatar
QuanluZhang committed
597
598
599
600
601
602
603
604
            // finishedTrialJobNum is 2, then requestTrialNum becomes -2. No trial will be requested from tuner,
            // and trialConcurrencyChange becomes -2.
            const requestTrialNum: number = this.trialConcurrencyChange + finishedTrialJobNum;
            if (requestTrialNum >= 0) {
                this.trialConcurrencyChange = 0;
            } else {
                this.trialConcurrencyChange = requestTrialNum;
            }
chicm-ms's avatar
chicm-ms committed
605

606
            this.requestTrialJobs(requestTrialNum);
chicm-ms's avatar
chicm-ms committed
607

QuanluZhang's avatar
QuanluZhang committed
608
            // check maxtrialnum and maxduration here
609
            // NO_MORE_TRIAL is more like a subset of RUNNING, because during RUNNING tuner
610
            // might tell nnimanager that this is no more trials. In NO_MORE_TRIAL state, the experiment is viewed
611
612
            // as still running. DONE could be transfered from RUNNING or NO_MORE_TRIAL.
            assert(this.status.status === 'RUNNING' ||
613
                this.status.status === 'DONE' ||
QuanluZhang's avatar
QuanluZhang committed
614
                this.status.status === 'NO_MORE_TRIAL' ||
615
                this.status.status === 'TUNER_NO_MORE_TRIAL', `Actual status: ${this.status.status}`);
616
            if (this.experimentProfile.execDuration > this.experimentProfile.params.maxExecDuration ||
QuanluZhang's avatar
QuanluZhang committed
617
                this.currSubmittedTrialNum >= this.experimentProfile.params.maxTrialNum) {
QuanluZhang's avatar
QuanluZhang committed
618
                if (this.status.status !== 'DONE') {
chicm-ms's avatar
chicm-ms committed
619
                    this.setStatus('NO_MORE_TRIAL');
QuanluZhang's avatar
QuanluZhang committed
620
621
622
623
                    waitSubmittedToFinish = this.currSubmittedTrialNum;

                    assert(allFinishedTrialJobNum <= waitSubmittedToFinish);
                    if (allFinishedTrialJobNum >= waitSubmittedToFinish) {
chicm-ms's avatar
chicm-ms committed
624
                        this.setStatus('DONE');
625
                        this.setEndtime();
QuanluZhang's avatar
QuanluZhang committed
626
627
628
629
                        await this.storeExperimentProfile();
                        // write this log for travis CI
                        this.log.info('Experiment done.');
                    }
QuanluZhang's avatar
QuanluZhang committed
630
631
632
                }
            } else {
                if (this.status.status === 'DONE') {
633
634
                    delete this.experimentProfile.endTime;
                    await this.storeExperimentProfile();
QuanluZhang's avatar
QuanluZhang committed
635
                }
QuanluZhang's avatar
QuanluZhang committed
636
                if (this.status.status !== 'TUNER_NO_MORE_TRIAL') {
chicm-ms's avatar
chicm-ms committed
637
                    this.setStatus('RUNNING');
638
                }
QuanluZhang's avatar
QuanluZhang committed
639
640
641
642
643
                for (let i: number = this.trialJobs.size; i < this.experimentProfile.params.trialConcurrency; i++) {
                    if (this.waitingTrials.length === 0 ||
                        this.currSubmittedTrialNum >= this.experimentProfile.params.maxTrialNum) {
                        break;
                    }
644
                    const form = this.waitingTrials.shift() as TrialJobApplicationForm;
QuanluZhang's avatar
QuanluZhang committed
645
                    this.currSubmittedTrialNum++;
646
647
                    this.log.info(`submitTrialJob: form: ${JSON.stringify(form)}`);
                    const trialJobDetail: TrialJobDetail = await this.trainingService.submitTrialJob(form);
648
                    const Snapshot: TrialJobDetail = Object.assign({}, trialJobDetail);
649
                    await this.storeExperimentProfile();
650
                    this.trialJobs.set(trialJobDetail.id, Snapshot);
QuanluZhang's avatar
QuanluZhang committed
651
652
653
                    const trialJobDetailSnapshot: TrialJobDetail | undefined = this.trialJobs.get(trialJobDetail.id);
                    if (trialJobDetailSnapshot != undefined) {
                        await this.dataStore.storeTrialJobEvent(
654
                            trialJobDetailSnapshot.status, trialJobDetailSnapshot.id, form.hyperParameters.value, trialJobDetailSnapshot);
QuanluZhang's avatar
QuanluZhang committed
655
656
657
658
659
660
661
662
663
                    } else {
                        assert(false, `undefined trialJobDetail in trialJobs: ${trialJobDetail.id}`);
                    }
                }
            }
            await delay(1000 * 5); // 5 seconds
        }
    }

Deshui Yu's avatar
Deshui Yu committed
664
665
666
667
668
669
    private storeExperimentProfile(): Promise<void> {
        this.experimentProfile.revision += 1;

        return this.dataStore.storeExperimentProfile(this.experimentProfile);
    }

670
    private async run(): Promise<void> {
QuanluZhang's avatar
QuanluZhang committed
671
        assert(this.dispatcher !== undefined);
672
673
674
675
676
677
678

        this.addEventListeners();

        this.sendInitTunerCommands();

        await Promise.all([
            this.periodicallyUpdateExecDuration(),
chicm-ms's avatar
chicm-ms committed
679
            this.pingDispatcher().catch((err: Error) => {
chicm-ms's avatar
chicm-ms committed
680
                throw NNIError.FromError(err, 'Dispatcher error: ');
chicm-ms's avatar
chicm-ms committed
681
            }),
682
            this.trainingService.run().catch((err: Error) => {
chicm-ms's avatar
chicm-ms committed
683
                throw NNIError.FromError(err, 'Training service error: ');
684
            }),
QuanluZhang's avatar
QuanluZhang committed
685
            this.manageTrials().catch((err: Error) => {
chicm-ms's avatar
chicm-ms committed
686
                throw NNIError.FromError(err, 'Job management error: ');
687
            })]);
688
689
    }

QuanluZhang's avatar
QuanluZhang committed
690
    private addEventListeners(): void {
chicm-ms's avatar
chicm-ms committed
691
        this.log.info('Add event listeners');
692
        // TO DO: cannot run this method more than once in one NNIManager instance
QuanluZhang's avatar
QuanluZhang committed
693
        if (this.dispatcher === undefined) {
694
695
            throw new Error('Error: tuner or job maintainer have not been setup');
        }
696
        this.trainingService.addTrialJobMetricListener(this.trialJobMetricListener);
697
698
699

        this.dispatcher.onCommand((commandType: string, content: string) => {
            this.onTunerCommand(commandType, content).catch((err: Error) => {
chicm-ms's avatar
chicm-ms committed
700
                this.criticalError(NNIError.FromError(err, 'Tuner command event error: '));
701
702
            });
        });
703
704
705
706
        this.dispatcher.onError((error: Error) => {
            this.log.error(`Dispatcher error: ${error.message}`);
            this.criticalError(new Error('Dispatcher stream error, tuner may have crashed.'));
        });
707
708
709
710
    }

    private sendInitTunerCommands(): void {
        if (this.dispatcher === undefined) {
711
            throw new Error('Dispatcher error: tuner has not been setup');
712
        }
chicm-ms's avatar
chicm-ms committed
713
714
715
        this.log.debug(`Send tuner command: INITIALIZE: ${this.experimentProfile.params.searchSpace}`);
        // Tuner need to be initialized with search space before generating any hyper parameters
        this.dispatcher.sendCommand(INITIALIZE, this.experimentProfile.params.searchSpace);
716
717
718
    }

    private async onTrialJobMetrics(metric: TrialJobMetric): Promise<void> {
719
        this.log.debug(`NNIManager received trial job metrics: ${JSON.stringify(metric)}`);
720
721
722
723
724
725
726
727
        if (this.trialJobs.has(metric.id)){
            await this.dataStore.storeMetricData(metric.id, metric.data);
            if (this.dispatcher === undefined) {
                throw new Error('Error: tuner has not been setup');
            }
            this.dispatcher.sendCommand(REPORT_METRIC_DATA, metric.data);
        } else {
            this.log.warning(`NNIManager received non-existent trial job metrics: ${metric}`);
728
729
730
        }
    }

chicm-ms's avatar
chicm-ms committed
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
    private requestTrialJobs(jobNum: number): void {
        if (jobNum < 1) {
            return;
        }
        if (this.dispatcher === undefined) {
            throw new Error('Dispatcher error: tuner has not been setup');
        }
        if (this.experimentProfile.params.multiThread) {
            // Send multiple requests to ensure multiple hyper parameters are generated in non-blocking way.
            // For a single REQUEST_TRIAL_JOBS request, hyper parameters are generated one by one
            // sequentially.
            for (let i: number = 0; i < jobNum; i++) {
                this.dispatcher.sendCommand(REQUEST_TRIAL_JOBS, '1');
            }
        } else {
            this.dispatcher.sendCommand(REQUEST_TRIAL_JOBS, String(jobNum));
        }
    }

750
    private async onTunerCommand(commandType: string, content: string): Promise<void> {
horizon365's avatar
horizon365 committed
751
        this.log.info(`NNIManager received command from dispatcher: ${commandType}, ${content}`);
752
        switch (commandType) {
chicm-ms's avatar
chicm-ms committed
753
            case INITIALIZED: {
chicm-ms's avatar
chicm-ms committed
754
                // Tuner is intialized, search space is set, request tuner to generate hyper parameters
755
756
757
758
759
760
                if (this.trialDataForTuner.length > 0) {
                    if (this.dispatcher === undefined) {
                        throw new Error('Dispatcher error: tuner has not been setup');
                    }
                    this.dispatcher.sendCommand(IMPORT_DATA, this.trialDataForTuner);
                }
chicm-ms's avatar
chicm-ms committed
761
762
                this.requestTrialJobs(this.experimentProfile.params.trialConcurrency);
                break;
chicm-ms's avatar
chicm-ms committed
763
764
            }
            case NEW_TRIAL_JOB: {
QuanluZhang's avatar
QuanluZhang committed
765
                if (this.status.status === 'TUNER_NO_MORE_TRIAL') {
766
                    this.log.warning('It is not supposed to receive more trials after NO_MORE_TRIAL is set');
chicm-ms's avatar
chicm-ms committed
767
                    this.setStatus('RUNNING');
768
                }
769
770
771
772
773
774
775
776
                const form: TrialJobApplicationForm = {
                    sequenceId: this.experimentProfile.nextSequenceId++,
                    hyperParameters: {
                        value: content,
                        index: 0
                    }
                };
                this.waitingTrials.push(form);
777
                break;
chicm-ms's avatar
chicm-ms committed
778
779
            }
            case SEND_TRIAL_JOB_PARAMETER: {
chicm-ms's avatar
chicm-ms committed
780
781
782
783
784
                const tunerCommand: any = JSON.parse(content);
                assert(tunerCommand.parameter_index >= 0);
                assert(tunerCommand.trial_job_id !== undefined);

                const trialJobForm: TrialJobApplicationForm = {
785
                    sequenceId: -1,  // FIXME: multi-phase tuner should use sequence ID instead of trial job ID
chicm-ms's avatar
chicm-ms committed
786
787
788
789
790
                    hyperParameters: {
                        value: content,
                        index: tunerCommand.parameter_index
                    }
                };
chicm-ms's avatar
chicm-ms committed
791
                this.log.info(`updateTrialJob: job id: ${tunerCommand.trial_job_id}, form: ${JSON.stringify(trialJobForm)}`);
chicm-ms's avatar
chicm-ms committed
792
                await this.trainingService.updateTrialJob(tunerCommand.trial_job_id, trialJobForm);
793
794
795
796
797
                if (tunerCommand['parameters'] !== null) {
                    // parameters field is set as empty string if no more hyper parameter can be generated by tuner.
                    await this.dataStore.storeTrialJobEvent(
                        'ADD_HYPERPARAMETER', tunerCommand.trial_job_id, content, undefined);
                }
chicm-ms's avatar
chicm-ms committed
798
                break;
chicm-ms's avatar
chicm-ms committed
799
800
            }
            case NO_MORE_TRIAL_JOBS: {
801
802
803
                if (!['ERROR', 'STOPPING', 'STOPPED'].includes(this.status.status)) {
                    this.setStatus('TUNER_NO_MORE_TRIAL');
                }
804
                break;
chicm-ms's avatar
chicm-ms committed
805
806
            }
            case KILL_TRIAL_JOB: {
chicm-ms's avatar
chicm-ms committed
807
                this.log.info(`cancelTrialJob: ${JSON.parse(content)}`);
QuanluZhang's avatar
QuanluZhang committed
808
                await this.trainingService.cancelTrialJob(JSON.parse(content), true);
809
                break;
chicm-ms's avatar
chicm-ms committed
810
            }
811
812
813
            default:
                throw new Error('Error: unsupported command type from tuner');
        }
Deshui Yu's avatar
Deshui Yu committed
814
815
    }

816
817
818
819
820
821
822
823
824
825
    private criticalError(err: Error): void {
        this.logError(err);
        console.error(err);
    }

    private logError(err: Error): void {
        if (err.stack !== undefined) {
            this.log.error(err.stack);
        }
        this.status.errors.push(err.message);
826
        this.setEndtime();
chicm-ms's avatar
chicm-ms committed
827
828
829
830
831
832
833
        this.setStatus('ERROR');
    }

    private setStatus(status: ExperimentStatus): void {
        if (status !== this.status.status) {
            this.log.info(`Change NNIManager status from: ${this.status.status} to: ${status}`);
            this.status.status = status;
834
            this.experimentManager.setExperimentInfo(this.experimentProfile.id, 'status', this.status.status);
chicm-ms's avatar
chicm-ms committed
835
        }
836
837
    }

838
839
840
841
842
    private setEndtime(): void {
        this.experimentProfile.endTime = Date.now();
        this.experimentManager.setExperimentInfo(this.experimentProfile.id, 'endTime', this.experimentProfile.endTime);
    }

843
844
845
846
847
    private createEmptyExperimentProfile(): ExperimentProfile {
        return {
            id: getExperimentId(),
            revision: 0,
            execDuration: 0,
848
            logDir: getExperimentRootDir(),
849
            nextSequenceId: 0,
850
851
852
853
854
855
            params: {
                authorName: '',
                experimentName: '',
                trialConcurrency: 0,
                maxExecDuration: 0, // unit: second
                maxTrialNum: 0, // maxTrialNum includes all the submitted trial jobs
856
                trainingServicePlatform: '',
QuanluZhang's avatar
QuanluZhang committed
857
                searchSpace: ''
858
859
            }
        };
Deshui Yu's avatar
Deshui Yu committed
860
    }
861

QuanluZhang's avatar
QuanluZhang committed
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
    private async createCheckpointDir(): Promise<string> {
        // TODO: test
        const chkpDir: string = getCheckpointDir();
        // create checkpoint directory
        await mkDirP(chkpDir);
        // assign this directory to exp profile's checkpointDir
        if (this.experimentProfile.params.advisor) {
            this.experimentProfile.params.advisor.checkpointDir = chkpDir;
        }
        if (this.experimentProfile.params.tuner) {
            this.experimentProfile.params.tuner.checkpointDir = chkpDir;
        }
        if (this.experimentProfile.params.assessor) {
            this.experimentProfile.params.assessor.checkpointDir = chkpDir;
        }

        return Promise.resolve(chkpDir);
    }
Deshui Yu's avatar
Deshui Yu committed
880
881
882
}

export { NNIManager };