nnimanager.ts 36.3 KB
Newer Older
liuzhe-lz's avatar
liuzhe-lz committed
1
2
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
Deshui Yu's avatar
Deshui Yu committed
3
4
5
6

'use strict';

import * as assert from 'assert';
chicm-ms's avatar
chicm-ms committed
7
import { ChildProcess, StdioOptions } from 'child_process';
Deshui Yu's avatar
Deshui Yu committed
8
9
10
import { Deferred } from 'ts-deferred';
import * as component from '../common/component';
import { DataStore, MetricDataRecord, MetricType, TrialJobInfo } from '../common/datastore';
11
import { NNIError } from '../common/errors';
12
import { getExperimentId, getDispatcherPipe } from '../common/experimentStartupInfo';
Deshui Yu's avatar
Deshui Yu committed
13
14
import { getLogger, Logger } from '../common/log';
import {
15
    ExperimentProfile, Manager, ExperimentStatus,
16
    NNIManagerStatus, ProfileUpdateType, TrialJobStatistics
Deshui Yu's avatar
Deshui Yu committed
17
} from '../common/manager';
18
import { ExperimentConfig, toSeconds, toCudaVisibleDevices } from '../common/experimentConfig';
19
import { ExperimentManager } from '../common/experimentManager';
J-shang's avatar
J-shang committed
20
import { TensorboardManager } from '../common/tensorboardManager';
Deshui Yu's avatar
Deshui Yu committed
21
import {
22
    TrainingService, TrialJobApplicationForm, TrialJobDetail, TrialJobMetric, TrialJobStatus, LogType
Deshui Yu's avatar
Deshui Yu committed
23
} from '../common/trainingService';
24
import { delay, getCheckpointDir, getExperimentRootDir, getLogDir, getMsgDispatcherCommand, mkDirP, getTunerProc, getLogLevel, isAlive, killPid } from '../common/utils';
Deshui Yu's avatar
Deshui Yu committed
25
import {
chicm-ms's avatar
chicm-ms committed
26
    INITIALIZE, INITIALIZED, KILL_TRIAL_JOB, NEW_TRIAL_JOB, NO_MORE_TRIAL_JOBS, PING,
27
    REPORT_METRIC_DATA, REQUEST_TRIAL_JOBS, SEND_TRIAL_JOB_PARAMETER, TERMINATE, TRIAL_END, UPDATE_SEARCH_SPACE, IMPORT_DATA
Deshui Yu's avatar
Deshui Yu committed
28
} from './commands';
29
import { createDispatcherInterface, createDispatcherPipeInterface, IpcInterface } from './ipcInterface';
30
import { NNIRestServer } from '../rest_server/nniRestServer';
Deshui Yu's avatar
Deshui Yu committed
31
32

/**
chicm-ms's avatar
chicm-ms committed
33
 * NNIManager which implements Manager interface
Deshui Yu's avatar
Deshui Yu committed
34
35
 */
class NNIManager implements Manager {
36
    private trainingService!: TrainingService;
37
    private dispatcher: IpcInterface | undefined;
38
    private experimentManager: ExperimentManager;
39
    private currSubmittedTrialNum: number;  // need to be recovered
QuanluZhang's avatar
QuanluZhang committed
40
    private trialConcurrencyChange: number; // >0: increase, <0: decrease
Deshui Yu's avatar
Deshui Yu committed
41
42
    private log: Logger;
    private dataStore: DataStore;
43
    private experimentProfile!: ExperimentProfile;
44
    private dispatcherPid: number;
45
    private status: NNIManagerStatus;
46
    private waitingTrials: TrialJobApplicationForm[];
QuanluZhang's avatar
QuanluZhang committed
47
    private trialJobs: Map<string, TrialJobDetail>;
48
    private trialDataForTuner: string;
SparkSnail's avatar
SparkSnail committed
49
    private readonly: boolean;
50
    private config!: ExperimentConfig;
51

52
    private trialJobMetricListener: (metric: TrialJobMetric) => void;
53

Deshui Yu's avatar
Deshui Yu committed
54
55
    constructor() {
        this.currSubmittedTrialNum = 0;
QuanluZhang's avatar
QuanluZhang committed
56
        this.trialConcurrencyChange = 0;
57
        this.experimentManager = component.get(ExperimentManager);
58
        this.dispatcherPid = 0;
QuanluZhang's avatar
QuanluZhang committed
59
60
        this.waitingTrials = [];
        this.trialJobs = new Map<string, TrialJobDetail>();
61
        this.trialDataForTuner = '';
SparkSnail's avatar
SparkSnail committed
62
        this.readonly = false;
Deshui Yu's avatar
Deshui Yu committed
63
64
65

        this.log = getLogger();
        this.dataStore = component.get(DataStore);
66
67
68
        this.status = {
            status: 'INITIALIZED',
            errors: []
Deshui Yu's avatar
Deshui Yu committed
69
        };
chicm-ms's avatar
chicm-ms committed
70
        this.trialJobMetricListener = (metric: TrialJobMetric): void => {
71
72
73
74
            this.onTrialJobMetrics(metric).catch((err: Error) => {
                this.criticalError(NNIError.FromError(err, 'Job metrics error: '));
            });
        };
75
76
77
78
79

        const pipe = getDispatcherPipe();
        if (pipe !== null) {
            this.dispatcher = createDispatcherPipeInterface(pipe);
        }
Deshui Yu's avatar
Deshui Yu committed
80
81
82
    }

