nnimanager.ts 19.5 KB
Newer Older
Deshui Yu's avatar
Deshui Yu committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
/**
 * Copyright (c) Microsoft Corporation
 * All rights reserved.
 *
 * MIT License
 *
 * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
 * documentation files (the "Software"), to deal in the Software without restriction, including without limitation
 * the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and
 * to permit persons to whom the Software is furnished to do so, subject to the following conditions:
 * The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
 *
 * THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING
 * BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
 * DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
 */

'use strict';

import * as assert from 'assert';
import * as cpp from 'child-process-promise';
import { ChildProcess, spawn } from 'child_process';
import { Deferred } from 'ts-deferred';
import * as component from '../common/component';
import { DataStore, MetricDataRecord, MetricType, TrialJobInfo } from '../common/datastore';
import { getExperimentId } from '../common/experimentStartupInfo';
import { getLogger, Logger } from '../common/log';
import {
    ExperimentParams, ExperimentProfile, Manager,
    ProfileUpdateType, TrialJobStatistics
} from '../common/manager';
import {
    TrainingService, TrialJobApplicationForm, TrialJobDetail, TrialJobMetric, TrialJobStatus
} from '../common/trainingService';
37
import { delay , getLogDir, getMsgDispatcherCommand} from '../common/utils';
Deshui Yu's avatar
Deshui Yu committed
38
39
40
41
import {
    ADD_CUSTOMIZED_TRIAL_JOB, KILL_TRIAL_JOB, NEW_TRIAL_JOB, NO_MORE_TRIAL_JOBS, REPORT_METRIC_DATA,
    REQUEST_TRIAL_JOBS, TERMINATE, TRIAL_END, UPDATE_SEARCH_SPACE
} from './commands';
42
import { createDispatcherInterface, IpcInterface } from './ipcInterface';
Deshui Yu's avatar
Deshui Yu committed
43
44
45
46
47
48
49
import { TrialJobMaintainerEvent, TrialJobs } from './trialJobs';

/**
 * NNIManager
 */
class NNIManager implements Manager {
    private trainingService: TrainingService;
50
    private dispatcher: IpcInterface | undefined;
Deshui Yu's avatar
Deshui Yu committed
51
52
53
54
55
56
57
    private trialJobsMaintainer: TrialJobs | undefined;
    private currSubmittedTrialNum: number; // need to be recovered
    private trialConcurrencyReduction: number;
    private customizedTrials: string[]; // need to be recovered
    private log: Logger;
    private dataStore: DataStore;
    private experimentProfile: ExperimentProfile;
58
    private dispatcherPid: number;
Deshui Yu's avatar
Deshui Yu committed
59
60
61
62
63
64
65
66

    constructor() {
        this.currSubmittedTrialNum = 0;
        this.trialConcurrencyReduction = 0;
        this.customizedTrials = [];
        const experimentId: string = getExperimentId();
        this.trainingService = component.get(TrainingService);
        assert(this.trainingService);
67
        this.dispatcherPid = 0;
Deshui Yu's avatar
Deshui Yu committed
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82

        this.log = getLogger();
        this.dataStore = component.get(DataStore);
        this.experimentProfile = {
            id: experimentId,
            revision: 0,
            execDuration: 0,
            params: {
                authorName: '',
                experimentName: '',
                trialConcurrency: 0,
                maxExecDuration: 0, // unit: second
                maxTrialNum: 0, // maxTrialNum includes all the submitted trial jobs
                searchSpace: '',
                tuner: {
83
84
85
                    className: '',
                    classArgs: {},
                    checkpointDir: ''
Deshui Yu's avatar
Deshui Yu committed
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
                }
            }
        };
    }

    public updateExperimentProfile(experimentProfile: ExperimentProfile, updateType: ProfileUpdateType): Promise<void> {
        // TO DO: remove this line, and let rest server do data type validation
        experimentProfile.startTime = new Date(<string><any>experimentProfile.startTime);
        switch (updateType) {
            case 'TRIAL_CONCURRENCY':
                this.updateTrialConcurrency(experimentProfile.params.trialConcurrency);
                break;
            case 'MAX_EXEC_DURATION':
                this.updateMaxExecDuration(experimentProfile.params.maxExecDuration);
                break;
            case 'SEARCH_SPACE':
                this.updateSearchSpace(experimentProfile.params.searchSpace);
                break;
            default:
                throw new Error('Error: unrecognized updateType');
        }

        return this.storeExperimentProfile();
    }

