nnimanager.ts 31.8 KB
Newer Older
Deshui Yu's avatar
Deshui Yu committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
 * Copyright (c) Microsoft Corporation
 * All rights reserved.
 *
 * MIT License
 *
 * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
 * documentation files (the "Software"), to deal in the Software without restriction, including without limitation
 * the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and
 * to permit persons to whom the Software is furnished to do so, subject to the following conditions:
 * The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
 *
 * THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING
 * BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
 * DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
 */

'use strict';

import * as assert from 'assert';
import * as cpp from 'child-process-promise';
goooxu's avatar
goooxu committed
24
import { ChildProcess, spawn, StdioOptions } from 'child_process';
Deshui Yu's avatar
Deshui Yu committed
25
26
27
import { Deferred } from 'ts-deferred';
import * as component from '../common/component';
import { DataStore, MetricDataRecord, MetricType, TrialJobInfo } from '../common/datastore';
28
import { NNIError } from '../common/errors';
29
import { getExperimentId, setInitTrialSequenceId } from '../common/experimentStartupInfo';
Deshui Yu's avatar
Deshui Yu committed
30
31
import { getLogger, Logger } from '../common/log';
import {
chicm-ms's avatar
chicm-ms committed
32
    ExperimentParams, ExperimentProfile, Manager, ExperimentStatus,
33
    NNIManagerStatus, ProfileUpdateType, TrialJobStatistics
Deshui Yu's avatar
Deshui Yu committed
34
35
36
37
} from '../common/manager';
import {
    TrainingService, TrialJobApplicationForm, TrialJobDetail, TrialJobMetric, TrialJobStatus
} from '../common/trainingService';
38
import { delay, getCheckpointDir, getExperimentRootDir, getLogDir, getMsgDispatcherCommand, mkDirP, getLogLevel } from '../common/utils';
Deshui Yu's avatar
Deshui Yu committed
39
import {
chicm-ms's avatar
chicm-ms committed
40
    ADD_CUSTOMIZED_TRIAL_JOB, INITIALIZE, INITIALIZED, KILL_TRIAL_JOB, NEW_TRIAL_JOB, NO_MORE_TRIAL_JOBS, PING,
chicm-ms's avatar
chicm-ms committed
41
    REPORT_METRIC_DATA, REQUEST_TRIAL_JOBS, SEND_TRIAL_JOB_PARAMETER, TERMINATE, TRIAL_END, UPDATE_SEARCH_SPACE
Deshui Yu's avatar
Deshui Yu committed
42
} from './commands';
43
import { createDispatcherInterface, IpcInterface } from './ipcInterface';
Deshui Yu's avatar
Deshui Yu committed
44
45

/**
chicm-ms's avatar
chicm-ms committed
46
 * NNIManager which implements Manager interface
Deshui Yu's avatar
Deshui Yu committed
47
48
49
 */
class NNIManager implements Manager {
    private trainingService: TrainingService;
50
    private dispatcher: IpcInterface | undefined;
51
    private currSubmittedTrialNum: number;  // need to be recovered
QuanluZhang's avatar
QuanluZhang committed
52
    private trialConcurrencyChange: number; // >0: increase, <0: decrease
Deshui Yu's avatar
Deshui Yu committed
53
54
55
56
    private customizedTrials: string[]; // need to be recovered
    private log: Logger;
    private dataStore: DataStore;
    private experimentProfile: ExperimentProfile;
57
    private dispatcherPid: number;
58
    private status: NNIManagerStatus;
QuanluZhang's avatar
QuanluZhang committed
59
60
    private waitingTrials: string[];
    private trialJobs: Map<string, TrialJobDetail>;
Deshui Yu's avatar
Deshui Yu committed
61
62
63

    constructor() {
        this.currSubmittedTrialNum = 0;
QuanluZhang's avatar
QuanluZhang committed
64
        this.trialConcurrencyChange = 0;
Deshui Yu's avatar
Deshui Yu committed
65
66
67
        this.customizedTrials = [];
        this.trainingService = component.get(TrainingService);
        assert(this.trainingService);
68
        this.dispatcherPid = 0;
QuanluZhang's avatar
QuanluZhang committed
69
70
        this.waitingTrials = [];
        this.trialJobs = new Map<string, TrialJobDetail>();
Deshui Yu's avatar
Deshui Yu committed
71
72
73

        this.log = getLogger();
        this.dataStore = component.get(DataStore);
74
75
76
77
        this.experimentProfile = this.createEmptyExperimentProfile();
        this.status = {
            status: 'INITIALIZED',
            errors: []
Deshui Yu's avatar
Deshui Yu committed
78
79
80
81
82
83
84
85
86
87
88
89
90
91
        };
    }

    public updateExperimentProfile(experimentProfile: ExperimentProfile, updateType: ProfileUpdateType): Promise<void> {
        switch (updateType) {
            case 'TRIAL_CONCURRENCY':
                this.updateTrialConcurrency(experimentProfile.params.trialConcurrency);
                break;
            case 'MAX_EXEC_DURATION':
                this.updateMaxExecDuration(experimentProfile.params.maxExecDuration);
                break;
            case 'SEARCH_SPACE':
                this.updateSearchSpace(experimentProfile.params.searchSpace);
                break;
QuanluZhang's avatar
QuanluZhang committed
92
93
94
            case 'MAX_TRIAL_NUM':
                this.updateMaxTrialNum(experimentProfile.params.maxTrialNum);
                break;
Deshui Yu's avatar
Deshui Yu committed
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
            default:
                throw new Error('Error: unrecognized updateType');
        }

        return this.storeExperimentProfile();
    }

