nnimanager.ts 35.6 KB
Newer Older
Deshui Yu's avatar
Deshui Yu committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
 * Copyright (c) Microsoft Corporation
 * All rights reserved.
 *
 * MIT License
 *
 * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
 * documentation files (the "Software"), to deal in the Software without restriction, including without limitation
 * the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and
 * to permit persons to whom the Software is furnished to do so, subject to the following conditions:
 * The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
 *
 * THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING
 * BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
 * DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
 */

'use strict';

import * as assert from 'assert';
import * as cpp from 'child-process-promise';
goooxu's avatar
goooxu committed
24
import { ChildProcess, spawn, StdioOptions } from 'child_process';
Deshui Yu's avatar
Deshui Yu committed
25
26
27
import { Deferred } from 'ts-deferred';
import * as component from '../common/component';
import { DataStore, MetricDataRecord, MetricType, TrialJobInfo } from '../common/datastore';
28
import { NNIError } from '../common/errors';
29
import { getExperimentId } from '../common/experimentStartupInfo';
Deshui Yu's avatar
Deshui Yu committed
30
31
import { getLogger, Logger } from '../common/log';
import {
chicm-ms's avatar
chicm-ms committed
32
    ExperimentParams, ExperimentProfile, Manager, ExperimentStatus,
33
    NNIManagerStatus, ProfileUpdateType, TrialJobStatistics
Deshui Yu's avatar
Deshui Yu committed
34
35
36
37
} from '../common/manager';
import {
    TrainingService, TrialJobApplicationForm, TrialJobDetail, TrialJobMetric, TrialJobStatus
} from '../common/trainingService';
38
import { delay, getCheckpointDir, getExperimentRootDir, getLogDir, getMsgDispatcherCommand, mkDirP, getTunerProc, getLogLevel, isAlive, killPid } from '../common/utils';
Deshui Yu's avatar
Deshui Yu committed
39
import {
chicm-ms's avatar
chicm-ms committed
40
    ADD_CUSTOMIZED_TRIAL_JOB, INITIALIZE, INITIALIZED, KILL_TRIAL_JOB, NEW_TRIAL_JOB, NO_MORE_TRIAL_JOBS, PING,
41
    REPORT_METRIC_DATA, REQUEST_TRIAL_JOBS, SEND_TRIAL_JOB_PARAMETER, TERMINATE, TRIAL_END, UPDATE_SEARCH_SPACE, IMPORT_DATA
Deshui Yu's avatar
Deshui Yu committed
42
} from './commands';
43
import { createDispatcherInterface, IpcInterface } from './ipcInterface';
Deshui Yu's avatar
Deshui Yu committed
44
45

/**
chicm-ms's avatar
chicm-ms committed
46
 * NNIManager which implements Manager interface
Deshui Yu's avatar
Deshui Yu committed
47
48
49
 */
class NNIManager implements Manager {
    private trainingService: TrainingService;
50
    private dispatcher: IpcInterface | undefined;
51
    private currSubmittedTrialNum: number;  // need to be recovered
QuanluZhang's avatar
QuanluZhang committed
52
    private trialConcurrencyChange: number; // >0: increase, <0: decrease
Deshui Yu's avatar
Deshui Yu committed
53
54
55
    private log: Logger;
    private dataStore: DataStore;
    private experimentProfile: ExperimentProfile;
56
    private dispatcherPid: number;
57
    private status: NNIManagerStatus;
58
    private waitingTrials: TrialJobApplicationForm[];
QuanluZhang's avatar
QuanluZhang committed
59
    private trialJobs: Map<string, TrialJobDetail>;
60
    private trialDataForTuner: string;
SparkSnail's avatar
SparkSnail committed
61
    private readonly: boolean;
62

63
    private trialJobMetricListener: (metric: TrialJobMetric) => void;
64

Deshui Yu's avatar
Deshui Yu committed
65
66
    constructor() {
        this.currSubmittedTrialNum = 0;
QuanluZhang's avatar
QuanluZhang committed
67
        this.trialConcurrencyChange = 0;
Deshui Yu's avatar
Deshui Yu committed
68
69
        this.trainingService = component.get(TrainingService);
        assert(this.trainingService);
70
        this.dispatcherPid = 0;
QuanluZhang's avatar
QuanluZhang committed
71
72
        this.waitingTrials = [];
        this.trialJobs = new Map<string, TrialJobDetail>();
73
        this.trialDataForTuner = '';
SparkSnail's avatar
SparkSnail committed
74
        this.readonly = false;
Deshui Yu's avatar
Deshui Yu committed
75
76
77

        this.log = getLogger();
        this.dataStore = component.get(DataStore);
78
79
80
81
        this.experimentProfile = this.createEmptyExperimentProfile();
        this.status = {
            status: 'INITIALIZED',
            errors: []
Deshui Yu's avatar
Deshui Yu committed
82
        };
83
84
85
86
87
        this.trialJobMetricListener = (metric: TrialJobMetric) => {
            this.onTrialJobMetrics(metric).catch((err: Error) => {
                this.criticalError(NNIError.FromError(err, 'Job metrics error: '));
            });
        };
Deshui Yu's avatar
Deshui Yu committed
88
89
90
    }

