nnimanager.ts 36.3 KB
Newer Older
liuzhe-lz's avatar
liuzhe-lz committed
1
2
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
Deshui Yu's avatar
Deshui Yu committed
3
4
5
6

'use strict';

import * as assert from 'assert';
chicm-ms's avatar
chicm-ms committed
7
import { ChildProcess, StdioOptions } from 'child_process';
Deshui Yu's avatar
Deshui Yu committed
8
9
10
import { Deferred } from 'ts-deferred';
import * as component from '../common/component';
import { DataStore, MetricDataRecord, MetricType, TrialJobInfo } from '../common/datastore';
11
import { NNIError } from '../common/errors';
12
import { getExperimentId, getDispatcherPipe } from '../common/experimentStartupInfo';
Deshui Yu's avatar
Deshui Yu committed
13
14
import { getLogger, Logger } from '../common/log';
import {
15
    ExperimentProfile, Manager, ExperimentStatus,
16
    NNIManagerStatus, ProfileUpdateType, TrialJobStatistics
Deshui Yu's avatar
Deshui Yu committed
17
} from '../common/manager';
18
import { ExperimentConfig, toSeconds, toCudaVisibleDevices } from '../common/experimentConfig';
19
import { ExperimentManager } from '../common/experimentManager';
J-shang's avatar
J-shang committed
20
import { TensorboardManager } from '../common/tensorboardManager';
Deshui Yu's avatar
Deshui Yu committed
21
import {
22
    TrainingService, TrialJobApplicationForm, TrialJobDetail, TrialJobMetric, TrialJobStatus, LogType
Deshui Yu's avatar
Deshui Yu committed
23
} from '../common/trainingService';
24
import { delay, getCheckpointDir, getExperimentRootDir, getLogDir, getMsgDispatcherCommand, mkDirP, getTunerProc, getLogLevel, isAlive, killPid } from '../common/utils';
Deshui Yu's avatar
Deshui Yu committed
25
import {
chicm-ms's avatar
chicm-ms committed
26
    INITIALIZE, INITIALIZED, KILL_TRIAL_JOB, NEW_TRIAL_JOB, NO_MORE_TRIAL_JOBS, PING,
27
    REPORT_METRIC_DATA, REQUEST_TRIAL_JOBS, SEND_TRIAL_JOB_PARAMETER, TERMINATE, TRIAL_END, UPDATE_SEARCH_SPACE, IMPORT_DATA
Deshui Yu's avatar
Deshui Yu committed
28
} from './commands';
29
import { createDispatcherInterface, createDispatcherPipeInterface, IpcInterface } from './ipcInterface';
30
import { NNIRestServer } from '../rest_server/nniRestServer';
Deshui Yu's avatar
Deshui Yu committed
31
32

/**
chicm-ms's avatar
chicm-ms committed
33
 * NNIManager which implements Manager interface
Deshui Yu's avatar
Deshui Yu committed
34
35
 */
class NNIManager implements Manager {
36
    private trainingService!: TrainingService;
37
    private dispatcher: IpcInterface | undefined;
38
    private experimentManager: ExperimentManager;
39
    private currSubmittedTrialNum: number;  // need to be recovered
QuanluZhang's avatar
QuanluZhang committed
40
    private trialConcurrencyChange: number; // >0: increase, <0: decrease
Deshui Yu's avatar
Deshui Yu committed
41
42
    private log: Logger;
    private dataStore: DataStore;
43
    private experimentProfile!: ExperimentProfile;
44
    private dispatcherPid: number;
45
    private status: NNIManagerStatus;
46
    private waitingTrials: TrialJobApplicationForm[];
QuanluZhang's avatar
QuanluZhang committed
47
    private trialJobs: Map<string, TrialJobDetail>;
48
    private trialDataForTuner: string;
SparkSnail's avatar
SparkSnail committed
49
    private readonly: boolean;
50
    private config!: ExperimentConfig;
51

52
    private trialJobMetricListener: (metric: TrialJobMetric) => void;
53

Deshui Yu's avatar
Deshui Yu committed
54
55
    constructor() {
        this.currSubmittedTrialNum = 0;
QuanluZhang's avatar
QuanluZhang committed
56
        this.trialConcurrencyChange = 0;
57
        this.experimentManager = component.get(ExperimentManager);
58
        this.dispatcherPid = 0;
QuanluZhang's avatar
QuanluZhang committed
59
60
        this.waitingTrials = [];
        this.trialJobs = new Map<string, TrialJobDetail>();
61
        this.trialDataForTuner = '';
SparkSnail's avatar
SparkSnail committed
62
        this.readonly = false;
Deshui Yu's avatar
Deshui Yu committed
63
64
65

        this.log = getLogger();
        this.dataStore = component.get(DataStore);
66
67
68
        this.status = {
            status: 'INITIALIZED',
            errors: []
Deshui Yu's avatar
Deshui Yu committed
69
        };
chicm-ms's avatar
chicm-ms committed
70
        this.trialJobMetricListener = (metric: TrialJobMetric): void => {
71
72
73
74
            this.onTrialJobMetrics(metric).catch((err: Error) => {
                this.criticalError(NNIError.FromError(err, 'Job metrics error: '));
            });
        };
75
76
77
78
79

        const pipe = getDispatcherPipe();
        if (pipe !== null) {
            this.dispatcher = createDispatcherPipeInterface(pipe);
        }
Deshui Yu's avatar
Deshui Yu committed
80
81
82
    }

