localTrainingService.ts 21.2 KB
Newer Older
liuzhe-lz's avatar
liuzhe-lz committed
1
2
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
Deshui Yu's avatar
Deshui Yu committed
3
4
5
6
7
8
9

'use strict';
import * as cp from 'child_process';
import { EventEmitter } from 'events';
import * as fs from 'fs';
import * as path from 'path';
import * as ts from 'tail-stream';
10
import * as tkill from 'tree-kill';
11
import { NNIError, NNIErrorNames } from '../../common/errors';
12
import { getExperimentId } from '../../common/experimentStartupInfo';
13
import { getLogger, Logger } from '../../common/log';
Deshui Yu's avatar
Deshui Yu committed
14
import {
15
    HyperParameters, TrainingService, TrialJobApplicationForm,
16
    TrialJobDetail, TrialJobMetric, TrialJobStatus, LogType
Deshui Yu's avatar
Deshui Yu committed
17
} from '../../common/trainingService';
18
19
20
import {
    delay, generateParamFileName, getExperimentRootDir, getJobCancelStatus, getNewLine, isAlive, uniqueString
} from '../../common/utils';
21
import { ExperimentConfig, LocalConfig, flattenConfig } from '../../common/experimentConfig';
22
import { execMkdir, execNewFile, getScriptName, runScript, setEnvironmentVariable } from '../common/util';
23
import { GPUScheduler } from './gpuScheduler';
Deshui Yu's avatar
Deshui Yu committed
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52

/**
 * Decode a command
 * @param Buffer binary incoming data
 * @returns a tuple of (success, commandType, content, remain)
 *          success: true if the buffer contains at least one complete command; otherwise false
 *          remain: remaining data after the first command
 */
function decodeCommand(data: Buffer): [boolean, string, string, Buffer] {
    if (data.length < 8) {
        return [false, '', '', data];
    }
    const commandType: string = data.slice(0, 2).toString();
    const contentLength: number = parseInt(data.slice(2, 8).toString(), 10);
    if (data.length < contentLength + 8) {
        return [false, '', '', data];
    }
    const content: string = data.slice(8, contentLength + 8).toString();
    const remain: Buffer = data.slice(contentLength + 8);

    return [true, commandType, content, remain];
}

/**
 * LocalTrialJobDetail
 */
class LocalTrialJobDetail implements TrialJobDetail {
    public id: string;
    public status: TrialJobStatus;
53
54
55
    public submitTime: number;
    public startTime?: number;
    public endTime?: number;
Deshui Yu's avatar
Deshui Yu committed
56
57
58
    public tags?: string[];
    public url?: string;
    public workingDirectory: string;
59
    public form: TrialJobApplicationForm;
Deshui Yu's avatar
Deshui Yu committed
60
    public pid?: number;
61
    public gpuIndices?: number[];
Deshui Yu's avatar
Deshui Yu committed
62

63
64
    constructor(
        id: string, status: TrialJobStatus, submitTime: number,
65
        workingDirectory: string, form: TrialJobApplicationForm) {
Deshui Yu's avatar
Deshui Yu committed
66
67
68
69
70
71
        this.id = id;
        this.status = status;
        this.submitTime = submitTime;
        this.workingDirectory = workingDirectory;
        this.form = form;
        this.url = `file://localhost:${workingDirectory}`;
72
73
74
75
        this.gpuIndices = [];
    }
}

76
interface FlattenLocalConfig extends ExperimentConfig, LocalConfig { }
Deshui Yu's avatar
Deshui Yu committed
77
78

/**
chicm-ms's avatar
chicm-ms committed
79
 * Local machine training service
Deshui Yu's avatar
Deshui Yu committed
80
81
 */
