nnimanager.ts 36.2 KB
Newer Older
liuzhe-lz's avatar
liuzhe-lz committed
1
2
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
Deshui Yu's avatar
Deshui Yu committed
3
4
5
6

'use strict';

import * as assert from 'assert';
chicm-ms's avatar
chicm-ms committed
7
import { ChildProcess, StdioOptions } from 'child_process';
Deshui Yu's avatar
Deshui Yu committed
8
9
10
import { Deferred } from 'ts-deferred';
import * as component from '../common/component';
import { DataStore, MetricDataRecord, MetricType, TrialJobInfo } from '../common/datastore';
11
import { NNIError } from '../common/errors';
12
import { getExperimentId, getDispatcherPipe } from '../common/experimentStartupInfo';
Deshui Yu's avatar
Deshui Yu committed
13
14
import { getLogger, Logger } from '../common/log';
import {
15
    ExperimentProfile, Manager, ExperimentStatus,
16
    NNIManagerStatus, ProfileUpdateType, TrialJobStatistics
Deshui Yu's avatar
Deshui Yu committed
17
} from '../common/manager';
18
import { ExperimentConfig, toSeconds, toCudaVisibleDevices } from '../common/experimentConfig';
19
import { ExperimentManager } from '../common/experimentManager';
J-shang's avatar
J-shang committed
20
import { TensorboardManager } from '../common/tensorboardManager';
Deshui Yu's avatar
Deshui Yu committed
21
import {
22
    TrainingService, TrialJobApplicationForm, TrialJobDetail, TrialJobMetric, TrialJobStatus, LogType
Deshui Yu's avatar
Deshui Yu committed
23
} from '../common/trainingService';
24
import { delay, getCheckpointDir, getExperimentRootDir, getLogDir, getMsgDispatcherCommand, mkDirP, getTunerProc, getLogLevel, isAlive, killPid } from '../common/utils';
Deshui Yu's avatar
Deshui Yu committed
25
import {
chicm-ms's avatar
chicm-ms committed
26
    INITIALIZE, INITIALIZED, KILL_TRIAL_JOB, NEW_TRIAL_JOB, NO_MORE_TRIAL_JOBS, PING,
27
    REPORT_METRIC_DATA, REQUEST_TRIAL_JOBS, SEND_TRIAL_JOB_PARAMETER, TERMINATE, TRIAL_END, UPDATE_SEARCH_SPACE, IMPORT_DATA
Deshui Yu's avatar
Deshui Yu committed
28
} from './commands';
29
import { createDispatcherInterface, createDispatcherPipeInterface, IpcInterface } from './ipcInterface';
30
import { NNIRestServer } from '../rest_server/nniRestServer';
Deshui Yu's avatar
Deshui Yu committed
31
32

/**
chicm-ms's avatar
chicm-ms committed
33
 * NNIManager which implements Manager interface
Deshui Yu's avatar
Deshui Yu committed
34
35
 */
class NNIManager implements Manager {
36
    private trainingService!: TrainingService;
37
    private dispatcher: IpcInterface | undefined;
38
    private experimentManager: ExperimentManager;
39
    private currSubmittedTrialNum: number;  // need to be recovered
QuanluZhang's avatar
QuanluZhang committed
40
    private trialConcurrencyChange: number; // >0: increase, <0: decrease
Deshui Yu's avatar
Deshui Yu committed
41
42
    private log: Logger;
    private dataStore: DataStore;
43
    private experimentProfile!: ExperimentProfile;
44
    private dispatcherPid: number;
45
    private status: NNIManagerStatus;
46
    private waitingTrials: TrialJobApplicationForm[];
QuanluZhang's avatar
QuanluZhang committed
47
    private trialJobs: Map<string, TrialJobDetail>;
48
    private trialDataForTuner: string;
SparkSnail's avatar
SparkSnail committed
49
    private readonly: boolean;
50
    private config!: ExperimentConfig;
51

52
    private trialJobMetricListener: (metric: TrialJobMetric) => void;
53

Deshui Yu's avatar
Deshui Yu committed
54
55
    constructor() {
        this.currSubmittedTrialNum = 0;
QuanluZhang's avatar
QuanluZhang committed
56
        this.trialConcurrencyChange = 0;
57
        this.experimentManager = component.get(ExperimentManager);
58
        this.dispatcherPid = 0;
QuanluZhang's avatar
QuanluZhang committed
59
60
        this.waitingTrials = [];
        this.trialJobs = new Map<string, TrialJobDetail>();
61
        this.trialDataForTuner = '';
SparkSnail's avatar
SparkSnail committed
62
        this.readonly = false;
Deshui Yu's avatar
Deshui Yu committed
63
64
65

        this.log = getLogger();
        this.dataStore = component.get(DataStore);
66
67
68
        this.status = {
            status: 'INITIALIZED',
            errors: []
Deshui Yu's avatar
Deshui Yu committed
69
        };
chicm-ms's avatar
chicm-ms committed
70
        this.trialJobMetricListener = (metric: TrialJobMetric): void => {
71
72
73
74
            this.onTrialJobMetrics(metric).catch((err: Error) => {
                this.criticalError(NNIError.FromError(err, 'Job metrics error: '));
            });
        };
75
76
77
78
79

        const pipe = getDispatcherPipe();
        if (pipe !== null) {
            this.dispatcher = createDispatcherPipeInterface(pipe);
        }
Deshui Yu's avatar
Deshui Yu committed
80
81
82
    }

