localTrainingService.ts 21.3 KB
Newer Older
liuzhe-lz's avatar
liuzhe-lz committed
1
2
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
Deshui Yu's avatar
Deshui Yu committed
3
4
5
6
7
8
9

'use strict';
import * as cp from 'child_process';
import { EventEmitter } from 'events';
import * as fs from 'fs';
import * as path from 'path';
import * as ts from 'tail-stream';
10
import * as tkill from 'tree-kill';
11
import { NNIError, NNIErrorNames } from '../../common/errors';
12
import { getExperimentId } from '../../common/experimentStartupInfo';
13
import { getLogger, Logger } from '../../common/log';
Deshui Yu's avatar
Deshui Yu committed
14
import {
15
    HyperParameters, TrainingService, TrialJobApplicationForm,
Yuge Zhang's avatar
Yuge Zhang committed
16
    TrialJobDetail, TrialJobMetric, TrialJobStatus
Deshui Yu's avatar
Deshui Yu committed
17
} from '../../common/trainingService';
18
19
20
import {
    delay, generateParamFileName, getExperimentRootDir, getJobCancelStatus, getNewLine, isAlive, uniqueString
} from '../../common/utils';
21
import { ExperimentConfig, LocalConfig, flattenConfig } from '../../common/experimentConfig';
22
import { execMkdir, execNewFile, getScriptName, runScript, setEnvironmentVariable } from '../common/util';
23
import { GPUScheduler } from './gpuScheduler';
Deshui Yu's avatar
Deshui Yu committed
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52

/**
 * Decode a command
 * @param Buffer binary incoming data
 * @returns a tuple of (success, commandType, content, remain)
 *          success: true if the buffer contains at least one complete command; otherwise false
 *          remain: remaining data after the first command
 */
function decodeCommand(data: Buffer): [boolean, string, string, Buffer] {
    if (data.length < 8) {
        return [false, '', '', data];
    }
    const commandType: string = data.slice(0, 2).toString();
    const contentLength: number = parseInt(data.slice(2, 8).toString(), 10);
    if (data.length < contentLength + 8) {
        return [false, '', '', data];
    }
    const content: string = data.slice(8, contentLength + 8).toString();
    const remain: Buffer = data.slice(contentLength + 8);

    return [true, commandType, content, remain];
}

/**
 * LocalTrialJobDetail
 */
class LocalTrialJobDetail implements TrialJobDetail {
    public id: string;
    public status: TrialJobStatus;
53
54
55
    public submitTime: number;
    public startTime?: number;
    public endTime?: number;
Deshui Yu's avatar
Deshui Yu committed
56
57
58
    public tags?: string[];
    public url?: string;
    public workingDirectory: string;
59
    public form: TrialJobApplicationForm;
Deshui Yu's avatar
Deshui Yu committed
60
    public pid?: number;
61
    public gpuIndices?: number[];
Deshui Yu's avatar
Deshui Yu committed
62

63
64
    constructor(
        id: string, status: TrialJobStatus, submitTime: number,
65
        workingDirectory: string, form: TrialJobApplicationForm) {
Deshui Yu's avatar
Deshui Yu committed
66
67
68
69
70
71
        this.id = id;
        this.status = status;
        this.submitTime = submitTime;
        this.workingDirectory = workingDirectory;
        this.form = form;
        this.url = `file://localhost:${workingDirectory}`;
72
73
74
75
        this.gpuIndices = [];
    }
}

76
interface FlattenLocalConfig extends ExperimentConfig, LocalConfig { }
Deshui Yu's avatar
Deshui Yu committed
77
78

/**
chicm-ms's avatar
chicm-ms committed
79
 * Local machine training service
Deshui Yu's avatar
Deshui Yu committed
80
81
 */