    public updateExperimentProfile(experimentProfile: ExperimentProfile, updateType: ProfileUpdateType): Promise<void> {
SparkSnail's avatar
SparkSnail committed
83
84
85
        if (this.readonly) {
            return Promise.reject(new Error('Error: can not update experiment profile in readonly mode!'));
        }
Deshui Yu's avatar
Deshui Yu committed
86
87
88
89
90
        switch (updateType) {
            case 'TRIAL_CONCURRENCY':
                this.updateTrialConcurrency(experimentProfile.params.trialConcurrency);
                break;
            case 'MAX_EXEC_DURATION':
91
                this.experimentProfile.params.maxExperimentDuration = experimentProfile.params.maxExperimentDuration;
Deshui Yu's avatar
Deshui Yu committed
92
93
94
95
                break;
            case 'SEARCH_SPACE':
                this.updateSearchSpace(experimentProfile.params.searchSpace);
                break;
QuanluZhang's avatar
QuanluZhang committed
96
            case 'MAX_TRIAL_NUM':
97
                this.experimentProfile.params.maxTrialNumber = experimentProfile.params.maxTrialNumber;
QuanluZhang's avatar
QuanluZhang committed
98
                break;
Deshui Yu's avatar
Deshui Yu committed
99
100
101
102
103
104
105
            default:
                throw new Error('Error: unrecognized updateType');
        }

        return this.storeExperimentProfile();
    }

106
    public importData(data: string): Promise<void> {
SparkSnail's avatar
SparkSnail committed
107
108
109
        if (this.readonly) {
            return Promise.reject(new Error('Error: can not import data in readonly mode!'));
        }
110
111
112
113
114
115
116
117
118
119
        if (this.dispatcher === undefined) {
            return Promise.reject(
                new Error('tuner has not been setup')
            );
        }
        this.dispatcher.sendCommand(IMPORT_DATA, data);

        return this.dataStore.storeTrialJobEvent('IMPORT_DATA', '', data);
    }

120
121
122
123
    public getImportedData(): Promise<string[]> {
        return this.dataStore.getImportedData();
    }

124
125
126
127
    public async exportData(): Promise<string> {
        return this.dataStore.exportTrialHpConfigs();
    }

128
    public addCustomizedTrialJob(hyperParams: string): Promise<number> {
SparkSnail's avatar
SparkSnail committed
129
130
131
        if (this.readonly) {
            return Promise.reject(new Error('Error: can not add customized trial job in readonly mode!'));
        }
132
        if (this.currSubmittedTrialNum >= this.maxTrialNum) {
133
            return Promise.reject(new Error('reach maxTrialNum'));
Deshui Yu's avatar
Deshui Yu committed
134
        }
135
136
137

        // TODO: NNI manager should not peek tuner's internal protocol, let's refactor this later
        const packedParameter = {
chicm-ms's avatar
chicm-ms committed
138
139
            parameter_id: null, // eslint-disable-line @typescript-eslint/camelcase
            parameter_source: 'customized', // eslint-disable-line @typescript-eslint/camelcase
140
141
142
143
144
145
146
147
148
149
150
            parameters: JSON.parse(hyperParams)
        }

        const form: TrialJobApplicationForm = {
            sequenceId: this.experimentProfile.nextSequenceId++,
            hyperParameters: {
                value: JSON.stringify(packedParameter),
                index: 0
            }
        };
        this.waitingTrials.push(form);
Deshui Yu's avatar
Deshui Yu committed
151
152

        // trial id has not been generated yet, thus use '' instead
153
154
155
        this.dataStore.storeTrialJobEvent('ADD_CUSTOMIZED', '', hyperParams);

        return Promise.resolve(form.sequenceId);
Deshui Yu's avatar
Deshui Yu committed
156
157
158
    }