    public updateExperimentProfile(experimentProfile: ExperimentProfile, updateType: ProfileUpdateType): Promise<void> {
SparkSnail's avatar
SparkSnail committed
83
84
85
        if (this.readonly) {
            return Promise.reject(new Error('Error: can not update experiment profile in readonly mode!'));
        }
Deshui Yu's avatar
Deshui Yu committed
86
87
88
89
90
        switch (updateType) {
            case 'TRIAL_CONCURRENCY':
                this.updateTrialConcurrency(experimentProfile.params.trialConcurrency);
                break;
            case 'MAX_EXEC_DURATION':
91
                this.experimentProfile.params.maxExperimentDuration = experimentProfile.params.maxExperimentDuration;
Deshui Yu's avatar
Deshui Yu committed
92
93
94
95
                break;
            case 'SEARCH_SPACE':
                this.updateSearchSpace(experimentProfile.params.searchSpace);
                break;
QuanluZhang's avatar
QuanluZhang committed
96
            case 'MAX_TRIAL_NUM':
97
                this.experimentProfile.params.maxTrialNumber = experimentProfile.params.maxTrialNumber;
QuanluZhang's avatar
QuanluZhang committed
98
                break;
Deshui Yu's avatar
Deshui Yu committed
99
100
101
102
103
104
105
            default:
                throw new Error('Error: unrecognized updateType');
        }

        return this.storeExperimentProfile();
    }

106
    public importData(data: string): Promise<void> {
SparkSnail's avatar
SparkSnail committed
107
108
109
        if (this.readonly) {
            return Promise.reject(new Error('Error: can not import data in readonly mode!'));
        }
110
111
112
113
114
115
116
117
118
119
        if (this.dispatcher === undefined) {
            return Promise.reject(
                new Error('tuner has not been setup')
            );
        }
        this.dispatcher.sendCommand(IMPORT_DATA, data);

        return this.dataStore.storeTrialJobEvent('IMPORT_DATA', '', data);
    }

120
121
122
123
    public getImportedData(): Promise<string[]> {
        return this.dataStore.getImportedData();
    }

124
125
126
127
    public async exportData(): Promise<string> {
        return this.dataStore.exportTrialHpConfigs();
    }

128
    public addCustomizedTrialJob(hyperParams: string): Promise<number> {
SparkSnail's avatar
SparkSnail committed
129
130
131
        if (this.readonly) {
            return Promise.reject(new Error('Error: can not add customized trial job in readonly mode!'));
        }
132
        if (this.currSubmittedTrialNum >= this.maxTrialNum) {
133
            return Promise.reject(new Error('reach maxTrialNum'));
Deshui Yu's avatar
Deshui Yu committed
134
        }
135
136
137

        // TODO: NNI manager should not peek tuner's internal protocol, let's refactor this later
        const packedParameter = {
chicm-ms's avatar
chicm-ms committed
138
139
            parameter_id: null, // eslint-disable-line @typescript-eslint/camelcase
            parameter_source: 'customized', // eslint-disable-line @typescript-eslint/camelcase
140
141
142
143
144
145
146
147
148
149
150
            parameters: JSON.parse(hyperParams)
        }

        const form: TrialJobApplicationForm = {
            sequenceId: this.experimentProfile.nextSequenceId++,
            hyperParameters: {
                value: JSON.stringify(packedParameter),
                index: 0
            }
        };
        this.waitingTrials.push(form);
Deshui Yu's avatar
Deshui Yu committed
151
152

        // trial id has not been generated yet, thus use '' instead
153
154
155
        this.dataStore.storeTrialJobEvent('ADD_CUSTOMIZED', '', hyperParams);

        return Promise.resolve(form.sequenceId);
Deshui Yu's avatar
Deshui Yu committed
156
157
158
    }