    public addCustomizedTrialJob(hyperParams: string): Promise<void> {
        if (this.currSubmittedTrialNum >= this.experimentProfile.params.maxTrialNum) {
            return Promise.reject(
                new Error('reach maxTrialNum')
            );
        }
        this.customizedTrials.push(hyperParams);

        // trial id has not been generated yet, thus use '' instead
        return this.dataStore.storeTrialJobEvent('ADD_CUSTOMIZED', '', hyperParams);
    }

    public async cancelTrialJobByUser(trialJobId: string): Promise<void> {
        await this.trainingService.cancelTrialJob(trialJobId);
        await this.dataStore.storeTrialJobEvent('USER_TO_CANCEL', trialJobId, '');
    }

    public async startExperiment(expParams: ExperimentParams): Promise<string> {
        this.log.debug(`Starting experiment: ${this.experimentProfile.id}`);
        this.experimentProfile.params = expParams;
        await this.storeExperimentProfile();
        this.log.debug('Setup tuner...');
133
134
135

        const dispatcherCommand: string = getMsgDispatcherCommand(expParams.tuner, expParams.assessor);
        console.log(`dispatcher command: ${dispatcherCommand}`);
Deshui Yu's avatar
Deshui Yu committed
136
        this.setupTuner(
137
138
139
            //expParams.tuner.tunerCommand,
            dispatcherCommand,
            undefined,
Deshui Yu's avatar
Deshui Yu committed
140
            'start',
141
            expParams.tuner.checkpointDir);
Deshui Yu's avatar
Deshui Yu committed
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156

        this.experimentProfile.startTime = new Date();
        await this.storeExperimentProfile();
        this.run().catch(err => {
            this.log.error(err.stack);
        });
        return this.experimentProfile.id;
    }

    public async resumeExperiment(): Promise<void> {
        //Fetch back the experiment profile
        const experimentId: string = getExperimentId();
        this.experimentProfile = await this.dataStore.getExperimentProfile(experimentId);
        const expParams: ExperimentParams = this.experimentProfile.params;

157
158
        const dispatcherCommand: string = getMsgDispatcherCommand(expParams.tuner, expParams.assessor);
        console.log(`dispatcher command: ${dispatcherCommand}`);
Deshui Yu's avatar
Deshui Yu committed
159
        this.setupTuner(
160
161
            dispatcherCommand,
            undefined,
Deshui Yu's avatar
Deshui Yu committed
162
            'resume',
163
            expParams.tuner.checkpointDir);
Deshui Yu's avatar
Deshui Yu committed
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189

        const allTrialJobs: TrialJobInfo[] = await this.dataStore.listTrialJobs();

        // Resume currSubmittedTrialNum
        this.currSubmittedTrialNum = allTrialJobs.length;

        // Check the final status for WAITING and RUNNING jobs
        await Promise.all(allTrialJobs
            .filter((job: TrialJobInfo) => job.status === 'WAITING' || job.status === 'RUNNING')
            .map((job: TrialJobInfo) => this.dataStore.storeTrialJobEvent('FAILED', job.id)));

        // TO DO: update database record for resume event
        this.run().catch(console.error);
    }

    public getTrialJob(trialJobId: string): Promise<TrialJobDetail> {
        return Promise.resolve(
            this.trainingService.getTrialJob(trialJobId)
        );
    }

    public async setClusterMetadata(key: string, value: string): Promise<void> {
        let timeoutId: NodeJS.Timer;
        // TO DO: move timeout value to constants file
        const delay1: Promise<{}> = new Promise((resolve: Function, reject: Function): void => {
            timeoutId = setTimeout(
190
                () => { reject(new Error('TrainingService setClusterMetadata timeout. Please check your config file.')); },
Deshui Yu's avatar
Deshui Yu committed
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
                10000);
        });
        await Promise.race([delay1, this.trainingService.setClusterMetadata(key, value)]).finally(() => {
            clearTimeout(timeoutId);
        });
    }