    public addCustomizedTrialJob(hyperParams: string): Promise<void> {
        if (this.currSubmittedTrialNum >= this.experimentProfile.params.maxTrialNum) {
            return Promise.reject(
                new Error('reach maxTrialNum')
            );
        }
        this.customizedTrials.push(hyperParams);

        // trial id has not been generated yet, thus use '' instead
        return this.dataStore.storeTrialJobEvent('ADD_CUSTOMIZED', '', hyperParams);
    }

    public async cancelTrialJobByUser(trialJobId: string): Promise<void> {
chicm-ms's avatar
chicm-ms committed
115
        this.log.info(`User cancelTrialJob: ${trialJobId}`);
Deshui Yu's avatar
Deshui Yu committed
116
117
118
119
120
        await this.trainingService.cancelTrialJob(trialJobId);
        await this.dataStore.storeTrialJobEvent('USER_TO_CANCEL', trialJobId, '');
    }

    public async startExperiment(expParams: ExperimentParams): Promise<string> {
chicm-ms's avatar
chicm-ms committed
121
        this.log.info(`Starting experiment: ${this.experimentProfile.id}`);
Deshui Yu's avatar
Deshui Yu committed
122
123
124
        this.experimentProfile.params = expParams;
        await this.storeExperimentProfile();
        this.log.debug('Setup tuner...');
125

126
        // Set up multiphase config
127
        if (expParams.multiPhase && this.trainingService.isMultiPhaseJobSupported) {
128
129
            this.trainingService.setClusterMetadata('multiPhase', expParams.multiPhase.toString());
        }
130
131
132
133
134
        // Set up versionCheck config
        if (expParams.versionCheck !== undefined) {
            this.trainingService.setClusterMetadata('version_check', expParams.versionCheck.toString());
        }
        
QuanluZhang's avatar
QuanluZhang committed
135
        const dispatcherCommand: string = getMsgDispatcherCommand(expParams.tuner, expParams.assessor, expParams.advisor,
goooxu's avatar
goooxu committed
136
            expParams.multiPhase, expParams.multiThread);
137
        this.log.debug(`dispatcher command: ${dispatcherCommand}`);
QuanluZhang's avatar
QuanluZhang committed
138
        const checkpointDir: string = await this.createCheckpointDir();
Deshui Yu's avatar
Deshui Yu committed
139
        this.setupTuner(
140
141
            dispatcherCommand,
            undefined,
Deshui Yu's avatar
Deshui Yu committed
142
            'start',
QuanluZhang's avatar
QuanluZhang committed
143
            checkpointDir);
Deshui Yu's avatar
Deshui Yu committed
144

145
        this.experimentProfile.startTime = Date.now();
chicm-ms's avatar
chicm-ms committed
146
        this.setStatus('RUNNING');
Deshui Yu's avatar
Deshui Yu committed
147
        await this.storeExperimentProfile();
148
149
        this.run().catch((err: Error) => {
            this.criticalError(err);
Deshui Yu's avatar
Deshui Yu committed
150
        });
151

Deshui Yu's avatar
Deshui Yu committed
152
153
154
155
        return this.experimentProfile.id;
    }

    public async resumeExperiment(): Promise<void> {
chicm-ms's avatar
chicm-ms committed
156
        this.log.info(`Resuming experiment: ${this.experimentProfile.id}`);
Deshui Yu's avatar
Deshui Yu committed
157
158
159
160
161
        //Fetch back the experiment profile
        const experimentId: string = getExperimentId();
        this.experimentProfile = await this.dataStore.getExperimentProfile(experimentId);
        const expParams: ExperimentParams = this.experimentProfile.params;

162
163
        setInitTrialSequenceId(this.experimentProfile.maxSequenceId + 1);

164
        // Set up multiphase config
165
        if (expParams.multiPhase && this.trainingService.isMultiPhaseJobSupported) {
166
167
168
            this.trainingService.setClusterMetadata('multiPhase', expParams.multiPhase.toString());
        }

169
170
171
172
173
        // Set up versionCheck config
        if (expParams.versionCheck !== undefined) {
            this.trainingService.setClusterMetadata('versionCheck', expParams.versionCheck.toString());
        }

QuanluZhang's avatar
QuanluZhang committed
174
        const dispatcherCommand: string = getMsgDispatcherCommand(expParams.tuner, expParams.assessor, expParams.advisor,
goooxu's avatar
goooxu committed
175
            expParams.multiPhase, expParams.multiThread);
176
        this.log.debug(`dispatcher command: ${dispatcherCommand}`);
QuanluZhang's avatar
QuanluZhang committed
177
        const checkpointDir: string = await this.createCheckpointDir();
Deshui Yu's avatar
Deshui Yu committed
178
        this.setupTuner(
179
180
            dispatcherCommand,
            undefined,
Deshui Yu's avatar
Deshui Yu committed
181
            'resume',
QuanluZhang's avatar
QuanluZhang committed
182
            checkpointDir);
Deshui Yu's avatar
Deshui Yu committed
183
184
185
186
187
188
189
190
191
192
193

        const allTrialJobs: TrialJobInfo[] = await this.dataStore.listTrialJobs();

        // Resume currSubmittedTrialNum
        this.currSubmittedTrialNum = allTrialJobs.length;

        // Check the final status for WAITING and RUNNING jobs
        await Promise.all(allTrialJobs
            .filter((job: TrialJobInfo) => job.status === 'WAITING' || job.status === 'RUNNING')
            .map((job: TrialJobInfo) => this.dataStore.storeTrialJobEvent('FAILED', job.id)));

chicm-ms's avatar
chicm-ms committed
194
195
196
197
198
        if (this.experimentProfile.execDuration < this.experimentProfile.params.maxExecDuration &&
            this.currSubmittedTrialNum < this.experimentProfile.params.maxTrialNum &&
            this.experimentProfile.endTime) {
            delete this.experimentProfile.endTime;
        }
chicm-ms's avatar
chicm-ms committed
199
        this.setStatus('RUNNING');
200

Deshui Yu's avatar
Deshui Yu committed
201
        // TO DO: update database record for resume event
202
203
204
        this.run().catch((err: Error) => {
            this.criticalError(err);
        });
Deshui Yu's avatar
Deshui Yu committed
205
206
207
208
209
210
211
212
213
    }