class LocalTrainingService implements TrainingService {
82
    private readonly config: FlattenLocalConfig;
83
84
85
    private readonly eventEmitter: EventEmitter;
    private readonly jobMap: Map<string, LocalTrialJobDetail>;
    private readonly jobQueue: string[];
Deshui Yu's avatar
Deshui Yu committed
86
87
88
    private initialized: boolean;
    private stopping: boolean;
    private rootDir!: string;
chicm-ms's avatar
chicm-ms committed
89
    private readonly experimentId!: string;
90
    private gpuScheduler!: GPUScheduler;
91
92
93
    private readonly occupiedGpuIndexNumMap: Map<number, number>;
    private readonly log: Logger;
    private readonly jobStreamMap: Map<string, ts.Stream>;
Deshui Yu's avatar
Deshui Yu committed
94

95
96
    constructor(config: ExperimentConfig) {
        this.config = flattenConfig<FlattenLocalConfig>(config, 'local');
Deshui Yu's avatar
Deshui Yu committed
97
98
99
100
        this.eventEmitter = new EventEmitter();
        this.jobMap = new Map<string, LocalTrialJobDetail>();
        this.jobQueue = [];
        this.stopping = false;
liuzhe-lz's avatar
liuzhe-lz committed
101
        this.log = getLogger('LocalTrainingService');
suiguoxin's avatar
suiguoxin committed
102
        this.experimentId = getExperimentId();
103
        this.jobStreamMap = new Map<string, ts.Stream>();
chicm-ms's avatar
chicm-ms committed
104
        this.log.info('Construct local machine training service.');
105
        this.occupiedGpuIndexNumMap = new Map<number, number>();
106
107
108
109
110
111
112
113
114
115
116
117
118
119

        if (this.config.trialGpuNumber !== undefined && this.config.trialGpuNumber > 0) {
            this.gpuScheduler = new GPUScheduler();
        }

        if (this.config.gpuIndices === []) {
            throw new Error('gpuIndices cannot be empty when specified.');
        }

        this.rootDir = getExperimentRootDir();
        if (!fs.existsSync(this.rootDir)) {
            throw new Error('root dir not created');
        }
        this.initialized = true;
Deshui Yu's avatar
Deshui Yu committed
120
121
122
    }

    public async run(): Promise<void> {
chicm-ms's avatar
chicm-ms committed
123
        this.log.info('Run local machine training service.');
124
125
126
        const longRunningTasks: Promise<void>[] = [this.runJobLoop()];
        if (this.gpuScheduler !== undefined) {
            longRunningTasks.push(this.gpuScheduler.run());
Deshui Yu's avatar
Deshui Yu committed
127
        }
128
        await Promise.all(longRunningTasks);
chicm-ms's avatar
chicm-ms committed
129
        this.log.info('Local machine training service exit.');
Deshui Yu's avatar
Deshui Yu committed
130
131
132
133
134
135
    }

    public async listTrialJobs(): Promise<TrialJobDetail[]> {
        const jobs: TrialJobDetail[] = [];
        for (const key of this.jobMap.keys()) {
            const trialJob: TrialJobDetail = await this.getTrialJob(key);
136
            jobs.push(trialJob);
Deshui Yu's avatar
Deshui Yu committed
137
138
139
140
141
142
143
144
145
146
147
        }

        return jobs;
    }

