nnimanager.ts 37.4 KB
Newer Older
liuzhe-lz's avatar
liuzhe-lz committed
1
2
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
Deshui Yu's avatar
Deshui Yu committed
3
4
5
6

'use strict';

import * as assert from 'assert';
chicm-ms's avatar
chicm-ms committed
7
import { ChildProcess, StdioOptions } from 'child_process';
Deshui Yu's avatar
Deshui Yu committed
8
9
10
import { Deferred } from 'ts-deferred';
import * as component from '../common/component';
import { DataStore, MetricDataRecord, MetricType, TrialJobInfo } from '../common/datastore';
11
import { NNIError } from '../common/errors';
12
import { getExperimentId, getDispatcherPipe } from '../common/experimentStartupInfo';
Deshui Yu's avatar
Deshui Yu committed
13
14
import { getLogger, Logger } from '../common/log';
import {
chicm-ms's avatar
chicm-ms committed
15
    ExperimentParams, ExperimentProfile, Manager, ExperimentStatus,
16
    NNIManagerStatus, ProfileUpdateType, TrialJobStatistics
Deshui Yu's avatar
Deshui Yu committed
17
} from '../common/manager';
18
import { ExperimentManager } from '../common/experimentManager';
Deshui Yu's avatar
Deshui Yu committed
19
import {
20
    TrainingService, TrialJobApplicationForm, TrialJobDetail, TrialJobMetric, TrialJobStatus, LogType
Deshui Yu's avatar
Deshui Yu committed
21
} from '../common/trainingService';
22
import { delay, getCheckpointDir, getExperimentRootDir, getLogDir, getMsgDispatcherCommand, mkDirP, getTunerProc, getLogLevel, isAlive, killPid } from '../common/utils';
Deshui Yu's avatar
Deshui Yu committed
23
import {
chicm-ms's avatar
chicm-ms committed
24
    INITIALIZE, INITIALIZED, KILL_TRIAL_JOB, NEW_TRIAL_JOB, NO_MORE_TRIAL_JOBS, PING,
25
    REPORT_METRIC_DATA, REQUEST_TRIAL_JOBS, SEND_TRIAL_JOB_PARAMETER, TERMINATE, TRIAL_END, UPDATE_SEARCH_SPACE, IMPORT_DATA
Deshui Yu's avatar
Deshui Yu committed
26
} from './commands';
27
import { createDispatcherInterface, createDispatcherPipeInterface, IpcInterface } from './ipcInterface';
28
import { NNIRestServer } from '../rest_server/nniRestServer';
Deshui Yu's avatar
Deshui Yu committed
29
30

/**
chicm-ms's avatar
chicm-ms committed
31
 * NNIManager which implements Manager interface
Deshui Yu's avatar
Deshui Yu committed
32
33
34
 */
class NNIManager implements Manager {
    private trainingService: TrainingService;
35
    private dispatcher: IpcInterface | undefined;
36
    private experimentManager: ExperimentManager;
37
    private currSubmittedTrialNum: number;  // need to be recovered
QuanluZhang's avatar
QuanluZhang committed
38
    private trialConcurrencyChange: number; // >0: increase, <0: decrease
Deshui Yu's avatar
Deshui Yu committed
39
40
41
    private log: Logger;
    private dataStore: DataStore;
    private experimentProfile: ExperimentProfile;
42
    private dispatcherPid: number;
43
    private status: NNIManagerStatus;
44
    private waitingTrials: TrialJobApplicationForm[];
QuanluZhang's avatar
QuanluZhang committed
45
    private trialJobs: Map<string, TrialJobDetail>;
46
    private trialDataForTuner: string;
SparkSnail's avatar
SparkSnail committed
47
    private readonly: boolean;
48

49
    private trialJobMetricListener: (metric: TrialJobMetric) => void;
50

Deshui Yu's avatar
Deshui Yu committed
51
52
    constructor() {
        this.currSubmittedTrialNum = 0;
QuanluZhang's avatar
QuanluZhang committed
53
        this.trialConcurrencyChange = 0;
Deshui Yu's avatar
Deshui Yu committed
54
        this.trainingService = component.get(TrainingService);
55
        this.experimentManager = component.get(ExperimentManager);
Deshui Yu's avatar
Deshui Yu committed
56
        assert(this.trainingService);
57
        this.dispatcherPid = 0;
QuanluZhang's avatar
QuanluZhang committed
58
59
        this.waitingTrials = [];
        this.trialJobs = new Map<string, TrialJobDetail>();
60
        this.trialDataForTuner = '';
SparkSnail's avatar
SparkSnail committed
61
        this.readonly = false;
Deshui Yu's avatar
Deshui Yu committed
62
63
64

        this.log = getLogger();
        this.dataStore = component.get(DataStore);
65
66
67
68
        this.experimentProfile = this.createEmptyExperimentProfile();
        this.status = {
            status: 'INITIALIZED',
            errors: []
Deshui Yu's avatar
Deshui Yu committed
69
        };
chicm-ms's avatar
chicm-ms committed
70
        this.trialJobMetricListener = (metric: TrialJobMetric): void => {
71
72
73
74
            this.onTrialJobMetrics(metric).catch((err: Error) => {
                this.criticalError(NNIError.FromError(err, 'Job metrics error: '));
            });
        };
75
76
77
78
79

        const pipe = getDispatcherPipe();
        if (pipe !== null) {
            this.dispatcher = createDispatcherPipeInterface(pipe);
        }
Deshui Yu's avatar
Deshui Yu committed
80
81
82
    }