    public async cancelTrialJobByUser(trialJobId: string): Promise<void> {
SparkSnail's avatar
SparkSnail committed
159
160
161
        if (this.readonly) {
            return Promise.reject(new Error('Error: can not cancel trial job in readonly mode!'));
        }
chicm-ms's avatar
chicm-ms committed
162
        this.log.info(`User cancelTrialJob: ${trialJobId}`);
Deshui Yu's avatar
Deshui Yu committed
163
164
165
166
        await this.trainingService.cancelTrialJob(trialJobId);
        await this.dataStore.storeTrialJobEvent('USER_TO_CANCEL', trialJobId, '');
    }

167
168
169
170
171
172
173
174
175
176
177
178
    public async startExperiment(config: ExperimentConfig): Promise<string> {
        this.experimentProfile = {
            params: config,
            id: getExperimentId(),
            execDuration: 0,
            logDir: getExperimentRootDir(),
            startTime: Date.now(),
            endTime: undefined,
            nextSequenceId: 0,
            revision: 0
        };

chicm-ms's avatar
chicm-ms committed
179
        this.log.info(`Starting experiment: ${this.experimentProfile.id}`);
Deshui Yu's avatar
Deshui Yu committed
180
        await this.storeExperimentProfile();
181

182
183
        this.log.info('Setup training service...');
        this.trainingService = await this.initTrainingService(config);
184

185
186
        this.log.info('Setup tuner...');
        const dispatcherCommand: string = getMsgDispatcherCommand(config);
187
        this.log.debug(`dispatcher command: ${dispatcherCommand}`);
QuanluZhang's avatar
QuanluZhang committed
188
        const checkpointDir: string = await this.createCheckpointDir();
189
        this.setupTuner(dispatcherCommand, undefined, 'start', checkpointDir);
Deshui Yu's avatar
Deshui Yu committed
190

chicm-ms's avatar
chicm-ms committed
191
        this.setStatus('RUNNING');
Deshui Yu's avatar
Deshui Yu committed
192
        await this.storeExperimentProfile();
193
194
        this.run().catch((err: Error) => {
            this.criticalError(err);
Deshui Yu's avatar
Deshui Yu committed
195
        });
196

Deshui Yu's avatar
Deshui Yu committed
197
198
199
        return this.experimentProfile.id;
    }

SparkSnail's avatar
SparkSnail committed
200
    public async resumeExperiment(readonly: boolean): Promise<void> {
Deshui Yu's avatar
Deshui Yu committed
201
202
        //Fetch back the experiment profile
        const experimentId: string = getExperimentId();
203
        this.log.info(`Resuming experiment: ${experimentId}`);
Deshui Yu's avatar
Deshui Yu committed
204
        this.experimentProfile = await this.dataStore.getExperimentProfile(experimentId);
SparkSnail's avatar
SparkSnail committed
205
206
207
208
        this.readonly = readonly;
        if (readonly) {
            return Promise.resolve();
        }
209

210
211
212
        this.log.info('Setup training service...');
        const config: ExperimentConfig = this.experimentProfile.params;
        this.trainingService = await this.initTrainingService(config);
213

214
215
        this.log.info('Setup tuner...');
        const dispatcherCommand: string = getMsgDispatcherCommand(config);
216
        this.log.debug(`dispatcher command: ${dispatcherCommand}`);
QuanluZhang's avatar
QuanluZhang committed
217
        const checkpointDir: string = await this.createCheckpointDir();
218
        this.setupTuner(dispatcherCommand, undefined, 'resume', checkpointDir);
Deshui Yu's avatar
Deshui Yu committed
219
220
221
222
223
224
225
226
227

        const allTrialJobs: TrialJobInfo[] = await this.dataStore.listTrialJobs();

        // Resume currSubmittedTrialNum
        this.currSubmittedTrialNum = allTrialJobs.length;

        // Check the final status for WAITING and RUNNING jobs
        await Promise.all(allTrialJobs
            .filter((job: TrialJobInfo) => job.status === 'WAITING' || job.status === 'RUNNING')
J-shang's avatar
J-shang committed
228
            .map((job: TrialJobInfo) => this.dataStore.storeTrialJobEvent('FAILED', job.trialJobId)));
Deshui Yu's avatar
Deshui Yu committed
229

230
231
232
        // Collect generated trials and imported trials
        const finishedTrialData: string = await this.exportData();
        const importedData: string[] = await this.dataStore.getImportedData();
chicm-ms's avatar
chicm-ms committed
233
        let trialData: Record<string, any>[] = JSON.parse(finishedTrialData);
234
235
        for (const oneImportedData of importedData) {
            // do not deduplicate
chicm-ms's avatar
chicm-ms committed
236
            trialData = trialData.concat(<Record<string, any>[]>JSON.parse(oneImportedData));
237
238
239
        }
        this.trialDataForTuner = JSON.stringify(trialData);

240
241
        if (this.experimentProfile.execDuration < this.maxDuration &&
            this.currSubmittedTrialNum < this.maxTrialNum &&
chicm-ms's avatar
chicm-ms committed
242
243
244
            this.experimentProfile.endTime) {
            delete this.experimentProfile.endTime;
        }
chicm-ms's avatar
chicm-ms committed
245
        this.setStatus('RUNNING');
246

Deshui Yu's avatar
Deshui Yu committed
247
        // TO DO: update database record for resume event
248
249
250
        this.run().catch((err: Error) => {
            this.criticalError(err);
        });
Deshui Yu's avatar
Deshui Yu committed
251
252
    }

253
254
    public getTrialJob(trialJobId: string): Promise<TrialJobInfo> {
        return this.dataStore.getTrialJob(trialJobId);
Deshui Yu's avatar
Deshui Yu committed
255
256
    }

257
258
    public async setClusterMetadata(_key: string, _value: string): Promise<void> {
        throw new Error('Calling removed API setClusterMetadata');
Deshui Yu's avatar
Deshui Yu committed
259
260
    }

261
262
    public getClusterMetadata(_key: string): Promise<string> {
        throw new Error('Calling removed API getClusterMetadata');
Deshui Yu's avatar
Deshui Yu committed
263
264
265
266
267
268
    }

    public async getTrialJobStatistics(): Promise<TrialJobStatistics[]> {
        return this.dataStore.getTrialJobStatistics();
    }

269
    public async stopExperiment(): Promise<void> {
270
271
272
273
274
        await this.stopExperimentTopHalf();
        await this.stopExperimentBottomHalf();
    }

    public async stopExperimentTopHalf(): Promise<void> {
chicm-ms's avatar
chicm-ms committed
275
276
        this.setStatus('STOPPING');
        this.log.info('Stopping experiment, cleaning up ...');
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298

        if (this.dispatcher === undefined) {
            this.log.error('Tuner has not been setup');
            return;
        }

        this.trainingService.removeTrialJobMetricListener(this.trialJobMetricListener);
        if (this.dispatcherPid > 0) {
            this.dispatcher.sendCommand(TERMINATE);
            // gracefully terminate tuner and assessor here, wait at most 30 seconds.
            for (let i: number = 0; i < 30; i++) {
                if (!await isAlive(this.dispatcherPid)) {
                    break;
                }
                await delay(1000);
            }
            await killPid(this.dispatcherPid);
        }
        this.dispatcher = undefined;
    }