class LocalTrainingService implements TrainingService {
82
    private readonly config: FlattenLocalConfig;
83
84
85
    private readonly eventEmitter: EventEmitter;
    private readonly jobMap: Map<string, LocalTrialJobDetail>;
    private readonly jobQueue: string[];
Deshui Yu's avatar
Deshui Yu committed
86
87
88
    private initialized: boolean;
    private stopping: boolean;
    private rootDir!: string;
chicm-ms's avatar
chicm-ms committed
89
    private readonly experimentId!: string;
90
    private gpuScheduler!: GPUScheduler;
91
92
93
    private readonly occupiedGpuIndexNumMap: Map<number, number>;
    private readonly log: Logger;
    private readonly jobStreamMap: Map<string, ts.Stream>;
Deshui Yu's avatar
Deshui Yu committed
94

95
96
    constructor(config: ExperimentConfig) {
        this.config = flattenConfig<FlattenLocalConfig>(config, 'local');
Deshui Yu's avatar
Deshui Yu committed
97
98
99
100
        this.eventEmitter = new EventEmitter();
        this.jobMap = new Map<string, LocalTrialJobDetail>();
        this.jobQueue = [];
        this.stopping = false;
liuzhe-lz's avatar
liuzhe-lz committed
101
        this.log = getLogger('LocalTrainingService');
suiguoxin's avatar
suiguoxin committed
102
        this.experimentId = getExperimentId();
103
        this.jobStreamMap = new Map<string, ts.Stream>();
chicm-ms's avatar
chicm-ms committed
104
        this.log.info('Construct local machine training service.');
105
        this.occupiedGpuIndexNumMap = new Map<number, number>();
106
107
108
109
110
111
112
113
114
115
116
117
118
119

        if (this.config.trialGpuNumber !== undefined && this.config.trialGpuNumber > 0) {
            this.gpuScheduler = new GPUScheduler();
        }

        if (this.config.gpuIndices === []) {
            throw new Error('gpuIndices cannot be empty when specified.');
        }

        this.rootDir = getExperimentRootDir();
        if (!fs.existsSync(this.rootDir)) {
            throw new Error('root dir not created');
        }
        this.initialized = true;
Deshui Yu's avatar
Deshui Yu committed
120
121
122
    }

    public async run(): Promise<void> {
chicm-ms's avatar
chicm-ms committed
123
        this.log.info('Run local machine training service.');
124
125
126
        const longRunningTasks: Promise<void>[] = [this.runJobLoop()];
        if (this.gpuScheduler !== undefined) {
            longRunningTasks.push(this.gpuScheduler.run());
Deshui Yu's avatar
Deshui Yu committed
127
        }
128
        await Promise.all(longRunningTasks);
chicm-ms's avatar
chicm-ms committed
129
        this.log.info('Local machine training service exit.');
Deshui Yu's avatar
Deshui Yu committed
130
131
132
133
134
135
    }

    public async listTrialJobs(): Promise<TrialJobDetail[]> {
        const jobs: TrialJobDetail[] = [];
        for (const key of this.jobMap.keys()) {
            const trialJob: TrialJobDetail = await this.getTrialJob(key);
136
            jobs.push(trialJob);
Deshui Yu's avatar
Deshui Yu committed
137
138
139
140
141
142
143
144
145
146
147
        }

        return jobs;
    }