    public updateExperimentProfile(experimentProfile: ExperimentProfile, updateType: ProfileUpdateType): Promise<void> {
SparkSnail's avatar
SparkSnail committed
83
84
85
        if (this.readonly) {
            return Promise.reject(new Error('Error: can not update experiment profile in readonly mode!'));
        }
Deshui Yu's avatar
Deshui Yu committed
86
87
88
89
90
91
92
93
94
95
        switch (updateType) {
            case 'TRIAL_CONCURRENCY':
                this.updateTrialConcurrency(experimentProfile.params.trialConcurrency);
                break;
            case 'MAX_EXEC_DURATION':
                this.updateMaxExecDuration(experimentProfile.params.maxExecDuration);
                break;
            case 'SEARCH_SPACE':
                this.updateSearchSpace(experimentProfile.params.searchSpace);
                break;
QuanluZhang's avatar
QuanluZhang committed
96
97
98
            case 'MAX_TRIAL_NUM':
                this.updateMaxTrialNum(experimentProfile.params.maxTrialNum);
                break;
Deshui Yu's avatar
Deshui Yu committed
99
100
101
102
103
104
105
            default:
                throw new Error('Error: unrecognized updateType');
        }

        return this.storeExperimentProfile();
    }

106
    public importData(data: string): Promise<void> {
SparkSnail's avatar
SparkSnail committed
107
108
109
        if (this.readonly) {
            return Promise.reject(new Error('Error: can not import data in readonly mode!'));
        }
110
111
112
113
114
115
116
117
118
119
        if (this.dispatcher === undefined) {
            return Promise.reject(
                new Error('tuner has not been setup')
            );
        }
        this.dispatcher.sendCommand(IMPORT_DATA, data);

        return this.dataStore.storeTrialJobEvent('IMPORT_DATA', '', data);
    }

120
121
122
123
    public getImportedData(): Promise<string[]> {
        return this.dataStore.getImportedData();
    }

124
125
126
127
    public async exportData(): Promise<string> {
        return this.dataStore.exportTrialHpConfigs();
    }

128
    public addCustomizedTrialJob(hyperParams: string): Promise<number> {
SparkSnail's avatar
SparkSnail committed
129
130
131
        if (this.readonly) {
            return Promise.reject(new Error('Error: can not add customized trial job in readonly mode!'));
        }
Deshui Yu's avatar
Deshui Yu committed
132
        if (this.currSubmittedTrialNum >= this.experimentProfile.params.maxTrialNum) {
133
            return Promise.reject(new Error('reach maxTrialNum'));
Deshui Yu's avatar
Deshui Yu committed
134
        }
135
136
137

        // TODO: NNI manager should not peek tuner's internal protocol, let's refactor this later
        const packedParameter = {
chicm-ms's avatar
chicm-ms committed
138
139
            parameter_id: null, // eslint-disable-line @typescript-eslint/camelcase
            parameter_source: 'customized', // eslint-disable-line @typescript-eslint/camelcase
140
141
142
143
144
145
146
147
148
149
150
            parameters: JSON.parse(hyperParams)
        }

        const form: TrialJobApplicationForm = {
            sequenceId: this.experimentProfile.nextSequenceId++,
            hyperParameters: {
                value: JSON.stringify(packedParameter),
                index: 0
            }
        };
        this.waitingTrials.push(form);
Deshui Yu's avatar
Deshui Yu committed
151
152

        // trial id has not been generated yet, thus use '' instead
153
154
155
        this.dataStore.storeTrialJobEvent('ADD_CUSTOMIZED', '', hyperParams);

        return Promise.resolve(form.sequenceId);
Deshui Yu's avatar
Deshui Yu committed
156
157
158
    }

    public async cancelTrialJobByUser(trialJobId: string): Promise<void> {
SparkSnail's avatar
SparkSnail committed
159
160
161
        if (this.readonly) {
            return Promise.reject(new Error('Error: can not cancel trial job in readonly mode!'));
        }
chicm-ms's avatar
chicm-ms committed
162
        this.log.info(`User cancelTrialJob: ${trialJobId}`);
Deshui Yu's avatar
Deshui Yu committed
163
164
165
166
167
        await this.trainingService.cancelTrialJob(trialJobId);
        await this.dataStore.storeTrialJobEvent('USER_TO_CANCEL', trialJobId, '');
    }