    public updateExperimentProfile(experimentProfile: ExperimentProfile, updateType: ProfileUpdateType): Promise<void> {
SparkSnail's avatar
SparkSnail committed
83
84
85
        if (this.readonly) {
            return Promise.reject(new Error('Error: can not update experiment profile in readonly mode!'));
        }
Deshui Yu's avatar
Deshui Yu committed
86
87
88
89
90
        switch (updateType) {
            case 'TRIAL_CONCURRENCY':
                this.updateTrialConcurrency(experimentProfile.params.trialConcurrency);
                break;
            case 'MAX_EXEC_DURATION':
91
                this.experimentProfile.params.maxExperimentDuration = experimentProfile.params.maxExperimentDuration;
Deshui Yu's avatar
Deshui Yu committed
92
93
94
95
                break;
            case 'SEARCH_SPACE':
                this.updateSearchSpace(experimentProfile.params.searchSpace);
                break;
QuanluZhang's avatar
QuanluZhang committed
96
            case 'MAX_TRIAL_NUM':
97
                this.experimentProfile.params.maxTrialNumber = experimentProfile.params.maxTrialNumber;
QuanluZhang's avatar
QuanluZhang committed
98
                break;
Deshui Yu's avatar
Deshui Yu committed
99
100
101
102
103
104
105
            default:
                throw new Error('Error: unrecognized updateType');
        }

        return this.storeExperimentProfile();
    }

106
    public importData(data: string): Promise<void> {
SparkSnail's avatar
SparkSnail committed
107
108
109
        if (this.readonly) {
            return Promise.reject(new Error('Error: can not import data in readonly mode!'));
        }
110
111
112
113
114
115
116
117
118
119
        if (this.dispatcher === undefined) {
            return Promise.reject(
                new Error('tuner has not been setup')
            );
        }
        this.dispatcher.sendCommand(IMPORT_DATA, data);

        return this.dataStore.storeTrialJobEvent('IMPORT_DATA', '', data);
    }

120
121
122
123
    public getImportedData(): Promise<string[]> {
        return this.dataStore.getImportedData();
    }

124
125
126
127
    public async exportData(): Promise<string> {
        return this.dataStore.exportTrialHpConfigs();
    }

128
    public addCustomizedTrialJob(hyperParams: string): Promise<number> {
SparkSnail's avatar
SparkSnail committed
129
130
131
        if (this.readonly) {
            return Promise.reject(new Error('Error: can not add customized trial job in readonly mode!'));
        }
132
        if (this.currSubmittedTrialNum >= this.maxTrialNum) {
133
            return Promise.reject(new Error('reach maxTrialNum'));
Deshui Yu's avatar
Deshui Yu committed
134
        }
135
136
137

        // TODO: NNI manager should not peek tuner's internal protocol, let's refactor this later
        const packedParameter = {
chicm-ms's avatar
chicm-ms committed
138
139
            parameter_id: null, // eslint-disable-line @typescript-eslint/camelcase
            parameter_source: 'customized', // eslint-disable-line @typescript-eslint/camelcase
140
141
142
143
144
145
146
147
148
149
150
            parameters: JSON.parse(hyperParams)
        }

        const form: TrialJobApplicationForm = {
            sequenceId: this.experimentProfile.nextSequenceId++,
            hyperParameters: {
                value: JSON.stringify(packedParameter),
                index: 0
            }
        };
        this.waitingTrials.push(form);
Deshui Yu's avatar
Deshui Yu committed
151
152

        // trial id has not been generated yet, thus use '' instead
153
154
155
        this.dataStore.storeTrialJobEvent('ADD_CUSTOMIZED', '', hyperParams);

        return Promise.resolve(form.sequenceId);
Deshui Yu's avatar
Deshui Yu committed
156
157
158
    }