    public async cancelTrialJobByUser(trialJobId: string): Promise<void> {
SparkSnail's avatar
SparkSnail committed
159
160
161
        if (this.readonly) {
            return Promise.reject(new Error('Error: can not cancel trial job in readonly mode!'));
        }
chicm-ms's avatar
chicm-ms committed
162
        this.log.info(`User cancelTrialJob: ${trialJobId}`);
Deshui Yu's avatar
Deshui Yu committed
163
164
165
166
        await this.trainingService.cancelTrialJob(trialJobId);
        await this.dataStore.storeTrialJobEvent('USER_TO_CANCEL', trialJobId, '');
    }

167
168
169
170
171
172
173
174
175
176
177
178
    public async startExperiment(config: ExperimentConfig): Promise<string> {
        this.experimentProfile = {
            params: config,
            id: getExperimentId(),
            execDuration: 0,
            logDir: getExperimentRootDir(),
            startTime: Date.now(),
            endTime: undefined,
            nextSequenceId: 0,
            revision: 0
        };

chicm-ms's avatar
chicm-ms committed
179
        this.log.info(`Starting experiment: ${this.experimentProfile.id}`);
Deshui Yu's avatar
Deshui Yu committed
180
        await this.storeExperimentProfile();
181

182
183
        this.log.info('Setup training service...');
        this.trainingService = await this.initTrainingService(config);
184

185
186
        this.log.info('Setup tuner...');
        const dispatcherCommand: string = getMsgDispatcherCommand(config);
187
        this.log.debug(`dispatcher command: ${dispatcherCommand}`);
QuanluZhang's avatar
QuanluZhang committed
188
        const checkpointDir: string = await this.createCheckpointDir();
189
        this.setupTuner(dispatcherCommand, undefined, 'start', checkpointDir);
Deshui Yu's avatar
Deshui Yu committed
190

chicm-ms's avatar
chicm-ms committed
191
        this.setStatus('RUNNING');
Deshui Yu's avatar
Deshui Yu committed
192
        await this.storeExperimentProfile();
193
194
        this.run().catch((err: Error) => {
            this.criticalError(err);
Deshui Yu's avatar
Deshui Yu committed
195
        });
196

Deshui Yu's avatar
Deshui Yu committed
197
198
199
        return this.experimentProfile.id;
    }

SparkSnail's avatar
SparkSnail committed
200
    public async resumeExperiment(readonly: boolean): Promise<void> {
Deshui Yu's avatar
Deshui Yu committed
201
202
        //Fetch back the experiment profile
        const experimentId: string = getExperimentId();
203
        this.log.info(`Resuming experiment: ${experimentId}`);
Deshui Yu's avatar
Deshui Yu committed
204
        this.experimentProfile = await this.dataStore.getExperimentProfile(experimentId);
SparkSnail's avatar
SparkSnail committed
205
206
        this.readonly = readonly;
        if (readonly) {
207
            this.setStatus('VIEWED');
SparkSnail's avatar
SparkSnail committed
208
209
            return Promise.resolve();
        }
210

211
212
213
        this.log.info('Setup training service...');
        const config: ExperimentConfig = this.experimentProfile.params;
        this.trainingService = await this.initTrainingService(config);
214

215
216
        this.log.info('Setup tuner...');
        const dispatcherCommand: string = getMsgDispatcherCommand(config);
217
        this.log.debug(`dispatcher command: ${dispatcherCommand}`);
QuanluZhang's avatar
QuanluZhang committed
218
        const checkpointDir: string = await this.createCheckpointDir();
219
        this.setupTuner(dispatcherCommand, undefined, 'resume', checkpointDir);
Deshui Yu's avatar
Deshui Yu committed
220
221
222
223
224
225
226
227
228

        const allTrialJobs: TrialJobInfo[] = await this.dataStore.listTrialJobs();

        // Resume currSubmittedTrialNum
        this.currSubmittedTrialNum = allTrialJobs.length;

        // Check the final status for WAITING and RUNNING jobs
        await Promise.all(allTrialJobs
            .filter((job: TrialJobInfo) => job.status === 'WAITING' || job.status === 'RUNNING')
J-shang's avatar
J-shang committed
229
            .map((job: TrialJobInfo) => this.dataStore.storeTrialJobEvent('FAILED', job.trialJobId)));
Deshui Yu's avatar
Deshui Yu committed
230

231
232
233
        // Collect generated trials and imported trials
        const finishedTrialData: string = await this.exportData();
        const importedData: string[] = await this.dataStore.getImportedData();
chicm-ms's avatar
chicm-ms committed
234
        let trialData: Record<string, any>[] = JSON.parse(finishedTrialData);
235
236
        for (const oneImportedData of importedData) {
            // do not deduplicate
chicm-ms's avatar
chicm-ms committed
237
            trialData = trialData.concat(<Record<string, any>[]>JSON.parse(oneImportedData));
238
239
240
        }
        this.trialDataForTuner = JSON.stringify(trialData);

241
242
        if (this.experimentProfile.execDuration < this.maxDuration &&
            this.currSubmittedTrialNum < this.maxTrialNum &&
chicm-ms's avatar
chicm-ms committed
243
244
245
            this.experimentProfile.endTime) {
            delete this.experimentProfile.endTime;
        }
chicm-ms's avatar
chicm-ms committed
246
        this.setStatus('RUNNING');
247

Deshui Yu's avatar
Deshui Yu committed
248
        // TO DO: update database record for resume event
249
250
251
        this.run().catch((err: Error) => {
            this.criticalError(err);
        });
Deshui Yu's avatar
Deshui Yu committed
252
253
    }

254
255
    public getTrialJob(trialJobId: string): Promise<TrialJobInfo> {
        return this.dataStore.getTrialJob(trialJobId);
Deshui Yu's avatar
Deshui Yu committed
256
257
    }

liuzhe-lz's avatar
liuzhe-lz committed
258
259
260
261
262
    public async setClusterMetadata(key: string, value: string): Promise<void> {
        while (this.trainingService === undefined) {
            await delay(1000);
        }
        this.trainingService.setClusterMetadata(key, value);
Deshui Yu's avatar
Deshui Yu committed
263
264
    }

liuzhe-lz's avatar
liuzhe-lz committed
265
266
    public getClusterMetadata(key: string): Promise<string> {
        return this.trainingService.getClusterMetadata(key);
Deshui Yu's avatar
Deshui Yu committed
267
268
269
270
271
272
    }

    public async getTrialJobStatistics(): Promise<TrialJobStatistics[]> {
        return this.dataStore.getTrialJobStatistics();
    }

273
    public async stopExperiment(): Promise<void> {
274
275
276
277
278
        await this.stopExperimentTopHalf();
        await this.stopExperimentBottomHalf();
    }

    public async stopExperimentTopHalf(): Promise<void> {
chicm-ms's avatar
chicm-ms committed
279
280
        this.setStatus('STOPPING');
        this.log.info('Stopping experiment, cleaning up ...');
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302

        if (this.dispatcher === undefined) {
            this.log.error('Tuner has not been setup');
            return;
        }

        this.trainingService.removeTrialJobMetricListener(this.trialJobMetricListener);
        if (this.dispatcherPid > 0) {
            this.dispatcher.sendCommand(TERMINATE);
            // gracefully terminate tuner and assessor here, wait at most 30 seconds.
            for (let i: number = 0; i < 30; i++) {
                if (!await isAlive(this.dispatcherPid)) {
                    break;
                }
                await delay(1000);
            }
            await killPid(this.dispatcherPid);
        }
        this.dispatcher = undefined;
    }