    public async startExperiment(expParams: ExperimentParams): Promise<string> {
chicm-ms's avatar
chicm-ms committed
168
        this.log.info(`Starting experiment: ${this.experimentProfile.id}`);
Deshui Yu's avatar
Deshui Yu committed
169
170
171
        this.experimentProfile.params = expParams;
        await this.storeExperimentProfile();
        this.log.debug('Setup tuner...');
172

173
        // Set up multiphase config
174
        if (expParams.multiPhase && this.trainingService.isMultiPhaseJobSupported) {
175
176
            this.trainingService.setClusterMetadata('multiPhase', expParams.multiPhase.toString());
        }
177
178
179
180
        // Set up versionCheck config
        if (expParams.versionCheck !== undefined) {
            this.trainingService.setClusterMetadata('version_check', expParams.versionCheck.toString());
        }
SparkSnail's avatar
SparkSnail committed
181
182
183
184
        // Set up logCollection config
        if (expParams.logCollection !== undefined) {
            this.trainingService.setClusterMetadata('log_collection', expParams.logCollection.toString());
        }
185

chicm-ms's avatar
chicm-ms committed
186
        const dispatcherCommand: string = getMsgDispatcherCommand(expParams);
187
        this.log.debug(`dispatcher command: ${dispatcherCommand}`);
QuanluZhang's avatar
QuanluZhang committed
188
        const checkpointDir: string = await this.createCheckpointDir();
Deshui Yu's avatar
Deshui Yu committed
189
        this.setupTuner(
190
191
            dispatcherCommand,
            undefined,
Deshui Yu's avatar
Deshui Yu committed
192
            'start',
QuanluZhang's avatar
QuanluZhang committed
193
            checkpointDir);
Deshui Yu's avatar
Deshui Yu committed
194

195
        this.experimentProfile.startTime = Date.now();
chicm-ms's avatar
chicm-ms committed
196
        this.setStatus('RUNNING');
Deshui Yu's avatar
Deshui Yu committed
197
        await this.storeExperimentProfile();
198
199
        this.run().catch((err: Error) => {
            this.criticalError(err);
Deshui Yu's avatar
Deshui Yu committed
200
        });
201

Deshui Yu's avatar
Deshui Yu committed
202
203
204
        return this.experimentProfile.id;
    }

SparkSnail's avatar
SparkSnail committed
205
    public async resumeExperiment(readonly: boolean): Promise<void> {
chicm-ms's avatar
chicm-ms committed
206
        this.log.info(`Resuming experiment: ${this.experimentProfile.id}`);
Deshui Yu's avatar
Deshui Yu committed
207
208
209
        //Fetch back the experiment profile
        const experimentId: string = getExperimentId();
        this.experimentProfile = await this.dataStore.getExperimentProfile(experimentId);
SparkSnail's avatar
SparkSnail committed
210
211
212
213
        this.readonly = readonly;
        if (readonly) {
            return Promise.resolve();
        }
Deshui Yu's avatar
Deshui Yu committed
214
        const expParams: ExperimentParams = this.experimentProfile.params;
215

216
        // Set up multiphase config
217
        if (expParams.multiPhase && this.trainingService.isMultiPhaseJobSupported) {
218
219
220
            this.trainingService.setClusterMetadata('multiPhase', expParams.multiPhase.toString());
        }

221
222
        // Set up versionCheck config
        if (expParams.versionCheck !== undefined) {
SparkSnail's avatar
SparkSnail committed
223
            this.trainingService.setClusterMetadata('version_check', expParams.versionCheck.toString());
224
225
        }

chicm-ms's avatar
chicm-ms committed
226
        const dispatcherCommand: string = getMsgDispatcherCommand(expParams);
227
        this.log.debug(`dispatcher command: ${dispatcherCommand}`);
QuanluZhang's avatar
QuanluZhang committed
228
        const checkpointDir: string = await this.createCheckpointDir();
Deshui Yu's avatar
Deshui Yu committed
229
        this.setupTuner(
230
231
            dispatcherCommand,
            undefined,
Deshui Yu's avatar
Deshui Yu committed
232
            'resume',
QuanluZhang's avatar
QuanluZhang committed
233
            checkpointDir);
Deshui Yu's avatar
Deshui Yu committed
234
235
236
237
238
239
240
241
242

        const allTrialJobs: TrialJobInfo[] = await this.dataStore.listTrialJobs();

        // Resume currSubmittedTrialNum
        this.currSubmittedTrialNum = allTrialJobs.length;

        // Check the final status for WAITING and RUNNING jobs
        await Promise.all(allTrialJobs
            .filter((job: TrialJobInfo) => job.status === 'WAITING' || job.status === 'RUNNING')
J-shang's avatar
J-shang committed
243
            .map((job: TrialJobInfo) => this.dataStore.storeTrialJobEvent('FAILED', job.trialJobId)));
Deshui Yu's avatar
Deshui Yu committed
244

245
246
247
        // Collect generated trials and imported trials
        const finishedTrialData: string = await this.exportData();
        const importedData: string[] = await this.dataStore.getImportedData();
chicm-ms's avatar
chicm-ms committed
248
        let trialData: Record<string, any>[] = JSON.parse(finishedTrialData);
249
250
        for (const oneImportedData of importedData) {
            // do not deduplicate
chicm-ms's avatar
chicm-ms committed
251
            trialData = trialData.concat(<Record<string, any>[]>JSON.parse(oneImportedData));
252
253
254
        }
        this.trialDataForTuner = JSON.stringify(trialData);

chicm-ms's avatar
chicm-ms committed
255
256
257
258
259
        if (this.experimentProfile.execDuration < this.experimentProfile.params.maxExecDuration &&
            this.currSubmittedTrialNum < this.experimentProfile.params.maxTrialNum &&
            this.experimentProfile.endTime) {
            delete this.experimentProfile.endTime;
        }
chicm-ms's avatar
chicm-ms committed
260
        this.setStatus('RUNNING');
261

Deshui Yu's avatar
Deshui Yu committed
262
        // TO DO: update database record for resume event
263
264
265
        this.run().catch((err: Error) => {
            this.criticalError(err);
        });
Deshui Yu's avatar
Deshui Yu committed
266
267
    }

268
269
    public getTrialJob(trialJobId: string): Promise<TrialJobInfo> {
        return this.dataStore.getTrialJob(trialJobId);
Deshui Yu's avatar
Deshui Yu committed
270
271
272
    }

    public async setClusterMetadata(key: string, value: string): Promise<void> {
SparkSnail's avatar
SparkSnail committed
273
274
275
        if (this.readonly) {
            return Promise.reject(new Error('Error: can not set cluster metadata in readonly mode!'));
        }
chicm-ms's avatar
chicm-ms committed
276
        this.log.info(`NNIManager setClusterMetadata, key: ${key}, value: ${value}`);
Deshui Yu's avatar
Deshui Yu committed
277
278
279
280
        let timeoutId: NodeJS.Timer;
        // TO DO: move timeout value to constants file
        const delay1: Promise<{}> = new Promise((resolve: Function, reject: Function): void => {
            timeoutId = setTimeout(
281
                () => { reject(new Error('TrainingService setClusterMetadata timeout. Please check your config file.')); },
282
                30000);
Deshui Yu's avatar
Deshui Yu committed
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
        });
        await Promise.race([delay1, this.trainingService.setClusterMetadata(key, value)]).finally(() => {
            clearTimeout(timeoutId);
        });
    }

    public getClusterMetadata(key: string): Promise<string> {
        return Promise.resolve(
            this.trainingService.getClusterMetadata(key)
        );
    }

    public async getTrialJobStatistics(): Promise<TrialJobStatistics[]> {
        return this.dataStore.getTrialJobStatistics();
    }

299
    public async stopExperiment(): Promise<void> {
300
301
302
303
304
        await this.stopExperimentTopHalf();
        await this.stopExperimentBottomHalf();
    }

    public async stopExperimentTopHalf(): Promise<void> {
chicm-ms's avatar
chicm-ms committed
305
306
        this.setStatus('STOPPING');
        this.log.info('Stopping experiment, cleaning up ...');
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350

        if (this.dispatcher === undefined) {
            this.log.error('Tuner has not been setup');
            return;
        }

        this.trainingService.removeTrialJobMetricListener(this.trialJobMetricListener);
        if (this.dispatcherPid > 0) {
            this.dispatcher.sendCommand(TERMINATE);
            // gracefully terminate tuner and assessor here, wait at most 30 seconds.
            for (let i: number = 0; i < 30; i++) {
                if (!await isAlive(this.dispatcherPid)) {
                    break;
                }
                await delay(1000);
            }
            await killPid(this.dispatcherPid);
        }
        this.dispatcher = undefined;
    }