    public updateExperimentProfile(experimentProfile: ExperimentProfile, updateType: ProfileUpdateType): Promise<void> {
SparkSnail's avatar
SparkSnail committed
91
92
93
        if (this.readonly) {
            return Promise.reject(new Error('Error: can not update experiment profile in readonly mode!'));
        }
Deshui Yu's avatar
Deshui Yu committed
94
95
96
97
98
99
100
101
102
103
        switch (updateType) {
            case 'TRIAL_CONCURRENCY':
                this.updateTrialConcurrency(experimentProfile.params.trialConcurrency);
                break;
            case 'MAX_EXEC_DURATION':
                this.updateMaxExecDuration(experimentProfile.params.maxExecDuration);
                break;
            case 'SEARCH_SPACE':
                this.updateSearchSpace(experimentProfile.params.searchSpace);
                break;
QuanluZhang's avatar
QuanluZhang committed
104
105
106
            case 'MAX_TRIAL_NUM':
                this.updateMaxTrialNum(experimentProfile.params.maxTrialNum);
                break;
Deshui Yu's avatar
Deshui Yu committed
107
108
109
110
111
112
113
            default:
                throw new Error('Error: unrecognized updateType');
        }

        return this.storeExperimentProfile();
    }

114
    public importData(data: string): Promise<void> {
SparkSnail's avatar
SparkSnail committed
115
116
117
        if (this.readonly) {
            return Promise.reject(new Error('Error: can not import data in readonly mode!'));
        }
118
119
120
121
122
123
124
125
126
127
        if (this.dispatcher === undefined) {
            return Promise.reject(
                new Error('tuner has not been setup')
            );
        }
        this.dispatcher.sendCommand(IMPORT_DATA, data);

        return this.dataStore.storeTrialJobEvent('IMPORT_DATA', '', data);
    }

128
129
130
131
    public async exportData(): Promise<string> {
        return this.dataStore.exportTrialHpConfigs();
    }

132
    public addCustomizedTrialJob(hyperParams: string): Promise<number> {
SparkSnail's avatar
SparkSnail committed
133
134
135
        if (this.readonly) {
            return Promise.reject(new Error('Error: can not add customized trial job in readonly mode!'));
        }
Deshui Yu's avatar
Deshui Yu committed
136
        if (this.currSubmittedTrialNum >= this.experimentProfile.params.maxTrialNum) {
137
            return Promise.reject(new Error('reach maxTrialNum'));
Deshui Yu's avatar
Deshui Yu committed
138
        }
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154

        // TODO: NNI manager should not peek tuner's internal protocol, let's refactor this later
        const packedParameter = {
            parameter_id: null,
            parameter_source: 'customized',
            parameters: JSON.parse(hyperParams)
        }

        const form: TrialJobApplicationForm = {
            sequenceId: this.experimentProfile.nextSequenceId++,
            hyperParameters: {
                value: JSON.stringify(packedParameter),
                index: 0
            }
        };
        this.waitingTrials.push(form);
Deshui Yu's avatar
Deshui Yu committed
155
156

        // trial id has not been generated yet, thus use '' instead
157
158
159
        this.dataStore.storeTrialJobEvent('ADD_CUSTOMIZED', '', hyperParams);

        return Promise.resolve(form.sequenceId);
Deshui Yu's avatar
Deshui Yu committed
160
161
162
    }

    public async cancelTrialJobByUser(trialJobId: string): Promise<void> {
SparkSnail's avatar
SparkSnail committed
163
164
165
        if (this.readonly) {
            return Promise.reject(new Error('Error: can not cancel trial job in readonly mode!'));
        }
chicm-ms's avatar
chicm-ms committed
166
        this.log.info(`User cancelTrialJob: ${trialJobId}`);
Deshui Yu's avatar
Deshui Yu committed
167
168
169
170
171
        await this.trainingService.cancelTrialJob(trialJobId);
        await this.dataStore.storeTrialJobEvent('USER_TO_CANCEL', trialJobId, '');
    }