    public async stopExperimentBottomHalf(): Promise<void> {
303
304
305
306
307
308
309
310
311
312
313
314
315
316
        try {
            const trialJobList: TrialJobDetail[] = await this.trainingService.listTrialJobs();

            // DON'T try to make it in parallel, the training service may not handle it well.
            // If there is performance concern, consider to support batch cancellation on training service.
            for (const trialJob of trialJobList) {
                if (trialJob.status === 'RUNNING' ||
                    trialJob.status === 'WAITING') {
                    try {
                        this.log.info(`cancelTrialJob: ${trialJob.id}`);
                        await this.trainingService.cancelTrialJob(trialJob.id);
                    } catch (error) {
                        this.log.debug(`ignorable error on canceling trial ${trialJob.id}. ${error}`);
                    }
317
318
                }
            }
319
320
321
            await this.trainingService.cleanUp();
        } catch (err) {
            this.log.error(`${err.stack}`);
322
323
324
325
326
327
        }
        if (this.experimentProfile.endTime === undefined) {
            this.setEndtime();
        }
        await this.storeExperimentProfile();
        this.setStatus('STOPPED');
chicm-ms's avatar
chicm-ms committed
328
        this.log.info('Experiment stopped.');
329
330
331

        let hasError: boolean = false;
        try {
liuzhe-lz's avatar
liuzhe-lz committed
332
            await this.experimentManager.stop();
J-shang's avatar
J-shang committed
333
            await component.get<TensorboardManager>(TensorboardManager).stop();
liuzhe-lz's avatar
liuzhe-lz committed
334
            await this.dataStore.close();
335
336
337
338
339
340
341
342
            await component.get<NNIRestServer>(NNIRestServer).stop();
        } catch (err) {
            hasError = true;
            this.log.error(`${err.stack}`);
        } finally {
            this.log.close();
            process.exit(hasError ? 1 : 0);
        }
Deshui Yu's avatar
Deshui Yu committed
343
344
    }

345
    public async getMetricData(trialJobId?: string, metricType?: MetricType): Promise<MetricDataRecord[]> {
Deshui Yu's avatar
Deshui Yu committed
346
347
348
        return this.dataStore.getMetricData(trialJobId, metricType);
    }

349
350
351
352
353
354
    public async getMetricDataByRange(minSeqId: number, maxSeqId: number): Promise<MetricDataRecord[]> {
        const trialJobs = await this.dataStore.listTrialJobs();
        const targetTrials = trialJobs.filter(trial => (
            // FIXME: can this be undefined?
            trial.sequenceId !== undefined && minSeqId <= trial.sequenceId && trial.sequenceId <= maxSeqId
        ));
J-shang's avatar
J-shang committed
355
        const targetTrialIds = new Set(targetTrials.map(trial => trial.trialJobId));
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379

        const allMetrics = await this.dataStore.getMetricData();
        return allMetrics.filter(metric => targetTrialIds.has(metric.trialJobId));
    }

    public async getLatestMetricData(): Promise<MetricDataRecord[]> {
        // FIXME: this can take a long time
        const allMetrics: MetricDataRecord[] = await this.dataStore.getMetricData();
        const finals: MetricDataRecord[] = [];
        const latestIntermediates: Map<string, MetricDataRecord> = new Map<string, MetricDataRecord>();
        for (const metric of allMetrics) {
            if (metric.type !== 'PERIODICAL') {
                finals.push(metric);
            } else {
                const old: MetricDataRecord | undefined = latestIntermediates.get(metric.trialJobId);
                if (old === undefined || old.sequence <= metric.sequence) {
                    latestIntermediates.set(metric.trialJobId, metric);
                }
            }
        }
        return finals.concat(Array.from(latestIntermediates.values()));
        // FIXME: unit test
    }

380
381
382
383
    public async getTrialLog(trialJobId: string, logType: LogType): Promise<string> {
        return this.trainingService.getTrialLog(trialJobId, logType);
    }

Deshui Yu's avatar
Deshui Yu committed
384
385
386
387
388
389
390
391
    public getExperimentProfile(): Promise<ExperimentProfile> {
        // TO DO: using Promise.resolve()
        const deferred: Deferred<ExperimentProfile> = new Deferred<ExperimentProfile>();
        deferred.resolve(this.experimentProfile);

        return deferred.promise;
    }

392
393
394
395
    public getStatus(): NNIManagerStatus {
        return this.status;
    }

Deshui Yu's avatar
Deshui Yu committed
396
397
398
399
    public async listTrialJobs(status?: TrialJobStatus): Promise<TrialJobInfo[]> {
        return this.dataStore.listTrialJobs(status);
    }

400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
    private get maxDuration(): number {
        const value = this.experimentProfile.params.maxExperimentDuration;
        return (value === undefined ? Infinity : toSeconds(value));
    }

    private get maxTrialNum(): number {
        const value = this.experimentProfile.params.maxTrialNumber;
        return (value === undefined ? Infinity : value);
    }

    private async initTrainingService(config: ExperimentConfig): Promise<TrainingService> {
        this.config = config;
        const platform = Array.isArray(config.trainingService) ? 'hybrid' : config.trainingService.platform;

        if (['remote', 'pai', 'aml', 'hybrid'].includes(platform)) {
            const module_ = await import('../training_service/reusable/routerTrainingService');
            return new module_.RouterTrainingService(config);
        } else if (platform === 'local') {
            const module_ = await import('../training_service/local/localTrainingService');
            return new module_.LocalTrainingService(config);
        } else if (platform === 'kubeflow') {
            const module_ = await import('../training_service/kubernetes/kubeflow/kubeflowTrainingService');
            return new module_.KubeflowTrainingService();
        } else if (platform === 'frameworkcontroller') {
            const module_ = await import('../training_service/kubernetes/frameworkcontroller/frameworkcontrollerTrainingService');
            return new module_.FrameworkControllerTrainingService();
        } else if (platform === 'adl') {
            const module_ = await import('../training_service/kubernetes/adl/adlTrainingService');
            return new module_.AdlTrainingService();
        }

        throw new Error(`Unsupported training service platform "${platform}"`);
    }

434
435
    private setupTuner(command: string, cwd: string | undefined, mode: 'start' | 'resume', dataDirectory: string): void {
        if (this.dispatcher !== undefined) {
Deshui Yu's avatar
Deshui Yu committed
436
437
            return;
        }
goooxu's avatar
goooxu committed
438
        const stdio: StdioOptions = ['ignore', process.stdout, process.stderr, 'pipe', 'pipe'];
Deshui Yu's avatar
Deshui Yu committed
439
440
441
442
443
444
445
        let newCwd: string;
        if (cwd === undefined || cwd === '') {
            newCwd = getLogDir();
        } else {
            newCwd = cwd;
        }
        // TO DO: add CUDA_VISIBLE_DEVICES
446
        const includeIntermediateResultsEnv = !!(this.config.deprecated && this.config.deprecated.includeIntermediateResults);
447

chicm-ms's avatar
chicm-ms committed
448
        const nniEnv = {
chicm-ms's avatar
chicm-ms committed
449
            SDK_PROCESS: 'dispatcher',
Zejun Lin's avatar
Zejun Lin committed
450
451
            NNI_MODE: mode,
            NNI_CHECKPOINT_DIRECTORY: dataDirectory,
452
            NNI_LOG_DIRECTORY: getLogDir(),
453
            NNI_LOG_LEVEL: getLogLevel(),
454
            NNI_INCLUDE_INTERMEDIATE_RESULTS: includeIntermediateResultsEnv,
455
            CUDA_VISIBLE_DEVICES: toCudaVisibleDevices(this.experimentProfile.params.tunerGpuIndices)
Zejun Lin's avatar
Zejun Lin committed
456
        };
chicm-ms's avatar
chicm-ms committed
457
        const newEnv = Object.assign({}, process.env, nniEnv);
458
        const tunerProc: ChildProcess = getTunerProc(command, stdio, newCwd, newEnv);
459
460
        this.dispatcherPid = tunerProc.pid;
        this.dispatcher = createDispatcherInterface(tunerProc);
Deshui Yu's avatar
Deshui Yu committed
461
462
463
464
465

        return;
    }