    public async stopExperimentBottomHalf(): Promise<void> {
        const trialJobList: TrialJobDetail[] = await this.trainingService.listTrialJobs();

        // DON'T try to make it in parallel, the training service may not handle it well.
        // If there is performance concern, consider to support batch cancellation on training service.
        for (const trialJob of trialJobList) {
            if (trialJob.status === 'RUNNING' ||
                trialJob.status === 'WAITING') {
                try {
                    this.log.info(`cancelTrialJob: ${trialJob.id}`);
                    await this.trainingService.cancelTrialJob(trialJob.id);
                } catch (error) {
                    this.log.debug(`ignorable error on canceling trial ${trialJob.id}. ${error}`);
                }
            }
        }

        await this.trainingService.cleanUp();
        if (this.experimentProfile.endTime === undefined) {
            this.setEndtime();
        }
        await this.storeExperimentProfile();
        this.setStatus('STOPPED');
chicm-ms's avatar
chicm-ms committed
351
        this.log.info('Experiment stopped.');
352
353
354
355
356
357
358
359
360
361
362
363
364

        let hasError: boolean = false;
        try {
            this.experimentManager.stop();
            this.dataStore.close();
            await component.get<NNIRestServer>(NNIRestServer).stop();
        } catch (err) {
            hasError = true;
            this.log.error(`${err.stack}`);
        } finally {
            this.log.close();
            process.exit(hasError ? 1 : 0);
        }
Deshui Yu's avatar
Deshui Yu committed
365
366
    }

367
    public async getMetricData(trialJobId?: string, metricType?: MetricType): Promise<MetricDataRecord[]> {
Deshui Yu's avatar
Deshui Yu committed
368
369
370
        return this.dataStore.getMetricData(trialJobId, metricType);
    }

371
372
373
374
375
376
    public async getMetricDataByRange(minSeqId: number, maxSeqId: number): Promise<MetricDataRecord[]> {
        const trialJobs = await this.dataStore.listTrialJobs();
        const targetTrials = trialJobs.filter(trial => (
            // FIXME: can this be undefined?
            trial.sequenceId !== undefined && minSeqId <= trial.sequenceId && trial.sequenceId <= maxSeqId
        ));
J-shang's avatar
J-shang committed
377
        const targetTrialIds = new Set(targetTrials.map(trial => trial.trialJobId));
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401

        const allMetrics = await this.dataStore.getMetricData();
        return allMetrics.filter(metric => targetTrialIds.has(metric.trialJobId));
    }

    public async getLatestMetricData(): Promise<MetricDataRecord[]> {
        // FIXME: this can take a long time
        const allMetrics: MetricDataRecord[] = await this.dataStore.getMetricData();
        const finals: MetricDataRecord[] = [];
        const latestIntermediates: Map<string, MetricDataRecord> = new Map<string, MetricDataRecord>();
        for (const metric of allMetrics) {
            if (metric.type !== 'PERIODICAL') {
                finals.push(metric);
            } else {
                const old: MetricDataRecord | undefined = latestIntermediates.get(metric.trialJobId);
                if (old === undefined || old.sequence <= metric.sequence) {
                    latestIntermediates.set(metric.trialJobId, metric);
                }
            }
        }
        return finals.concat(Array.from(latestIntermediates.values()));
        // FIXME: unit test
    }

402
403
404
405
    public async getTrialLog(trialJobId: string, logType: LogType): Promise<string> {
        return this.trainingService.getTrialLog(trialJobId, logType);
    }

Deshui Yu's avatar
Deshui Yu committed
406
407
408
409
410
411
412
413
    public getExperimentProfile(): Promise<ExperimentProfile> {
        // TO DO: using Promise.resolve()
        const deferred: Deferred<ExperimentProfile> = new Deferred<ExperimentProfile>();
        deferred.resolve(this.experimentProfile);

        return deferred.promise;
    }

414
415
416
417
    public getStatus(): NNIManagerStatus {
        return this.status;
    }

Deshui Yu's avatar
Deshui Yu committed
418
419
420
421
    public async listTrialJobs(status?: TrialJobStatus): Promise<TrialJobInfo[]> {
        return this.dataStore.listTrialJobs(status);
    }

422
423
    private setupTuner(command: string, cwd: string | undefined, mode: 'start' | 'resume', dataDirectory: string): void {
        if (this.dispatcher !== undefined) {
Deshui Yu's avatar
Deshui Yu committed
424
425
            return;
        }
goooxu's avatar
goooxu committed
426
        const stdio: StdioOptions = ['ignore', process.stdout, process.stderr, 'pipe', 'pipe'];
Deshui Yu's avatar
Deshui Yu committed
427
428
429
430
431
432
433
        let newCwd: string;
        if (cwd === undefined || cwd === '') {
            newCwd = getLogDir();
        } else {
            newCwd = cwd;
        }
        // TO DO: add CUDA_VISIBLE_DEVICES
434
435
436
437
438
        let includeIntermediateResultsEnv: boolean | undefined = false;
        if (this.experimentProfile.params.tuner !== undefined) {
            includeIntermediateResultsEnv = this.experimentProfile.params.tuner.includeIntermediateResults;
        }

chicm-ms's avatar
chicm-ms committed
439
        const nniEnv = {
chicm-ms's avatar
chicm-ms committed
440
            SDK_PROCESS: 'dispatcher',
Zejun Lin's avatar
Zejun Lin committed
441
442
            NNI_MODE: mode,
            NNI_CHECKPOINT_DIRECTORY: dataDirectory,
443
            NNI_LOG_DIRECTORY: getLogDir(),
444
            NNI_LOG_LEVEL: getLogLevel(),
445
446
            NNI_INCLUDE_INTERMEDIATE_RESULTS: includeIntermediateResultsEnv,
            CUDA_VISIBLE_DEVICES: this.getGpuEnvvarValue()
Zejun Lin's avatar
Zejun Lin committed
447
        };
chicm-ms's avatar
chicm-ms committed
448
        const newEnv = Object.assign({}, process.env, nniEnv);
449
        const tunerProc: ChildProcess = getTunerProc(command, stdio, newCwd, newEnv);
450
451
        this.dispatcherPid = tunerProc.pid;
        this.dispatcher = createDispatcherInterface(tunerProc);
Deshui Yu's avatar
Deshui Yu committed
452
453
454
455

        return;
    }

456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
    private getGpuEnvvarValue(): string {
        let cudaDevices: string | undefined;

        if (this.experimentProfile.params.advisor !== undefined) {
            cudaDevices = this.experimentProfile.params.advisor.gpuIndices;
        } else if (this.experimentProfile.params.tuner !== undefined) {
            cudaDevices = this.experimentProfile.params.tuner.gpuIndices;
        }

        if (cudaDevices === undefined) {
            return '';
        } else {
            return cudaDevices;
        }
    }

Deshui Yu's avatar
Deshui Yu committed
472
    private updateTrialConcurrency(trialConcurrency: number): void {
QuanluZhang's avatar
QuanluZhang committed
473
474
        // we assume trialConcurrency >= 0, which is checked by restserver
        this.trialConcurrencyChange += (trialConcurrency - this.experimentProfile.params.trialConcurrency);
Deshui Yu's avatar
Deshui Yu committed
475
476
477
478
479
480
481
482
483
484
485
486
        this.experimentProfile.params.trialConcurrency = trialConcurrency;

        return;
    }