    public async cancelTrialJobByUser(trialJobId: string): Promise<void> {
SparkSnail's avatar
SparkSnail committed
159
160
161
        if (this.readonly) {
            return Promise.reject(new Error('Error: can not cancel trial job in readonly mode!'));
        }
chicm-ms's avatar
chicm-ms committed
162
        this.log.info(`User cancelTrialJob: ${trialJobId}`);
Deshui Yu's avatar
Deshui Yu committed
163
164
165
166
        await this.trainingService.cancelTrialJob(trialJobId);
        await this.dataStore.storeTrialJobEvent('USER_TO_CANCEL', trialJobId, '');
    }

167
168
169
170
171
172
173
174
175
176
177
178
    public async startExperiment(config: ExperimentConfig): Promise<string> {
        this.experimentProfile = {
            params: config,
            id: getExperimentId(),
            execDuration: 0,
            logDir: getExperimentRootDir(),
            startTime: Date.now(),
            endTime: undefined,
            nextSequenceId: 0,
            revision: 0
        };

chicm-ms's avatar
chicm-ms committed
179
        this.log.info(`Starting experiment: ${this.experimentProfile.id}`);
Deshui Yu's avatar
Deshui Yu committed
180
        await this.storeExperimentProfile();
181

182
183
        this.log.info('Setup training service...');
        this.trainingService = await this.initTrainingService(config);
184

185
186
        this.log.info('Setup tuner...');
        const dispatcherCommand: string = getMsgDispatcherCommand(config);
187
        this.log.debug(`dispatcher command: ${dispatcherCommand}`);
QuanluZhang's avatar
QuanluZhang committed
188
        const checkpointDir: string = await this.createCheckpointDir();
189
        this.setupTuner(dispatcherCommand, undefined, 'start', checkpointDir);
Deshui Yu's avatar
Deshui Yu committed
190

chicm-ms's avatar
chicm-ms committed
191
        this.setStatus('RUNNING');
Deshui Yu's avatar
Deshui Yu committed
192
        await this.storeExperimentProfile();
193
194
        this.run().catch((err: Error) => {
            this.criticalError(err);
Deshui Yu's avatar
Deshui Yu committed
195
        });
196

Deshui Yu's avatar
Deshui Yu committed
197
198
199
        return this.experimentProfile.id;
    }

SparkSnail's avatar
SparkSnail committed
200
    public async resumeExperiment(readonly: boolean): Promise<void> {
Deshui Yu's avatar
Deshui Yu committed
201
202
        //Fetch back the experiment profile
        const experimentId: string = getExperimentId();
203
        this.log.info(`Resuming experiment: ${experimentId}`);
Deshui Yu's avatar
Deshui Yu committed
204
        this.experimentProfile = await this.dataStore.getExperimentProfile(experimentId);
SparkSnail's avatar
SparkSnail committed
205
206
207
208
        this.readonly = readonly;
        if (readonly) {
            return Promise.resolve();
        }
209

210
211
212
        this.log.info('Setup training service...');
        const config: ExperimentConfig = this.experimentProfile.params;
        this.trainingService = await this.initTrainingService(config);
213

214
215
        this.log.info('Setup tuner...');
        const dispatcherCommand: string = getMsgDispatcherCommand(config);
216
        this.log.debug(`dispatcher command: ${dispatcherCommand}`);
QuanluZhang's avatar
QuanluZhang committed
217
        const checkpointDir: string = await this.createCheckpointDir();
218
        this.setupTuner(dispatcherCommand, undefined, 'resume', checkpointDir);
Deshui Yu's avatar
Deshui Yu committed
219
220
221
222
223
224
225
226
227

        const allTrialJobs: TrialJobInfo[] = await this.dataStore.listTrialJobs();

        // Resume currSubmittedTrialNum
        this.currSubmittedTrialNum = allTrialJobs.length;

        // Check the final status for WAITING and RUNNING jobs
        await Promise.all(allTrialJobs
            .filter((job: TrialJobInfo) => job.status === 'WAITING' || job.status === 'RUNNING')
J-shang's avatar
J-shang committed
228
            .map((job: TrialJobInfo) => this.dataStore.storeTrialJobEvent('FAILED', job.trialJobId)));
Deshui Yu's avatar
Deshui Yu committed
229

230
231
232
        // Collect generated trials and imported trials
        const finishedTrialData: string = await this.exportData();
        const importedData: string[] = await this.dataStore.getImportedData();
chicm-ms's avatar
chicm-ms committed
233
        let trialData: Record<string, any>[] = JSON.parse(finishedTrialData);
234
235
        for (const oneImportedData of importedData) {
            // do not deduplicate
chicm-ms's avatar
chicm-ms committed
236
            trialData = trialData.concat(<Record<string, any>[]>JSON.parse(oneImportedData));
237
238
239
        }
        this.trialDataForTuner = JSON.stringify(trialData);

240
241
        if (this.experimentProfile.execDuration < this.maxDuration &&
            this.currSubmittedTrialNum < this.maxTrialNum &&
chicm-ms's avatar
chicm-ms committed
242
243
244
            this.experimentProfile.endTime) {
            delete this.experimentProfile.endTime;
        }
chicm-ms's avatar
chicm-ms committed
245
        this.setStatus('RUNNING');
246

Deshui Yu's avatar
Deshui Yu committed
247
        // TO DO: update database record for resume event
248
249
250
        this.run().catch((err: Error) => {
            this.criticalError(err);
        });
Deshui Yu's avatar
Deshui Yu committed
251
252
    }

253
254
    public getTrialJob(trialJobId: string): Promise<TrialJobInfo> {
        return this.dataStore.getTrialJob(trialJobId);
Deshui Yu's avatar
Deshui Yu committed
255
256
    }

liuzhe-lz's avatar
liuzhe-lz committed
257
258
259
260
261
    public async setClusterMetadata(key: string, value: string): Promise<void> {
        while (this.trainingService === undefined) {
            await delay(1000);
        }
        this.trainingService.setClusterMetadata(key, value);
Deshui Yu's avatar
Deshui Yu committed
262
263
    }

liuzhe-lz's avatar
liuzhe-lz committed
264
265
    public getClusterMetadata(key: string): Promise<string> {
        return this.trainingService.getClusterMetadata(key);
Deshui Yu's avatar
Deshui Yu committed
266
267
268
269
270
271
    }

    public async getTrialJobStatistics(): Promise<TrialJobStatistics[]> {
        return this.dataStore.getTrialJobStatistics();
    }

272
    public async stopExperiment(): Promise<void> {
273
274
275
276
277
        await this.stopExperimentTopHalf();
        await this.stopExperimentBottomHalf();
    }

    public async stopExperimentTopHalf(): Promise<void> {
chicm-ms's avatar
chicm-ms committed
278
279
        this.setStatus('STOPPING');
        this.log.info('Stopping experiment, cleaning up ...');
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301

        if (this.dispatcher === undefined) {
            this.log.error('Tuner has not been setup');
            return;
        }

        this.trainingService.removeTrialJobMetricListener(this.trialJobMetricListener);
        if (this.dispatcherPid > 0) {
            this.dispatcher.sendCommand(TERMINATE);
            // gracefully terminate tuner and assessor here, wait at most 30 seconds.
            for (let i: number = 0; i < 30; i++) {
                if (!await isAlive(this.dispatcherPid)) {
                    break;
                }
                await delay(1000);
            }
            await killPid(this.dispatcherPid);
        }
        this.dispatcher = undefined;
    }