    public async startExperiment(expParams: ExperimentParams): Promise<string> {
chicm-ms's avatar
chicm-ms committed
172
        this.log.info(`Starting experiment: ${this.experimentProfile.id}`);
Deshui Yu's avatar
Deshui Yu committed
173
174
175
        this.experimentProfile.params = expParams;
        await this.storeExperimentProfile();
        this.log.debug('Setup tuner...');
176

177
        // Set up multiphase config
178
        if (expParams.multiPhase && this.trainingService.isMultiPhaseJobSupported) {
179
180
            this.trainingService.setClusterMetadata('multiPhase', expParams.multiPhase.toString());
        }
181
182
183
184
        // Set up versionCheck config
        if (expParams.versionCheck !== undefined) {
            this.trainingService.setClusterMetadata('version_check', expParams.versionCheck.toString());
        }
SparkSnail's avatar
SparkSnail committed
185
186
187
188
        // Set up logCollection config
        if (expParams.logCollection !== undefined) {
            this.trainingService.setClusterMetadata('log_collection', expParams.logCollection.toString());
        }
189

QuanluZhang's avatar
QuanluZhang committed
190
        const dispatcherCommand: string = getMsgDispatcherCommand(expParams.tuner, expParams.assessor, expParams.advisor,
goooxu's avatar
goooxu committed
191
            expParams.multiPhase, expParams.multiThread);
192
        this.log.debug(`dispatcher command: ${dispatcherCommand}`);
QuanluZhang's avatar
QuanluZhang committed
193
        const checkpointDir: string = await this.createCheckpointDir();
Deshui Yu's avatar
Deshui Yu committed
194
        this.setupTuner(
195
196
            dispatcherCommand,
            undefined,
Deshui Yu's avatar
Deshui Yu committed
197
            'start',
QuanluZhang's avatar
QuanluZhang committed
198
            checkpointDir);
Deshui Yu's avatar
Deshui Yu committed
199

200
        this.experimentProfile.startTime = Date.now();
chicm-ms's avatar
chicm-ms committed
201
        this.setStatus('RUNNING');
Deshui Yu's avatar
Deshui Yu committed
202
        await this.storeExperimentProfile();
203
204
        this.run().catch((err: Error) => {
            this.criticalError(err);
Deshui Yu's avatar
Deshui Yu committed
205
        });
206

Deshui Yu's avatar
Deshui Yu committed
207
208
209
        return this.experimentProfile.id;
    }

SparkSnail's avatar
SparkSnail committed
210
    public async resumeExperiment(readonly: boolean): Promise<void> {
chicm-ms's avatar
chicm-ms committed
211
        this.log.info(`Resuming experiment: ${this.experimentProfile.id}`);
Deshui Yu's avatar
Deshui Yu committed
212
213
214
        //Fetch back the experiment profile
        const experimentId: string = getExperimentId();
        this.experimentProfile = await this.dataStore.getExperimentProfile(experimentId);
SparkSnail's avatar
SparkSnail committed
215
216
217
218
        this.readonly = readonly;
        if (readonly) {
            return Promise.resolve();
        }
Deshui Yu's avatar
Deshui Yu committed
219
        const expParams: ExperimentParams = this.experimentProfile.params;
220

221
        // Set up multiphase config
222
        if (expParams.multiPhase && this.trainingService.isMultiPhaseJobSupported) {
223
224
225
            this.trainingService.setClusterMetadata('multiPhase', expParams.multiPhase.toString());
        }

226
227
        // Set up versionCheck config
        if (expParams.versionCheck !== undefined) {
SparkSnail's avatar
SparkSnail committed
228
            this.trainingService.setClusterMetadata('version_check', expParams.versionCheck.toString());
229
230
        }

QuanluZhang's avatar
QuanluZhang committed
231
        const dispatcherCommand: string = getMsgDispatcherCommand(expParams.tuner, expParams.assessor, expParams.advisor,
goooxu's avatar
goooxu committed
232
            expParams.multiPhase, expParams.multiThread);
233
        this.log.debug(`dispatcher command: ${dispatcherCommand}`);
QuanluZhang's avatar
QuanluZhang committed
234
        const checkpointDir: string = await this.createCheckpointDir();
Deshui Yu's avatar
Deshui Yu committed
235
        this.setupTuner(
236
237
            dispatcherCommand,
            undefined,
Deshui Yu's avatar
Deshui Yu committed
238
            'resume',
QuanluZhang's avatar
QuanluZhang committed
239
            checkpointDir);
Deshui Yu's avatar
Deshui Yu committed
240
241
242
243
244
245
246
247
248
249
250

        const allTrialJobs: TrialJobInfo[] = await this.dataStore.listTrialJobs();

        // Resume currSubmittedTrialNum
        this.currSubmittedTrialNum = allTrialJobs.length;

        // Check the final status for WAITING and RUNNING jobs
        await Promise.all(allTrialJobs
            .filter((job: TrialJobInfo) => job.status === 'WAITING' || job.status === 'RUNNING')
            .map((job: TrialJobInfo) => this.dataStore.storeTrialJobEvent('FAILED', job.id)));

251
252
253
254
255
256
257
258
259
260
        // Collect generated trials and imported trials
        const finishedTrialData: string = await this.exportData();
        const importedData: string[] = await this.dataStore.getImportedData();
        let trialData: Object[] = JSON.parse(finishedTrialData);
        for (const oneImportedData of importedData) {
            // do not deduplicate
            trialData = trialData.concat(<Object[]>JSON.parse(oneImportedData));
        }
        this.trialDataForTuner = JSON.stringify(trialData);

chicm-ms's avatar
chicm-ms committed
261
262
263
264
265
        if (this.experimentProfile.execDuration < this.experimentProfile.params.maxExecDuration &&
            this.currSubmittedTrialNum < this.experimentProfile.params.maxTrialNum &&
            this.experimentProfile.endTime) {
            delete this.experimentProfile.endTime;
        }
chicm-ms's avatar
chicm-ms committed
266
        this.setStatus('RUNNING');
267

Deshui Yu's avatar
Deshui Yu committed
268
        // TO DO: update database record for resume event
269
270
271
        this.run().catch((err: Error) => {
            this.criticalError(err);
        });
Deshui Yu's avatar
Deshui Yu committed
272
273
    }

274
275
    public getTrialJob(trialJobId: string): Promise<TrialJobInfo> {
        return this.dataStore.getTrialJob(trialJobId);
Deshui Yu's avatar
Deshui Yu committed
276
277
278
    }

    public async setClusterMetadata(key: string, value: string): Promise<void> {
SparkSnail's avatar
SparkSnail committed
279
280
281
        if (this.readonly) {
            return Promise.reject(new Error('Error: can not set cluster metadata in readonly mode!'));
        }
chicm-ms's avatar
chicm-ms committed
282
        this.log.info(`NNIManager setClusterMetadata, key: ${key}, value: ${value}`);
Deshui Yu's avatar
Deshui Yu committed
283
284
285
286
        let timeoutId: NodeJS.Timer;
        // TO DO: move timeout value to constants file
        const delay1: Promise<{}> = new Promise((resolve: Function, reject: Function): void => {
            timeoutId = setTimeout(
287
                () => { reject(new Error('TrainingService setClusterMetadata timeout. Please check your config file.')); },
Deshui Yu's avatar
Deshui Yu committed
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
                10000);
        });
        await Promise.race([delay1, this.trainingService.setClusterMetadata(key, value)]).finally(() => {
            clearTimeout(timeoutId);
        });
    }

    public getClusterMetadata(key: string): Promise<string> {
        return Promise.resolve(
            this.trainingService.getClusterMetadata(key)
        );
    }