    public getTrialJob(trialJobId: string): Promise<TrialJobDetail> {
        return Promise.resolve(
            this.trainingService.getTrialJob(trialJobId)
        );
    }

    public async setClusterMetadata(key: string, value: string): Promise<void> {
chicm-ms's avatar
chicm-ms committed
214
        this.log.info(`NNIManager setClusterMetadata, key: ${key}, value: ${value}`);
Deshui Yu's avatar
Deshui Yu committed
215
216
217
218
        let timeoutId: NodeJS.Timer;
        // TO DO: move timeout value to constants file
        const delay1: Promise<{}> = new Promise((resolve: Function, reject: Function): void => {
            timeoutId = setTimeout(
219
                () => { reject(new Error('TrainingService setClusterMetadata timeout. Please check your config file.')); },
Deshui Yu's avatar
Deshui Yu committed
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
                10000);
        });
        await Promise.race([delay1, this.trainingService.setClusterMetadata(key, value)]).finally(() => {
            clearTimeout(timeoutId);
        });
    }

    public getClusterMetadata(key: string): Promise<string> {
        return Promise.resolve(
            this.trainingService.getClusterMetadata(key)
        );
    }

    public async getTrialJobStatistics(): Promise<TrialJobStatistics[]> {
        return this.dataStore.getTrialJobStatistics();
    }

237
    public async stopExperiment(): Promise<void> {
chicm-ms's avatar
chicm-ms committed
238
239
        this.setStatus('STOPPING');
        this.log.info('Stopping experiment, cleaning up ...');
240
        await this.experimentDoneCleanUp();
chicm-ms's avatar
chicm-ms committed
241
        this.log.info('Experiment stopped.');
Deshui Yu's avatar
Deshui Yu committed
242
243
    }

244
    public async getMetricData(trialJobId?: string, metricType?: MetricType): Promise<MetricDataRecord[]> {
Deshui Yu's avatar
Deshui Yu committed
245
246
247
248
249
250
251
252
253
254
255
        return this.dataStore.getMetricData(trialJobId, metricType);
    }

    public getExperimentProfile(): Promise<ExperimentProfile> {
        // TO DO: using Promise.resolve()
        const deferred: Deferred<ExperimentProfile> = new Deferred<ExperimentProfile>();
        deferred.resolve(this.experimentProfile);

        return deferred.promise;
    }

256
257
258
259
    public getStatus(): NNIManagerStatus {
        return this.status;
    }

Deshui Yu's avatar
Deshui Yu committed
260
261
262
263
    public async listTrialJobs(status?: TrialJobStatus): Promise<TrialJobInfo[]> {
        return this.dataStore.listTrialJobs(status);
    }

264
265
    private setupTuner(command: string, cwd: string | undefined, mode: 'start' | 'resume', dataDirectory: string): void {
        if (this.dispatcher !== undefined) {
Deshui Yu's avatar
Deshui Yu committed
266
267
            return;
        }
goooxu's avatar
goooxu committed
268
        const stdio: StdioOptions = ['ignore', process.stdout, process.stderr, 'pipe', 'pipe'];
Deshui Yu's avatar
Deshui Yu committed
269
270
271
272
273
274
275
        let newCwd: string;
        if (cwd === undefined || cwd === '') {
            newCwd = getLogDir();
        } else {
            newCwd = cwd;
        }
        // TO DO: add CUDA_VISIBLE_DEVICES
Zejun Lin's avatar
Zejun Lin committed
276
277
278
        let nniEnv = {
            NNI_MODE: mode,
            NNI_CHECKPOINT_DIRECTORY: dataDirectory,
279
280
            NNI_LOG_DIRECTORY: getLogDir(),
            NNI_LOG_LEVEL: getLogLevel()
Zejun Lin's avatar
Zejun Lin committed
281
282
        };
        let newEnv = Object.assign({}, process.env, nniEnv);
Deshui Yu's avatar
Deshui Yu committed
283
284
285
        const tunerProc: ChildProcess = spawn(command, [], {
            stdio,
            cwd: newCwd,
Zejun Lin's avatar
Zejun Lin committed
286
            env: newEnv,
Deshui Yu's avatar
Deshui Yu committed
287
288
            shell: true
        });
289
290
        this.dispatcherPid = tunerProc.pid;
        this.dispatcher = createDispatcherInterface(tunerProc);
Deshui Yu's avatar
Deshui Yu committed
291
292
293
294
295

        return;
    }