    private updateTrialConcurrency(trialConcurrency: number): void {
QuanluZhang's avatar
QuanluZhang committed
466
467
        // we assume trialConcurrency >= 0, which is checked by restserver
        this.trialConcurrencyChange += (trialConcurrency - this.experimentProfile.params.trialConcurrency);
Deshui Yu's avatar
Deshui Yu committed
468
469
470
471
472
473
        this.experimentProfile.params.trialConcurrency = trialConcurrency;

        return;
    }

    private updateSearchSpace(searchSpace: string): void {
474
        if (this.dispatcher === undefined) {
Deshui Yu's avatar
Deshui Yu committed
475
476
            throw new Error('Error: tuner has not been setup');
        }
477
        this.dispatcher.sendCommand(UPDATE_SEARCH_SPACE, searchSpace);
Deshui Yu's avatar
Deshui Yu committed
478
479
480
481
482
483
        this.experimentProfile.params.searchSpace = searchSpace;

        return;
    }

    private async periodicallyUpdateExecDuration(): Promise<void> {
484
        let count: number = 1;
485
        while (!['ERROR', 'STOPPING', 'STOPPED'].includes(this.status.status)) {
486
            await delay(1000 * 1); // 1 seconds
487
            if (['RUNNING', 'NO_MORE_TRIAL', 'TUNER_NO_MORE_TRIAL'].includes(this.status.status)) {
488
489
490
491
492
493
                this.experimentProfile.execDuration += 1;
                if (count % 10 === 0) {
                    await this.storeExperimentProfile();
                }
            }
            count += 1;
Deshui Yu's avatar
Deshui Yu committed
494
495
496
        }
    }

chicm-ms's avatar
chicm-ms committed
497
498
499
500
501
502
    private async pingDispatcher(): Promise<void> {
        if (this.dispatcher === undefined) {
            throw new Error('Error: tuner has not been setup');
        }
        while (!['ERROR', 'STOPPING', 'STOPPED'].includes(this.status.status)) {
            this.dispatcher.sendCommand(PING);
chicm-ms's avatar
chicm-ms committed
503
            await delay(1000 * 5);
chicm-ms's avatar
chicm-ms committed
504
505
506
        }
    }

QuanluZhang's avatar
QuanluZhang committed
507
508
    private async requestTrialJobsStatus(): Promise<number> {
        let finishedTrialJobNum: number = 0;
QuanluZhang's avatar
QuanluZhang committed
509
510
511
        if (this.dispatcher === undefined) {
            throw new Error('Error: tuner has not been setup');
        }
QuanluZhang's avatar
QuanluZhang committed
512
513
514
515
        for (const trialJobId of Array.from(this.trialJobs.keys())) {
            const trialJobDetail: TrialJobDetail = await this.trainingService.getTrialJob(trialJobId);
            const oldTrialJobDetail: TrialJobDetail | undefined = this.trialJobs.get(trialJobId);
            if (oldTrialJobDetail !== undefined && oldTrialJobDetail.status !== trialJobDetail.status) {
chicm-ms's avatar
chicm-ms committed
516
                this.log.info(`Trial job ${trialJobDetail.id} status changed from ${oldTrialJobDetail.status} to ${trialJobDetail.status}`);
QuanluZhang's avatar
QuanluZhang committed
517
                this.trialJobs.set(trialJobId, Object.assign({}, trialJobDetail));
518
                await this.dataStore.storeTrialJobEvent(trialJobDetail.status, trialJobDetail.id, undefined, trialJobDetail);
QuanluZhang's avatar
QuanluZhang committed
519
            }
520
521
522
523
            const newTrialJobDetail: TrialJobDetail | undefined = this.trialJobs.get(trialJobId);
            if (newTrialJobDetail !== undefined) {
                newTrialJobDetail.message = trialJobDetail.message;
            }
QuanluZhang's avatar
QuanluZhang committed
524
            let hyperParams: string | undefined = undefined;
QuanluZhang's avatar
QuanluZhang committed
525
526
527
            switch (trialJobDetail.status) {
                case 'SUCCEEDED':
                case 'USER_CANCELED':
QuanluZhang's avatar
QuanluZhang committed
528
                case 'EARLY_STOPPED':
QuanluZhang's avatar
QuanluZhang committed
529
530
                    this.trialJobs.delete(trialJobId);
                    finishedTrialJobNum++;
531
                    hyperParams = trialJobDetail.form.hyperParameters.value;
QuanluZhang's avatar
QuanluZhang committed
532
                    this.dispatcher.sendCommand(TRIAL_END, JSON.stringify({
chicm-ms's avatar
chicm-ms committed
533
                        trial_job_id: trialJobDetail.id, // eslint-disable-line @typescript-eslint/camelcase
QuanluZhang's avatar
QuanluZhang committed
534
                        event: trialJobDetail.status,
chicm-ms's avatar
chicm-ms committed
535
                        hyper_params: hyperParams // eslint-disable-line @typescript-eslint/camelcase
goooxu's avatar
goooxu committed
536
                    }));
QuanluZhang's avatar
QuanluZhang committed
537
538
539
540
541
542
543
                    break;
                case 'FAILED':
                case 'SYS_CANCELED':
                    // In the current version, we do not retry
                    // TO DO: push this job to queue for retry
                    this.trialJobs.delete(trialJobId);
                    finishedTrialJobNum++;
544
                    hyperParams = trialJobDetail.form.hyperParameters.value;
QuanluZhang's avatar
QuanluZhang committed
545
                    this.dispatcher.sendCommand(TRIAL_END, JSON.stringify({
chicm-ms's avatar
chicm-ms committed
546
                        trial_job_id: trialJobDetail.id, // eslint-disable-line @typescript-eslint/camelcase
QuanluZhang's avatar
QuanluZhang committed
547
                        event: trialJobDetail.status,
chicm-ms's avatar
chicm-ms committed
548
                        hyper_params: hyperParams // eslint-disable-line @typescript-eslint/camelcase
goooxu's avatar
goooxu committed
549
                    }));
QuanluZhang's avatar
QuanluZhang committed
550
551
552
553
554
555
556
557
558
559
                    break;
                case 'WAITING':
                case 'RUNNING':
                case 'UNKNOWN':
                    // Do nothing
                    break;
                default:
                // TO DO: add warning in log
            }
        }
goooxu's avatar
goooxu committed
560

Gems Guo's avatar
Gems Guo committed
561
        return finishedTrialJobNum;
QuanluZhang's avatar
QuanluZhang committed
562
563
564
565
566
567
    }