    public async getTrialJobStatistics(): Promise<TrialJobStatistics[]> {
        return this.dataStore.getTrialJobStatistics();
    }

305
    public async stopExperiment(): Promise<void> {
chicm-ms's avatar
chicm-ms committed
306
307
        this.setStatus('STOPPING');
        this.log.info('Stopping experiment, cleaning up ...');
308
        await this.experimentDoneCleanUp();
chicm-ms's avatar
chicm-ms committed
309
        this.log.info('Experiment stopped.');
Deshui Yu's avatar
Deshui Yu committed
310
311
    }

312
    public async getMetricData(trialJobId?: string, metricType?: MetricType): Promise<MetricDataRecord[]> {
Deshui Yu's avatar
Deshui Yu committed
313
314
315
        return this.dataStore.getMetricData(trialJobId, metricType);
    }

316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
    public async getMetricDataByRange(minSeqId: number, maxSeqId: number): Promise<MetricDataRecord[]> {
        const trialJobs = await this.dataStore.listTrialJobs();
        const targetTrials = trialJobs.filter(trial => (
            // FIXME: can this be undefined?
            trial.sequenceId !== undefined && minSeqId <= trial.sequenceId && trial.sequenceId <= maxSeqId
        ));
        const targetTrialIds = new Set(targetTrials.map(trial => trial.id));

        const allMetrics = await this.dataStore.getMetricData();
        return allMetrics.filter(metric => targetTrialIds.has(metric.trialJobId));
    }

    public async getLatestMetricData(): Promise<MetricDataRecord[]> {
        // FIXME: this can take a long time
        const allMetrics: MetricDataRecord[] = await this.dataStore.getMetricData();
        const finals: MetricDataRecord[] = [];
        const latestIntermediates: Map<string, MetricDataRecord> = new Map<string, MetricDataRecord>();
        for (const metric of allMetrics) {
            if (metric.type !== 'PERIODICAL') {
                finals.push(metric);
            } else {
                const old: MetricDataRecord | undefined = latestIntermediates.get(metric.trialJobId);
                if (old === undefined || old.sequence <= metric.sequence) {
                    latestIntermediates.set(metric.trialJobId, metric);
                }
            }
        }
        return finals.concat(Array.from(latestIntermediates.values()));
        // FIXME: unit test
    }

Deshui Yu's avatar
Deshui Yu committed
347
348
349
350
351
352
353
354
    public getExperimentProfile(): Promise<ExperimentProfile> {
        // TO DO: using Promise.resolve()
        const deferred: Deferred<ExperimentProfile> = new Deferred<ExperimentProfile>();
        deferred.resolve(this.experimentProfile);

        return deferred.promise;
    }

355
356
357
358
    public getStatus(): NNIManagerStatus {
        return this.status;
    }

Deshui Yu's avatar
Deshui Yu committed
359
360
361
362
    public async listTrialJobs(status?: TrialJobStatus): Promise<TrialJobInfo[]> {
        return this.dataStore.listTrialJobs(status);
    }

363
364
    private setupTuner(command: string, cwd: string | undefined, mode: 'start' | 'resume', dataDirectory: string): void {
        if (this.dispatcher !== undefined) {
Deshui Yu's avatar
Deshui Yu committed
365
366
            return;
        }
goooxu's avatar
goooxu committed
367
        const stdio: StdioOptions = ['ignore', process.stdout, process.stderr, 'pipe', 'pipe'];
Deshui Yu's avatar
Deshui Yu committed
368
369
370
371
372
373
374
        let newCwd: string;
        if (cwd === undefined || cwd === '') {
            newCwd = getLogDir();
        } else {
            newCwd = cwd;
        }
        // TO DO: add CUDA_VISIBLE_DEVICES
375
376
377
378
379
        let includeIntermediateResultsEnv: boolean | undefined = false;
        if (this.experimentProfile.params.tuner !== undefined) {
            includeIntermediateResultsEnv = this.experimentProfile.params.tuner.includeIntermediateResults;
        }

Zejun Lin's avatar
Zejun Lin committed
380
381
382
        let nniEnv = {
            NNI_MODE: mode,
            NNI_CHECKPOINT_DIRECTORY: dataDirectory,
383
            NNI_LOG_DIRECTORY: getLogDir(),
384
            NNI_LOG_LEVEL: getLogLevel(),
385
386
            NNI_INCLUDE_INTERMEDIATE_RESULTS: includeIntermediateResultsEnv,
            CUDA_VISIBLE_DEVICES: this.getGpuEnvvarValue()
Zejun Lin's avatar
Zejun Lin committed
387
388
        };
        let newEnv = Object.assign({}, process.env, nniEnv);
389
        const tunerProc: ChildProcess = getTunerProc(command,stdio,newCwd,newEnv);
390
391
        this.dispatcherPid = tunerProc.pid;
        this.dispatcher = createDispatcherInterface(tunerProc);
Deshui Yu's avatar
Deshui Yu committed
392
393
394
395

        return;
    }

396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
    private getGpuEnvvarValue(): string {
        let cudaDevices: string | undefined;

        if (this.experimentProfile.params.advisor !== undefined) {
            cudaDevices = this.experimentProfile.params.advisor.gpuIndices;
        } else if (this.experimentProfile.params.tuner !== undefined) {
            cudaDevices = this.experimentProfile.params.tuner.gpuIndices;
        }

        if (cudaDevices === undefined) {
            return '';
        } else {
            return cudaDevices;
        }
    }

Deshui Yu's avatar
Deshui Yu committed
412
    private updateTrialConcurrency(trialConcurrency: number): void {
QuanluZhang's avatar
QuanluZhang committed
413
414
        // we assume trialConcurrency >= 0, which is checked by restserver
        this.trialConcurrencyChange += (trialConcurrency - this.experimentProfile.params.trialConcurrency);
Deshui Yu's avatar
Deshui Yu committed
415
416
417
418
419
420
421
422
423
424
425
426
        this.experimentProfile.params.trialConcurrency = trialConcurrency;

        return;
    }

    private updateMaxExecDuration(duration: number): void {
        this.experimentProfile.params.maxExecDuration = duration;

        return;
    }