    private updateTrialConcurrency(trialConcurrency: number): void {
QuanluZhang's avatar
QuanluZhang committed
296
297
        // we assume trialConcurrency >= 0, which is checked by restserver
        this.trialConcurrencyChange += (trialConcurrency - this.experimentProfile.params.trialConcurrency);
Deshui Yu's avatar
Deshui Yu committed
298
299
300
301
302
303
304
305
306
307
308
309
        this.experimentProfile.params.trialConcurrency = trialConcurrency;

        return;
    }

    private updateMaxExecDuration(duration: number): void {
        this.experimentProfile.params.maxExecDuration = duration;

        return;
    }

    private updateSearchSpace(searchSpace: string): void {
310
        if (this.dispatcher === undefined) {
Deshui Yu's avatar
Deshui Yu committed
311
312
            throw new Error('Error: tuner has not been setup');
        }
313
        this.dispatcher.sendCommand(UPDATE_SEARCH_SPACE, searchSpace);
Deshui Yu's avatar
Deshui Yu committed
314
315
316
317
318
        this.experimentProfile.params.searchSpace = searchSpace;

        return;
    }

QuanluZhang's avatar
QuanluZhang committed
319
320
321
322
323
324
    private updateMaxTrialNum(maxTrialNum: number): void {
        this.experimentProfile.params.maxTrialNum = maxTrialNum;

        return;
    }

Deshui Yu's avatar
Deshui Yu committed
325
    private async experimentDoneCleanUp(): Promise<void> {
326
        if (this.dispatcher === undefined) {
Deshui Yu's avatar
Deshui Yu committed
327
328
            throw new Error('Error: tuner has not been setup');
        }
329
        this.dispatcher.sendCommand(TERMINATE);
Deshui Yu's avatar
Deshui Yu committed
330
331
332
        let tunerAlive: boolean = true;
        // gracefully terminate tuner and assessor here, wait at most 30 seconds.
        for (let i: number = 0; i < 30; i++) {
333
            if (!tunerAlive) { break; }
Deshui Yu's avatar
Deshui Yu committed
334
            try {
335
                await cpp.exec(`kill -0 ${this.dispatcherPid}`);
Deshui Yu's avatar
Deshui Yu committed
336
337
338
339
            } catch (error) { tunerAlive = false; }
            await delay(1000);
        }
        try {
340
            await cpp.exec(`kill ${this.dispatcherPid}`);
Deshui Yu's avatar
Deshui Yu committed
341
342
343
344
345
346
347
348
349
        } catch (error) {
            // this.tunerPid does not exist, do nothing here
        }
        const trialJobList: TrialJobDetail[] = await this.trainingService.listTrialJobs();
        // TO DO: to promise all
        for (const trialJob of trialJobList) {
            if (trialJob.status === 'RUNNING' ||
                trialJob.status === 'WAITING') {
                try {
chicm-ms's avatar
chicm-ms committed
350
                    this.log.info(`cancelTrialJob: ${trialJob.id}`);
Deshui Yu's avatar
Deshui Yu committed
351
352
353
354
355
356
357
                    await this.trainingService.cancelTrialJob(trialJob.id);
                } catch (error) {
                    // pid does not exist, do nothing here
                }
            }
        }
        await this.trainingService.cleanUp();
358
        this.experimentProfile.endTime = Date.now();
Deshui Yu's avatar
Deshui Yu committed
359
        await this.storeExperimentProfile();
chicm-ms's avatar
chicm-ms committed
360
        this.setStatus('STOPPED');
Deshui Yu's avatar
Deshui Yu committed
361
362
363
    }