    private updateMaxExecDuration(duration: number): void {
        this.experimentProfile.params.maxExecDuration = duration;

        return;
    }

    private updateSearchSpace(searchSpace: string): void {
487
        if (this.dispatcher === undefined) {
Deshui Yu's avatar
Deshui Yu committed
488
489
            throw new Error('Error: tuner has not been setup');
        }
490
        this.dispatcher.sendCommand(UPDATE_SEARCH_SPACE, searchSpace);
Deshui Yu's avatar
Deshui Yu committed
491
492
493
494
495
        this.experimentProfile.params.searchSpace = searchSpace;

        return;
    }

QuanluZhang's avatar
QuanluZhang committed
496
497
498
499
500
501
    private updateMaxTrialNum(maxTrialNum: number): void {
        this.experimentProfile.params.maxTrialNum = maxTrialNum;

        return;
    }

Deshui Yu's avatar
Deshui Yu committed
502
    private async periodicallyUpdateExecDuration(): Promise<void> {
503
        let count: number = 1;
504
        while (!['ERROR', 'STOPPING', 'STOPPED'].includes(this.status.status)) {
505
            await delay(1000 * 1); // 1 seconds
506
            if (['RUNNING', 'NO_MORE_TRIAL', 'TUNER_NO_MORE_TRIAL'].includes(this.status.status)) {
507
508
509
510
511
512
                this.experimentProfile.execDuration += 1;
                if (count % 10 === 0) {
                    await this.storeExperimentProfile();
                }
            }
            count += 1;
Deshui Yu's avatar
Deshui Yu committed
513
514
515
        }
    }

chicm-ms's avatar
chicm-ms committed
516
517
518
519
520
521
    private async pingDispatcher(): Promise<void> {
        if (this.dispatcher === undefined) {
            throw new Error('Error: tuner has not been setup');
        }
        while (!['ERROR', 'STOPPING', 'STOPPED'].includes(this.status.status)) {
            this.dispatcher.sendCommand(PING);
chicm-ms's avatar
chicm-ms committed
522
            await delay(1000 * 5);
chicm-ms's avatar
chicm-ms committed
523
524
525
        }
    }

QuanluZhang's avatar
QuanluZhang committed
526
527
    private async requestTrialJobsStatus(): Promise<number> {
        let finishedTrialJobNum: number = 0;
QuanluZhang's avatar
QuanluZhang committed
528
529
530
        if (this.dispatcher === undefined) {
            throw new Error('Error: tuner has not been setup');
        }
QuanluZhang's avatar
QuanluZhang committed
531
532
533
534
        for (const trialJobId of Array.from(this.trialJobs.keys())) {
            const trialJobDetail: TrialJobDetail = await this.trainingService.getTrialJob(trialJobId);
            const oldTrialJobDetail: TrialJobDetail | undefined = this.trialJobs.get(trialJobId);
            if (oldTrialJobDetail !== undefined && oldTrialJobDetail.status !== trialJobDetail.status) {
chicm-ms's avatar
chicm-ms committed
535
                this.log.info(`Trial job ${trialJobDetail.id} status changed from ${oldTrialJobDetail.status} to ${trialJobDetail.status}`);
QuanluZhang's avatar
QuanluZhang committed
536
                this.trialJobs.set(trialJobId, Object.assign({}, trialJobDetail));
537
                await this.dataStore.storeTrialJobEvent(trialJobDetail.status, trialJobDetail.id, undefined, trialJobDetail);
QuanluZhang's avatar
QuanluZhang committed
538
            }
539
540
541
542
            const newTrialJobDetail: TrialJobDetail | undefined = this.trialJobs.get(trialJobId);
            if (newTrialJobDetail !== undefined) {
                newTrialJobDetail.message = trialJobDetail.message;
            }
QuanluZhang's avatar
QuanluZhang committed
543
            let hyperParams: string | undefined = undefined;
QuanluZhang's avatar
QuanluZhang committed
544
545
546
            switch (trialJobDetail.status) {
                case 'SUCCEEDED':
                case 'USER_CANCELED':
QuanluZhang's avatar
QuanluZhang committed
547
                case 'EARLY_STOPPED':
QuanluZhang's avatar
QuanluZhang committed
548
549
                    this.trialJobs.delete(trialJobId);
                    finishedTrialJobNum++;
550
                    hyperParams = trialJobDetail.form.hyperParameters.value;
QuanluZhang's avatar
QuanluZhang committed
551
                    this.dispatcher.sendCommand(TRIAL_END, JSON.stringify({
chicm-ms's avatar
chicm-ms committed
552
                        trial_job_id: trialJobDetail.id, // eslint-disable-line @typescript-eslint/camelcase
QuanluZhang's avatar
QuanluZhang committed
553
                        event: trialJobDetail.status,
chicm-ms's avatar
chicm-ms committed
554
                        hyper_params: hyperParams // eslint-disable-line @typescript-eslint/camelcase
goooxu's avatar
goooxu committed
555
                    }));
QuanluZhang's avatar
QuanluZhang committed
556
557
558
559
560
561
562
                    break;
                case 'FAILED':
                case 'SYS_CANCELED':
                    // In the current version, we do not retry
                    // TO DO: push this job to queue for retry
                    this.trialJobs.delete(trialJobId);
                    finishedTrialJobNum++;
563
                    hyperParams = trialJobDetail.form.hyperParameters.value;
QuanluZhang's avatar
QuanluZhang committed
564
                    this.dispatcher.sendCommand(TRIAL_END, JSON.stringify({
chicm-ms's avatar
chicm-ms committed
565
                        trial_job_id: trialJobDetail.id, // eslint-disable-line @typescript-eslint/camelcase
QuanluZhang's avatar
QuanluZhang committed
566
                        event: trialJobDetail.status,
chicm-ms's avatar
chicm-ms committed
567
                        hyper_params: hyperParams // eslint-disable-line @typescript-eslint/camelcase
goooxu's avatar
goooxu committed
568
                    }));
QuanluZhang's avatar
QuanluZhang committed
569
570
571
572
573
574
575
576
577
578
                    break;
                case 'WAITING':
                case 'RUNNING':
                case 'UNKNOWN':
                    // Do nothing
                    break;
                default:
                // TO DO: add warning in log
            }
        }
goooxu's avatar
goooxu committed
579

Gems Guo's avatar
Gems Guo committed
580
        return finishedTrialJobNum;
QuanluZhang's avatar
QuanluZhang committed
581
582
583
584
585
586
    }