    public async stopExperimentBottomHalf(): Promise<void> {
299
300
301
302
303
304
305
306
307
308
309
310
311
312
        try {
            const trialJobList: TrialJobDetail[] = await this.trainingService.listTrialJobs();

            // DON'T try to make it in parallel, the training service may not handle it well.
            // If there is performance concern, consider to support batch cancellation on training service.
            for (const trialJob of trialJobList) {
                if (trialJob.status === 'RUNNING' ||
                    trialJob.status === 'WAITING') {
                    try {
                        this.log.info(`cancelTrialJob: ${trialJob.id}`);
                        await this.trainingService.cancelTrialJob(trialJob.id);
                    } catch (error) {
                        this.log.debug(`ignorable error on canceling trial ${trialJob.id}. ${error}`);
                    }
313
314
                }
            }
315
316
317
            await this.trainingService.cleanUp();
        } catch (err) {
            this.log.error(`${err.stack}`);
318
319
320
321
322
323
        }
        if (this.experimentProfile.endTime === undefined) {
            this.setEndtime();
        }
        await this.storeExperimentProfile();
        this.setStatus('STOPPED');
chicm-ms's avatar
chicm-ms committed
324
        this.log.info('Experiment stopped.');
325
326
327

        let hasError: boolean = false;
        try {
liuzhe-lz's avatar
liuzhe-lz committed
328
            await this.experimentManager.stop();
J-shang's avatar
J-shang committed
329
            await component.get<TensorboardManager>(TensorboardManager).stop();
liuzhe-lz's avatar
liuzhe-lz committed
330
            await this.dataStore.close();
331
332
333
334
335
336
337
338
            await component.get<NNIRestServer>(NNIRestServer).stop();
        } catch (err) {
            hasError = true;
            this.log.error(`${err.stack}`);
        } finally {
            this.log.close();
            process.exit(hasError ? 1 : 0);
        }
Deshui Yu's avatar
Deshui Yu committed
339
340
    }

341
    public async getMetricData(trialJobId?: string, metricType?: MetricType): Promise<MetricDataRecord[]> {
Deshui Yu's avatar
Deshui Yu committed
342
343
344
        return this.dataStore.getMetricData(trialJobId, metricType);
    }

345
346
347
348
349
350
    public async getMetricDataByRange(minSeqId: number, maxSeqId: number): Promise<MetricDataRecord[]> {
        const trialJobs = await this.dataStore.listTrialJobs();
        const targetTrials = trialJobs.filter(trial => (
            // FIXME: can this be undefined?
            trial.sequenceId !== undefined && minSeqId <= trial.sequenceId && trial.sequenceId <= maxSeqId
        ));
J-shang's avatar
J-shang committed
351
        const targetTrialIds = new Set(targetTrials.map(trial => trial.trialJobId));
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375

        const allMetrics = await this.dataStore.getMetricData();
        return allMetrics.filter(metric => targetTrialIds.has(metric.trialJobId));
    }

    public async getLatestMetricData(): Promise<MetricDataRecord[]> {
        // FIXME: this can take a long time
        const allMetrics: MetricDataRecord[] = await this.dataStore.getMetricData();
        const finals: MetricDataRecord[] = [];
        const latestIntermediates: Map<string, MetricDataRecord> = new Map<string, MetricDataRecord>();
        for (const metric of allMetrics) {
            if (metric.type !== 'PERIODICAL') {
                finals.push(metric);
            } else {
                const old: MetricDataRecord | undefined = latestIntermediates.get(metric.trialJobId);
                if (old === undefined || old.sequence <= metric.sequence) {
                    latestIntermediates.set(metric.trialJobId, metric);
                }
            }
        }
        return finals.concat(Array.from(latestIntermediates.values()));
        // FIXME: unit test
    }

376
377
378
379
    public async getTrialLog(trialJobId: string, logType: LogType): Promise<string> {
        return this.trainingService.getTrialLog(trialJobId, logType);
    }

Deshui Yu's avatar
Deshui Yu committed
380
381
382
383
384
385
386
387
    public getExperimentProfile(): Promise<ExperimentProfile> {
        // TO DO: using Promise.resolve()
        const deferred: Deferred<ExperimentProfile> = new Deferred<ExperimentProfile>();
        deferred.resolve(this.experimentProfile);

        return deferred.promise;
    }

388
389
390
391
    public getStatus(): NNIManagerStatus {
        return this.status;
    }

Deshui Yu's avatar
Deshui Yu committed
392
393
394
395
    public async listTrialJobs(status?: TrialJobStatus): Promise<TrialJobInfo[]> {
        return this.dataStore.listTrialJobs(status);
    }

396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
    private get maxDuration(): number {
        const value = this.experimentProfile.params.maxExperimentDuration;
        return (value === undefined ? Infinity : toSeconds(value));
    }

    private get maxTrialNum(): number {
        const value = this.experimentProfile.params.maxTrialNumber;
        return (value === undefined ? Infinity : value);
    }

    private async initTrainingService(config: ExperimentConfig): Promise<TrainingService> {
        this.config = config;
        const platform = Array.isArray(config.trainingService) ? 'hybrid' : config.trainingService.platform;

        if (['remote', 'pai', 'aml', 'hybrid'].includes(platform)) {
            const module_ = await import('../training_service/reusable/routerTrainingService');
            return new module_.RouterTrainingService(config);
        } else if (platform === 'local') {
            const module_ = await import('../training_service/local/localTrainingService');
            return new module_.LocalTrainingService(config);
        } else if (platform === 'kubeflow') {
            const module_ = await import('../training_service/kubernetes/kubeflow/kubeflowTrainingService');
            return new module_.KubeflowTrainingService();
        } else if (platform === 'frameworkcontroller') {
            const module_ = await import('../training_service/kubernetes/frameworkcontroller/frameworkcontrollerTrainingService');
            return new module_.FrameworkControllerTrainingService();
        } else if (platform === 'adl') {
            const module_ = await import('../training_service/kubernetes/adl/adlTrainingService');
            return new module_.AdlTrainingService();
        }

        throw new Error(`Unsupported training service platform "${platform}"`);
    }

430
431
    private setupTuner(command: string, cwd: string | undefined, mode: 'start' | 'resume', dataDirectory: string): void {
        if (this.dispatcher !== undefined) {
Deshui Yu's avatar
Deshui Yu committed
432
433
            return;
        }
goooxu's avatar
goooxu committed
434
        const stdio: StdioOptions = ['ignore', process.stdout, process.stderr, 'pipe', 'pipe'];
Deshui Yu's avatar
Deshui Yu committed
435
436
437
438
439
440
441
        let newCwd: string;
        if (cwd === undefined || cwd === '') {
            newCwd = getLogDir();
        } else {
            newCwd = cwd;
        }
        // TO DO: add CUDA_VISIBLE_DEVICES
442
        const includeIntermediateResultsEnv = !!(this.config.deprecated && this.config.deprecated.includeIntermediateResults);
443

chicm-ms's avatar
chicm-ms committed
444
        const nniEnv = {
chicm-ms's avatar
chicm-ms committed
445
            SDK_PROCESS: 'dispatcher',
Zejun Lin's avatar
Zejun Lin committed
446
447
            NNI_MODE: mode,
            NNI_CHECKPOINT_DIRECTORY: dataDirectory,
448
            NNI_LOG_DIRECTORY: getLogDir(),
449
            NNI_LOG_LEVEL: getLogLevel(),
450
            NNI_INCLUDE_INTERMEDIATE_RESULTS: includeIntermediateResultsEnv,
451
            CUDA_VISIBLE_DEVICES: toCudaVisibleDevices(this.experimentProfile.params.tunerGpuIndices)
Zejun Lin's avatar
Zejun Lin committed
452
        };
chicm-ms's avatar
chicm-ms committed
453
        const newEnv = Object.assign({}, process.env, nniEnv);
454
        const tunerProc: ChildProcess = getTunerProc(command, stdio, newCwd, newEnv);
455
456
        this.dispatcherPid = tunerProc.pid;
        this.dispatcher = createDispatcherInterface(tunerProc);
Deshui Yu's avatar
Deshui Yu committed
457
458
459
460
461

        return;
    }