    public getClusterMetadata(key: string): Promise<string> {
        return Promise.resolve(
            this.trainingService.getClusterMetadata(key)
        );
    }

    public async getTrialJobStatistics(): Promise<TrialJobStatistics[]> {
        return this.dataStore.getTrialJobStatistics();
    }

    public stopExperiment(): Promise<void> {
        if (this.trialJobsMaintainer !== undefined) {
            this.trialJobsMaintainer.setStopLoop();

            return Promise.resolve();
        } else {
            return Promise.reject(new Error('Error: undefined trialJobsMaintainer'));
        }
    }

    public async getMetricData(trialJobId: string, metricType: MetricType): Promise<MetricDataRecord[]> {
        return this.dataStore.getMetricData(trialJobId, metricType);
    }

    public getExperimentProfile(): Promise<ExperimentProfile> {
        // TO DO: using Promise.resolve()
        const deferred: Deferred<ExperimentProfile> = new Deferred<ExperimentProfile>();
        deferred.resolve(this.experimentProfile);

        return deferred.promise;
    }

    public async listTrialJobs(status?: TrialJobStatus): Promise<TrialJobInfo[]> {
        return this.dataStore.listTrialJobs(status);
    }

234
235
    private setupTuner(command: string, cwd: string | undefined, mode: 'start' | 'resume', dataDirectory: string): void {
        if (this.dispatcher !== undefined) {
Deshui Yu's avatar
Deshui Yu committed
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
            return;
        }
        const stdio: (string | NodeJS.WriteStream)[] = ['ignore', process.stdout, process.stderr, 'pipe', 'pipe'];
        let newCwd: string;
        if (cwd === undefined || cwd === '') {
            newCwd = getLogDir();
        } else {
            newCwd = cwd;
        }
        // TO DO: add CUDA_VISIBLE_DEVICES
        const tunerProc: ChildProcess = spawn(command, [], {
            stdio,
            cwd: newCwd,
            env: {
                NNI_MODE: mode,
                NNI_CHECKPOINT_DIRECTORY: dataDirectory,
                NNI_LOG_DIRECTORY: getLogDir()
            },
            shell: true
        });
256
257
        this.dispatcherPid = tunerProc.pid;
        this.dispatcher = createDispatcherInterface(tunerProc);
Deshui Yu's avatar
Deshui Yu committed
258
259
260
261
262
263
264

        return;
    }

    private updateTrialConcurrency(trialConcurrency: number): void {
        // TO DO: this method can only be called after startExperiment/resumeExperiment
        if (trialConcurrency > this.experimentProfile.params.trialConcurrency) {
265
            if (this.dispatcher === undefined) {
Deshui Yu's avatar
Deshui Yu committed
266
267
                throw new Error('Error: tuner has to be initialized');
            }
268
            this.dispatcher.sendCommand(
Deshui Yu's avatar
Deshui Yu committed
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
                REQUEST_TRIAL_JOBS,
                String(trialConcurrency - this.experimentProfile.params.trialConcurrency)
            );
        } else {
            // we assume trialConcurrency >= 0, which is checked by restserver
            this.trialConcurrencyReduction += (this.experimentProfile.params.trialConcurrency - trialConcurrency);
        }
        this.experimentProfile.params.trialConcurrency = trialConcurrency;

        return;
    }

    private updateMaxExecDuration(duration: number): void {
        if (this.trialJobsMaintainer !== undefined) {
            this.trialJobsMaintainer.updateMaxExecDuration(duration);
        }
        this.experimentProfile.params.maxExecDuration = duration;

        return;
    }

    private updateSearchSpace(searchSpace: string): void {
291
        if (this.dispatcher === undefined) {
Deshui Yu's avatar
Deshui Yu committed
292
293
            throw new Error('Error: tuner has not been setup');
        }
294
        this.dispatcher.sendCommand(UPDATE_SEARCH_SPACE, searchSpace);
Deshui Yu's avatar
Deshui Yu committed
295
296
297
298
299
300
        this.experimentProfile.params.searchSpace = searchSpace;

        return;
    }