    private async manageTrials(): Promise<void> {
        if (this.dispatcher === undefined) {
            throw new Error('Error: tuner has not been setup');
        }
QuanluZhang's avatar
QuanluZhang committed
587
        let allFinishedTrialJobNum: number = this.currSubmittedTrialNum;
QuanluZhang's avatar
QuanluZhang committed
588
        let waitSubmittedToFinish: number;
589
        while (!['ERROR', 'STOPPING', 'STOPPED'].includes(this.status.status)) {
QuanluZhang's avatar
QuanluZhang committed
590
591
592
593
594
595
596
            const finishedTrialJobNum: number = await this.requestTrialJobsStatus();
            allFinishedTrialJobNum += finishedTrialJobNum;

            // requestTrialNum is the number of trials that will be requested from tuner.
            // If trialConcurrency does not change, requestTrialNum equals finishedTrialJobNum.
            // If trialConcurrency changes, for example, trialConcurrency increases by 2 (trialConcurrencyChange=2), then
            // requestTrialNum equals 2 + finishedTrialJobNum and trialConcurrencyChange becomes 0.
597
            // If trialConcurrency changes, for example, trialConcurrency decreases by 4 (trialConcurrencyChange=-4) and
QuanluZhang's avatar
QuanluZhang committed
598
599
600
601
602
603
604
605
            // finishedTrialJobNum is 2, then requestTrialNum becomes -2. No trial will be requested from tuner,
            // and trialConcurrencyChange becomes -2.
            const requestTrialNum: number = this.trialConcurrencyChange + finishedTrialJobNum;
            if (requestTrialNum >= 0) {
                this.trialConcurrencyChange = 0;
            } else {
                this.trialConcurrencyChange = requestTrialNum;
            }
chicm-ms's avatar
chicm-ms committed
606

607
            this.requestTrialJobs(requestTrialNum);
chicm-ms's avatar
chicm-ms committed
608

QuanluZhang's avatar
QuanluZhang committed
609
            // check maxtrialnum and maxduration here
610
            // NO_MORE_TRIAL is more like a subset of RUNNING, because during RUNNING tuner
611
            // might tell nnimanager that this is no more trials. In NO_MORE_TRIAL state, the experiment is viewed
612
613
            // as still running. DONE could be transfered from RUNNING or NO_MORE_TRIAL.
            assert(this.status.status === 'RUNNING' ||
614
                this.status.status === 'DONE' ||
QuanluZhang's avatar
QuanluZhang committed
615
                this.status.status === 'NO_MORE_TRIAL' ||
616
                this.status.status === 'TUNER_NO_MORE_TRIAL', `Actual status: ${this.status.status}`);
617
            if (this.experimentProfile.execDuration > this.experimentProfile.params.maxExecDuration ||
QuanluZhang's avatar
QuanluZhang committed
618
                this.currSubmittedTrialNum >= this.experimentProfile.params.maxTrialNum) {
QuanluZhang's avatar
QuanluZhang committed
619
                if (this.status.status !== 'DONE') {
chicm-ms's avatar
chicm-ms committed
620
                    this.setStatus('NO_MORE_TRIAL');
QuanluZhang's avatar
QuanluZhang committed
621
622
623
624
                    waitSubmittedToFinish = this.currSubmittedTrialNum;

                    assert(allFinishedTrialJobNum <= waitSubmittedToFinish);
                    if (allFinishedTrialJobNum >= waitSubmittedToFinish) {
chicm-ms's avatar
chicm-ms committed
625
                        this.setStatus('DONE');
626
                        this.setEndtime();
QuanluZhang's avatar
QuanluZhang committed
627
628
629
630
                        await this.storeExperimentProfile();
                        // write this log for travis CI
                        this.log.info('Experiment done.');
                    }
QuanluZhang's avatar
QuanluZhang committed
631
632
633
                }
            } else {
                if (this.status.status === 'DONE') {
634
635
                    delete this.experimentProfile.endTime;
                    await this.storeExperimentProfile();
QuanluZhang's avatar
QuanluZhang committed
636
                }
QuanluZhang's avatar
QuanluZhang committed
637
                if (this.status.status !== 'TUNER_NO_MORE_TRIAL') {
chicm-ms's avatar
chicm-ms committed
638
                    this.setStatus('RUNNING');
639
                }
QuanluZhang's avatar
QuanluZhang committed
640
641
642
643
644
                for (let i: number = this.trialJobs.size; i < this.experimentProfile.params.trialConcurrency; i++) {
                    if (this.waitingTrials.length === 0 ||
                        this.currSubmittedTrialNum >= this.experimentProfile.params.maxTrialNum) {
                        break;
                    }
645
                    const form = this.waitingTrials.shift() as TrialJobApplicationForm;
QuanluZhang's avatar
QuanluZhang committed
646
                    this.currSubmittedTrialNum++;
647
648
                    this.log.info(`submitTrialJob: form: ${JSON.stringify(form)}`);
                    const trialJobDetail: TrialJobDetail = await this.trainingService.submitTrialJob(form);
649
                    const Snapshot: TrialJobDetail = Object.assign({}, trialJobDetail);
650
                    await this.storeExperimentProfile();
651
                    this.trialJobs.set(trialJobDetail.id, Snapshot);
QuanluZhang's avatar
QuanluZhang committed
652
653
654
                    const trialJobDetailSnapshot: TrialJobDetail | undefined = this.trialJobs.get(trialJobDetail.id);
                    if (trialJobDetailSnapshot != undefined) {
                        await this.dataStore.storeTrialJobEvent(
655
                            trialJobDetailSnapshot.status, trialJobDetailSnapshot.id, form.hyperParameters.value, trialJobDetailSnapshot);
QuanluZhang's avatar
QuanluZhang committed
656
657
658
659
660
661
662
663
664
                    } else {
                        assert(false, `undefined trialJobDetail in trialJobs: ${trialJobDetail.id}`);
                    }
                }
            }
            await delay(1000 * 5); // 5 seconds
        }
    }

Deshui Yu's avatar
Deshui Yu committed
665
666
667
668
669
670
    private storeExperimentProfile(): Promise<void> {
        this.experimentProfile.revision += 1;

        return this.dataStore.storeExperimentProfile(this.experimentProfile);
    }

671
    private async run(): Promise<void> {
QuanluZhang's avatar
QuanluZhang committed
672
        assert(this.dispatcher !== undefined);
673
674
675
676
677
678
679

        this.addEventListeners();

        this.sendInitTunerCommands();

        await Promise.all([
            this.periodicallyUpdateExecDuration(),
chicm-ms's avatar
chicm-ms committed
680
            this.pingDispatcher().catch((err: Error) => {
chicm-ms's avatar
chicm-ms committed
681
                throw NNIError.FromError(err, 'Dispatcher error: ');
chicm-ms's avatar
chicm-ms committed
682
            }),
683
            this.trainingService.run().catch((err: Error) => {
chicm-ms's avatar
chicm-ms committed
684
                throw NNIError.FromError(err, 'Training service error: ');
685
            }),
QuanluZhang's avatar
QuanluZhang committed
686
            this.manageTrials().catch((err: Error) => {
chicm-ms's avatar
chicm-ms committed
687
                throw NNIError.FromError(err, 'Job management error: ');
688
            })]);
689
690
    }

QuanluZhang's avatar
QuanluZhang committed
691
    private addEventListeners(): void {
chicm-ms's avatar
chicm-ms committed
692
        this.log.info('Add event listeners');
693
        // TO DO: cannot run this method more than once in one NNIManager instance
QuanluZhang's avatar
QuanluZhang committed
694
        if (this.dispatcher === undefined) {
695
696
            throw new Error('Error: tuner or job maintainer have not been setup');
        }
697
        this.trainingService.addTrialJobMetricListener(this.trialJobMetricListener);
698
699
700

        this.dispatcher.onCommand((commandType: string, content: string) => {
            this.onTunerCommand(commandType, content).catch((err: Error) => {
chicm-ms's avatar
chicm-ms committed
701
                this.criticalError(NNIError.FromError(err, 'Tuner command event error: '));
702
703
            });
        });
704
705
706
707
        this.dispatcher.onError((error: Error) => {
            this.log.error(`Dispatcher error: ${error.message}`);
            this.criticalError(new Error('Dispatcher stream error, tuner may have crashed.'));
        });
708
709
710
711
    }