    private async manageTrials(): Promise<void> {
        if (this.dispatcher === undefined) {
            throw new Error('Error: tuner has not been setup');
        }
QuanluZhang's avatar
QuanluZhang committed
568
        let allFinishedTrialJobNum: number = this.currSubmittedTrialNum;
QuanluZhang's avatar
QuanluZhang committed
569
        let waitSubmittedToFinish: number;
570
        while (!['ERROR', 'STOPPING', 'STOPPED'].includes(this.status.status)) {
QuanluZhang's avatar
QuanluZhang committed
571
572
573
574
575
576
577
            const finishedTrialJobNum: number = await this.requestTrialJobsStatus();
            allFinishedTrialJobNum += finishedTrialJobNum;

            // requestTrialNum is the number of trials that will be requested from tuner.
            // If trialConcurrency does not change, requestTrialNum equals finishedTrialJobNum.
            // If trialConcurrency changes, for example, trialConcurrency increases by 2 (trialConcurrencyChange=2), then
            // requestTrialNum equals 2 + finishedTrialJobNum and trialConcurrencyChange becomes 0.
578
            // If trialConcurrency changes, for example, trialConcurrency decreases by 4 (trialConcurrencyChange=-4) and
QuanluZhang's avatar
QuanluZhang committed
579
580
581
582
583
584
585
586
            // finishedTrialJobNum is 2, then requestTrialNum becomes -2. No trial will be requested from tuner,
            // and trialConcurrencyChange becomes -2.
            const requestTrialNum: number = this.trialConcurrencyChange + finishedTrialJobNum;
            if (requestTrialNum >= 0) {
                this.trialConcurrencyChange = 0;
            } else {
                this.trialConcurrencyChange = requestTrialNum;
            }
chicm-ms's avatar
chicm-ms committed
587

588
            this.requestTrialJobs(requestTrialNum);
chicm-ms's avatar
chicm-ms committed
589

QuanluZhang's avatar
QuanluZhang committed
590
            // check maxtrialnum and maxduration here
591
            // NO_MORE_TRIAL is more like a subset of RUNNING, because during RUNNING tuner
592
            // might tell nnimanager that this is no more trials. In NO_MORE_TRIAL state, the experiment is viewed
593
594
            // as still running. DONE could be transfered from RUNNING or NO_MORE_TRIAL.
            assert(this.status.status === 'RUNNING' ||
595
                this.status.status === 'DONE' ||
QuanluZhang's avatar
QuanluZhang committed
596
                this.status.status === 'NO_MORE_TRIAL' ||
597
                this.status.status === 'TUNER_NO_MORE_TRIAL', `Actual status: ${this.status.status}`);
598
599
            if (this.experimentProfile.execDuration > this.maxDuration ||
                this.currSubmittedTrialNum >= this.maxTrialNum) {
QuanluZhang's avatar
QuanluZhang committed
600
                if (this.status.status !== 'DONE') {
chicm-ms's avatar
chicm-ms committed
601
                    this.setStatus('NO_MORE_TRIAL');
QuanluZhang's avatar
QuanluZhang committed
602
603
604
605
                    waitSubmittedToFinish = this.currSubmittedTrialNum;

                    assert(allFinishedTrialJobNum <= waitSubmittedToFinish);
                    if (allFinishedTrialJobNum >= waitSubmittedToFinish) {
chicm-ms's avatar
chicm-ms committed
606
                        this.setStatus('DONE');
607
                        this.setEndtime();
QuanluZhang's avatar
QuanluZhang committed
608
609
610
611
                        await this.storeExperimentProfile();
                        // write this log for travis CI
                        this.log.info('Experiment done.');
                    }
QuanluZhang's avatar
QuanluZhang committed
612
613
614
                }
            } else {
                if (this.status.status === 'DONE') {
615
616
                    delete this.experimentProfile.endTime;
                    await this.storeExperimentProfile();
QuanluZhang's avatar
QuanluZhang committed
617
                }
QuanluZhang's avatar
QuanluZhang committed
618
                if (this.status.status !== 'TUNER_NO_MORE_TRIAL') {
chicm-ms's avatar
chicm-ms committed
619
                    this.setStatus('RUNNING');
620
                }
QuanluZhang's avatar
QuanluZhang committed
621
622
                for (let i: number = this.trialJobs.size; i < this.experimentProfile.params.trialConcurrency; i++) {
                    if (this.waitingTrials.length === 0 ||
623
                        this.currSubmittedTrialNum >= this.maxTrialNum) {
QuanluZhang's avatar
QuanluZhang committed
624
625
                        break;
                    }
626
                    const form = this.waitingTrials.shift() as TrialJobApplicationForm;
QuanluZhang's avatar
QuanluZhang committed
627
                    this.currSubmittedTrialNum++;
628
629
                    this.log.info(`submitTrialJob: form: ${JSON.stringify(form)}`);
                    const trialJobDetail: TrialJobDetail = await this.trainingService.submitTrialJob(form);
630
                    const Snapshot: TrialJobDetail = Object.assign({}, trialJobDetail);
631
                    await this.storeExperimentProfile();
632
                    this.trialJobs.set(trialJobDetail.id, Snapshot);
QuanluZhang's avatar
QuanluZhang committed
633
634
635
                    const trialJobDetailSnapshot: TrialJobDetail | undefined = this.trialJobs.get(trialJobDetail.id);
                    if (trialJobDetailSnapshot != undefined) {
                        await this.dataStore.storeTrialJobEvent(
636
                            trialJobDetailSnapshot.status, trialJobDetailSnapshot.id, form.hyperParameters.value, trialJobDetailSnapshot);
QuanluZhang's avatar
QuanluZhang committed
637
638
639
640
641
642
643
644
645
                    } else {
                        assert(false, `undefined trialJobDetail in trialJobs: ${trialJobDetail.id}`);
                    }
                }
            }
            await delay(1000 * 5); // 5 seconds
        }
    }

Deshui Yu's avatar
Deshui Yu committed
646
647
648
649
650
651
    private storeExperimentProfile(): Promise<void> {
        this.experimentProfile.revision += 1;

        return this.dataStore.storeExperimentProfile(this.experimentProfile);
    }

652
    private async run(): Promise<void> {
QuanluZhang's avatar
QuanluZhang committed
653
        assert(this.dispatcher !== undefined);
654
655
656
657
658
659
660

        this.addEventListeners();

        this.sendInitTunerCommands();

        await Promise.all([
            this.periodicallyUpdateExecDuration(),
chicm-ms's avatar
chicm-ms committed
661
            this.pingDispatcher().catch((err: Error) => {
chicm-ms's avatar
chicm-ms committed
662
                throw NNIError.FromError(err, 'Dispatcher error: ');
chicm-ms's avatar
chicm-ms committed
663
            }),
664
            this.trainingService.run().catch((err: Error) => {
chicm-ms's avatar
chicm-ms committed
665
                throw NNIError.FromError(err, 'Training service error: ');
666
            }),
QuanluZhang's avatar
QuanluZhang committed
667
            this.manageTrials().catch((err: Error) => {
chicm-ms's avatar
chicm-ms committed
668
                throw NNIError.FromError(err, 'Job management error: ');
669
            })]);
670
671
    }

QuanluZhang's avatar
QuanluZhang committed
672
    private addEventListeners(): void {
chicm-ms's avatar
chicm-ms committed
673
        this.log.info('Add event listeners');
674
        // TO DO: cannot run this method more than once in one NNIManager instance
QuanluZhang's avatar
QuanluZhang committed
675
        if (this.dispatcher === undefined) {
676
677
            throw new Error('Error: tuner or job maintainer have not been setup');
        }
678
        this.trainingService.addTrialJobMetricListener(this.trialJobMetricListener);
679
680
681

        this.dispatcher.onCommand((commandType: string, content: string) => {
            this.onTunerCommand(commandType, content).catch((err: Error) => {
chicm-ms's avatar
chicm-ms committed
682
                this.criticalError(NNIError.FromError(err, 'Tuner command event error: '));
683
684
            });
        });
685
686
687
688
        this.dispatcher.onError((error: Error) => {
            this.log.error(`Dispatcher error: ${error.message}`);
            this.criticalError(new Error('Dispatcher stream error, tuner may have crashed.'));
        });
689
690
691
692
    }