    public async getTrialJob(trialJobId: string): Promise<TrialJobDetail> {
        const trialJob: LocalTrialJobDetail | undefined = this.jobMap.get(trialJobId);
        if (trialJob === undefined) {
            throw new NNIError(NNIErrorNames.NOT_FOUND, 'Trial job not found');
        }
        if (trialJob.status === 'RUNNING') {
148
            const alive: boolean = await isAlive(trialJob.pid);
Deshui Yu's avatar
Deshui Yu committed
149
            if (!alive) {
150
                trialJob.endTime = Date.now();
Deshui Yu's avatar
Deshui Yu committed
151
152
153
                this.setTrialJobStatus(trialJob, 'FAILED');
                try {
                    const state: string = await fs.promises.readFile(path.join(trialJob.workingDirectory, '.nni', 'state'), 'utf8');
154
155
                    const match: RegExpMatchArray | null = state.trim()
                        .match(/^(\d+)\s+(\d+)/);
Deshui Yu's avatar
Deshui Yu committed
156
157
158
159
160
                    if (match !== null) {
                        const { 1: code, 2: timestamp } = match;
                        if (parseInt(code, 10) === 0) {
                            this.setTrialJobStatus(trialJob, 'SUCCEEDED');
                        }
161
                        trialJob.endTime = parseInt(timestamp, 10);
Deshui Yu's avatar
Deshui Yu committed
162
163
164
165
                    }
                } catch (error) {
                    //ignore
                }
166
                this.log.debug(`trialJob status update: ${trialJobId}, ${trialJob.status}`);
Deshui Yu's avatar
Deshui Yu committed
167
168
169
170
171
172
            }
        }

        return trialJob;
    }

Yuge Zhang's avatar
Yuge Zhang committed
173
174
175
176
    public async getTrialFile(trialJobId: string, fileName: string): Promise<string | Buffer> {
        // check filename here for security
        if (!['trial.log', 'stderr', 'model.onnx', 'stdout'].includes(fileName)) {
            throw new Error(`File unaccessible: ${fileName}`);
177
        }
Yuge Zhang's avatar
Yuge Zhang committed
178
179
180
181
182
183
184
185
186
        let encoding: string | null = null;
        if (!fileName.includes('.') || fileName.match(/.*\.(txt|log)/g)) {
            encoding = 'utf8';
        }
        const logPath = path.join(this.rootDir, 'trials', trialJobId, fileName);
        if (!fs.existsSync(logPath)) {
            throw new Error(`File not found: ${logPath}`);
        }
        return fs.promises.readFile(logPath, {encoding: encoding as any});
187
188
    }

Deshui Yu's avatar
Deshui Yu committed
189
190
191
192
193
194
195
196
    public addTrialJobMetricListener(listener: (metric: TrialJobMetric) => void): void {
        this.eventEmitter.on('metric', listener);
    }