    private async periodicallyUpdateExecDuration(): Promise<void> {
364
        let count: number = 1;
Gems Guo's avatar
Gems Guo committed
365
        while (this.status.status !== 'STOPPING' && this.status.status !== 'STOPPED') {
366
            await delay(1000 * 1); // 1 seconds
367
            if (this.status.status === 'RUNNING') {
368
369
370
371
372
373
                this.experimentProfile.execDuration += 1;
                if (count % 10 === 0) {
                    await this.storeExperimentProfile();
                }
            }
            count += 1;
Deshui Yu's avatar
Deshui Yu committed
374
375
376
        }
    }

chicm-ms's avatar
chicm-ms committed
377
378
379
380
381
382
    private async pingDispatcher(): Promise<void> {
        if (this.dispatcher === undefined) {
            throw new Error('Error: tuner has not been setup');
        }
        while (!['ERROR', 'STOPPING', 'STOPPED'].includes(this.status.status)) {
            this.dispatcher.sendCommand(PING);
chicm-ms's avatar
chicm-ms committed
383
            await delay(1000 * 5);
chicm-ms's avatar
chicm-ms committed
384
385
386
        }
    }

QuanluZhang's avatar
QuanluZhang committed
387
388
    private async requestTrialJobsStatus(): Promise<number> {
        let finishedTrialJobNum: number = 0;
QuanluZhang's avatar
QuanluZhang committed
389
390
391
        if (this.dispatcher === undefined) {
            throw new Error('Error: tuner has not been setup');
        }
QuanluZhang's avatar
QuanluZhang committed
392
393
394
395
        for (const trialJobId of Array.from(this.trialJobs.keys())) {
            const trialJobDetail: TrialJobDetail = await this.trainingService.getTrialJob(trialJobId);
            const oldTrialJobDetail: TrialJobDetail | undefined = this.trialJobs.get(trialJobId);
            if (oldTrialJobDetail !== undefined && oldTrialJobDetail.status !== trialJobDetail.status) {
chicm-ms's avatar
chicm-ms committed
396
                this.log.info(`Trial job ${trialJobDetail.id} status changed from ${oldTrialJobDetail.status} to ${trialJobDetail.status}`);
QuanluZhang's avatar
QuanluZhang committed
397
                this.trialJobs.set(trialJobId, Object.assign({}, trialJobDetail));
398
                await this.dataStore.storeTrialJobEvent(trialJobDetail.status, trialJobDetail.id, undefined, trialJobDetail);
QuanluZhang's avatar
QuanluZhang committed
399
            }
QuanluZhang's avatar
QuanluZhang committed
400
            let hyperParams: string | undefined = undefined;
QuanluZhang's avatar
QuanluZhang committed
401
402
403
            switch (trialJobDetail.status) {
                case 'SUCCEEDED':
                case 'USER_CANCELED':
QuanluZhang's avatar
QuanluZhang committed
404
                case 'EARLY_STOPPED':
QuanluZhang's avatar
QuanluZhang committed
405
406
                    this.trialJobs.delete(trialJobId);
                    finishedTrialJobNum++;
QuanluZhang's avatar
QuanluZhang committed
407
408
409
410
411
412
413
414
                    if (trialJobDetail.form.jobType === 'TRIAL') {
                        hyperParams = (<TrialJobApplicationForm>trialJobDetail.form).hyperParameters.value;
                    } else {
                        throw new Error('Error: jobType error, not TRIAL');
                    }
                    this.dispatcher.sendCommand(TRIAL_END, JSON.stringify({
                        trial_job_id: trialJobDetail.id,
                        event: trialJobDetail.status,
goooxu's avatar
goooxu committed
415
416
                        hyper_params: hyperParams
                    }));
QuanluZhang's avatar
QuanluZhang committed
417
418
419
420
421
422
423
                    break;
                case 'FAILED':
                case 'SYS_CANCELED':
                    // In the current version, we do not retry
                    // TO DO: push this job to queue for retry
                    this.trialJobs.delete(trialJobId);
                    finishedTrialJobNum++;
QuanluZhang's avatar
QuanluZhang committed
424
425
426
427
428
429
430
431
                    if (trialJobDetail.form.jobType === 'TRIAL') {
                        hyperParams = (<TrialJobApplicationForm>trialJobDetail.form).hyperParameters.value;
                    } else {
                        throw new Error('Error: jobType error, not TRIAL');
                    }
                    this.dispatcher.sendCommand(TRIAL_END, JSON.stringify({
                        trial_job_id: trialJobDetail.id,
                        event: trialJobDetail.status,
goooxu's avatar
goooxu committed
432
433
                        hyper_params: hyperParams
                    }));
QuanluZhang's avatar
QuanluZhang committed
434
435
436
437
438
439
440
441
442
443
                    break;
                case 'WAITING':
                case 'RUNNING':
                case 'UNKNOWN':
                    // Do nothing
                    break;
                default:
                // TO DO: add warning in log
            }
        }
goooxu's avatar
goooxu committed
444

Gems Guo's avatar
Gems Guo committed
445
        return finishedTrialJobNum;
QuanluZhang's avatar
QuanluZhang committed
446
447
448
449
450
451
    }