    private updateSearchSpace(searchSpace: string): void {
427
        if (this.dispatcher === undefined) {
Deshui Yu's avatar
Deshui Yu committed
428
429
            throw new Error('Error: tuner has not been setup');
        }
430
        this.dispatcher.sendCommand(UPDATE_SEARCH_SPACE, searchSpace);
Deshui Yu's avatar
Deshui Yu committed
431
432
433
434
435
        this.experimentProfile.params.searchSpace = searchSpace;

        return;
    }

QuanluZhang's avatar
QuanluZhang committed
436
437
438
439
440
441
    private updateMaxTrialNum(maxTrialNum: number): void {
        this.experimentProfile.params.maxTrialNum = maxTrialNum;

        return;
    }

Deshui Yu's avatar
Deshui Yu committed
442
    private async experimentDoneCleanUp(): Promise<void> {
443
        if (this.dispatcher === undefined) {
Deshui Yu's avatar
Deshui Yu committed
444
445
            throw new Error('Error: tuner has not been setup');
        }
446
        this.trainingService.removeTrialJobMetricListener(this.trialJobMetricListener);
447
        this.dispatcher.sendCommand(TERMINATE);
Deshui Yu's avatar
Deshui Yu committed
448
449
450
        let tunerAlive: boolean = true;
        // gracefully terminate tuner and assessor here, wait at most 30 seconds.
        for (let i: number = 0; i < 30; i++) {
451
            if (!tunerAlive) { break; }
452
            tunerAlive = await isAlive(this.dispatcherPid);
Deshui Yu's avatar
Deshui Yu committed
453
454
            await delay(1000);
        }
455
        await killPid(this.dispatcherPid);
Deshui Yu's avatar
Deshui Yu committed
456
457
458
459
460
461
        const trialJobList: TrialJobDetail[] = await this.trainingService.listTrialJobs();
        // TO DO: to promise all
        for (const trialJob of trialJobList) {
            if (trialJob.status === 'RUNNING' ||
                trialJob.status === 'WAITING') {
                try {
chicm-ms's avatar
chicm-ms committed
462
                    this.log.info(`cancelTrialJob: ${trialJob.id}`);
Deshui Yu's avatar
Deshui Yu committed
463
464
465
466
467
468
469
                    await this.trainingService.cancelTrialJob(trialJob.id);
                } catch (error) {
                    // pid does not exist, do nothing here
                }
            }
        }
        await this.trainingService.cleanUp();
470
        this.experimentProfile.endTime = Date.now();
Deshui Yu's avatar
Deshui Yu committed
471
        await this.storeExperimentProfile();
chicm-ms's avatar
chicm-ms committed
472
        this.setStatus('STOPPED');
Deshui Yu's avatar
Deshui Yu committed
473
474
475
    }

    private async periodicallyUpdateExecDuration(): Promise<void> {
476
        let count: number = 1;
477
        while (!['ERROR', 'STOPPING', 'STOPPED'].includes(this.status.status)) {
478
            await delay(1000 * 1); // 1 seconds
479
            if (this.status.status === 'RUNNING') {
480
481
482
483
484
485
                this.experimentProfile.execDuration += 1;
                if (count % 10 === 0) {
                    await this.storeExperimentProfile();
                }
            }
            count += 1;
Deshui Yu's avatar
Deshui Yu committed
486
487
488
        }
    }

chicm-ms's avatar
chicm-ms committed
489
490
491
492
493
494
    private async pingDispatcher(): Promise<void> {
        if (this.dispatcher === undefined) {
            throw new Error('Error: tuner has not been setup');
        }
        while (!['ERROR', 'STOPPING', 'STOPPED'].includes(this.status.status)) {
            this.dispatcher.sendCommand(PING);
chicm-ms's avatar
chicm-ms committed
495
            await delay(1000 * 5);
chicm-ms's avatar
chicm-ms committed
496
497
498
        }
    }

QuanluZhang's avatar
QuanluZhang committed
499
500
    private async requestTrialJobsStatus(): Promise<number> {
        let finishedTrialJobNum: number = 0;
QuanluZhang's avatar
QuanluZhang committed
501
502
503
        if (this.dispatcher === undefined) {
            throw new Error('Error: tuner has not been setup');
        }
QuanluZhang's avatar
QuanluZhang committed
504
505
506
507
        for (const trialJobId of Array.from(this.trialJobs.keys())) {
            const trialJobDetail: TrialJobDetail = await this.trainingService.getTrialJob(trialJobId);
            const oldTrialJobDetail: TrialJobDetail | undefined = this.trialJobs.get(trialJobId);
            if (oldTrialJobDetail !== undefined && oldTrialJobDetail.status !== trialJobDetail.status) {
chicm-ms's avatar
chicm-ms committed
508
                this.log.info(`Trial job ${trialJobDetail.id} status changed from ${oldTrialJobDetail.status} to ${trialJobDetail.status}`);
QuanluZhang's avatar
QuanluZhang committed
509
                this.trialJobs.set(trialJobId, Object.assign({}, trialJobDetail));
510
                await this.dataStore.storeTrialJobEvent(trialJobDetail.status, trialJobDetail.id, undefined, trialJobDetail);
QuanluZhang's avatar
QuanluZhang committed
511
            }
QuanluZhang's avatar
QuanluZhang committed
512
            let hyperParams: string | undefined = undefined;
QuanluZhang's avatar
QuanluZhang committed
513
514
515
            switch (trialJobDetail.status) {
                case 'SUCCEEDED':
                case 'USER_CANCELED':
QuanluZhang's avatar
QuanluZhang committed
516
                case 'EARLY_STOPPED':
QuanluZhang's avatar
QuanluZhang committed
517
518
                    this.trialJobs.delete(trialJobId);
                    finishedTrialJobNum++;
519
                    hyperParams = trialJobDetail.form.hyperParameters.value;
QuanluZhang's avatar
QuanluZhang committed
520
521
522
                    this.dispatcher.sendCommand(TRIAL_END, JSON.stringify({
                        trial_job_id: trialJobDetail.id,
                        event: trialJobDetail.status,
goooxu's avatar
goooxu committed
523
524
                        hyper_params: hyperParams
                    }));
QuanluZhang's avatar
QuanluZhang committed
525
526
527
528
529
530
531
                    break;
                case 'FAILED':
                case 'SYS_CANCELED':
                    // In the current version, we do not retry
                    // TO DO: push this job to queue for retry
                    this.trialJobs.delete(trialJobId);
                    finishedTrialJobNum++;
532
                    hyperParams = trialJobDetail.form.hyperParameters.value;
QuanluZhang's avatar
QuanluZhang committed
533
534
535
                    this.dispatcher.sendCommand(TRIAL_END, JSON.stringify({
                        trial_job_id: trialJobDetail.id,
                        event: trialJobDetail.status,
goooxu's avatar
goooxu committed
536
537
                        hyper_params: hyperParams
                    }));
QuanluZhang's avatar
QuanluZhang committed
538
539
540
541
542
543
544
545
546
547
                    break;
                case 'WAITING':
                case 'RUNNING':
                case 'UNKNOWN':
                    // Do nothing
                    break;
                default:
                // TO DO: add warning in log
            }
        }
goooxu's avatar
goooxu committed
548

Gems Guo's avatar
Gems Guo committed
549
        return finishedTrialJobNum;
QuanluZhang's avatar
QuanluZhang committed
550
551
552
553
554
555
    }