    public async stopExperimentBottomHalf(): Promise<void> {
302
303
304
305
306
307
308
309
310
311
312
313
314
315
        try {
            const trialJobList: TrialJobDetail[] = await this.trainingService.listTrialJobs();

            // DON'T try to make it in parallel, the training service may not handle it well.
            // If there is performance concern, consider to support batch cancellation on training service.
            for (const trialJob of trialJobList) {
                if (trialJob.status === 'RUNNING' ||
                    trialJob.status === 'WAITING') {
                    try {
                        this.log.info(`cancelTrialJob: ${trialJob.id}`);
                        await this.trainingService.cancelTrialJob(trialJob.id);
                    } catch (error) {
                        this.log.debug(`ignorable error on canceling trial ${trialJob.id}. ${error}`);
                    }
316
317
                }
            }
318
319
320
            await this.trainingService.cleanUp();
        } catch (err) {
            this.log.error(`${err.stack}`);
321
322
323
324
325
326
        }
        if (this.experimentProfile.endTime === undefined) {
            this.setEndtime();
        }
        await this.storeExperimentProfile();
        this.setStatus('STOPPED');
chicm-ms's avatar
chicm-ms committed
327
        this.log.info('Experiment stopped.');
328
329
330

        let hasError: boolean = false;
        try {
liuzhe-lz's avatar
liuzhe-lz committed
331
            await this.experimentManager.stop();
J-shang's avatar
J-shang committed
332
            await component.get<TensorboardManager>(TensorboardManager).stop();
liuzhe-lz's avatar
liuzhe-lz committed
333
            await this.dataStore.close();
334
335
336
337
338
339
340
341
            await component.get<NNIRestServer>(NNIRestServer).stop();
        } catch (err) {
            hasError = true;
            this.log.error(`${err.stack}`);
        } finally {
            this.log.close();
            process.exit(hasError ? 1 : 0);
        }
Deshui Yu's avatar
Deshui Yu committed
342
343
    }

344
    public async getMetricData(trialJobId?: string, metricType?: MetricType): Promise<MetricDataRecord[]> {
Deshui Yu's avatar
Deshui Yu committed
345
346
347
        return this.dataStore.getMetricData(trialJobId, metricType);
    }

348
349
350
351
352
353
    public async getMetricDataByRange(minSeqId: number, maxSeqId: number): Promise<MetricDataRecord[]> {
        const trialJobs = await this.dataStore.listTrialJobs();
        const targetTrials = trialJobs.filter(trial => (
            // FIXME: can this be undefined?
            trial.sequenceId !== undefined && minSeqId <= trial.sequenceId && trial.sequenceId <= maxSeqId
        ));
J-shang's avatar
J-shang committed
354
        const targetTrialIds = new Set(targetTrials.map(trial => trial.trialJobId));
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378

        const allMetrics = await this.dataStore.getMetricData();
        return allMetrics.filter(metric => targetTrialIds.has(metric.trialJobId));
    }

    public async getLatestMetricData(): Promise<MetricDataRecord[]> {
        // FIXME: this can take a long time
        const allMetrics: MetricDataRecord[] = await this.dataStore.getMetricData();
        const finals: MetricDataRecord[] = [];
        const latestIntermediates: Map<string, MetricDataRecord> = new Map<string, MetricDataRecord>();
        for (const metric of allMetrics) {
            if (metric.type !== 'PERIODICAL') {
                finals.push(metric);
            } else {
                const old: MetricDataRecord | undefined = latestIntermediates.get(metric.trialJobId);
                if (old === undefined || old.sequence <= metric.sequence) {
                    latestIntermediates.set(metric.trialJobId, metric);
                }
            }
        }
        return finals.concat(Array.from(latestIntermediates.values()));
        // FIXME: unit test
    }

379
380
381
382
    public async getTrialLog(trialJobId: string, logType: LogType): Promise<string> {
        return this.trainingService.getTrialLog(trialJobId, logType);
    }

Deshui Yu's avatar
Deshui Yu committed
383
384
385
386
387
388
389
390
    public getExperimentProfile(): Promise<ExperimentProfile> {
        // TO DO: using Promise.resolve()
        const deferred: Deferred<ExperimentProfile> = new Deferred<ExperimentProfile>();
        deferred.resolve(this.experimentProfile);

        return deferred.promise;
    }

391
392
393
394
    public getStatus(): NNIManagerStatus {
        return this.status;
    }

Deshui Yu's avatar
Deshui Yu committed
395
396
397
398
    public async listTrialJobs(status?: TrialJobStatus): Promise<TrialJobInfo[]> {
        return this.dataStore.listTrialJobs(status);
    }

399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
    private get maxDuration(): number {
        const value = this.experimentProfile.params.maxExperimentDuration;
        return (value === undefined ? Infinity : toSeconds(value));
    }

    private get maxTrialNum(): number {
        const value = this.experimentProfile.params.maxTrialNumber;
        return (value === undefined ? Infinity : value);
    }

    private async initTrainingService(config: ExperimentConfig): Promise<TrainingService> {
        this.config = config;
        const platform = Array.isArray(config.trainingService) ? 'hybrid' : config.trainingService.platform;

        if (['remote', 'pai', 'aml', 'hybrid'].includes(platform)) {
            const module_ = await import('../training_service/reusable/routerTrainingService');
            return new module_.RouterTrainingService(config);
        } else if (platform === 'local') {
            const module_ = await import('../training_service/local/localTrainingService');
            return new module_.LocalTrainingService(config);
        } else if (platform === 'kubeflow') {
            const module_ = await import('../training_service/kubernetes/kubeflow/kubeflowTrainingService');
            return new module_.KubeflowTrainingService();
        } else if (platform === 'frameworkcontroller') {
            const module_ = await import('../training_service/kubernetes/frameworkcontroller/frameworkcontrollerTrainingService');
            return new module_.FrameworkControllerTrainingService();
        } else if (platform === 'adl') {
            const module_ = await import('../training_service/kubernetes/adl/adlTrainingService');
            return new module_.AdlTrainingService();
        }

        throw new Error(`Unsupported training service platform "${platform}"`);
    }

433
434
    private setupTuner(command: string, cwd: string | undefined, mode: 'start' | 'resume', dataDirectory: string): void {
        if (this.dispatcher !== undefined) {
Deshui Yu's avatar
Deshui Yu committed
435
436
            return;
        }
goooxu's avatar
goooxu committed
437
        const stdio: StdioOptions = ['ignore', process.stdout, process.stderr, 'pipe', 'pipe'];
Deshui Yu's avatar
Deshui Yu committed
438
439
440
441
442
443
444
        let newCwd: string;
        if (cwd === undefined || cwd === '') {
            newCwd = getLogDir();
        } else {
            newCwd = cwd;
        }
        // TO DO: add CUDA_VISIBLE_DEVICES
445
        const includeIntermediateResultsEnv = !!(this.config.deprecated && this.config.deprecated.includeIntermediateResults);
446

chicm-ms's avatar
chicm-ms committed
447
        const nniEnv = {
chicm-ms's avatar
chicm-ms committed
448
            SDK_PROCESS: 'dispatcher',
Zejun Lin's avatar
Zejun Lin committed
449
450
            NNI_MODE: mode,
            NNI_CHECKPOINT_DIRECTORY: dataDirectory,
451
            NNI_LOG_DIRECTORY: getLogDir(),
452
            NNI_LOG_LEVEL: getLogLevel(),
453
            NNI_INCLUDE_INTERMEDIATE_RESULTS: includeIntermediateResultsEnv,
454
            CUDA_VISIBLE_DEVICES: toCudaVisibleDevices(this.experimentProfile.params.tunerGpuIndices)
Zejun Lin's avatar
Zejun Lin committed
455
        };
chicm-ms's avatar
chicm-ms committed
456
        const newEnv = Object.assign({}, process.env, nniEnv);
457
        const tunerProc: ChildProcess = getTunerProc(command, stdio, newCwd, newEnv);
458
459
        this.dispatcherPid = tunerProc.pid;
        this.dispatcher = createDispatcherInterface(tunerProc);
Deshui Yu's avatar
Deshui Yu committed
460
461
462
463
464

        return;
    }