    private updateTrialConcurrency(trialConcurrency: number): void {
QuanluZhang's avatar
QuanluZhang committed
462
463
        // we assume trialConcurrency >= 0, which is checked by restserver
        this.trialConcurrencyChange += (trialConcurrency - this.experimentProfile.params.trialConcurrency);
Deshui Yu's avatar
Deshui Yu committed
464
465
466
467
468
469
        this.experimentProfile.params.trialConcurrency = trialConcurrency;

        return;
    }

    private updateSearchSpace(searchSpace: string): void {
470
        if (this.dispatcher === undefined) {
Deshui Yu's avatar
Deshui Yu committed
471
472
            throw new Error('Error: tuner has not been setup');
        }
473
        this.dispatcher.sendCommand(UPDATE_SEARCH_SPACE, searchSpace);
Deshui Yu's avatar
Deshui Yu committed
474
475
476
477
478
479
        this.experimentProfile.params.searchSpace = searchSpace;

        return;
    }

    private async periodicallyUpdateExecDuration(): Promise<void> {
480
        let count: number = 1;
481
        while (!['ERROR', 'STOPPING', 'STOPPED'].includes(this.status.status)) {
482
            await delay(1000 * 1); // 1 seconds
483
            if (['RUNNING', 'NO_MORE_TRIAL', 'TUNER_NO_MORE_TRIAL'].includes(this.status.status)) {
484
485
486
487
488
489
                this.experimentProfile.execDuration += 1;
                if (count % 10 === 0) {
                    await this.storeExperimentProfile();
                }
            }
            count += 1;
Deshui Yu's avatar
Deshui Yu committed
490
491
492
        }
    }

chicm-ms's avatar
chicm-ms committed
493
494
495
496
497
498
    private async pingDispatcher(): Promise<void> {
        if (this.dispatcher === undefined) {
            throw new Error('Error: tuner has not been setup');
        }
        while (!['ERROR', 'STOPPING', 'STOPPED'].includes(this.status.status)) {
            this.dispatcher.sendCommand(PING);
chicm-ms's avatar
chicm-ms committed
499
            await delay(1000 * 5);
chicm-ms's avatar
chicm-ms committed
500
501
502
        }
    }

QuanluZhang's avatar
QuanluZhang committed
503
504
    private async requestTrialJobsStatus(): Promise<number> {
        let finishedTrialJobNum: number = 0;
QuanluZhang's avatar
QuanluZhang committed
505
506
507
        if (this.dispatcher === undefined) {
            throw new Error('Error: tuner has not been setup');
        }
QuanluZhang's avatar
QuanluZhang committed
508
509
510
511
        for (const trialJobId of Array.from(this.trialJobs.keys())) {
            const trialJobDetail: TrialJobDetail = await this.trainingService.getTrialJob(trialJobId);
            const oldTrialJobDetail: TrialJobDetail | undefined = this.trialJobs.get(trialJobId);
            if (oldTrialJobDetail !== undefined && oldTrialJobDetail.status !== trialJobDetail.status) {
chicm-ms's avatar
chicm-ms committed
512
                this.log.info(`Trial job ${trialJobDetail.id} status changed from ${oldTrialJobDetail.status} to ${trialJobDetail.status}`);
QuanluZhang's avatar
QuanluZhang committed
513
                this.trialJobs.set(trialJobId, Object.assign({}, trialJobDetail));
514
                await this.dataStore.storeTrialJobEvent(trialJobDetail.status, trialJobDetail.id, undefined, trialJobDetail);
QuanluZhang's avatar
QuanluZhang committed
515
            }
516
517
518
519
            const newTrialJobDetail: TrialJobDetail | undefined = this.trialJobs.get(trialJobId);
            if (newTrialJobDetail !== undefined) {
                newTrialJobDetail.message = trialJobDetail.message;
            }
QuanluZhang's avatar
QuanluZhang committed
520
            let hyperParams: string | undefined = undefined;
QuanluZhang's avatar
QuanluZhang committed
521
522
523
            switch (trialJobDetail.status) {
                case 'SUCCEEDED':
                case 'USER_CANCELED':
QuanluZhang's avatar
QuanluZhang committed
524
                case 'EARLY_STOPPED':
QuanluZhang's avatar
QuanluZhang committed
525
526
                    this.trialJobs.delete(trialJobId);
                    finishedTrialJobNum++;
527
                    hyperParams = trialJobDetail.form.hyperParameters.value;
QuanluZhang's avatar
QuanluZhang committed
528
                    this.dispatcher.sendCommand(TRIAL_END, JSON.stringify({
chicm-ms's avatar
chicm-ms committed
529
                        trial_job_id: trialJobDetail.id, // eslint-disable-line @typescript-eslint/camelcase
QuanluZhang's avatar
QuanluZhang committed
530
                        event: trialJobDetail.status,
chicm-ms's avatar
chicm-ms committed
531
                        hyper_params: hyperParams // eslint-disable-line @typescript-eslint/camelcase
goooxu's avatar
goooxu committed
532
                    }));
QuanluZhang's avatar
QuanluZhang committed
533
534
535
536
537
538
539
                    break;
                case 'FAILED':
                case 'SYS_CANCELED':
                    // In the current version, we do not retry
                    // TO DO: push this job to queue for retry
                    this.trialJobs.delete(trialJobId);
                    finishedTrialJobNum++;
540
                    hyperParams = trialJobDetail.form.hyperParameters.value;
QuanluZhang's avatar
QuanluZhang committed
541
                    this.dispatcher.sendCommand(TRIAL_END, JSON.stringify({
chicm-ms's avatar
chicm-ms committed
542
                        trial_job_id: trialJobDetail.id, // eslint-disable-line @typescript-eslint/camelcase
QuanluZhang's avatar
QuanluZhang committed
543
                        event: trialJobDetail.status,
chicm-ms's avatar
chicm-ms committed
544
                        hyper_params: hyperParams // eslint-disable-line @typescript-eslint/camelcase
goooxu's avatar
goooxu committed
545
                    }));
QuanluZhang's avatar
QuanluZhang committed
546
547
548
549
550
551
552
553
554
555
                    break;
                case 'WAITING':
                case 'RUNNING':
                case 'UNKNOWN':
                    // Do nothing
                    break;
                default:
                // TO DO: add warning in log
            }
        }
goooxu's avatar
goooxu committed
556

Gems Guo's avatar
Gems Guo committed
557
        return finishedTrialJobNum;
QuanluZhang's avatar
QuanluZhang committed
558
559
560
561
562
563
    }