    private sendInitTunerCommands(): void {
        if (this.dispatcher === undefined) {
693
            throw new Error('Dispatcher error: tuner has not been setup');
694
        }
chicm-ms's avatar
chicm-ms committed
695
696
        this.log.debug(`Send tuner command: INITIALIZE: ${this.experimentProfile.params.searchSpace}`);
        // Tuner need to be initialized with search space before generating any hyper parameters
697
        this.dispatcher.sendCommand(INITIALIZE, JSON.stringify(this.experimentProfile.params.searchSpace));
698
699
700
    }

    private async onTrialJobMetrics(metric: TrialJobMetric): Promise<void> {
701
        this.log.debug(`NNIManager received trial job metrics: ${JSON.stringify(metric)}`);
702
703
704
705
706
707
708
709
        if (this.trialJobs.has(metric.id)){
            await this.dataStore.storeMetricData(metric.id, metric.data);
            if (this.dispatcher === undefined) {
                throw new Error('Error: tuner has not been setup');
            }
            this.dispatcher.sendCommand(REPORT_METRIC_DATA, metric.data);
        } else {
            this.log.warning(`NNIManager received non-existent trial job metrics: ${metric}`);
710
711
712
        }
    }

chicm-ms's avatar
chicm-ms committed
713
714
715
716
717
718
719
    private requestTrialJobs(jobNum: number): void {
        if (jobNum < 1) {
            return;
        }
        if (this.dispatcher === undefined) {
            throw new Error('Dispatcher error: tuner has not been setup');
        }
720
        if (this.config.deprecated && this.config.deprecated.multiThread) {
chicm-ms's avatar
chicm-ms committed
721
722
723
724
725
726
727
728
729
730
731
            // Send multiple requests to ensure multiple hyper parameters are generated in non-blocking way.
            // For a single REQUEST_TRIAL_JOBS request, hyper parameters are generated one by one
            // sequentially.
            for (let i: number = 0; i < jobNum; i++) {
                this.dispatcher.sendCommand(REQUEST_TRIAL_JOBS, '1');
            }
        } else {
            this.dispatcher.sendCommand(REQUEST_TRIAL_JOBS, String(jobNum));
        }
    }

732
    private async onTunerCommand(commandType: string, content: string): Promise<void> {
horizon365's avatar
horizon365 committed
733
        this.log.info(`NNIManager received command from dispatcher: ${commandType}, ${content}`);
734
        switch (commandType) {
chicm-ms's avatar
chicm-ms committed
735
            case INITIALIZED: {
chicm-ms's avatar
chicm-ms committed
736
                // Tuner is intialized, search space is set, request tuner to generate hyper parameters
737
738
739
740
741
742
                if (this.trialDataForTuner.length > 0) {
                    if (this.dispatcher === undefined) {
                        throw new Error('Dispatcher error: tuner has not been setup');
                    }
                    this.dispatcher.sendCommand(IMPORT_DATA, this.trialDataForTuner);
                }
chicm-ms's avatar
chicm-ms committed
743
744
                this.requestTrialJobs(this.experimentProfile.params.trialConcurrency);
                break;
chicm-ms's avatar
chicm-ms committed
745
746
            }
            case NEW_TRIAL_JOB: {
QuanluZhang's avatar
QuanluZhang committed
747
                if (this.status.status === 'TUNER_NO_MORE_TRIAL') {
748
                    this.log.warning('It is not supposed to receive more trials after NO_MORE_TRIAL is set');
chicm-ms's avatar
chicm-ms committed
749
                    this.setStatus('RUNNING');
750
                }
751
752
753
754
755
756
757
758
                const form: TrialJobApplicationForm = {
                    sequenceId: this.experimentProfile.nextSequenceId++,
                    hyperParameters: {
                        value: content,
                        index: 0
                    }
                };
                this.waitingTrials.push(form);
759
                break;
chicm-ms's avatar
chicm-ms committed
760
761
            }
            case SEND_TRIAL_JOB_PARAMETER: {
chicm-ms's avatar
chicm-ms committed
762
763
764
765
766
                const tunerCommand: any = JSON.parse(content);
                assert(tunerCommand.parameter_index >= 0);
                assert(tunerCommand.trial_job_id !== undefined);

                const trialJobForm: TrialJobApplicationForm = {
767
                    sequenceId: -1,  // FIXME: multi-phase tuner should use sequence ID instead of trial job ID
chicm-ms's avatar
chicm-ms committed
768
769
770
771
772
                    hyperParameters: {
                        value: content,
                        index: tunerCommand.parameter_index
                    }
                };
chicm-ms's avatar
chicm-ms committed
773
                this.log.info(`updateTrialJob: job id: ${tunerCommand.trial_job_id}, form: ${JSON.stringify(trialJobForm)}`);
chicm-ms's avatar
chicm-ms committed
774
                await this.trainingService.updateTrialJob(tunerCommand.trial_job_id, trialJobForm);
775
776
777
778
779
                if (tunerCommand['parameters'] !== null) {
                    // parameters field is set as empty string if no more hyper parameter can be generated by tuner.
                    await this.dataStore.storeTrialJobEvent(
                        'ADD_HYPERPARAMETER', tunerCommand.trial_job_id, content, undefined);
                }
chicm-ms's avatar
chicm-ms committed
780
                break;
chicm-ms's avatar
chicm-ms committed
781
782
            }
            case NO_MORE_TRIAL_JOBS: {
783
784
785
                if (!['ERROR', 'STOPPING', 'STOPPED'].includes(this.status.status)) {
                    this.setStatus('TUNER_NO_MORE_TRIAL');
                }
786
                break;
chicm-ms's avatar
chicm-ms committed
787
788
            }
            case KILL_TRIAL_JOB: {
chicm-ms's avatar
chicm-ms committed
789
                this.log.info(`cancelTrialJob: ${JSON.parse(content)}`);
QuanluZhang's avatar
QuanluZhang committed
790
                await this.trainingService.cancelTrialJob(JSON.parse(content), true);
791
                break;
chicm-ms's avatar
chicm-ms committed
792
            }
793
794
795
            default:
                throw new Error('Error: unsupported command type from tuner');
        }
Deshui Yu's avatar
Deshui Yu committed
796
797
    }

798
799
800
801
802
803
804
805
806
807
    private criticalError(err: Error): void {
        this.logError(err);
        console.error(err);
    }