    private async manageTrials(): Promise<void> {
        if (this.dispatcher === undefined) {
            throw new Error('Error: tuner has not been setup');
        }
QuanluZhang's avatar
QuanluZhang committed
556
        let allFinishedTrialJobNum: number = this.currSubmittedTrialNum;
QuanluZhang's avatar
QuanluZhang committed
557
        let waitSubmittedToFinish: number;
558
        while (!['ERROR', 'STOPPING', 'STOPPED'].includes(this.status.status)) {
QuanluZhang's avatar
QuanluZhang committed
559
560
561
562
563
564
565
            const finishedTrialJobNum: number = await this.requestTrialJobsStatus();
            allFinishedTrialJobNum += finishedTrialJobNum;

            // requestTrialNum is the number of trials that will be requested from tuner.
            // If trialConcurrency does not change, requestTrialNum equals finishedTrialJobNum.
            // If trialConcurrency changes, for example, trialConcurrency increases by 2 (trialConcurrencyChange=2), then
            // requestTrialNum equals 2 + finishedTrialJobNum and trialConcurrencyChange becomes 0.
566
            // If trialConcurrency changes, for example, trialConcurrency decreases by 4 (trialConcurrencyChange=-4) and
QuanluZhang's avatar
QuanluZhang committed
567
568
569
570
571
572
573
574
            // finishedTrialJobNum is 2, then requestTrialNum becomes -2. No trial will be requested from tuner,
            // and trialConcurrencyChange becomes -2.
            const requestTrialNum: number = this.trialConcurrencyChange + finishedTrialJobNum;
            if (requestTrialNum >= 0) {
                this.trialConcurrencyChange = 0;
            } else {
                this.trialConcurrencyChange = requestTrialNum;
            }
chicm-ms's avatar
chicm-ms committed
575

576
            this.requestTrialJobs(requestTrialNum);
chicm-ms's avatar
chicm-ms committed
577

QuanluZhang's avatar
QuanluZhang committed
578
            // check maxtrialnum and maxduration here
579
            // NO_MORE_TRIAL is more like a subset of RUNNING, because during RUNNING tuner
580
            // might tell nnimanager that this is no more trials. In NO_MORE_TRIAL state, the experiment is viewed
581
582
            // as still running. DONE could be transfered from RUNNING or NO_MORE_TRIAL.
            assert(this.status.status === 'RUNNING' ||
583
                this.status.status === 'DONE' ||
QuanluZhang's avatar
QuanluZhang committed
584
585
                this.status.status === 'NO_MORE_TRIAL' ||
                this.status.status === 'TUNER_NO_MORE_TRIAL');
586
            if (this.experimentProfile.execDuration > this.experimentProfile.params.maxExecDuration ||
QuanluZhang's avatar
QuanluZhang committed
587
                this.currSubmittedTrialNum >= this.experimentProfile.params.maxTrialNum) {
QuanluZhang's avatar
QuanluZhang committed
588
                if (this.status.status !== 'DONE') {
chicm-ms's avatar
chicm-ms committed
589
                    this.setStatus('NO_MORE_TRIAL');
QuanluZhang's avatar
QuanluZhang committed
590
591
592
593
                    waitSubmittedToFinish = this.currSubmittedTrialNum;

                    assert(allFinishedTrialJobNum <= waitSubmittedToFinish);
                    if (allFinishedTrialJobNum >= waitSubmittedToFinish) {
chicm-ms's avatar
chicm-ms committed
594
                        this.setStatus('DONE');
QuanluZhang's avatar
QuanluZhang committed
595
596
597
598
599
                        this.experimentProfile.endTime = Date.now();
                        await this.storeExperimentProfile();
                        // write this log for travis CI
                        this.log.info('Experiment done.');
                    }
QuanluZhang's avatar
QuanluZhang committed
600
601
602
                }
            } else {
                if (this.status.status === 'DONE') {
603
604
                    delete this.experimentProfile.endTime;
                    await this.storeExperimentProfile();
QuanluZhang's avatar
QuanluZhang committed
605
                }
QuanluZhang's avatar
QuanluZhang committed
606
                if (this.status.status !== 'TUNER_NO_MORE_TRIAL') {
chicm-ms's avatar
chicm-ms committed
607
                    this.setStatus('RUNNING');
608
                }
QuanluZhang's avatar
QuanluZhang committed
609
610
611
612
613
                for (let i: number = this.trialJobs.size; i < this.experimentProfile.params.trialConcurrency; i++) {
                    if (this.waitingTrials.length === 0 ||
                        this.currSubmittedTrialNum >= this.experimentProfile.params.maxTrialNum) {
                        break;
                    }
614
                    const form = this.waitingTrials.shift() as TrialJobApplicationForm;
QuanluZhang's avatar
QuanluZhang committed
615
                    this.currSubmittedTrialNum++;
616
617
                    this.log.info(`submitTrialJob: form: ${JSON.stringify(form)}`);
                    const trialJobDetail: TrialJobDetail = await this.trainingService.submitTrialJob(form);
618
                    await this.storeExperimentProfile();
QuanluZhang's avatar
QuanluZhang committed
619
620
621
622
                    this.trialJobs.set(trialJobDetail.id, Object.assign({}, trialJobDetail));
                    const trialJobDetailSnapshot: TrialJobDetail | undefined = this.trialJobs.get(trialJobDetail.id);
                    if (trialJobDetailSnapshot != undefined) {
                        await this.dataStore.storeTrialJobEvent(
623
                            trialJobDetailSnapshot.status, trialJobDetailSnapshot.id, form.hyperParameters.value, trialJobDetailSnapshot);
QuanluZhang's avatar
QuanluZhang committed
624
625
626
627
628
629
630
631
632
                    } else {
                        assert(false, `undefined trialJobDetail in trialJobs: ${trialJobDetail.id}`);
                    }
                }
            }
            await delay(1000 * 5); // 5 seconds
        }
    }

Deshui Yu's avatar
Deshui Yu committed
633
634
635
636
637
638
    private storeExperimentProfile(): Promise<void> {
        this.experimentProfile.revision += 1;

        return this.dataStore.storeExperimentProfile(this.experimentProfile);
    }

639
    private async run(): Promise<void> {
QuanluZhang's avatar
QuanluZhang committed
640
        assert(this.dispatcher !== undefined);
641
642
643
644
645
646
647

        this.addEventListeners();

        this.sendInitTunerCommands();

        await Promise.all([
            this.periodicallyUpdateExecDuration(),
chicm-ms's avatar
chicm-ms committed
648
            this.pingDispatcher().catch((err: Error) => {
chicm-ms's avatar
chicm-ms committed
649
                throw NNIError.FromError(err, 'Dispatcher error: ');
chicm-ms's avatar
chicm-ms committed
650
            }),
651
            this.trainingService.run().catch((err: Error) => {
chicm-ms's avatar
chicm-ms committed
652
                throw NNIError.FromError(err, 'Training service error: ');
653
            }),
QuanluZhang's avatar
QuanluZhang committed
654
            this.manageTrials().catch((err: Error) => {
chicm-ms's avatar
chicm-ms committed
655
                throw NNIError.FromError(err, 'Job management error: ');
656
            })]);
657
658
    }

QuanluZhang's avatar
QuanluZhang committed
659
    private addEventListeners(): void {
chicm-ms's avatar
chicm-ms committed
660
        this.log.info('Add event listeners');
661
        // TO DO: cannot run this method more than once in one NNIManager instance
QuanluZhang's avatar
QuanluZhang committed
662
        if (this.dispatcher === undefined) {
663
664
            throw new Error('Error: tuner or job maintainer have not been setup');
        }
665
        this.trainingService.addTrialJobMetricListener(this.trialJobMetricListener);
666
667
668

        this.dispatcher.onCommand((commandType: string, content: string) => {
            this.onTunerCommand(commandType, content).catch((err: Error) => {
chicm-ms's avatar
chicm-ms committed
669
                this.criticalError(NNIError.FromError(err, 'Tuner command event error: '));
670
671
672
673
674
675
            });
        });
    }