    private async manageTrials(): Promise<void> {
        if (this.dispatcher === undefined) {
            throw new Error('Error: tuner has not been setup');
        }
QuanluZhang's avatar
QuanluZhang committed
564
        let allFinishedTrialJobNum: number = this.currSubmittedTrialNum;
QuanluZhang's avatar
QuanluZhang committed
565
        let waitSubmittedToFinish: number;
566
        while (!['ERROR', 'STOPPING', 'STOPPED'].includes(this.status.status)) {
QuanluZhang's avatar
QuanluZhang committed
567
568
569
570
571
572
573
            const finishedTrialJobNum: number = await this.requestTrialJobsStatus();
            allFinishedTrialJobNum += finishedTrialJobNum;

            // requestTrialNum is the number of trials that will be requested from tuner.
            // If trialConcurrency does not change, requestTrialNum equals finishedTrialJobNum.
            // If trialConcurrency changes, for example, trialConcurrency increases by 2 (trialConcurrencyChange=2), then
            // requestTrialNum equals 2 + finishedTrialJobNum and trialConcurrencyChange becomes 0.
574
            // If trialConcurrency changes, for example, trialConcurrency decreases by 4 (trialConcurrencyChange=-4) and
QuanluZhang's avatar
QuanluZhang committed
575
576
577
578
579
580
581
582
            // finishedTrialJobNum is 2, then requestTrialNum becomes -2. No trial will be requested from tuner,
            // and trialConcurrencyChange becomes -2.
            const requestTrialNum: number = this.trialConcurrencyChange + finishedTrialJobNum;
            if (requestTrialNum >= 0) {
                this.trialConcurrencyChange = 0;
            } else {
                this.trialConcurrencyChange = requestTrialNum;
            }
chicm-ms's avatar
chicm-ms committed
583

584
            this.requestTrialJobs(requestTrialNum);
chicm-ms's avatar
chicm-ms committed
585

QuanluZhang's avatar
QuanluZhang committed
586
            // check maxtrialnum and maxduration here
587
            // NO_MORE_TRIAL is more like a subset of RUNNING, because during RUNNING tuner
588
            // might tell nnimanager that this is no more trials. In NO_MORE_TRIAL state, the experiment is viewed
589
590
            // as still running. DONE could be transfered from RUNNING or NO_MORE_TRIAL.
            assert(this.status.status === 'RUNNING' ||
591
                this.status.status === 'DONE' ||
QuanluZhang's avatar
QuanluZhang committed
592
                this.status.status === 'NO_MORE_TRIAL' ||
593
                this.status.status === 'TUNER_NO_MORE_TRIAL', `Actual status: ${this.status.status}`);
594
595
            if (this.experimentProfile.execDuration > this.maxDuration ||
                this.currSubmittedTrialNum >= this.maxTrialNum) {
QuanluZhang's avatar
QuanluZhang committed
596
                if (this.status.status !== 'DONE') {
chicm-ms's avatar
chicm-ms committed
597
                    this.setStatus('NO_MORE_TRIAL');
QuanluZhang's avatar
QuanluZhang committed
598
599
600
601
                    waitSubmittedToFinish = this.currSubmittedTrialNum;

                    assert(allFinishedTrialJobNum <= waitSubmittedToFinish);
                    if (allFinishedTrialJobNum >= waitSubmittedToFinish) {
chicm-ms's avatar
chicm-ms committed
602
                        this.setStatus('DONE');
603
                        this.setEndtime();
QuanluZhang's avatar
QuanluZhang committed
604
605
606
607
                        await this.storeExperimentProfile();
                        // write this log for travis CI
                        this.log.info('Experiment done.');
                    }
QuanluZhang's avatar
QuanluZhang committed
608
609
610
                }
            } else {
                if (this.status.status === 'DONE') {
611
612
                    delete this.experimentProfile.endTime;
                    await this.storeExperimentProfile();
QuanluZhang's avatar
QuanluZhang committed
613
                }
QuanluZhang's avatar
QuanluZhang committed
614
                if (this.status.status !== 'TUNER_NO_MORE_TRIAL') {
chicm-ms's avatar
chicm-ms committed
615
                    this.setStatus('RUNNING');
616
                }
QuanluZhang's avatar
QuanluZhang committed
617
618
                for (let i: number = this.trialJobs.size; i < this.experimentProfile.params.trialConcurrency; i++) {
                    if (this.waitingTrials.length === 0 ||
619
                        this.currSubmittedTrialNum >= this.maxTrialNum) {
QuanluZhang's avatar
QuanluZhang committed
620
621
                        break;
                    }
622
                    const form = this.waitingTrials.shift() as TrialJobApplicationForm;
QuanluZhang's avatar
QuanluZhang committed
623
                    this.currSubmittedTrialNum++;
624
625
                    this.log.info(`submitTrialJob: form: ${JSON.stringify(form)}`);
                    const trialJobDetail: TrialJobDetail = await this.trainingService.submitTrialJob(form);
626
                    const Snapshot: TrialJobDetail = Object.assign({}, trialJobDetail);
627
                    await this.storeExperimentProfile();
628
                    this.trialJobs.set(trialJobDetail.id, Snapshot);
QuanluZhang's avatar
QuanluZhang committed
629
630
631
                    const trialJobDetailSnapshot: TrialJobDetail | undefined = this.trialJobs.get(trialJobDetail.id);
                    if (trialJobDetailSnapshot != undefined) {
                        await this.dataStore.storeTrialJobEvent(
632
                            trialJobDetailSnapshot.status, trialJobDetailSnapshot.id, form.hyperParameters.value, trialJobDetailSnapshot);
QuanluZhang's avatar
QuanluZhang committed
633
634
635
636
637
638
639
640
641
                    } else {
                        assert(false, `undefined trialJobDetail in trialJobs: ${trialJobDetail.id}`);
                    }
                }
            }
            await delay(1000 * 5); // 5 seconds
        }
    }

Deshui Yu's avatar
Deshui Yu committed
642
643
644
645
646
647
    private storeExperimentProfile(): Promise<void> {
        this.experimentProfile.revision += 1;

        return this.dataStore.storeExperimentProfile(this.experimentProfile);
    }

648
    private async run(): Promise<void> {
QuanluZhang's avatar
QuanluZhang committed
649
        assert(this.dispatcher !== undefined);
650
651
652
653
654
655
656

        this.addEventListeners();

        this.sendInitTunerCommands();

        await Promise.all([
            this.periodicallyUpdateExecDuration(),
chicm-ms's avatar
chicm-ms committed
657
            this.pingDispatcher().catch((err: Error) => {
chicm-ms's avatar
chicm-ms committed
658
                throw NNIError.FromError(err, 'Dispatcher error: ');
chicm-ms's avatar
chicm-ms committed
659
            }),
660
            this.trainingService.run().catch((err: Error) => {
chicm-ms's avatar
chicm-ms committed
661
                throw NNIError.FromError(err, 'Training service error: ');
662
            }),
QuanluZhang's avatar
QuanluZhang committed
663
            this.manageTrials().catch((err: Error) => {
chicm-ms's avatar
chicm-ms committed
664
                throw NNIError.FromError(err, 'Job management error: ');
665
            })]);
666
667
    }

QuanluZhang's avatar
QuanluZhang committed
668
    private addEventListeners(): void {
chicm-ms's avatar
chicm-ms committed
669
        this.log.info('Add event listeners');
670
        // TO DO: cannot run this method more than once in one NNIManager instance
QuanluZhang's avatar
QuanluZhang committed
671
        if (this.dispatcher === undefined) {
672
673
            throw new Error('Error: tuner or job maintainer have not been setup');
        }
674
        this.trainingService.addTrialJobMetricListener(this.trialJobMetricListener);
675
676
677

        this.dispatcher.onCommand((commandType: string, content: string) => {
            this.onTunerCommand(commandType, content).catch((err: Error) => {
chicm-ms's avatar
chicm-ms committed
678
                this.criticalError(NNIError.FromError(err, 'Tuner command event error: '));
679
680
            });
        });
681
682
683
684
        this.dispatcher.onError((error: Error) => {
            this.log.error(`Dispatcher error: ${error.message}`);
            this.criticalError(new Error('Dispatcher stream error, tuner may have crashed.'));
        });
685
686
687
688
    }