    private async experimentDoneCleanUp(): Promise<void> {
301
        if (this.dispatcher === undefined) {
Deshui Yu's avatar
Deshui Yu committed
302
303
            throw new Error('Error: tuner has not been setup');
        }
304
        this.dispatcher.sendCommand(TERMINATE);
Deshui Yu's avatar
Deshui Yu committed
305
306
307
        let tunerAlive: boolean = true;
        // gracefully terminate tuner and assessor here, wait at most 30 seconds.
        for (let i: number = 0; i < 30; i++) {
308
            if (!tunerAlive) { break; }
Deshui Yu's avatar
Deshui Yu committed
309
            try {
310
                await cpp.exec(`kill -0 ${this.dispatcherPid}`);
Deshui Yu's avatar
Deshui Yu committed
311
312
313
314
            } catch (error) { tunerAlive = false; }
            await delay(1000);
        }
        try {
315
            await cpp.exec(`kill ${this.dispatcherPid}`);
Deshui Yu's avatar
Deshui Yu committed
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
        } catch (error) {
            // this.tunerPid does not exist, do nothing here
        }
        const trialJobList: TrialJobDetail[] = await this.trainingService.listTrialJobs();
        // TO DO: to promise all
        for (const trialJob of trialJobList) {
            if (trialJob.status === 'RUNNING' ||
                trialJob.status === 'WAITING') {
                try {
                    await this.trainingService.cancelTrialJob(trialJob.id);
                } catch (error) {
                    // pid does not exist, do nothing here
                }
            }
        }
        await this.trainingService.cleanUp();
        this.experimentProfile.endTime = new Date();
        await this.storeExperimentProfile();
    }

    private async periodicallyUpdateExecDuration(): Promise<void> {
        const startTime: Date = new Date();
        const execDuration: number = this.experimentProfile.execDuration;
        for (; ;) {
            await delay(1000 * 60 * 10); // 10 minutes
            this.experimentProfile.execDuration = execDuration + (Date.now() - startTime.getTime()) / 1000;
            await this.storeExperimentProfile();
        }
    }