    private async manageTrials(): Promise<void> {
        if (this.dispatcher === undefined) {
            throw new Error('Error: tuner has not been setup');
        }
QuanluZhang's avatar
QuanluZhang committed
452
        let allFinishedTrialJobNum: number = this.currSubmittedTrialNum;
QuanluZhang's avatar
QuanluZhang committed
453
        let waitSubmittedToFinish: number;
Gems Guo's avatar
Gems Guo committed
454
        while (this.status.status !== 'STOPPING' && this.status.status !== 'STOPPED') {
QuanluZhang's avatar
QuanluZhang committed
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
            const finishedTrialJobNum: number = await this.requestTrialJobsStatus();
            allFinishedTrialJobNum += finishedTrialJobNum;

            // requestTrialNum is the number of trials that will be requested from tuner.
            // If trialConcurrency does not change, requestTrialNum equals finishedTrialJobNum.
            // If trialConcurrency changes, for example, trialConcurrency increases by 2 (trialConcurrencyChange=2), then
            // requestTrialNum equals 2 + finishedTrialJobNum and trialConcurrencyChange becomes 0.
            // If trialConcurrency changes, for example, trialConcurrency decreases by 4 (trialConcurrencyChange=-4) and 
            // finishedTrialJobNum is 2, then requestTrialNum becomes -2. No trial will be requested from tuner,
            // and trialConcurrencyChange becomes -2.
            const requestTrialNum: number = this.trialConcurrencyChange + finishedTrialJobNum;
            if (requestTrialNum >= 0) {
                this.trialConcurrencyChange = 0;
            } else {
                this.trialConcurrencyChange = requestTrialNum;
            }
chicm-ms's avatar
chicm-ms committed
471
472
473

            const requestCustomTrialNum: number = Math.min(requestTrialNum, this.customizedTrials.length);
            for (let i: number = 0; i < requestCustomTrialNum; i++) {
QuanluZhang's avatar
QuanluZhang committed
474
475
476
477
478
479
480
                // ask tuner for more trials
                if (this.customizedTrials.length > 0) {
                    const hyperParams: string | undefined = this.customizedTrials.shift();
                    this.dispatcher.sendCommand(ADD_CUSTOMIZED_TRIAL_JOB, hyperParams);
                }
            }

chicm-ms's avatar
chicm-ms committed
481
482
483
484
            if (requestTrialNum - requestCustomTrialNum > 0) {
                this.requestTrialJobs(requestTrialNum - requestCustomTrialNum);
            }

QuanluZhang's avatar
QuanluZhang committed
485
            // check maxtrialnum and maxduration here
486
            // NO_MORE_TRIAL is more like a subset of RUNNING, because during RUNNING tuner
487
            // might tell nnimanager that this is no more trials. In NO_MORE_TRIAL state, the experiment is viewed
488
489
            // as still running. DONE could be transfered from RUNNING or NO_MORE_TRIAL.
            assert(this.status.status === 'RUNNING' ||
490
                this.status.status === 'DONE' ||
QuanluZhang's avatar
QuanluZhang committed
491
492
                this.status.status === 'NO_MORE_TRIAL' ||
                this.status.status === 'TUNER_NO_MORE_TRIAL');
493
            if (this.experimentProfile.execDuration > this.experimentProfile.params.maxExecDuration ||
QuanluZhang's avatar
QuanluZhang committed
494
                this.currSubmittedTrialNum >= this.experimentProfile.params.maxTrialNum) {
QuanluZhang's avatar
QuanluZhang committed
495
                if (this.status.status !== 'DONE') {
chicm-ms's avatar
chicm-ms committed
496
                    this.setStatus('NO_MORE_TRIAL');
QuanluZhang's avatar
QuanluZhang committed
497
498
499
500
                    waitSubmittedToFinish = this.currSubmittedTrialNum;

                    assert(allFinishedTrialJobNum <= waitSubmittedToFinish);
                    if (allFinishedTrialJobNum >= waitSubmittedToFinish) {
chicm-ms's avatar
chicm-ms committed
501
                        this.setStatus('DONE');
QuanluZhang's avatar
QuanluZhang committed
502
503
504
505
506
                        this.experimentProfile.endTime = Date.now();
                        await this.storeExperimentProfile();
                        // write this log for travis CI
                        this.log.info('Experiment done.');
                    }
QuanluZhang's avatar
QuanluZhang committed
507
508
509
                }
            } else {
                if (this.status.status === 'DONE') {
510
511
                    delete this.experimentProfile.endTime;
                    await this.storeExperimentProfile();
QuanluZhang's avatar
QuanluZhang committed
512
                }
QuanluZhang's avatar
QuanluZhang committed
513
                if (this.status.status !== 'TUNER_NO_MORE_TRIAL') {
chicm-ms's avatar
chicm-ms committed
514
                    this.setStatus('RUNNING');
515
                }
QuanluZhang's avatar
QuanluZhang committed
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
                for (let i: number = this.trialJobs.size; i < this.experimentProfile.params.trialConcurrency; i++) {
                    if (this.waitingTrials.length === 0 ||
                        this.currSubmittedTrialNum >= this.experimentProfile.params.maxTrialNum) {
                        break;
                    }
                    const hyperParams: string | undefined = this.waitingTrials.shift();
                    if (hyperParams === undefined) {
                        throw new Error(`Error: invalid hyper-parameters for job submission: ${hyperParams}`);
                    }
                    this.currSubmittedTrialNum++;
                    const trialJobAppForm: TrialJobApplicationForm = {
                        jobType: 'TRIAL',
                        hyperParameters: {
                            value: hyperParams,
                            index: 0
                        }
                    };
chicm-ms's avatar
chicm-ms committed
533
                    this.log.info(`submitTrialJob: form: ${JSON.stringify(trialJobAppForm)}`);
QuanluZhang's avatar
QuanluZhang committed
534
                    const trialJobDetail: TrialJobDetail = await this.trainingService.submitTrialJob(trialJobAppForm);
535
                    await this.storeMaxSequenceId(trialJobDetail.sequenceId);
QuanluZhang's avatar
QuanluZhang committed
536
537
538
539
                    this.trialJobs.set(trialJobDetail.id, Object.assign({}, trialJobDetail));
                    const trialJobDetailSnapshot: TrialJobDetail | undefined = this.trialJobs.get(trialJobDetail.id);
                    if (trialJobDetailSnapshot != undefined) {
                        await this.dataStore.storeTrialJobEvent(
540
                            trialJobDetailSnapshot.status, trialJobDetailSnapshot.id, hyperParams, trialJobDetailSnapshot);
QuanluZhang's avatar
QuanluZhang committed
541
542
543
544
545
546
547
548
549
                    } else {
                        assert(false, `undefined trialJobDetail in trialJobs: ${trialJobDetail.id}`);
                    }
                }
            }
            await delay(1000 * 5); // 5 seconds
        }
    }

Deshui Yu's avatar
Deshui Yu committed
550
551
552
553
554
555
    private storeExperimentProfile(): Promise<void> {
        this.experimentProfile.revision += 1;

        return this.dataStore.storeExperimentProfile(this.experimentProfile);
    }

556
    private async run(): Promise<void> {
QuanluZhang's avatar
QuanluZhang committed
557
        assert(this.dispatcher !== undefined);
558
559
560
561
562
563
564

        this.addEventListeners();

        this.sendInitTunerCommands();

        await Promise.all([
            this.periodicallyUpdateExecDuration(),
chicm-ms's avatar
chicm-ms committed
565
566
567
            this.pingDispatcher().catch((err: Error) => {
                throw new NNIError('Dispatcher error', `Dispatcher error: ${err.message}`, err);
            }),
568
569
570
            this.trainingService.run().catch((err: Error) => {
                throw new NNIError('Training service error', `Training service error: ${err.message}`, err);
            }),
QuanluZhang's avatar
QuanluZhang committed
571
572
            this.manageTrials().catch((err: Error) => {
                throw new NNIError('Job management error', `Job management error: ${err.message}`, err);
573
            })]);
574
575
    }

QuanluZhang's avatar
QuanluZhang committed
576
    private addEventListeners(): void {
chicm-ms's avatar
chicm-ms committed
577
        this.log.info('Add event listeners');
578
        // TO DO: cannot run this method more than once in one NNIManager instance
QuanluZhang's avatar
QuanluZhang committed
579
        if (this.dispatcher === undefined) {
580
581
582
583
            throw new Error('Error: tuner or job maintainer have not been setup');
        }
        this.trainingService.addTrialJobMetricListener((metric: TrialJobMetric) => {
            this.onTrialJobMetrics(metric).catch((err: Error) => {
584
                this.criticalError(new NNIError('Job metrics error', `Job metrics error: ${err.message}`, err));
585
586
587
588
589
            });
        });

        this.dispatcher.onCommand((commandType: string, content: string) => {
            this.onTunerCommand(commandType, content).catch((err: Error) => {
590
                this.criticalError(new NNIError('Tuner command event error', `Tuner command event error: ${err.message}`, err));
591
592
593
594
595
596
            });
        });
    }