    public removeTrialJobMetricListener(listener: (metric: TrialJobMetric) => void): void {
        this.eventEmitter.off('metric', listener);
    }

197
198
199
200
201
202
203
204
205
206
207
208
    public submitTrialJob(form: TrialJobApplicationForm): Promise<TrialJobDetail> {
        const trialJobId: string = uniqueString(5);
        const trialJobDetail: LocalTrialJobDetail = new LocalTrialJobDetail(
            trialJobId,
            'WAITING',
            Date.now(),
            path.join(this.rootDir, 'trials', trialJobId),
            form
        );
        this.jobQueue.push(trialJobId);
        this.jobMap.set(trialJobId, trialJobDetail);

liuzhe-lz's avatar
liuzhe-lz committed
209
        this.log.debug('submitTrialJob: return:',  trialJobDetail);
210
211

        return Promise.resolve(trialJobDetail);
Deshui Yu's avatar
Deshui Yu committed
212
213
    }

214
215
216
217
218
    /**
     * Update trial job for multi-phase
     * @param trialJobId trial job id
     * @param form job application form
     */
219
    public async updateTrialJob(trialJobId: string, form: TrialJobApplicationForm): Promise<TrialJobDetail> {
chicm-ms's avatar
chicm-ms committed
220
221
222
223
        const trialJobDetail: undefined | TrialJobDetail = this.jobMap.get(trialJobId);
        if (trialJobDetail === undefined) {
            throw new Error(`updateTrialJob failed: ${trialJobId} not found`);
        }
224
        await this.writeParameterFile(trialJobDetail.workingDirectory, form.hyperParameters);
chicm-ms's avatar
chicm-ms committed
225
226

        return trialJobDetail;
227
228
    }

QuanluZhang's avatar
QuanluZhang committed
229
    public async cancelTrialJob(trialJobId: string, isEarlyStopped: boolean = false): Promise<void> {
Deshui Yu's avatar
Deshui Yu committed
230
231
232
233
        const trialJob: LocalTrialJobDetail | undefined = this.jobMap.get(trialJobId);
        if (trialJob === undefined) {
            throw new NNIError(NNIErrorNames.NOT_FOUND, 'Trial job not found');
        }
234
        if (trialJob.pid === undefined) {
235
            this.setTrialJobStatus(trialJob, 'USER_CANCELED');
236

SparkSnail's avatar
SparkSnail committed
237
            return Promise.resolve();
238
        }
239
240
241
242
243
244
245
246
247
248
249
250
251
252
        tkill(trialJob.pid, 'SIGTERM');
        const startTime = Date.now();
        while(await isAlive(trialJob.pid)) {    
            if (Date.now() - startTime > 4999) {
                tkill(trialJob.pid, 'SIGKILL', (err) => {
                    if (err) {
                        this.log.error(`kill trial job error: ${err}`);
                    }
                });
                break;
            }
            await delay(500);
        }

QuanluZhang's avatar
QuanluZhang committed
253
        this.setTrialJobStatus(trialJob, getJobCancelStatus(isEarlyStopped));
254

SparkSnail's avatar
SparkSnail committed
255
        return Promise.resolve();
Deshui Yu's avatar
Deshui Yu committed
256
257
    }

258
259
    public async setClusterMetadata(_key: string, _value: string): Promise<void> { return; }
    public async getClusterMetadata(_key: string): Promise<string> { return ''; }
Deshui Yu's avatar
Deshui Yu committed
260

261
    public async cleanUp(): Promise<void> {
chicm-ms's avatar
chicm-ms committed
262
        this.log.info('Stopping local machine training service...');
Deshui Yu's avatar
Deshui Yu committed
263
        this.stopping = true;
264
        for (const stream of this.jobStreamMap.values()) {
265
266
            stream.end(0);
            stream.emit('end');
267
        }
268
269
270
271
        if (this.gpuScheduler !== undefined) {
            await this.gpuScheduler.stop();
        }

Deshui Yu's avatar
Deshui Yu committed
272
273
274
        return Promise.resolve();
    }

275
    private onTrialJobStatusChanged(trialJob: LocalTrialJobDetail, oldStatus: TrialJobStatus): void {
276
        //if job is not running, destory job stream
277
278
279
        if (['SUCCEEDED', 'FAILED', 'USER_CANCELED', 'SYS_CANCELED', 'EARLY_STOPPED'].includes(trialJob.status)) {
            if (this.jobStreamMap.has(trialJob.id)) {
                const stream: ts.Stream | undefined = this.jobStreamMap.get(trialJob.id);
280
                if (stream === undefined) {
281
282
                    throw new Error(`Could not find stream in trial ${trialJob.id}`);
                }
283
                //Refer https://github.com/Juul/tail-stream/issues/20
Yan Ni's avatar
Yan Ni committed
284
285
286
287
288
                setTimeout(() => {
                    stream.end(0);
                    stream.emit('end');
                    this.jobStreamMap.delete(trialJob.id);
                }, 5000);
289
290
            }
        }
291
292
293
        if (trialJob.gpuIndices !== undefined && trialJob.gpuIndices.length > 0 && this.gpuScheduler !== undefined) {
            if (oldStatus === 'RUNNING' && trialJob.status !== 'RUNNING') {
                for (const index of trialJob.gpuIndices) {
294
295
                    const num: number | undefined = this.occupiedGpuIndexNumMap.get(index);
                    if (num === undefined) {
296
                        throw new Error(`gpu resource schedule error`);
297
                    } else if (num === 1) {
298
299
                        this.occupiedGpuIndexNumMap.delete(index);
                    } else {
300
                        this.occupiedGpuIndexNumMap.set(index, num - 1);
301
                    }
302
303
304
                }
            }
        }
Deshui Yu's avatar
Deshui Yu committed
305
306
    }

307
308
    private getEnvironmentVariables(
        trialJobDetail: TrialJobDetail,
SparkSnail's avatar
SparkSnail committed
309
310
        resource: { gpuIndices: number[] },
        gpuNum: number | undefined): { key: string; value: string }[] {
311
312
313
314
315
316
317
            const envVariables: { key: string; value: string }[] = [
                { key: 'NNI_PLATFORM', value: 'local' },
                { key: 'NNI_EXP_ID', value: this.experimentId },
                { key: 'NNI_SYS_DIR', value: trialJobDetail.workingDirectory },
                { key: 'NNI_TRIAL_JOB_ID', value: trialJobDetail.id },
                { key: 'NNI_OUTPUT_DIR', value: trialJobDetail.workingDirectory },
                { key: 'NNI_TRIAL_SEQ_ID', value: trialJobDetail.form.sequenceId.toString() },
318
                { key: 'NNI_CODE_DIR', value: this.config.trialCodeDirectory}
319
320
321
322
323
324
325
            ];
            if (gpuNum !== undefined) {
                envVariables.push({
                    key: 'CUDA_VISIBLE_DEVICES',
                    value: this.gpuScheduler === undefined ? '-1' : resource.gpuIndices.join(',')
                });
            }
326
327
328
329
330
331
332
333
334
335
336
337
338
339

        return envVariables;
    }