    private updateTrialConcurrency(trialConcurrency: number): void {
QuanluZhang's avatar
QuanluZhang committed
465
466
        // we assume trialConcurrency >= 0, which is checked by restserver
        this.trialConcurrencyChange += (trialConcurrency - this.experimentProfile.params.trialConcurrency);
Deshui Yu's avatar
Deshui Yu committed
467
468
469
470
471
472
        this.experimentProfile.params.trialConcurrency = trialConcurrency;

        return;
    }

    private updateSearchSpace(searchSpace: string): void {
473
        if (this.dispatcher === undefined) {
Deshui Yu's avatar
Deshui Yu committed
474
475
            throw new Error('Error: tuner has not been setup');
        }
476
        this.dispatcher.sendCommand(UPDATE_SEARCH_SPACE, searchSpace);
Deshui Yu's avatar
Deshui Yu committed
477
478
479
480
481
482
        this.experimentProfile.params.searchSpace = searchSpace;

        return;
    }

    private async periodicallyUpdateExecDuration(): Promise<void> {
483
        let count: number = 1;
484
        while (!['ERROR', 'STOPPING', 'STOPPED'].includes(this.status.status)) {
485
            await delay(1000 * 1); // 1 seconds
486
            if (['RUNNING', 'NO_MORE_TRIAL', 'TUNER_NO_MORE_TRIAL'].includes(this.status.status)) {
487
488
489
490
491
492
                this.experimentProfile.execDuration += 1;
                if (count % 10 === 0) {
                    await this.storeExperimentProfile();
                }
            }
            count += 1;
Deshui Yu's avatar
Deshui Yu committed
493
494
495
        }
    }

chicm-ms's avatar
chicm-ms committed
496
497
498
499
500
501
    private async pingDispatcher(): Promise<void> {
        if (this.dispatcher === undefined) {
            throw new Error('Error: tuner has not been setup');
        }
        while (!['ERROR', 'STOPPING', 'STOPPED'].includes(this.status.status)) {
            this.dispatcher.sendCommand(PING);
chicm-ms's avatar
chicm-ms committed
502
            await delay(1000 * 5);
chicm-ms's avatar
chicm-ms committed
503
504
505
        }
    }

QuanluZhang's avatar
QuanluZhang committed
506
507
    private async requestTrialJobsStatus(): Promise<number> {
        let finishedTrialJobNum: number = 0;
QuanluZhang's avatar
QuanluZhang committed
508
509
510
        if (this.dispatcher === undefined) {
            throw new Error('Error: tuner has not been setup');
        }
QuanluZhang's avatar
QuanluZhang committed
511
512
513
514
        for (const trialJobId of Array.from(this.trialJobs.keys())) {
            const trialJobDetail: TrialJobDetail = await this.trainingService.getTrialJob(trialJobId);
            const oldTrialJobDetail: TrialJobDetail | undefined = this.trialJobs.get(trialJobId);
            if (oldTrialJobDetail !== undefined && oldTrialJobDetail.status !== trialJobDetail.status) {
chicm-ms's avatar
chicm-ms committed
515
                this.log.info(`Trial job ${trialJobDetail.id} status changed from ${oldTrialJobDetail.status} to ${trialJobDetail.status}`);
QuanluZhang's avatar
QuanluZhang committed
516
                this.trialJobs.set(trialJobId, Object.assign({}, trialJobDetail));
517
                await this.dataStore.storeTrialJobEvent(trialJobDetail.status, trialJobDetail.id, undefined, trialJobDetail);
QuanluZhang's avatar
QuanluZhang committed
518
            }
519
520
521
522
            const newTrialJobDetail: TrialJobDetail | undefined = this.trialJobs.get(trialJobId);
            if (newTrialJobDetail !== undefined) {
                newTrialJobDetail.message = trialJobDetail.message;
            }
QuanluZhang's avatar
QuanluZhang committed
523
            let hyperParams: string | undefined = undefined;
QuanluZhang's avatar
QuanluZhang committed
524
525
526
            switch (trialJobDetail.status) {
                case 'SUCCEEDED':
                case 'USER_CANCELED':
QuanluZhang's avatar
QuanluZhang committed
527
                case 'EARLY_STOPPED':
QuanluZhang's avatar
QuanluZhang committed
528
529
                    this.trialJobs.delete(trialJobId);
                    finishedTrialJobNum++;
530
                    hyperParams = trialJobDetail.form.hyperParameters.value;
QuanluZhang's avatar
QuanluZhang committed
531
                    this.dispatcher.sendCommand(TRIAL_END, JSON.stringify({
chicm-ms's avatar
chicm-ms committed
532
                        trial_job_id: trialJobDetail.id, // eslint-disable-line @typescript-eslint/camelcase
QuanluZhang's avatar
QuanluZhang committed
533
                        event: trialJobDetail.status,
chicm-ms's avatar
chicm-ms committed
534
                        hyper_params: hyperParams // eslint-disable-line @typescript-eslint/camelcase
goooxu's avatar
goooxu committed
535
                    }));
QuanluZhang's avatar
QuanluZhang committed
536
537
538
539
540
541
542
                    break;
                case 'FAILED':
                case 'SYS_CANCELED':
                    // In the current version, we do not retry
                    // TO DO: push this job to queue for retry
                    this.trialJobs.delete(trialJobId);
                    finishedTrialJobNum++;
543
                    hyperParams = trialJobDetail.form.hyperParameters.value;
QuanluZhang's avatar
QuanluZhang committed
544
                    this.dispatcher.sendCommand(TRIAL_END, JSON.stringify({
chicm-ms's avatar
chicm-ms committed
545
                        trial_job_id: trialJobDetail.id, // eslint-disable-line @typescript-eslint/camelcase
QuanluZhang's avatar
QuanluZhang committed
546
                        event: trialJobDetail.status,
chicm-ms's avatar
chicm-ms committed
547
                        hyper_params: hyperParams // eslint-disable-line @typescript-eslint/camelcase
goooxu's avatar
goooxu committed
548
                    }));
QuanluZhang's avatar
QuanluZhang committed
549
550
551
552
553
554
555
556
557
558
                    break;
                case 'WAITING':
                case 'RUNNING':
                case 'UNKNOWN':
                    // Do nothing
                    break;
                default:
                // TO DO: add warning in log
            }
        }
goooxu's avatar
goooxu committed
559

Gems Guo's avatar
Gems Guo committed
560
        return finishedTrialJobNum;
QuanluZhang's avatar
QuanluZhang committed
561
562
563
564
565
566
    }