    private sendInitTunerCommands(): void {
        if (this.dispatcher === undefined) {
712
            throw new Error('Dispatcher error: tuner has not been setup');
713
        }
chicm-ms's avatar
chicm-ms committed
714
715
716
        this.log.debug(`Send tuner command: INITIALIZE: ${this.experimentProfile.params.searchSpace}`);
        // Tuner need to be initialized with search space before generating any hyper parameters
        this.dispatcher.sendCommand(INITIALIZE, this.experimentProfile.params.searchSpace);
717
718
719
    }

    private async onTrialJobMetrics(metric: TrialJobMetric): Promise<void> {
720
        this.log.debug(`NNIManager received trial job metrics: ${JSON.stringify(metric)}`);
721
722
723
724
725
726
727
728
        if (this.trialJobs.has(metric.id)){
            await this.dataStore.storeMetricData(metric.id, metric.data);
            if (this.dispatcher === undefined) {
                throw new Error('Error: tuner has not been setup');
            }
            this.dispatcher.sendCommand(REPORT_METRIC_DATA, metric.data);
        } else {
            this.log.warning(`NNIManager received non-existent trial job metrics: ${metric}`);
729
730
731
        }
    }

chicm-ms's avatar
chicm-ms committed
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
    private requestTrialJobs(jobNum: number): void {
        if (jobNum < 1) {
            return;
        }
        if (this.dispatcher === undefined) {
            throw new Error('Dispatcher error: tuner has not been setup');
        }
        if (this.experimentProfile.params.multiThread) {
            // Send multiple requests to ensure multiple hyper parameters are generated in non-blocking way.
            // For a single REQUEST_TRIAL_JOBS request, hyper parameters are generated one by one
            // sequentially.
            for (let i: number = 0; i < jobNum; i++) {
                this.dispatcher.sendCommand(REQUEST_TRIAL_JOBS, '1');
            }
        } else {
            this.dispatcher.sendCommand(REQUEST_TRIAL_JOBS, String(jobNum));
        }
    }

751
    private async onTunerCommand(commandType: string, content: string): Promise<void> {
horizon365's avatar
horizon365 committed
752
        this.log.info(`NNIManager received command from dispatcher: ${commandType}, ${content}`);
753
        switch (commandType) {
chicm-ms's avatar
chicm-ms committed
754
            case INITIALIZED: {
chicm-ms's avatar
chicm-ms committed
755
                // Tuner is intialized, search space is set, request tuner to generate hyper parameters
756
757
758
759
760
761
                if (this.trialDataForTuner.length > 0) {
                    if (this.dispatcher === undefined) {
                        throw new Error('Dispatcher error: tuner has not been setup');
                    }
                    this.dispatcher.sendCommand(IMPORT_DATA, this.trialDataForTuner);
                }
chicm-ms's avatar
chicm-ms committed
762
763
                this.requestTrialJobs(this.experimentProfile.params.trialConcurrency);
                break;
chicm-ms's avatar
chicm-ms committed
764
765
            }
            case NEW_TRIAL_JOB: {
QuanluZhang's avatar
QuanluZhang committed
766
                if (this.status.status === 'TUNER_NO_MORE_TRIAL') {
767
                    this.log.warning('It is not supposed to receive more trials after NO_MORE_TRIAL is set');
chicm-ms's avatar
chicm-ms committed
768
                    this.setStatus('RUNNING');
769
                }
770
771
772
773
774
775
776
777
                const form: TrialJobApplicationForm = {
                    sequenceId: this.experimentProfile.nextSequenceId++,
                    hyperParameters: {
                        value: content,
                        index: 0
                    }
                };
                this.waitingTrials.push(form);
778
                break;
chicm-ms's avatar
chicm-ms committed
779
780
            }
            case SEND_TRIAL_JOB_PARAMETER: {
chicm-ms's avatar
chicm-ms committed
781
782
783
784
785
                const tunerCommand: any = JSON.parse(content);
                assert(tunerCommand.parameter_index >= 0);
                assert(tunerCommand.trial_job_id !== undefined);

                const trialJobForm: TrialJobApplicationForm = {
786
                    sequenceId: -1,  // FIXME: multi-phase tuner should use sequence ID instead of trial job ID
chicm-ms's avatar
chicm-ms committed
787
788
789
790
791
                    hyperParameters: {
                        value: content,
                        index: tunerCommand.parameter_index
                    }
                };
chicm-ms's avatar
chicm-ms committed
792
                this.log.info(`updateTrialJob: job id: ${tunerCommand.trial_job_id}, form: ${JSON.stringify(trialJobForm)}`);
chicm-ms's avatar
chicm-ms committed
793
                await this.trainingService.updateTrialJob(tunerCommand.trial_job_id, trialJobForm);
794
795
796
797
798
                if (tunerCommand['parameters'] !== null) {
                    // parameters field is set as empty string if no more hyper parameter can be generated by tuner.
                    await this.dataStore.storeTrialJobEvent(
                        'ADD_HYPERPARAMETER', tunerCommand.trial_job_id, content, undefined);
                }
chicm-ms's avatar
chicm-ms committed
799
                break;
chicm-ms's avatar
chicm-ms committed
800
801
            }
            case NO_MORE_TRIAL_JOBS: {
802
803
804
                if (!['ERROR', 'STOPPING', 'STOPPED'].includes(this.status.status)) {
                    this.setStatus('TUNER_NO_MORE_TRIAL');
                }
805
                break;
chicm-ms's avatar
chicm-ms committed
806
807
            }
            case KILL_TRIAL_JOB: {
chicm-ms's avatar
chicm-ms committed
808
                this.log.info(`cancelTrialJob: ${JSON.parse(content)}`);
QuanluZhang's avatar
QuanluZhang committed
809
                await this.trainingService.cancelTrialJob(JSON.parse(content), true);
810
                break;
chicm-ms's avatar
chicm-ms committed
811
            }
812
813
814
            default:
                throw new Error('Error: unsupported command type from tuner');
        }
Deshui Yu's avatar
Deshui Yu committed
815
816
    }

817
818
819
820
821
822
823
824
825
826
    private criticalError(err: Error): void {
        this.logError(err);
        console.error(err);
    }