    private setExtraProperties(trialJobDetail: LocalTrialJobDetail, resource: { gpuIndices: number[] }): void {
        trialJobDetail.gpuIndices = resource.gpuIndices;
    }

    private tryGetAvailableResource(): [boolean, { gpuIndices: number[]}] {
        const resource: { gpuIndices: number[] } = { gpuIndices: [] };
        if (this.gpuScheduler === undefined) {
            return [true, resource];
        }

340
        let selectedGPUIndices: number[] = [];
341
        const availableGpuIndices: number[] = this.gpuScheduler.getAvailableGPUIndices(this.config.useActiveGpu, this.occupiedGpuIndexNumMap);
342
343
        for (const index of availableGpuIndices) {
            const num: number | undefined = this.occupiedGpuIndexNumMap.get(index);
344
            if (num === undefined || num < this.config.maxTrialNumberPerGpu) {
345
346
347
                selectedGPUIndices.push(index);
            }
        }
348

349
        if (this.config.gpuIndices !== undefined) {
350
            this.checkSpecifiedGpuIndices();
351
            selectedGPUIndices = selectedGPUIndices.filter((index: number) => this.config.gpuIndices!.includes(index));
352
353
        }

354
        if (selectedGPUIndices.length < this.config.trialGpuNumber!) {
355
356
357
            return [false, resource];
        }

358
        selectedGPUIndices.splice(this.config.trialGpuNumber!);
359
360
361
        Object.assign(resource, { gpuIndices: selectedGPUIndices });

        return [true, resource];
Deshui Yu's avatar
Deshui Yu committed
362
363
    }

364
    private checkSpecifiedGpuIndices(): void {
365
        const gpuCount: number | undefined = this.gpuScheduler.getSystemGpuCount();
366
367
        if (this.config.gpuIndices !== undefined && gpuCount !== undefined) {
            for (const index of this.config.gpuIndices) {
368
369
370
371
372
                if (index >= gpuCount) {
                    throw new Error(`Specified GPU index not found: ${index}`);
                }
            }
        }
Deshui Yu's avatar
Deshui Yu committed
373
374
    }

375
376
377
    private occupyResource(resource: {gpuIndices: number[]}): void {
        if (this.gpuScheduler !== undefined) {
            for (const index of resource.gpuIndices) {
378
379
380
                const num: number | undefined = this.occupiedGpuIndexNumMap.get(index);
                if (num === undefined) {
                    this.occupiedGpuIndexNumMap.set(index, 1);
381
                } else {
382
                    this.occupiedGpuIndexNumMap.set(index, num + 1);
383
                }
384
385
            }
        }
Deshui Yu's avatar
Deshui Yu committed
386
387
    }

388
389
390
391
    private async runJobLoop(): Promise<void> {
        while (!this.stopping) {
            while (!this.stopping && this.jobQueue.length !== 0) {
                const trialJobId: string = this.jobQueue[0];
392
393
                const trialJobDetail: LocalTrialJobDetail | undefined = this.jobMap.get(trialJobId);
                if (trialJobDetail !== undefined && trialJobDetail.status === 'WAITING') {
394
395
396
397
                    const [success, resource] = this.tryGetAvailableResource();
                    if (!success) {
                        break;
                    }
398

399
400
401
402
403
404
405
                    this.occupyResource(resource);
                    await this.runTrialJob(trialJobId, resource);
                }
                this.jobQueue.shift();
            }
            await delay(5000);
        }
Deshui Yu's avatar
Deshui Yu committed
406
407
408
409
410
411
412
413
414
415
    }