    private sendInitTunerCommands(): void {
        if (this.dispatcher === undefined) {
597
            throw new Error('Dispatcher error: tuner has not been setup');
598
        }
chicm-ms's avatar
chicm-ms committed
599
600
601
        this.log.debug(`Send tuner command: INITIALIZE: ${this.experimentProfile.params.searchSpace}`);
        // Tuner need to be initialized with search space before generating any hyper parameters
        this.dispatcher.sendCommand(INITIALIZE, this.experimentProfile.params.searchSpace);
602
603
604
    }

    private async onTrialJobMetrics(metric: TrialJobMetric): Promise<void> {
chicm-ms's avatar
chicm-ms committed
605
        this.log.debug(`NNIManager received trial job metrics: ${metric}`);
606
607
608
609
610
611
612
        await this.dataStore.storeMetricData(metric.id, metric.data);
        if (this.dispatcher === undefined) {
            throw new Error('Error: tuner has not been setup');
        }
        this.dispatcher.sendCommand(REPORT_METRIC_DATA, metric.data);
    }

chicm-ms's avatar
chicm-ms committed
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
    private requestTrialJobs(jobNum: number): void {
        if (jobNum < 1) {
            return;
        }
        if (this.dispatcher === undefined) {
            throw new Error('Dispatcher error: tuner has not been setup');
        }
        if (this.experimentProfile.params.multiThread) {
            // Send multiple requests to ensure multiple hyper parameters are generated in non-blocking way.
            // For a single REQUEST_TRIAL_JOBS request, hyper parameters are generated one by one
            // sequentially.
            for (let i: number = 0; i < jobNum; i++) {
                this.dispatcher.sendCommand(REQUEST_TRIAL_JOBS, '1');
            }
        } else {
            this.dispatcher.sendCommand(REQUEST_TRIAL_JOBS, String(jobNum));
        }
    }

632
    private async onTunerCommand(commandType: string, content: string): Promise<void> {
horizon365's avatar
horizon365 committed
633
        this.log.info(`NNIManager received command from dispatcher: ${commandType}, ${content}`);
634
        switch (commandType) {
chicm-ms's avatar
chicm-ms committed
635
636
637
638
            case INITIALIZED:
                // Tuner is intialized, search space is set, request tuner to generate hyper parameters
                this.requestTrialJobs(this.experimentProfile.params.trialConcurrency);
                break;
639
            case NEW_TRIAL_JOB:
QuanluZhang's avatar
QuanluZhang committed
640
                if (this.status.status === 'TUNER_NO_MORE_TRIAL') {
641
                    this.log.warning('It is not supposed to receive more trials after NO_MORE_TRIAL is set');
chicm-ms's avatar
chicm-ms committed
642
                    this.setStatus('RUNNING');
643
                }
QuanluZhang's avatar
QuanluZhang committed
644
                this.waitingTrials.push(content);
645
                break;
chicm-ms's avatar
chicm-ms committed
646
647
648
649
650
651
652
653
654
655
656
657
            case SEND_TRIAL_JOB_PARAMETER:
                const tunerCommand: any = JSON.parse(content);
                assert(tunerCommand.parameter_index >= 0);
                assert(tunerCommand.trial_job_id !== undefined);

                const trialJobForm: TrialJobApplicationForm = {
                    jobType: 'TRIAL',
                    hyperParameters: {
                        value: content,
                        index: tunerCommand.parameter_index
                    }
                };
chicm-ms's avatar
chicm-ms committed
658
                this.log.info(`updateTrialJob: job id: ${tunerCommand.trial_job_id}, form: ${JSON.stringify(trialJobForm)}`);
chicm-ms's avatar
chicm-ms committed
659
660
                await this.trainingService.updateTrialJob(tunerCommand.trial_job_id, trialJobForm);
                await this.dataStore.storeTrialJobEvent(
661
                    'ADD_HYPERPARAMETER', tunerCommand.trial_job_id, content, undefined);
chicm-ms's avatar
chicm-ms committed
662
                break;
663
            case NO_MORE_TRIAL_JOBS:
chicm-ms's avatar
chicm-ms committed
664
                this.setStatus('TUNER_NO_MORE_TRIAL');
665
666
                break;
            case KILL_TRIAL_JOB:
chicm-ms's avatar
chicm-ms committed
667
                this.log.info(`cancelTrialJob: ${JSON.parse(content)}`);
QuanluZhang's avatar
QuanluZhang committed
668
                await this.trainingService.cancelTrialJob(JSON.parse(content), true);
669
670
671
672
                break;
            default:
                throw new Error('Error: unsupported command type from tuner');
        }
Deshui Yu's avatar
Deshui Yu committed
673
674
    }

675
676
677
678
679
680
681
682
683
684
    private criticalError(err: Error): void {
        this.logError(err);
        console.error(err);
    }