    private async manageTrials(): Promise<void> {
        if (this.dispatcher === undefined) {
            throw new Error('Error: tuner has not been setup');
        }
QuanluZhang's avatar
QuanluZhang committed
567
        let allFinishedTrialJobNum: number = this.currSubmittedTrialNum;
QuanluZhang's avatar
QuanluZhang committed
568
        let waitSubmittedToFinish: number;
569
        while (!['ERROR', 'STOPPING', 'STOPPED'].includes(this.status.status)) {
QuanluZhang's avatar
QuanluZhang committed
570
571
572
573
574
575
576
            const finishedTrialJobNum: number = await this.requestTrialJobsStatus();
            allFinishedTrialJobNum += finishedTrialJobNum;

            // requestTrialNum is the number of trials that will be requested from tuner.
            // If trialConcurrency does not change, requestTrialNum equals finishedTrialJobNum.
            // If trialConcurrency changes, for example, trialConcurrency increases by 2 (trialConcurrencyChange=2), then
            // requestTrialNum equals 2 + finishedTrialJobNum and trialConcurrencyChange becomes 0.
577
            // If trialConcurrency changes, for example, trialConcurrency decreases by 4 (trialConcurrencyChange=-4) and
QuanluZhang's avatar
QuanluZhang committed
578
579
580
581
582
583
584
585
            // finishedTrialJobNum is 2, then requestTrialNum becomes -2. No trial will be requested from tuner,
            // and trialConcurrencyChange becomes -2.
            const requestTrialNum: number = this.trialConcurrencyChange + finishedTrialJobNum;
            if (requestTrialNum >= 0) {
                this.trialConcurrencyChange = 0;
            } else {
                this.trialConcurrencyChange = requestTrialNum;
            }
chicm-ms's avatar
chicm-ms committed
586

587
            this.requestTrialJobs(requestTrialNum);
chicm-ms's avatar
chicm-ms committed
588

QuanluZhang's avatar
QuanluZhang committed
589
            // check maxtrialnum and maxduration here
590
            // NO_MORE_TRIAL is more like a subset of RUNNING, because during RUNNING tuner
591
            // might tell nnimanager that this is no more trials. In NO_MORE_TRIAL state, the experiment is viewed
592
593
            // as still running. DONE could be transfered from RUNNING or NO_MORE_TRIAL.
            assert(this.status.status === 'RUNNING' ||
594
                this.status.status === 'DONE' ||
QuanluZhang's avatar
QuanluZhang committed
595
                this.status.status === 'NO_MORE_TRIAL' ||
596
                this.status.status === 'TUNER_NO_MORE_TRIAL', `Actual status: ${this.status.status}`);
597
598
            if (this.experimentProfile.execDuration > this.maxDuration ||
                this.currSubmittedTrialNum >= this.maxTrialNum) {
QuanluZhang's avatar
QuanluZhang committed
599
                if (this.status.status !== 'DONE') {
chicm-ms's avatar
chicm-ms committed
600
                    this.setStatus('NO_MORE_TRIAL');
QuanluZhang's avatar
QuanluZhang committed
601
602
603
604
                    waitSubmittedToFinish = this.currSubmittedTrialNum;

                    assert(allFinishedTrialJobNum <= waitSubmittedToFinish);
                    if (allFinishedTrialJobNum >= waitSubmittedToFinish) {
chicm-ms's avatar
chicm-ms committed
605
                        this.setStatus('DONE');
606
                        this.setEndtime();
QuanluZhang's avatar
QuanluZhang committed
607
608
609
610
                        await this.storeExperimentProfile();
                        // write this log for travis CI
                        this.log.info('Experiment done.');
                    }
QuanluZhang's avatar
QuanluZhang committed
611
612
613
                }
            } else {
                if (this.status.status === 'DONE') {
614
615
                    delete this.experimentProfile.endTime;
                    await this.storeExperimentProfile();
QuanluZhang's avatar
QuanluZhang committed
616
                }
QuanluZhang's avatar
QuanluZhang committed
617
                if (this.status.status !== 'TUNER_NO_MORE_TRIAL') {
chicm-ms's avatar
chicm-ms committed
618
                    this.setStatus('RUNNING');
619
                }
QuanluZhang's avatar
QuanluZhang committed
620
621
                for (let i: number = this.trialJobs.size; i < this.experimentProfile.params.trialConcurrency; i++) {
                    if (this.waitingTrials.length === 0 ||
622
                        this.currSubmittedTrialNum >= this.maxTrialNum) {
QuanluZhang's avatar
QuanluZhang committed
623
624
                        break;
                    }
625
                    const form = this.waitingTrials.shift() as TrialJobApplicationForm;
QuanluZhang's avatar
QuanluZhang committed
626
                    this.currSubmittedTrialNum++;
627
628
                    this.log.info(`submitTrialJob: form: ${JSON.stringify(form)}`);
                    const trialJobDetail: TrialJobDetail = await this.trainingService.submitTrialJob(form);
629
                    const Snapshot: TrialJobDetail = Object.assign({}, trialJobDetail);
630
                    await this.storeExperimentProfile();
631
                    this.trialJobs.set(trialJobDetail.id, Snapshot);
QuanluZhang's avatar
QuanluZhang committed
632
633
634
                    const trialJobDetailSnapshot: TrialJobDetail | undefined = this.trialJobs.get(trialJobDetail.id);
                    if (trialJobDetailSnapshot != undefined) {
                        await this.dataStore.storeTrialJobEvent(
635
                            trialJobDetailSnapshot.status, trialJobDetailSnapshot.id, form.hyperParameters.value, trialJobDetailSnapshot);
QuanluZhang's avatar
QuanluZhang committed
636
637
638
639
640
641
642
643
644
                    } else {
                        assert(false, `undefined trialJobDetail in trialJobs: ${trialJobDetail.id}`);
                    }
                }
            }
            await delay(1000 * 5); // 5 seconds
        }
    }

Deshui Yu's avatar
Deshui Yu committed
645
646
647
648
649
650
    private storeExperimentProfile(): Promise<void> {
        this.experimentProfile.revision += 1;

        return this.dataStore.storeExperimentProfile(this.experimentProfile);
    }

651
    private async run(): Promise<void> {
QuanluZhang's avatar
QuanluZhang committed
652
        assert(this.dispatcher !== undefined);
653
654
655
656
657
658
659

        this.addEventListeners();

        this.sendInitTunerCommands();

        await Promise.all([
            this.periodicallyUpdateExecDuration(),
chicm-ms's avatar
chicm-ms committed
660
            this.pingDispatcher().catch((err: Error) => {
chicm-ms's avatar
chicm-ms committed
661
                throw NNIError.FromError(err, 'Dispatcher error: ');
chicm-ms's avatar
chicm-ms committed
662
            }),
663
            this.trainingService.run().catch((err: Error) => {
chicm-ms's avatar
chicm-ms committed
664
                throw NNIError.FromError(err, 'Training service error: ');
665
            }),
QuanluZhang's avatar
QuanluZhang committed
666
            this.manageTrials().catch((err: Error) => {
chicm-ms's avatar
chicm-ms committed
667
                throw NNIError.FromError(err, 'Job management error: ');
668
            })]);
669
670
    }

QuanluZhang's avatar
QuanluZhang committed
671
    private addEventListeners(): void {
chicm-ms's avatar
chicm-ms committed
672
        this.log.info('Add event listeners');
673
        // TO DO: cannot run this method more than once in one NNIManager instance
QuanluZhang's avatar
QuanluZhang committed
674
        if (this.dispatcher === undefined) {
675
676
            throw new Error('Error: tuner or job maintainer have not been setup');
        }
677
        this.trainingService.addTrialJobMetricListener(this.trialJobMetricListener);
678
679
680

        this.dispatcher.onCommand((commandType: string, content: string) => {
            this.onTunerCommand(commandType, content).catch((err: Error) => {
chicm-ms's avatar
chicm-ms committed
681
                this.criticalError(NNIError.FromError(err, 'Tuner command event error: '));
682
683
            });
        });
684
685
686
687
        this.dispatcher.onError((error: Error) => {
            this.log.error(`Dispatcher error: ${error.message}`);
            this.criticalError(new Error('Dispatcher stream error, tuner may have crashed.'));
        });
688
689
690
691
    }