    public async getTrialJob(trialJobId: string): Promise<TrialJobDetail> {
        const trialJob: LocalTrialJobDetail | undefined = this.jobMap.get(trialJobId);
        if (trialJob === undefined) {
            throw new NNIError(NNIErrorNames.NOT_FOUND, 'Trial job not found');
        }
        if (trialJob.status === 'RUNNING') {
148
            const alive: boolean = await isAlive(trialJob.pid);
Deshui Yu's avatar
Deshui Yu committed
149
            if (!alive) {
150
                trialJob.endTime = Date.now();
Deshui Yu's avatar
Deshui Yu committed
151
152
153
                this.setTrialJobStatus(trialJob, 'FAILED');
                try {
                    const state: string = await fs.promises.readFile(path.join(trialJob.workingDirectory, '.nni', 'state'), 'utf8');
154
155
                    const match: RegExpMatchArray | null = state.trim()
                        .match(/^(\d+)\s+(\d+)/);
Deshui Yu's avatar
Deshui Yu committed
156
157
158
159
160
                    if (match !== null) {
                        const { 1: code, 2: timestamp } = match;
                        if (parseInt(code, 10) === 0) {
                            this.setTrialJobStatus(trialJob, 'SUCCEEDED');
                        }
161
                        trialJob.endTime = parseInt(timestamp, 10);
Deshui Yu's avatar
Deshui Yu committed
162
163
164
165
                    }
                } catch (error) {
                    //ignore
                }
166
                this.log.debug(`trialJob status update: ${trialJobId}, ${trialJob.status}`);
Deshui Yu's avatar
Deshui Yu committed
167
168
169
170
171
172
            }
        }

        return trialJob;
    }

173
174
175
176
    public async getTrialLog(trialJobId: string, logType: LogType): Promise<string> {
        let logPath: string;
        if (logType === 'TRIAL_LOG') {
            logPath = path.join(this.rootDir, 'trials', trialJobId, 'trial.log');
177
178
        } else if (logType === 'TRIAL_STDOUT'){
            logPath = path.join(this.rootDir, 'trials', trialJobId, 'stdout');
179
180
181
182
183
184
185
186
        } else if (logType === 'TRIAL_ERROR') {
            logPath = path.join(this.rootDir, 'trials', trialJobId, 'stderr');
        } else {
            throw new Error('unexpected log type');
        }
        return fs.promises.readFile(logPath, 'utf8');
    }

Deshui Yu's avatar
Deshui Yu committed
187
188
189
190
191
192
193
194
    public addTrialJobMetricListener(listener: (metric: TrialJobMetric) => void): void {
        this.eventEmitter.on('metric', listener);
    }