    private logError(err: Error): void {
        if (err.stack !== undefined) {
            this.log.error(err.stack);
        }
        this.status.errors.push(err.message);
chicm-ms's avatar
chicm-ms committed
685
686
687
688
689
690
691
692
        this.setStatus('ERROR');
    }

    private setStatus(status: ExperimentStatus): void {
        if (status !== this.status.status) {
            this.log.info(`Change NNIManager status from: ${this.status.status} to: ${status}`);
            this.status.status = status;
        }
693
694
695
696
697
698
699
    }

    private createEmptyExperimentProfile(): ExperimentProfile {
        return {
            id: getExperimentId(),
            revision: 0,
            execDuration: 0,
700
            logDir: getExperimentRootDir(),
701
            maxSequenceId: 0,
702
703
704
705
706
707
            params: {
                authorName: '',
                experimentName: '',
                trialConcurrency: 0,
                maxExecDuration: 0, // unit: second
                maxTrialNum: 0, // maxTrialNum includes all the submitted trial jobs
708
                trainingServicePlatform: '',
QuanluZhang's avatar
QuanluZhang committed
709
                searchSpace: ''
710
711
            }
        };
Deshui Yu's avatar
Deshui Yu committed
712
    }
713

QuanluZhang's avatar
QuanluZhang committed
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
    private async createCheckpointDir(): Promise<string> {
        // TODO: test
        const chkpDir: string = getCheckpointDir();
        // create checkpoint directory
        await mkDirP(chkpDir);
        // assign this directory to exp profile's checkpointDir
        if (this.experimentProfile.params.advisor) {
            this.experimentProfile.params.advisor.checkpointDir = chkpDir;
        }
        if (this.experimentProfile.params.tuner) {
            this.experimentProfile.params.tuner.checkpointDir = chkpDir;
        }
        if (this.experimentProfile.params.assessor) {
            this.experimentProfile.params.assessor.checkpointDir = chkpDir;
        }

        return Promise.resolve(chkpDir);
    }
goooxu's avatar
goooxu committed
732

733
734
735
736
737
738
    private async storeMaxSequenceId(sequenceId: number): Promise<void> {
        if (sequenceId > this.experimentProfile.maxSequenceId) {
            this.experimentProfile.maxSequenceId = sequenceId;
            await this.storeExperimentProfile();
        }
    }
Deshui Yu's avatar
Deshui Yu committed
739
740
741
}

export { NNIManager };