    private sendInitTunerCommands(): void {
        if (this.dispatcher === undefined) {
689
            throw new Error('Dispatcher error: tuner has not been setup');
690
        }
chicm-ms's avatar
chicm-ms committed
691
692
        this.log.debug(`Send tuner command: INITIALIZE: ${this.experimentProfile.params.searchSpace}`);
        // Tuner need to be initialized with search space before generating any hyper parameters
693
        this.dispatcher.sendCommand(INITIALIZE, JSON.stringify(this.experimentProfile.params.searchSpace));
694
695
696
    }

    private async onTrialJobMetrics(metric: TrialJobMetric): Promise<void> {
697
        this.log.debug(`NNIManager received trial job metrics: ${JSON.stringify(metric)}`);
698
699
700
701
702
703
704
705
        if (this.trialJobs.has(metric.id)){
            await this.dataStore.storeMetricData(metric.id, metric.data);
            if (this.dispatcher === undefined) {
                throw new Error('Error: tuner has not been setup');
            }
            this.dispatcher.sendCommand(REPORT_METRIC_DATA, metric.data);
        } else {
            this.log.warning(`NNIManager received non-existent trial job metrics: ${metric}`);
706
707
708
        }
    }

chicm-ms's avatar
chicm-ms committed
709
710
711
712
713
714
715
    private requestTrialJobs(jobNum: number): void {
        if (jobNum < 1) {
            return;
        }
        if (this.dispatcher === undefined) {
            throw new Error('Dispatcher error: tuner has not been setup');
        }
716
        if (this.config.deprecated && this.config.deprecated.multiThread) {
chicm-ms's avatar
chicm-ms committed
717
718
719
720
721
722
723
724
725
726
727
            // Send multiple requests to ensure multiple hyper parameters are generated in non-blocking way.
            // For a single REQUEST_TRIAL_JOBS request, hyper parameters are generated one by one
            // sequentially.
            for (let i: number = 0; i < jobNum; i++) {
                this.dispatcher.sendCommand(REQUEST_TRIAL_JOBS, '1');
            }
        } else {
            this.dispatcher.sendCommand(REQUEST_TRIAL_JOBS, String(jobNum));
        }
    }

728
    private async onTunerCommand(commandType: string, content: string): Promise<void> {
horizon365's avatar
horizon365 committed
729
        this.log.info(`NNIManager received command from dispatcher: ${commandType}, ${content}`);
730
        switch (commandType) {
chicm-ms's avatar
chicm-ms committed
731
            case INITIALIZED: {
chicm-ms's avatar
chicm-ms committed
732
                // Tuner is intialized, search space is set, request tuner to generate hyper parameters
733
734
735
736
737
738
                if (this.trialDataForTuner.length > 0) {
                    if (this.dispatcher === undefined) {
                        throw new Error('Dispatcher error: tuner has not been setup');
                    }
                    this.dispatcher.sendCommand(IMPORT_DATA, this.trialDataForTuner);
                }
chicm-ms's avatar
chicm-ms committed
739
740
                this.requestTrialJobs(this.experimentProfile.params.trialConcurrency);
                break;
chicm-ms's avatar
chicm-ms committed
741
742
            }
            case NEW_TRIAL_JOB: {
QuanluZhang's avatar
QuanluZhang committed
743
                if (this.status.status === 'TUNER_NO_MORE_TRIAL') {
744
                    this.log.warning('It is not supposed to receive more trials after NO_MORE_TRIAL is set');
chicm-ms's avatar
chicm-ms committed
745
                    this.setStatus('RUNNING');
746
                }
747
748
749
750
751
752
753
754
                const form: TrialJobApplicationForm = {
                    sequenceId: this.experimentProfile.nextSequenceId++,
                    hyperParameters: {
                        value: content,
                        index: 0
                    }
                };
                this.waitingTrials.push(form);
755
                break;
chicm-ms's avatar
chicm-ms committed
756
757
            }
            case SEND_TRIAL_JOB_PARAMETER: {
chicm-ms's avatar
chicm-ms committed
758
759
760
761
762
                const tunerCommand: any = JSON.parse(content);
                assert(tunerCommand.parameter_index >= 0);
                assert(tunerCommand.trial_job_id !== undefined);

                const trialJobForm: TrialJobApplicationForm = {
763
                    sequenceId: -1,  // FIXME: multi-phase tuner should use sequence ID instead of trial job ID
chicm-ms's avatar
chicm-ms committed
764
765
766
767
768
                    hyperParameters: {
                        value: content,
                        index: tunerCommand.parameter_index
                    }
                };
chicm-ms's avatar
chicm-ms committed
769
                this.log.info(`updateTrialJob: job id: ${tunerCommand.trial_job_id}, form: ${JSON.stringify(trialJobForm)}`);
chicm-ms's avatar
chicm-ms committed
770
                await this.trainingService.updateTrialJob(tunerCommand.trial_job_id, trialJobForm);
771
772
773
774
775
                if (tunerCommand['parameters'] !== null) {
                    // parameters field is set as empty string if no more hyper parameter can be generated by tuner.
                    await this.dataStore.storeTrialJobEvent(
                        'ADD_HYPERPARAMETER', tunerCommand.trial_job_id, content, undefined);
                }
chicm-ms's avatar
chicm-ms committed
776
                break;
chicm-ms's avatar
chicm-ms committed
777
778
            }
            case NO_MORE_TRIAL_JOBS: {
779
780
781
                if (!['ERROR', 'STOPPING', 'STOPPED'].includes(this.status.status)) {
                    this.setStatus('TUNER_NO_MORE_TRIAL');
                }
782
                break;
chicm-ms's avatar
chicm-ms committed
783
784
            }
            case KILL_TRIAL_JOB: {
chicm-ms's avatar
chicm-ms committed
785
                this.log.info(`cancelTrialJob: ${JSON.parse(content)}`);
QuanluZhang's avatar
QuanluZhang committed
786
                await this.trainingService.cancelTrialJob(JSON.parse(content), true);
787
                break;
chicm-ms's avatar
chicm-ms committed
788
            }
789
790
791
            default:
                throw new Error('Error: unsupported command type from tuner');
        }
Deshui Yu's avatar
Deshui Yu committed
792
793
    }

794
795
796
797
798
799
800
801
802
803
    private criticalError(err: Error): void {
        this.logError(err);
        console.error(err);
    }