    public removeTrialJobMetricListener(listener: (metric: TrialJobMetric) => void): void {
        this.eventEmitter.off('metric', listener);
    }

195
196
197
198
199
200
201
202
203
204
205
206
    public submitTrialJob(form: TrialJobApplicationForm): Promise<TrialJobDetail> {
        const trialJobId: string = uniqueString(5);
        const trialJobDetail: LocalTrialJobDetail = new LocalTrialJobDetail(
            trialJobId,
            'WAITING',
            Date.now(),
            path.join(this.rootDir, 'trials', trialJobId),
            form
        );
        this.jobQueue.push(trialJobId);
        this.jobMap.set(trialJobId, trialJobDetail);

liuzhe-lz's avatar
liuzhe-lz committed
207
        this.log.debug('submitTrialJob: return:',  trialJobDetail);
208
209

        return Promise.resolve(trialJobDetail);
Deshui Yu's avatar
Deshui Yu committed
210
211
    }

212
213
214
215
216
    /**
     * Update trial job for multi-phase
     * @param trialJobId trial job id
     * @param form job application form
     */
217
    public async updateTrialJob(trialJobId: string, form: TrialJobApplicationForm): Promise<TrialJobDetail> {
chicm-ms's avatar
chicm-ms committed
218
219
220
221
        const trialJobDetail: undefined | TrialJobDetail = this.jobMap.get(trialJobId);
        if (trialJobDetail === undefined) {
            throw new Error(`updateTrialJob failed: ${trialJobId} not found`);
        }
222
        await this.writeParameterFile(trialJobDetail.workingDirectory, form.hyperParameters);
chicm-ms's avatar
chicm-ms committed
223
224

        return trialJobDetail;
225
226
    }

QuanluZhang's avatar
QuanluZhang committed
227
    public async cancelTrialJob(trialJobId: string, isEarlyStopped: boolean = false): Promise<void> {
Deshui Yu's avatar
Deshui Yu committed
228
229
230
231
        const trialJob: LocalTrialJobDetail | undefined = this.jobMap.get(trialJobId);
        if (trialJob === undefined) {
            throw new NNIError(NNIErrorNames.NOT_FOUND, 'Trial job not found');
        }
232
        if (trialJob.pid === undefined) {
233
            this.setTrialJobStatus(trialJob, 'USER_CANCELED');
234

SparkSnail's avatar
SparkSnail committed
235
            return Promise.resolve();
236
        }
237
238
239
240
241
242
243
244
245
246
247
248
249
250
        tkill(trialJob.pid, 'SIGTERM');
        const startTime = Date.now();
        while(await isAlive(trialJob.pid)) {    
            if (Date.now() - startTime > 4999) {
                tkill(trialJob.pid, 'SIGKILL', (err) => {
                    if (err) {
                        this.log.error(`kill trial job error: ${err}`);
                    }
                });
                break;
            }
            await delay(500);
        }

QuanluZhang's avatar
QuanluZhang committed
251
        this.setTrialJobStatus(trialJob, getJobCancelStatus(isEarlyStopped));
252

SparkSnail's avatar
SparkSnail committed
253
        return Promise.resolve();
Deshui Yu's avatar
Deshui Yu committed
254
255
    }

256
257
    public async setClusterMetadata(_key: string, _value: string): Promise<void> { return; }
    public async getClusterMetadata(_key: string): Promise<string> { return ''; }
Deshui Yu's avatar
Deshui Yu committed
258

259
    public async cleanUp(): Promise<void> {
chicm-ms's avatar
chicm-ms committed
260
        this.log.info('Stopping local machine training service...');
Deshui Yu's avatar
Deshui Yu committed
261
        this.stopping = true;
262
        for (const stream of this.jobStreamMap.values()) {
263
264
            stream.end(0);
            stream.emit('end');
265
        }
266
267
268
269
        if (this.gpuScheduler !== undefined) {
            await this.gpuScheduler.stop();
        }

Deshui Yu's avatar
Deshui Yu committed
270
271
272
        return Promise.resolve();
    }

273
    private onTrialJobStatusChanged(trialJob: LocalTrialJobDetail, oldStatus: TrialJobStatus): void {
274
        //if job is not running, destory job stream
275
276
277
        if (['SUCCEEDED', 'FAILED', 'USER_CANCELED', 'SYS_CANCELED', 'EARLY_STOPPED'].includes(trialJob.status)) {
            if (this.jobStreamMap.has(trialJob.id)) {
                const stream: ts.Stream | undefined = this.jobStreamMap.get(trialJob.id);
278
                if (stream === undefined) {
279
280
                    throw new Error(`Could not find stream in trial ${trialJob.id}`);
                }
281
                //Refer https://github.com/Juul/tail-stream/issues/20
Yan Ni's avatar
Yan Ni committed
282
283
284
285
286
                setTimeout(() => {
                    stream.end(0);
                    stream.emit('end');
                    this.jobStreamMap.delete(trialJob.id);
                }, 5000);
287
288
            }
        }
289
290
291
        if (trialJob.gpuIndices !== undefined && trialJob.gpuIndices.length > 0 && this.gpuScheduler !== undefined) {
            if (oldStatus === 'RUNNING' && trialJob.status !== 'RUNNING') {
                for (const index of trialJob.gpuIndices) {
292
293
                    const num: number | undefined = this.occupiedGpuIndexNumMap.get(index);
                    if (num === undefined) {
294
                        throw new Error(`gpu resource schedule error`);
295
                    } else if (num === 1) {
296
297
                        this.occupiedGpuIndexNumMap.delete(index);
                    } else {
298
                        this.occupiedGpuIndexNumMap.set(index, num - 1);
299
                    }
300
301
302
                }
            }
        }
Deshui Yu's avatar
Deshui Yu committed
303
304
    }

305
306
    private getEnvironmentVariables(
        trialJobDetail: TrialJobDetail,
SparkSnail's avatar
SparkSnail committed
307
308
        resource: { gpuIndices: number[] },
        gpuNum: number | undefined): { key: string; value: string }[] {
309
310
311
312
313
314
315
            const envVariables: { key: string; value: string }[] = [
                { key: 'NNI_PLATFORM', value: 'local' },
                { key: 'NNI_EXP_ID', value: this.experimentId },
                { key: 'NNI_SYS_DIR', value: trialJobDetail.workingDirectory },
                { key: 'NNI_TRIAL_JOB_ID', value: trialJobDetail.id },
                { key: 'NNI_OUTPUT_DIR', value: trialJobDetail.workingDirectory },
                { key: 'NNI_TRIAL_SEQ_ID', value: trialJobDetail.form.sequenceId.toString() },
316
                { key: 'NNI_CODE_DIR', value: this.config.trialCodeDirectory}
317
318
319
320
321
322
323
            ];
            if (gpuNum !== undefined) {
                envVariables.push({
                    key: 'CUDA_VISIBLE_DEVICES',
                    value: this.gpuScheduler === undefined ? '-1' : resource.gpuIndices.join(',')
                });
            }
324
325
326
327
328
329
330
331
332
333
334
335
336
337

        return envVariables;
    }