    private setTrialJobStatus(trialJob: LocalTrialJobDetail, newStatus: TrialJobStatus): void {
        if (trialJob.status !== newStatus) {
            const oldStatus: TrialJobStatus = trialJob.status;
            trialJob.status = newStatus;
            this.onTrialJobStatusChanged(trialJob, oldStatus);
        }
    }

416
    private getScript(workingDirectory: string): string[] {
417
418
        const script: string[] = [];
        if (process.platform === 'win32') {
419
            script.push(`$PSDefaultParameterValues = @{'Out-File:Encoding' = 'utf8'}`);
SparkSnail's avatar
SparkSnail committed
420
            script.push(`cd $env:NNI_CODE_DIR`);
421
            script.push(
422
                `cmd.exe /c ${this.config.trialCommand} 1>${path.join(workingDirectory, 'stdout')} 2>${path.join(workingDirectory, 'stderr')}`,
423
                `$NOW_DATE = [int64](([datetime]::UtcNow)-(get-date "1/1/1970")).TotalSeconds`,
424
                `$NOW_DATE = "$NOW_DATE" + (Get-Date -Format fff).ToString()`,
SparkSnail's avatar
SparkSnail committed
425
                `Write $LASTEXITCODE " " $NOW_DATE  | Out-File "${path.join(workingDirectory, '.nni', 'state')}" -NoNewline -encoding utf8`);
426
        } else {
SparkSnail's avatar
SparkSnail committed
427
            script.push(`cd $NNI_CODE_DIR`);
428
            script.push(`eval ${this.config.trialCommand} 1>${path.join(workingDirectory, 'stdout')} 2>${path.join(workingDirectory, 'stderr')}`);
429
430
431
            if (process.platform === 'darwin') {
                // https://superuser.com/questions/599072/how-to-get-bash-execution-time-in-milliseconds-under-mac-os-x
                // Considering the worst case, write 999 to avoid negative duration
SparkSnail's avatar
SparkSnail committed
432
                script.push(`echo $? \`date +%s999\` >'${path.join(workingDirectory, '.nni', 'state')}'`);
433
            } else {
SparkSnail's avatar
SparkSnail committed
434
                script.push(`echo $? \`date +%s%3N\` >'${path.join(workingDirectory, '.nni', 'state')}'`);
435
            }
436
        }
437

438
439
440
        return script;
    }

441
    private async runTrialJob(trialJobId: string, resource: {gpuIndices: number[]}): Promise<void> {
Deshui Yu's avatar
Deshui Yu committed
442
        const trialJobDetail: LocalTrialJobDetail = <LocalTrialJobDetail>this.jobMap.get(trialJobId);
443
        const variables: { key: string; value: string }[] = this.getEnvironmentVariables(trialJobDetail, resource, this.config.trialGpuNumber);
Deshui Yu's avatar
Deshui Yu committed
444

445
446
447
        const runScriptContent: string[] = [];
        if (process.platform !== 'win32') {
            runScriptContent.push('#!/bin/bash');
448
449
        } else {
            runScriptContent.push(`$env:PATH="${process.env.path}"`)
450
        }
Deshui Yu's avatar
Deshui Yu committed
451
        for (const variable of variables) {
452
            runScriptContent.push(setEnvironmentVariable(variable));
Deshui Yu's avatar
Deshui Yu committed
453
        }
454
        const scripts: string[] = this.getScript(trialJobDetail.workingDirectory);
455
456
        scripts.forEach((script: string) => {
            runScriptContent.push(script);
457
458
459
460
461
        });
        await execMkdir(trialJobDetail.workingDirectory);
        await execMkdir(path.join(trialJobDetail.workingDirectory, '.nni'));
        await execNewFile(path.join(trialJobDetail.workingDirectory, '.nni', 'metrics'));
        const scriptName: string = getScriptName('run');
462
463
        await fs.promises.writeFile(path.join(trialJobDetail.workingDirectory, scriptName),
                                    runScriptContent.join(getNewLine()), { encoding: 'utf8', mode: 0o777 });
464
        await this.writeParameterFile(trialJobDetail.workingDirectory, trialJobDetail.form.hyperParameters);
465
        const trialJobProcess: cp.ChildProcess = runScript(path.join(trialJobDetail.workingDirectory, scriptName));
Deshui Yu's avatar
Deshui Yu committed
466
        this.setTrialJobStatus(trialJobDetail, 'RUNNING');
467
468
        trialJobDetail.startTime = Date.now(); // eslint-disable-line require-atomic-updates
        trialJobDetail.pid = trialJobProcess.pid; // eslint-disable-line require-atomic-updates
Deshui Yu's avatar
Deshui Yu committed
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
        this.setExtraProperties(trialJobDetail, resource);

        let buffer: Buffer = Buffer.alloc(0);
        const stream: ts.Stream = ts.createReadStream(path.join(trialJobDetail.workingDirectory, '.nni', 'metrics'));
        stream.on('data', (data: Buffer) => {
            buffer = Buffer.concat([buffer, data]);
            while (buffer.length > 0) {
                const [success, , content, remain] = decodeCommand(buffer);
                if (!success) {
                    break;
                }
                this.eventEmitter.emit('metric', {
                    id: trialJobDetail.id,
                    data: content
                });
                this.log.debug(`Sending metrics, job id: ${trialJobDetail.id}, metrics: ${content}`);
                buffer = remain;
            }
        });
488
        this.jobStreamMap.set(trialJobDetail.id, stream);
Deshui Yu's avatar
Deshui Yu committed
489
490
    }

chicm-ms's avatar
chicm-ms committed
491
    private async writeParameterFile(directory: string, hyperParameters: HyperParameters): Promise<void> {
492
        const filepath: string = path.join(directory, generateParamFileName(hyperParameters));
chicm-ms's avatar
chicm-ms committed
493
494
        await fs.promises.writeFile(filepath, hyperParameters.value, { encoding: 'utf8' });
    }
J-shang's avatar
J-shang committed
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510

    public async getTrialOutputLocalPath(trialJobId: string): Promise<string> {
        return Promise.resolve(path.join(this.rootDir, 'trials', trialJobId));
    }

    public async fetchTrialOutput(trialJobId: string, subpath: string): Promise<void> {
        let trialLocalPath = await this.getTrialOutputLocalPath(trialJobId);
        if (subpath !== undefined) {
            trialLocalPath = path.join(trialLocalPath, subpath);
        }
        if (fs.existsSync(trialLocalPath)) {
            return Promise.resolve();
        } else {
            return Promise.reject(new Error('Trial local path not exist.'));
        }
    }
Deshui Yu's avatar
Deshui Yu committed
511
512
513
}

export { LocalTrainingService };