    private sendInitTunerCommands(): void {
        if (this.dispatcher === undefined) {
676
            throw new Error('Dispatcher error: tuner has not been setup');
677
        }
chicm-ms's avatar
chicm-ms committed
678
679
680
        this.log.debug(`Send tuner command: INITIALIZE: ${this.experimentProfile.params.searchSpace}`);
        // Tuner need to be initialized with search space before generating any hyper parameters
        this.dispatcher.sendCommand(INITIALIZE, this.experimentProfile.params.searchSpace);
681
682
683
    }

    private async onTrialJobMetrics(metric: TrialJobMetric): Promise<void> {
chicm-ms's avatar
chicm-ms committed
684
        this.log.debug(`NNIManager received trial job metrics: ${metric}`);
685
686
687
688
689
690
691
        await this.dataStore.storeMetricData(metric.id, metric.data);
        if (this.dispatcher === undefined) {
            throw new Error('Error: tuner has not been setup');
        }
        this.dispatcher.sendCommand(REPORT_METRIC_DATA, metric.data);
    }

chicm-ms's avatar
chicm-ms committed
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
    private requestTrialJobs(jobNum: number): void {
        if (jobNum < 1) {
            return;
        }
        if (this.dispatcher === undefined) {
            throw new Error('Dispatcher error: tuner has not been setup');
        }
        if (this.experimentProfile.params.multiThread) {
            // Send multiple requests to ensure multiple hyper parameters are generated in non-blocking way.
            // For a single REQUEST_TRIAL_JOBS request, hyper parameters are generated one by one
            // sequentially.
            for (let i: number = 0; i < jobNum; i++) {
                this.dispatcher.sendCommand(REQUEST_TRIAL_JOBS, '1');
            }
        } else {
            this.dispatcher.sendCommand(REQUEST_TRIAL_JOBS, String(jobNum));
        }
    }

711
    private async onTunerCommand(commandType: string, content: string): Promise<void> {
horizon365's avatar
horizon365 committed
712
        this.log.info(`NNIManager received command from dispatcher: ${commandType}, ${content}`);
713
        switch (commandType) {
chicm-ms's avatar
chicm-ms committed
714
715
            case INITIALIZED:
                // Tuner is intialized, search space is set, request tuner to generate hyper parameters
716
717
718
719
720
721
                if (this.trialDataForTuner.length > 0) {
                    if (this.dispatcher === undefined) {
                        throw new Error('Dispatcher error: tuner has not been setup');
                    }
                    this.dispatcher.sendCommand(IMPORT_DATA, this.trialDataForTuner);
                }
chicm-ms's avatar
chicm-ms committed
722
723
                this.requestTrialJobs(this.experimentProfile.params.trialConcurrency);
                break;
724
            case NEW_TRIAL_JOB:
QuanluZhang's avatar
QuanluZhang committed
725
                if (this.status.status === 'TUNER_NO_MORE_TRIAL') {
726
                    this.log.warning('It is not supposed to receive more trials after NO_MORE_TRIAL is set');
chicm-ms's avatar
chicm-ms committed
727
                    this.setStatus('RUNNING');
728
                }
729
730
731
732
733
734
735
736
                const form: TrialJobApplicationForm = {
                    sequenceId: this.experimentProfile.nextSequenceId++,
                    hyperParameters: {
                        value: content,
                        index: 0
                    }
                };
                this.waitingTrials.push(form);
737
                break;
chicm-ms's avatar
chicm-ms committed
738
739
740
741
742
743
            case SEND_TRIAL_JOB_PARAMETER:
                const tunerCommand: any = JSON.parse(content);
                assert(tunerCommand.parameter_index >= 0);
                assert(tunerCommand.trial_job_id !== undefined);

                const trialJobForm: TrialJobApplicationForm = {
744
                    sequenceId: -1,  // FIXME: multi-phase tuner should use sequence ID instead of trial job ID
chicm-ms's avatar
chicm-ms committed
745
746
747
748
749
                    hyperParameters: {
                        value: content,
                        index: tunerCommand.parameter_index
                    }
                };
chicm-ms's avatar
chicm-ms committed
750
                this.log.info(`updateTrialJob: job id: ${tunerCommand.trial_job_id}, form: ${JSON.stringify(trialJobForm)}`);
chicm-ms's avatar
chicm-ms committed
751
                await this.trainingService.updateTrialJob(tunerCommand.trial_job_id, trialJobForm);
752
753
754
755
756
                if (tunerCommand['parameters'] !== null) {
                    // parameters field is set as empty string if no more hyper parameter can be generated by tuner.
                    await this.dataStore.storeTrialJobEvent(
                        'ADD_HYPERPARAMETER', tunerCommand.trial_job_id, content, undefined);
                }
chicm-ms's avatar
chicm-ms committed
757
                break;
758
            case NO_MORE_TRIAL_JOBS:
759
760
761
                if (!['ERROR', 'STOPPING', 'STOPPED'].includes(this.status.status)) {
                    this.setStatus('TUNER_NO_MORE_TRIAL');
                }
762
763
                break;
            case KILL_TRIAL_JOB:
chicm-ms's avatar
chicm-ms committed
764
                this.log.info(`cancelTrialJob: ${JSON.parse(content)}`);
QuanluZhang's avatar
QuanluZhang committed
765
                await this.trainingService.cancelTrialJob(JSON.parse(content), true);
766
767
768
769
                break;
            default:
                throw new Error('Error: unsupported command type from tuner');
        }
Deshui Yu's avatar
Deshui Yu committed
770
771
    }

772
773
774
775
776
777
778
779
780
781
    private criticalError(err: Error): void {
        this.logError(err);
        console.error(err);
    }