    private setExtraProperties(trialJobDetail: LocalTrialJobDetail, resource: { gpuIndices: number[] }): void {
        trialJobDetail.gpuIndices = resource.gpuIndices;
    }

    private tryGetAvailableResource(): [boolean, { gpuIndices: number[]}] {
        const resource: { gpuIndices: number[] } = { gpuIndices: [] };
        if (this.gpuScheduler === undefined) {
            return [true, resource];
        }

338
        let selectedGPUIndices: number[] = [];
339
        const availableGpuIndices: number[] = this.gpuScheduler.getAvailableGPUIndices(this.config.useActiveGpu, this.occupiedGpuIndexNumMap);
340
341
        for (const index of availableGpuIndices) {
            const num: number | undefined = this.occupiedGpuIndexNumMap.get(index);
342
            if (num === undefined || num < this.config.maxTrialNumberPerGpu) {
343
344
345
                selectedGPUIndices.push(index);
            }
        }
346

347
        if (this.config.gpuIndices !== undefined) {
348
            this.checkSpecifiedGpuIndices();
349
            selectedGPUIndices = selectedGPUIndices.filter((index: number) => this.config.gpuIndices!.includes(index));
350
351
        }

352
        if (selectedGPUIndices.length < this.config.trialGpuNumber!) {
353
354
355
            return [false, resource];
        }

356
        selectedGPUIndices.splice(this.config.trialGpuNumber!);
357
358
359
        Object.assign(resource, { gpuIndices: selectedGPUIndices });

        return [true, resource];
Deshui Yu's avatar
Deshui Yu committed
360
361
    }

362
    private checkSpecifiedGpuIndices(): void {
363
        const gpuCount: number | undefined = this.gpuScheduler.getSystemGpuCount();
364
365
        if (this.config.gpuIndices !== undefined && gpuCount !== undefined) {
            for (const index of this.config.gpuIndices) {
366
367
368
369
370
                if (index >= gpuCount) {
                    throw new Error(`Specified GPU index not found: ${index}`);
                }
            }
        }
Deshui Yu's avatar
Deshui Yu committed
371
372
    }

373
374
375
    private occupyResource(resource: {gpuIndices: number[]}): void {
        if (this.gpuScheduler !== undefined) {
            for (const index of resource.gpuIndices) {
376
377
378
                const num: number | undefined = this.occupiedGpuIndexNumMap.get(index);
                if (num === undefined) {
                    this.occupiedGpuIndexNumMap.set(index, 1);
379
                } else {
380
                    this.occupiedGpuIndexNumMap.set(index, num + 1);
381
                }
382
383
            }
        }
Deshui Yu's avatar
Deshui Yu committed
384
385
    }

386
387
388
389
    private async runJobLoop(): Promise<void> {
        while (!this.stopping) {
            while (!this.stopping && this.jobQueue.length !== 0) {
                const trialJobId: string = this.jobQueue[0];
390
391
                const trialJobDetail: LocalTrialJobDetail | undefined = this.jobMap.get(trialJobId);
                if (trialJobDetail !== undefined && trialJobDetail.status === 'WAITING') {
392
393
394
395
                    const [success, resource] = this.tryGetAvailableResource();
                    if (!success) {
                        break;
                    }
396

397
398
399
400
401
402
403
                    this.occupyResource(resource);
                    await this.runTrialJob(trialJobId, resource);
                }
                this.jobQueue.shift();
            }
            await delay(5000);
        }
Deshui Yu's avatar
Deshui Yu committed
404
405
406
407
408
409
410
411
412
413
    }