    private logError(err: Error): void {
        if (err.stack !== undefined) {
            this.log.error(err.stack);
        }
        this.status.errors.push(err.message);
827
        this.setEndtime();
chicm-ms's avatar
chicm-ms committed
828
829
830
831
832
833
834
        this.setStatus('ERROR');
    }

    private setStatus(status: ExperimentStatus): void {
        if (status !== this.status.status) {
            this.log.info(`Change NNIManager status from: ${this.status.status} to: ${status}`);
            this.status.status = status;
835
            this.experimentManager.setExperimentInfo(this.experimentProfile.id, 'status', this.status.status);
chicm-ms's avatar
chicm-ms committed
836
        }
837
838
    }

839
840
841
842
843
    private setEndtime(): void {
        this.experimentProfile.endTime = Date.now();
        this.experimentManager.setExperimentInfo(this.experimentProfile.id, 'endTime', this.experimentProfile.endTime);
    }

844
845
846
847
848
    private createEmptyExperimentProfile(): ExperimentProfile {
        return {
            id: getExperimentId(),
            revision: 0,
            execDuration: 0,
849
            logDir: getExperimentRootDir(),
850
            nextSequenceId: 0,
851
852
853
854
855
856
            params: {
                authorName: '',
                experimentName: '',
                trialConcurrency: 0,
                maxExecDuration: 0, // unit: second
                maxTrialNum: 0, // maxTrialNum includes all the submitted trial jobs
857
                trainingServicePlatform: '',
QuanluZhang's avatar
QuanluZhang committed
858
                searchSpace: ''
859
860
            }
        };
Deshui Yu's avatar
Deshui Yu committed
861
    }
862

QuanluZhang's avatar
QuanluZhang committed
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
    private async createCheckpointDir(): Promise<string> {
        // TODO: test
        const chkpDir: string = getCheckpointDir();
        // create checkpoint directory
        await mkDirP(chkpDir);
        // assign this directory to exp profile's checkpointDir
        if (this.experimentProfile.params.advisor) {
            this.experimentProfile.params.advisor.checkpointDir = chkpDir;
        }
        if (this.experimentProfile.params.tuner) {
            this.experimentProfile.params.tuner.checkpointDir = chkpDir;
        }
        if (this.experimentProfile.params.assessor) {
            this.experimentProfile.params.assessor.checkpointDir = chkpDir;
        }

        return Promise.resolve(chkpDir);
    }
Deshui Yu's avatar
Deshui Yu committed
881
882
883
}

export { NNIManager };