    private storeExperimentProfile(): Promise<void> {
        this.experimentProfile.revision += 1;

        return this.dataStore.storeExperimentProfile(this.experimentProfile);
    }

352
    // tslint:disable-next-line:max-func-body-length
Deshui Yu's avatar
Deshui Yu committed
353
354
    private runInternal(): Promise<void> {
        // TO DO: cannot run this method more than once in one NNIManager instance
355
        if (this.dispatcher === undefined) {
Deshui Yu's avatar
Deshui Yu committed
356
357
358
359
            throw new Error('Error: tuner has not been setup');
        }
        this.trainingService.addTrialJobMetricListener(async (metric: TrialJobMetric) => {
            await this.dataStore.storeMetricData(metric.id, metric.data);
360
            if (this.dispatcher === undefined) {
Deshui Yu's avatar
Deshui Yu committed
361
362
                throw new Error('Error: tuner has not been setup');
            }
363
            this.dispatcher.sendCommand(REPORT_METRIC_DATA, metric.data);
Deshui Yu's avatar
Deshui Yu committed
364
365
366
367
368
369
370
371
372
373
374
375
        });

        this.trialJobsMaintainer = new TrialJobs(
            this.trainingService,
            this.experimentProfile.execDuration,
            this.experimentProfile.params.maxExecDuration);
        this.trialJobsMaintainer.on(async (event: TrialJobMaintainerEvent, trialJobDetail: TrialJobDetail) => {
            if (trialJobDetail !== undefined) {
                this.log.debug(`Job event: ${event}, id: ${trialJobDetail.id}`);
            } else {
                this.log.debug(`Job event: ${event}`);
            }
376
            if (this.dispatcher === undefined) {
Deshui Yu's avatar
Deshui Yu committed
377
378
379
380
381
382
383
384
385
386
387
388
389
                throw new Error('Error: tuner has not been setup');
            }
            switch (event) {
                case 'SUCCEEDED':
                case 'FAILED':
                case 'USER_CANCELED':
                case 'SYS_CANCELED':
                    if (this.trialConcurrencyReduction > 0) {
                        this.trialConcurrencyReduction--;
                    } else {
                        if (this.currSubmittedTrialNum < this.experimentProfile.params.maxTrialNum) {
                            if (this.customizedTrials.length > 0) {
                                const hyperParams: string | undefined = this.customizedTrials.shift();
390
                                this.dispatcher.sendCommand(ADD_CUSTOMIZED_TRIAL_JOB, hyperParams);
Deshui Yu's avatar
Deshui Yu committed
391
                            } else {
392
                                this.dispatcher.sendCommand(REQUEST_TRIAL_JOBS, '1');
Deshui Yu's avatar
Deshui Yu committed
393
394
395
                            }
                        }
                    }
396
                    this.dispatcher.sendCommand(TRIAL_END, JSON.stringify({trial_job_id: trialJobDetail.id, event: event}));
Deshui Yu's avatar
Deshui Yu committed
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
                    await this.dataStore.storeTrialJobEvent(event, trialJobDetail.id, undefined, trialJobDetail.url);
                    break;
                case 'RUNNING':
                    await this.dataStore.storeTrialJobEvent(event, trialJobDetail.id, undefined, trialJobDetail.url);
                    break;
                case 'EXPERIMENT_DONE':
                    this.log.info('Experiment done, cleaning up...');
                    await this.experimentDoneCleanUp();
                    this.log.info('Experiment done.');
                    break;
                default:
                    throw new Error('Error: unrecognized event from trialJobsMaintainer');
            }
        });

        // TO DO: we should send INITIALIZE command to tuner if user's tuner needs to run init method in tuner
413
414
        this.log.debug(`Send tuner command: update search space: ${this.experimentProfile.params.searchSpace}`);
        this.dispatcher.sendCommand(UPDATE_SEARCH_SPACE, this.experimentProfile.params.searchSpace);
Deshui Yu's avatar
Deshui Yu committed
415
416
417
418
        if (this.trialConcurrencyReduction !== 0) {
            return Promise.reject(new Error('Error: cannot modify trialConcurrency before startExperiment'));
        }
        this.log.debug(`Send tuner command: ${this.experimentProfile.params.trialConcurrency}`)
419
420
        this.dispatcher.sendCommand(REQUEST_TRIAL_JOBS, String(this.experimentProfile.params.trialConcurrency));
        this.dispatcher.onCommand(async (commandType: string, content: string) => {
Deshui Yu's avatar
Deshui Yu committed
421
422
423
424
425
426
427
428
429
430
431
432
433
434
            this.log.info(`Command from tuner: ${commandType}, ${content}`);
            if (this.trialJobsMaintainer === undefined) {
                throw new Error('Error: trialJobsMaintainer not initialized');
            }
            switch (commandType) {
                case NEW_TRIAL_JOB:
                    if (this.currSubmittedTrialNum < this.experimentProfile.params.maxTrialNum) {
                        this.currSubmittedTrialNum++;
                        const trialJobAppForm: TrialJobApplicationForm = {
                            jobType: 'TRIAL',
                            hyperParameters: content
                        };
                        const trialJobDetail: TrialJobDetail = await this.trainingService.submitTrialJob(trialJobAppForm);
                        this.trialJobsMaintainer.setTrialJob(trialJobDetail.id, Object.assign({}, trialJobDetail));
435
                        assert(trialJobDetail.status === 'WAITING');
Deshui Yu's avatar
Deshui Yu committed
436
437
438
439
440
441
442
443
444
                        await this.dataStore.storeTrialJobEvent(trialJobDetail.status, trialJobDetail.id, content, trialJobDetail.url);
                        if (this.currSubmittedTrialNum === this.experimentProfile.params.maxTrialNum) {
                            this.trialJobsMaintainer.setNoMoreTrials();
                        }
                    }
                    break;
                case NO_MORE_TRIAL_JOBS:
                    this.trialJobsMaintainer.setNoMoreTrials();
                    break;
445
446
447
                case KILL_TRIAL_JOB:
                    await this.trainingService.cancelTrialJob(JSON.parse(content));
                    break;
Deshui Yu's avatar
Deshui Yu committed
448
                default:
449
                    throw new Error(`Error: unsupported command type: [${commandType}]`);
Deshui Yu's avatar
Deshui Yu committed
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
            }
        });

        return this.trialJobsMaintainer.run();
    }

    private async run(): Promise<void> {
        await Promise.all([
            this.periodicallyUpdateExecDuration(),
            this.trainingService.run(),
            this.runInternal()]);
    }
}

export { NNIManager };