    private logError(err: Error): void {
        if (err.stack !== undefined) {
            this.log.error(err.stack);
        }
        this.status.errors.push(err.message);
808
        this.setEndtime();
chicm-ms's avatar
chicm-ms committed
809
810
811
812
813
814
815
        this.setStatus('ERROR');
    }

    private setStatus(status: ExperimentStatus): void {
        if (status !== this.status.status) {
            this.log.info(`Change NNIManager status from: ${this.status.status} to: ${status}`);
            this.status.status = status;
816
            this.experimentManager.setExperimentInfo(this.experimentProfile.id, 'status', this.status.status);
chicm-ms's avatar
chicm-ms committed
817
        }
818
819
    }

820
821
822
823
824
    private setEndtime(): void {
        this.experimentProfile.endTime = Date.now();
        this.experimentManager.setExperimentInfo(this.experimentProfile.id, 'endTime', this.experimentProfile.endTime);
    }

QuanluZhang's avatar
QuanluZhang committed
825
826
827
828
    private async createCheckpointDir(): Promise<string> {
        // TODO: test
        const chkpDir: string = getCheckpointDir();
        await mkDirP(chkpDir);
829
        return chkpDir;
QuanluZhang's avatar
QuanluZhang committed
830
    }
J-shang's avatar
J-shang committed
831
832
833
834
835
836
837
838

    public async getTrialOutputLocalPath(trialJobId: string): Promise<string> {
        return this.trainingService.getTrialOutputLocalPath(trialJobId);
    }

    public async fetchTrialOutput(trialJobId: string, subpath: string): Promise<void> {
        return this.trainingService.fetchTrialOutput(trialJobId, subpath);
    }
Deshui Yu's avatar
Deshui Yu committed
839
840
841
}

export { NNIManager };