    private logError(err: Error): void {
        if (err.stack !== undefined) {
            this.log.error(err.stack);
        }
        this.status.errors.push(err.message);
chicm-ms's avatar
chicm-ms committed
782
783
784
785
786
787
788
789
        this.setStatus('ERROR');
    }

    private setStatus(status: ExperimentStatus): void {
        if (status !== this.status.status) {
            this.log.info(`Change NNIManager status from: ${this.status.status} to: ${status}`);
            this.status.status = status;
        }
790
791
792
793
794
795
796
    }

    private createEmptyExperimentProfile(): ExperimentProfile {
        return {
            id: getExperimentId(),
            revision: 0,
            execDuration: 0,
797
            logDir: getExperimentRootDir(),
798
            nextSequenceId: 0,
799
800
801
802
803
804
            params: {
                authorName: '',
                experimentName: '',
                trialConcurrency: 0,
                maxExecDuration: 0, // unit: second
                maxTrialNum: 0, // maxTrialNum includes all the submitted trial jobs
805
                trainingServicePlatform: '',
QuanluZhang's avatar
QuanluZhang committed
806
                searchSpace: ''
807
808
            }
        };
Deshui Yu's avatar
Deshui Yu committed
809
    }
810

QuanluZhang's avatar
QuanluZhang committed
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
    private async createCheckpointDir(): Promise<string> {
        // TODO: test
        const chkpDir: string = getCheckpointDir();
        // create checkpoint directory
        await mkDirP(chkpDir);
        // assign this directory to exp profile's checkpointDir
        if (this.experimentProfile.params.advisor) {
            this.experimentProfile.params.advisor.checkpointDir = chkpDir;
        }
        if (this.experimentProfile.params.tuner) {
            this.experimentProfile.params.tuner.checkpointDir = chkpDir;
        }
        if (this.experimentProfile.params.assessor) {
            this.experimentProfile.params.assessor.checkpointDir = chkpDir;
        }

        return Promise.resolve(chkpDir);
    }
Deshui Yu's avatar
Deshui Yu committed
829
830
831
}

export { NNIManager };