    private setTrialJobStatus(trialJob: LocalTrialJobDetail, newStatus: TrialJobStatus): void {
        if (trialJob.status !== newStatus) {
            const oldStatus: TrialJobStatus = trialJob.status;
            trialJob.status = newStatus;
            this.onTrialJobStatusChanged(trialJob, oldStatus);
        }
    }

414
    private getScript(workingDirectory: string): string[] {
415
416
        const script: string[] = [];
        if (process.platform === 'win32') {
417
            script.push(`$PSDefaultParameterValues = @{'Out-File:Encoding' = 'utf8'}`);
SparkSnail's avatar
SparkSnail committed
418
            script.push(`cd $env:NNI_CODE_DIR`);
419
            script.push(
420
                `cmd.exe /c ${this.config.trialCommand} 1>${path.join(workingDirectory, 'stdout')} 2>${path.join(workingDirectory, 'stderr')}`,
421
                `$NOW_DATE = [int64](([datetime]::UtcNow)-(get-date "1/1/1970")).TotalSeconds`,
422
                `$NOW_DATE = "$NOW_DATE" + (Get-Date -Format fff).ToString()`,
SparkSnail's avatar
SparkSnail committed
423
                `Write $LASTEXITCODE " " $NOW_DATE  | Out-File "${path.join(workingDirectory, '.nni', 'state')}" -NoNewline -encoding utf8`);
424
        } else {
SparkSnail's avatar
SparkSnail committed
425
            script.push(`cd $NNI_CODE_DIR`);
426
            script.push(`eval ${this.config.trialCommand} 1>${path.join(workingDirectory, 'stdout')} 2>${path.join(workingDirectory, 'stderr')}`);
427
428
429
            if (process.platform === 'darwin') {
                // https://superuser.com/questions/599072/how-to-get-bash-execution-time-in-milliseconds-under-mac-os-x
                // Considering the worst case, write 999 to avoid negative duration
SparkSnail's avatar
SparkSnail committed
430
                script.push(`echo $? \`date +%s999\` >'${path.join(workingDirectory, '.nni', 'state')}'`);
431
            } else {
SparkSnail's avatar
SparkSnail committed
432
                script.push(`echo $? \`date +%s%3N\` >'${path.join(workingDirectory, '.nni', 'state')}'`);
433
            }
434
        }
435

436
437
438
        return script;
    }

439
    private async runTrialJob(trialJobId: string, resource: {gpuIndices: number[]}): Promise<void> {
Deshui Yu's avatar
Deshui Yu committed
440
        const trialJobDetail: LocalTrialJobDetail = <LocalTrialJobDetail>this.jobMap.get(trialJobId);
441
        const variables: { key: string; value: string }[] = this.getEnvironmentVariables(trialJobDetail, resource, this.config.trialGpuNumber);
Deshui Yu's avatar
Deshui Yu committed
442

443
444
445
        const runScriptContent: string[] = [];
        if (process.platform !== 'win32') {
            runScriptContent.push('#!/bin/bash');
446
447
        } else {
            runScriptContent.push(`$env:PATH="${process.env.path}"`)
448
        }
Deshui Yu's avatar
Deshui Yu committed
449
        for (const variable of variables) {
450
            runScriptContent.push(setEnvironmentVariable(variable));
Deshui Yu's avatar
Deshui Yu committed
451
        }
452
        const scripts: string[] = this.getScript(trialJobDetail.workingDirectory);
453
454
        scripts.forEach((script: string) => {
            runScriptContent.push(script);
455
456
457
458
459
        });
        await execMkdir(trialJobDetail.workingDirectory);
        await execMkdir(path.join(trialJobDetail.workingDirectory, '.nni'));
        await execNewFile(path.join(trialJobDetail.workingDirectory, '.nni', 'metrics'));
        const scriptName: string = getScriptName('run');
460
461
        await fs.promises.writeFile(path.join(trialJobDetail.workingDirectory, scriptName),
                                    runScriptContent.join(getNewLine()), { encoding: 'utf8', mode: 0o777 });
462
        await this.writeParameterFile(trialJobDetail.workingDirectory, trialJobDetail.form.hyperParameters);
463
        const trialJobProcess: cp.ChildProcess = runScript(path.join(trialJobDetail.workingDirectory, scriptName));
Deshui Yu's avatar
Deshui Yu committed
464
        this.setTrialJobStatus(trialJobDetail, 'RUNNING');
465
466
        trialJobDetail.startTime = Date.now(); // eslint-disable-line require-atomic-updates
        trialJobDetail.pid = trialJobProcess.pid; // eslint-disable-line require-atomic-updates
Deshui Yu's avatar
Deshui Yu committed
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
        this.setExtraProperties(trialJobDetail, resource);

        let buffer: Buffer = Buffer.alloc(0);
        const stream: ts.Stream = ts.createReadStream(path.join(trialJobDetail.workingDirectory, '.nni', 'metrics'));
        stream.on('data', (data: Buffer) => {
            buffer = Buffer.concat([buffer, data]);
            while (buffer.length > 0) {
                const [success, , content, remain] = decodeCommand(buffer);
                if (!success) {
                    break;
                }
                this.eventEmitter.emit('metric', {
                    id: trialJobDetail.id,
                    data: content
                });
                this.log.debug(`Sending metrics, job id: ${trialJobDetail.id}, metrics: ${content}`);
                buffer = remain;
            }
        });
486
        this.jobStreamMap.set(trialJobDetail.id, stream);
Deshui Yu's avatar
Deshui Yu committed
487
488
    }

chicm-ms's avatar
chicm-ms committed
489
    private async writeParameterFile(directory: string, hyperParameters: HyperParameters): Promise<void> {
490
        const filepath: string = path.join(directory, generateParamFileName(hyperParameters));
chicm-ms's avatar
chicm-ms committed
491
492
        await fs.promises.writeFile(filepath, hyperParameters.value, { encoding: 'utf8' });
    }
J-shang's avatar
J-shang committed
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508

    public async getTrialOutputLocalPath(trialJobId: string): Promise<string> {
        return Promise.resolve(path.join(this.rootDir, 'trials', trialJobId));
    }

    public async fetchTrialOutput(trialJobId: string, subpath: string): Promise<void> {
        let trialLocalPath = await this.getTrialOutputLocalPath(trialJobId);
        if (subpath !== undefined) {
            trialLocalPath = path.join(trialLocalPath, subpath);
        }
        if (fs.existsSync(trialLocalPath)) {
            return Promise.resolve();
        } else {
            return Promise.reject(new Error('Trial local path not exist.'));
        }
    }
Deshui Yu's avatar
Deshui Yu committed
509
510
511
}

export { LocalTrainingService };