    private sendInitTunerCommands(): void {
        if (this.dispatcher === undefined) {
692
            throw new Error('Dispatcher error: tuner has not been setup');
693
        }
chicm-ms's avatar
chicm-ms committed
694
695
        this.log.debug(`Send tuner command: INITIALIZE: ${this.experimentProfile.params.searchSpace}`);
        // Tuner need to be initialized with search space before generating any hyper parameters
696
        this.dispatcher.sendCommand(INITIALIZE, JSON.stringify(this.experimentProfile.params.searchSpace));
697
698
699
    }

    private async onTrialJobMetrics(metric: TrialJobMetric): Promise<void> {
700
        this.log.debug(`NNIManager received trial job metrics: ${JSON.stringify(metric)}`);
701
702
703
704
705
706
707
708
        if (this.trialJobs.has(metric.id)){
            await this.dataStore.storeMetricData(metric.id, metric.data);
            if (this.dispatcher === undefined) {
                throw new Error('Error: tuner has not been setup');
            }
            this.dispatcher.sendCommand(REPORT_METRIC_DATA, metric.data);
        } else {
            this.log.warning(`NNIManager received non-existent trial job metrics: ${metric}`);
709
710
711
        }
    }

chicm-ms's avatar
chicm-ms committed
712
713
714
715
716
717
718
    private requestTrialJobs(jobNum: number): void {
        if (jobNum < 1) {
            return;
        }
        if (this.dispatcher === undefined) {
            throw new Error('Dispatcher error: tuner has not been setup');
        }
719
        if (this.config.deprecated && this.config.deprecated.multiThread) {
chicm-ms's avatar
chicm-ms committed
720
721
722
723
724
725
726
727
728
729
730
            // Send multiple requests to ensure multiple hyper parameters are generated in non-blocking way.
            // For a single REQUEST_TRIAL_JOBS request, hyper parameters are generated one by one
            // sequentially.
            for (let i: number = 0; i < jobNum; i++) {
                this.dispatcher.sendCommand(REQUEST_TRIAL_JOBS, '1');
            }
        } else {
            this.dispatcher.sendCommand(REQUEST_TRIAL_JOBS, String(jobNum));
        }
    }

731
    private async onTunerCommand(commandType: string, content: string): Promise<void> {
horizon365's avatar
horizon365 committed
732
        this.log.info(`NNIManager received command from dispatcher: ${commandType}, ${content}`);
733
        switch (commandType) {
chicm-ms's avatar
chicm-ms committed
734
            case INITIALIZED: {
chicm-ms's avatar
chicm-ms committed
735
                // Tuner is intialized, search space is set, request tuner to generate hyper parameters
736
737
738
739
740
741
                if (this.trialDataForTuner.length > 0) {
                    if (this.dispatcher === undefined) {
                        throw new Error('Dispatcher error: tuner has not been setup');
                    }
                    this.dispatcher.sendCommand(IMPORT_DATA, this.trialDataForTuner);
                }
chicm-ms's avatar
chicm-ms committed
742
743
                this.requestTrialJobs(this.experimentProfile.params.trialConcurrency);
                break;
chicm-ms's avatar
chicm-ms committed
744
745
            }
            case NEW_TRIAL_JOB: {
QuanluZhang's avatar
QuanluZhang committed
746
                if (this.status.status === 'TUNER_NO_MORE_TRIAL') {
747
                    this.log.warning('It is not supposed to receive more trials after NO_MORE_TRIAL is set');
chicm-ms's avatar
chicm-ms committed
748
                    this.setStatus('RUNNING');
749
                }
750
751
752
753
754
755
756
757
                const form: TrialJobApplicationForm = {
                    sequenceId: this.experimentProfile.nextSequenceId++,
                    hyperParameters: {
                        value: content,
                        index: 0
                    }
                };
                this.waitingTrials.push(form);
758
                break;
chicm-ms's avatar
chicm-ms committed
759
760
            }
            case SEND_TRIAL_JOB_PARAMETER: {
chicm-ms's avatar
chicm-ms committed
761
762
763
764
765
                const tunerCommand: any = JSON.parse(content);
                assert(tunerCommand.parameter_index >= 0);
                assert(tunerCommand.trial_job_id !== undefined);

                const trialJobForm: TrialJobApplicationForm = {
766
                    sequenceId: -1,  // FIXME: multi-phase tuner should use sequence ID instead of trial job ID
chicm-ms's avatar
chicm-ms committed
767
768
769
770
771
                    hyperParameters: {
                        value: content,
                        index: tunerCommand.parameter_index
                    }
                };
chicm-ms's avatar
chicm-ms committed
772
                this.log.info(`updateTrialJob: job id: ${tunerCommand.trial_job_id}, form: ${JSON.stringify(trialJobForm)}`);
chicm-ms's avatar
chicm-ms committed
773
                await this.trainingService.updateTrialJob(tunerCommand.trial_job_id, trialJobForm);
774
775
776
777
778
                if (tunerCommand['parameters'] !== null) {
                    // parameters field is set as empty string if no more hyper parameter can be generated by tuner.
                    await this.dataStore.storeTrialJobEvent(
                        'ADD_HYPERPARAMETER', tunerCommand.trial_job_id, content, undefined);
                }
chicm-ms's avatar
chicm-ms committed
779
                break;
chicm-ms's avatar
chicm-ms committed
780
781
            }
            case NO_MORE_TRIAL_JOBS: {
782
783
784
                if (!['ERROR', 'STOPPING', 'STOPPED'].includes(this.status.status)) {
                    this.setStatus('TUNER_NO_MORE_TRIAL');
                }
785
                break;
chicm-ms's avatar
chicm-ms committed
786
787
            }
            case KILL_TRIAL_JOB: {
chicm-ms's avatar
chicm-ms committed
788
                this.log.info(`cancelTrialJob: ${JSON.parse(content)}`);
QuanluZhang's avatar
QuanluZhang committed
789
                await this.trainingService.cancelTrialJob(JSON.parse(content), true);
790
                break;
chicm-ms's avatar
chicm-ms committed
791
            }
792
793
794
            default:
                throw new Error('Error: unsupported command type from tuner');
        }
Deshui Yu's avatar
Deshui Yu committed
795
796
    }

797
798
799
800
801
802
803
804
805
806
    private criticalError(err: Error): void {
        this.logError(err);
        console.error(err);
    }