    private logError(err: Error): void {
        if (err.stack !== undefined) {
            this.log.error(err.stack);
        }
        this.status.errors.push(err.message);
804
        this.setEndtime();
chicm-ms's avatar
chicm-ms committed
805
806
807
808
809
810
811
        this.setStatus('ERROR');
    }

    private setStatus(status: ExperimentStatus): void {
        if (status !== this.status.status) {
            this.log.info(`Change NNIManager status from: ${this.status.status} to: ${status}`);
            this.status.status = status;
812
            this.experimentManager.setExperimentInfo(this.experimentProfile.id, 'status', this.status.status);
chicm-ms's avatar
chicm-ms committed
813
        }
814
815
    }

816
817
818
819
820
    private setEndtime(): void {
        this.experimentProfile.endTime = Date.now();
        this.experimentManager.setExperimentInfo(this.experimentProfile.id, 'endTime', this.experimentProfile.endTime);
    }

QuanluZhang's avatar
QuanluZhang committed
821
822
823
824
    private async createCheckpointDir(): Promise<string> {
        // TODO: test
        const chkpDir: string = getCheckpointDir();
        await mkDirP(chkpDir);
825
        return chkpDir;
QuanluZhang's avatar
QuanluZhang committed
826
    }
J-shang's avatar
J-shang committed
827
828
829
830
831
832
833
834

    public async getTrialOutputLocalPath(trialJobId: string): Promise<string> {
        return this.trainingService.getTrialOutputLocalPath(trialJobId);
    }

    public async fetchTrialOutput(trialJobId: string, subpath: string): Promise<void> {
        return this.trainingService.fetchTrialOutput(trialJobId, subpath);
    }
Deshui Yu's avatar
Deshui Yu committed
835
836
837
}

export { NNIManager };