    private logError(err: Error): void {
        if (err.stack !== undefined) {
            this.log.error(err.stack);
        }
        this.status.errors.push(err.message);
807
        this.setEndtime();
chicm-ms's avatar
chicm-ms committed
808
809
810
811
812
813
814
        this.setStatus('ERROR');
    }

    private setStatus(status: ExperimentStatus): void {
        if (status !== this.status.status) {
            this.log.info(`Change NNIManager status from: ${this.status.status} to: ${status}`);
            this.status.status = status;
815
            this.experimentManager.setExperimentInfo(this.experimentProfile.id, 'status', this.status.status);
chicm-ms's avatar
chicm-ms committed
816
        }
817
818
    }

819
820
821
822
823
    private setEndtime(): void {
        this.experimentProfile.endTime = Date.now();
        this.experimentManager.setExperimentInfo(this.experimentProfile.id, 'endTime', this.experimentProfile.endTime);
    }

QuanluZhang's avatar
QuanluZhang committed
824
825
826
827
    private async createCheckpointDir(): Promise<string> {
        // TODO: test
        const chkpDir: string = getCheckpointDir();
        await mkDirP(chkpDir);
828
        return chkpDir;
QuanluZhang's avatar
QuanluZhang committed
829
    }
J-shang's avatar
J-shang committed
830
831
832
833
834
835
836
837

    public async getTrialOutputLocalPath(trialJobId: string): Promise<string> {
        return this.trainingService.getTrialOutputLocalPath(trialJobId);
    }

    public async fetchTrialOutput(trialJobId: string, subpath: string): Promise<void> {
        return this.trainingService.fetchTrialOutput(trialJobId, subpath);
    }
Deshui Yu's avatar
Deshui Yu committed
838
839
840
}

export { NNIManager };