localTrainingService.ts 21.4 KB
Newer Older
liuzhe-lz's avatar
liuzhe-lz committed
1
2
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
Deshui Yu's avatar
Deshui Yu committed
3
4
5
6
7
8
9

'use strict';
import * as cp from 'child_process';
import { EventEmitter } from 'events';
import * as fs from 'fs';
import * as path from 'path';
import * as ts from 'tail-stream';
10
import * as tkill from 'tree-kill';
11
import { NNIError, NNIErrorNames } from '../../common/errors';
12
import { getExperimentId } from '../../common/experimentStartupInfo';
13
import { getLogger, Logger } from '../../common/log';
14
import { powershellString } from '../../common/shellUtils';
Deshui Yu's avatar
Deshui Yu committed
15
import {
16
    HyperParameters, TrainingService, TrialJobApplicationForm,
Yuge Zhang's avatar
Yuge Zhang committed
17
    TrialJobDetail, TrialJobMetric, TrialJobStatus
Deshui Yu's avatar
Deshui Yu committed
18
} from '../../common/trainingService';
19
20
21
import {
    delay, generateParamFileName, getExperimentRootDir, getJobCancelStatus, getNewLine, isAlive, uniqueString
} from '../../common/utils';
22
import { ExperimentConfig, LocalConfig, flattenConfig } from '../../common/experimentConfig';
23
import { execMkdir, execNewFile, getScriptName, runScript, setEnvironmentVariable } from '../common/util';
24
import { GPUScheduler } from './gpuScheduler';
Deshui Yu's avatar
Deshui Yu committed
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53

/**
 * Decode a command
 * @param Buffer binary incoming data
 * @returns a tuple of (success, commandType, content, remain)
 *          success: true if the buffer contains at least one complete command; otherwise false
 *          remain: remaining data after the first command
 */
function decodeCommand(data: Buffer): [boolean, string, string, Buffer] {
    if (data.length < 8) {
        return [false, '', '', data];
    }
    const commandType: string = data.slice(0, 2).toString();
    const contentLength: number = parseInt(data.slice(2, 8).toString(), 10);
    if (data.length < contentLength + 8) {
        return [false, '', '', data];
    }
    const content: string = data.slice(8, contentLength + 8).toString();
    const remain: Buffer = data.slice(contentLength + 8);

    return [true, commandType, content, remain];
}

/**
 * LocalTrialJobDetail
 */
class LocalTrialJobDetail implements TrialJobDetail {
    public id: string;
    public status: TrialJobStatus;
54
55
56
    public submitTime: number;
    public startTime?: number;
    public endTime?: number;
Deshui Yu's avatar
Deshui Yu committed
57
58
59
    public tags?: string[];
    public url?: string;
    public workingDirectory: string;
60
    public form: TrialJobApplicationForm;
Deshui Yu's avatar
Deshui Yu committed
61
    public pid?: number;
62
    public gpuIndices?: number[];
Deshui Yu's avatar
Deshui Yu committed
63

64
65
    constructor(
        id: string, status: TrialJobStatus, submitTime: number,
66
        workingDirectory: string, form: TrialJobApplicationForm) {
Deshui Yu's avatar
Deshui Yu committed
67
68
69
70
71
72
        this.id = id;
        this.status = status;
        this.submitTime = submitTime;
        this.workingDirectory = workingDirectory;
        this.form = form;
        this.url = `file://localhost:${workingDirectory}`;
73
74
75
76
        this.gpuIndices = [];
    }
}

77
interface FlattenLocalConfig extends ExperimentConfig, LocalConfig { }
Deshui Yu's avatar
Deshui Yu committed
78
79

/**
chicm-ms's avatar
chicm-ms committed
80
 * Local machine training service
Deshui Yu's avatar
Deshui Yu committed
81
82
 */
class LocalTrainingService implements TrainingService {
83
    private readonly config: FlattenLocalConfig;
84
85
86
    private readonly eventEmitter: EventEmitter;
    private readonly jobMap: Map<string, LocalTrialJobDetail>;
    private readonly jobQueue: string[];
Deshui Yu's avatar
Deshui Yu committed
87
88
89
    private initialized: boolean;
    private stopping: boolean;
    private rootDir!: string;
chicm-ms's avatar
chicm-ms committed
90
    private readonly experimentId!: string;
91
    private gpuScheduler!: GPUScheduler;
92
93
94
    private readonly occupiedGpuIndexNumMap: Map<number, number>;
    private readonly log: Logger;
    private readonly jobStreamMap: Map<string, ts.Stream>;
Deshui Yu's avatar
Deshui Yu committed
95

96
97
    constructor(config: ExperimentConfig) {
        this.config = flattenConfig<FlattenLocalConfig>(config, 'local');
Deshui Yu's avatar
Deshui Yu committed
98
99
100
101
        this.eventEmitter = new EventEmitter();
        this.jobMap = new Map<string, LocalTrialJobDetail>();
        this.jobQueue = [];
        this.stopping = false;
liuzhe-lz's avatar
liuzhe-lz committed
102
        this.log = getLogger('LocalTrainingService');
suiguoxin's avatar
suiguoxin committed
103
        this.experimentId = getExperimentId();
104
        this.jobStreamMap = new Map<string, ts.Stream>();
chicm-ms's avatar
chicm-ms committed
105
        this.log.info('Construct local machine training service.');
106
        this.occupiedGpuIndexNumMap = new Map<number, number>();
107
108
109
110
111
112
113
114
115
116
117
118
119
120

        if (this.config.trialGpuNumber !== undefined && this.config.trialGpuNumber > 0) {
            this.gpuScheduler = new GPUScheduler();
        }

        if (this.config.gpuIndices === []) {
            throw new Error('gpuIndices cannot be empty when specified.');
        }

        this.rootDir = getExperimentRootDir();
        if (!fs.existsSync(this.rootDir)) {
            throw new Error('root dir not created');
        }
        this.initialized = true;
Deshui Yu's avatar
Deshui Yu committed
121
122
123
    }

    public async run(): Promise<void> {
chicm-ms's avatar
chicm-ms committed
124
        this.log.info('Run local machine training service.');
125
126
127
        const longRunningTasks: Promise<void>[] = [this.runJobLoop()];
        if (this.gpuScheduler !== undefined) {
            longRunningTasks.push(this.gpuScheduler.run());
Deshui Yu's avatar
Deshui Yu committed
128
        }
129
        await Promise.all(longRunningTasks);
chicm-ms's avatar
chicm-ms committed
130
        this.log.info('Local machine training service exit.');
Deshui Yu's avatar
Deshui Yu committed
131
132
133
134
135
136
    }

    public async listTrialJobs(): Promise<TrialJobDetail[]> {
        const jobs: TrialJobDetail[] = [];
        for (const key of this.jobMap.keys()) {
            const trialJob: TrialJobDetail = await this.getTrialJob(key);
137
            jobs.push(trialJob);
Deshui Yu's avatar
Deshui Yu committed
138
139
140
141
142
143
144
145
146
147
148
        }

        return jobs;
    }

    public async getTrialJob(trialJobId: string): Promise<TrialJobDetail> {
        const trialJob: LocalTrialJobDetail | undefined = this.jobMap.get(trialJobId);
        if (trialJob === undefined) {
            throw new NNIError(NNIErrorNames.NOT_FOUND, 'Trial job not found');
        }
        if (trialJob.status === 'RUNNING') {
149
            const alive: boolean = await isAlive(trialJob.pid);
Deshui Yu's avatar
Deshui Yu committed
150
            if (!alive) {
151
                trialJob.endTime = Date.now();
Deshui Yu's avatar
Deshui Yu committed
152
153
154
                this.setTrialJobStatus(trialJob, 'FAILED');
                try {
                    const state: string = await fs.promises.readFile(path.join(trialJob.workingDirectory, '.nni', 'state'), 'utf8');
155
156
                    const match: RegExpMatchArray | null = state.trim()
                        .match(/^(\d+)\s+(\d+)/);
Deshui Yu's avatar
Deshui Yu committed
157
158
159
160
161
                    if (match !== null) {
                        const { 1: code, 2: timestamp } = match;
                        if (parseInt(code, 10) === 0) {
                            this.setTrialJobStatus(trialJob, 'SUCCEEDED');
                        }
162
                        trialJob.endTime = parseInt(timestamp, 10);
Deshui Yu's avatar
Deshui Yu committed
163
164
165
166
                    }
                } catch (error) {
                    //ignore
                }
167
                this.log.debug(`trialJob status update: ${trialJobId}, ${trialJob.status}`);
Deshui Yu's avatar
Deshui Yu committed
168
169
170
171
172
173
            }
        }

        return trialJob;
    }

Yuge Zhang's avatar
Yuge Zhang committed
174
175
176
177
    public async getTrialFile(trialJobId: string, fileName: string): Promise<string | Buffer> {
        // check filename here for security
        if (!['trial.log', 'stderr', 'model.onnx', 'stdout'].includes(fileName)) {
            throw new Error(`File unaccessible: ${fileName}`);
178
        }
Yuge Zhang's avatar
Yuge Zhang committed
179
180
181
182
183
184
185
186
187
        let encoding: string | null = null;
        if (!fileName.includes('.') || fileName.match(/.*\.(txt|log)/g)) {
            encoding = 'utf8';
        }
        const logPath = path.join(this.rootDir, 'trials', trialJobId, fileName);
        if (!fs.existsSync(logPath)) {
            throw new Error(`File not found: ${logPath}`);
        }
        return fs.promises.readFile(logPath, {encoding: encoding as any});
188
189
    }

Deshui Yu's avatar
Deshui Yu committed
190
191
192
193
194
195
196
197
    public addTrialJobMetricListener(listener: (metric: TrialJobMetric) => void): void {
        this.eventEmitter.on('metric', listener);
    }

    public removeTrialJobMetricListener(listener: (metric: TrialJobMetric) => void): void {
        this.eventEmitter.off('metric', listener);
    }

198
199
200
201
202
203
204
205
206
207
208
209
    public submitTrialJob(form: TrialJobApplicationForm): Promise<TrialJobDetail> {
        const trialJobId: string = uniqueString(5);
        const trialJobDetail: LocalTrialJobDetail = new LocalTrialJobDetail(
            trialJobId,
            'WAITING',
            Date.now(),
            path.join(this.rootDir, 'trials', trialJobId),
            form
        );
        this.jobQueue.push(trialJobId);
        this.jobMap.set(trialJobId, trialJobDetail);

liuzhe-lz's avatar
liuzhe-lz committed
210
        this.log.debug('submitTrialJob: return:',  trialJobDetail);
211
212

        return Promise.resolve(trialJobDetail);
Deshui Yu's avatar
Deshui Yu committed
213
214
    }

215
216
217
218
219
    /**
     * Update trial job for multi-phase
     * @param trialJobId trial job id
     * @param form job application form
     */
220
    public async updateTrialJob(trialJobId: string, form: TrialJobApplicationForm): Promise<TrialJobDetail> {
chicm-ms's avatar
chicm-ms committed
221
222
223
224
        const trialJobDetail: undefined | TrialJobDetail = this.jobMap.get(trialJobId);
        if (trialJobDetail === undefined) {
            throw new Error(`updateTrialJob failed: ${trialJobId} not found`);
        }
225
        await this.writeParameterFile(trialJobDetail.workingDirectory, form.hyperParameters);
chicm-ms's avatar
chicm-ms committed
226
227

        return trialJobDetail;
228
229
    }

QuanluZhang's avatar
QuanluZhang committed
230
    public async cancelTrialJob(trialJobId: string, isEarlyStopped: boolean = false): Promise<void> {
Deshui Yu's avatar
Deshui Yu committed
231
232
233
234
        const trialJob: LocalTrialJobDetail | undefined = this.jobMap.get(trialJobId);
        if (trialJob === undefined) {
            throw new NNIError(NNIErrorNames.NOT_FOUND, 'Trial job not found');
        }
235
        if (trialJob.pid === undefined) {
236
            this.setTrialJobStatus(trialJob, 'USER_CANCELED');
237

SparkSnail's avatar
SparkSnail committed
238
            return Promise.resolve();
239
        }
240
        tkill(trialJob.pid, 'SIGTERM');
241
242
        this.setTrialJobStatus(trialJob, getJobCancelStatus(isEarlyStopped));

243
        const startTime = Date.now();
244
        while(await isAlive(trialJob.pid)) {
245
246
247
248
249
250
251
252
253
254
255
            if (Date.now() - startTime > 4999) {
                tkill(trialJob.pid, 'SIGKILL', (err) => {
                    if (err) {
                        this.log.error(`kill trial job error: ${err}`);
                    }
                });
                break;
            }
            await delay(500);
        }

SparkSnail's avatar
SparkSnail committed
256
        return Promise.resolve();
Deshui Yu's avatar
Deshui Yu committed
257
258
    }

259
260
    public async setClusterMetadata(_key: string, _value: string): Promise<void> { return; }
    public async getClusterMetadata(_key: string): Promise<string> { return ''; }
Deshui Yu's avatar
Deshui Yu committed
261

262
    public async cleanUp(): Promise<void> {
chicm-ms's avatar
chicm-ms committed
263
        this.log.info('Stopping local machine training service...');
Deshui Yu's avatar
Deshui Yu committed
264
        this.stopping = true;
265
        for (const stream of this.jobStreamMap.values()) {
266
267
            stream.end(0);
            stream.emit('end');
268
        }
269
270
271
272
        if (this.gpuScheduler !== undefined) {
            await this.gpuScheduler.stop();
        }

Deshui Yu's avatar
Deshui Yu committed
273
274
275
        return Promise.resolve();
    }

276
    private onTrialJobStatusChanged(trialJob: LocalTrialJobDetail, oldStatus: TrialJobStatus): void {
277
        //if job is not running, destory job stream
278
279
280
        if (['SUCCEEDED', 'FAILED', 'USER_CANCELED', 'SYS_CANCELED', 'EARLY_STOPPED'].includes(trialJob.status)) {
            if (this.jobStreamMap.has(trialJob.id)) {
                const stream: ts.Stream | undefined = this.jobStreamMap.get(trialJob.id);
281
                if (stream === undefined) {
282
283
                    throw new Error(`Could not find stream in trial ${trialJob.id}`);
                }
284
                //Refer https://github.com/Juul/tail-stream/issues/20
Yan Ni's avatar
Yan Ni committed
285
286
287
288
289
                setTimeout(() => {
                    stream.end(0);
                    stream.emit('end');
                    this.jobStreamMap.delete(trialJob.id);
                }, 5000);
290
291
            }
        }
292
293
294
        if (trialJob.gpuIndices !== undefined && trialJob.gpuIndices.length > 0 && this.gpuScheduler !== undefined) {
            if (oldStatus === 'RUNNING' && trialJob.status !== 'RUNNING') {
                for (const index of trialJob.gpuIndices) {
295
296
                    const num: number | undefined = this.occupiedGpuIndexNumMap.get(index);
                    if (num === undefined) {
297
                        throw new Error(`gpu resource schedule error`);
298
                    } else if (num === 1) {
299
300
                        this.occupiedGpuIndexNumMap.delete(index);
                    } else {
301
                        this.occupiedGpuIndexNumMap.set(index, num - 1);
302
                    }
303
304
305
                }
            }
        }
Deshui Yu's avatar
Deshui Yu committed
306
307
    }

308
309
    private getEnvironmentVariables(
        trialJobDetail: TrialJobDetail,
SparkSnail's avatar
SparkSnail committed
310
311
        resource: { gpuIndices: number[] },
        gpuNum: number | undefined): { key: string; value: string }[] {
312
313
314
315
316
317
318
            const envVariables: { key: string; value: string }[] = [
                { key: 'NNI_PLATFORM', value: 'local' },
                { key: 'NNI_EXP_ID', value: this.experimentId },
                { key: 'NNI_SYS_DIR', value: trialJobDetail.workingDirectory },
                { key: 'NNI_TRIAL_JOB_ID', value: trialJobDetail.id },
                { key: 'NNI_OUTPUT_DIR', value: trialJobDetail.workingDirectory },
                { key: 'NNI_TRIAL_SEQ_ID', value: trialJobDetail.form.sequenceId.toString() },
319
                { key: 'NNI_CODE_DIR', value: this.config.trialCodeDirectory}
320
321
322
323
324
325
326
            ];
            if (gpuNum !== undefined) {
                envVariables.push({
                    key: 'CUDA_VISIBLE_DEVICES',
                    value: this.gpuScheduler === undefined ? '-1' : resource.gpuIndices.join(',')
                });
            }
327
328
329
330
331
332
333
334
335
336
337
338
339
340

        return envVariables;
    }

    private setExtraProperties(trialJobDetail: LocalTrialJobDetail, resource: { gpuIndices: number[] }): void {
        trialJobDetail.gpuIndices = resource.gpuIndices;
    }

    private tryGetAvailableResource(): [boolean, { gpuIndices: number[]}] {
        const resource: { gpuIndices: number[] } = { gpuIndices: [] };
        if (this.gpuScheduler === undefined) {
            return [true, resource];
        }

341
        let selectedGPUIndices: number[] = [];
342
        const availableGpuIndices: number[] = this.gpuScheduler.getAvailableGPUIndices(this.config.useActiveGpu, this.occupiedGpuIndexNumMap);
343
344
        for (const index of availableGpuIndices) {
            const num: number | undefined = this.occupiedGpuIndexNumMap.get(index);
345
            if (num === undefined || num < this.config.maxTrialNumberPerGpu) {
346
347
348
                selectedGPUIndices.push(index);
            }
        }
349

350
        if (this.config.gpuIndices !== undefined) {
351
            this.checkSpecifiedGpuIndices();
352
            selectedGPUIndices = selectedGPUIndices.filter((index: number) => this.config.gpuIndices!.includes(index));
353
354
        }

355
        if (selectedGPUIndices.length < this.config.trialGpuNumber!) {
356
357
358
            return [false, resource];
        }

359
        selectedGPUIndices.splice(this.config.trialGpuNumber!);
360
361
362
        Object.assign(resource, { gpuIndices: selectedGPUIndices });

        return [true, resource];
Deshui Yu's avatar
Deshui Yu committed
363
364
    }

365
    private checkSpecifiedGpuIndices(): void {
366
        const gpuCount: number | undefined = this.gpuScheduler.getSystemGpuCount();
367
368
        if (this.config.gpuIndices !== undefined && gpuCount !== undefined) {
            for (const index of this.config.gpuIndices) {
369
370
371
372
373
                if (index >= gpuCount) {
                    throw new Error(`Specified GPU index not found: ${index}`);
                }
            }
        }
Deshui Yu's avatar
Deshui Yu committed
374
375
    }

376
377
378
    private occupyResource(resource: {gpuIndices: number[]}): void {
        if (this.gpuScheduler !== undefined) {
            for (const index of resource.gpuIndices) {
379
380
381
                const num: number | undefined = this.occupiedGpuIndexNumMap.get(index);
                if (num === undefined) {
                    this.occupiedGpuIndexNumMap.set(index, 1);
382
                } else {
383
                    this.occupiedGpuIndexNumMap.set(index, num + 1);
384
                }
385
386
            }
        }
Deshui Yu's avatar
Deshui Yu committed
387
388
    }

389
390
391
392
    private async runJobLoop(): Promise<void> {
        while (!this.stopping) {
            while (!this.stopping && this.jobQueue.length !== 0) {
                const trialJobId: string = this.jobQueue[0];
393
394
                const trialJobDetail: LocalTrialJobDetail | undefined = this.jobMap.get(trialJobId);
                if (trialJobDetail !== undefined && trialJobDetail.status === 'WAITING') {
395
396
397
398
                    const [success, resource] = this.tryGetAvailableResource();
                    if (!success) {
                        break;
                    }
399

400
401
402
403
404
405
406
                    this.occupyResource(resource);
                    await this.runTrialJob(trialJobId, resource);
                }
                this.jobQueue.shift();
            }
            await delay(5000);
        }
Deshui Yu's avatar
Deshui Yu committed
407
408
409
410
411
412
413
414
415
416
    }

    private setTrialJobStatus(trialJob: LocalTrialJobDetail, newStatus: TrialJobStatus): void {
        if (trialJob.status !== newStatus) {
            const oldStatus: TrialJobStatus = trialJob.status;
            trialJob.status = newStatus;
            this.onTrialJobStatusChanged(trialJob, oldStatus);
        }
    }

417
    private getScript(workingDirectory: string): string[] {
418
419
        const script: string[] = [];
        if (process.platform === 'win32') {
420
            script.push(`$PSDefaultParameterValues = @{'Out-File:Encoding' = 'utf8'}`);
SparkSnail's avatar
SparkSnail committed
421
            script.push(`cd $env:NNI_CODE_DIR`);
422
            script.push(
423
                `cmd.exe /c ${this.config.trialCommand} 1>${path.join(workingDirectory, 'stdout')} 2>${path.join(workingDirectory, 'stderr')}`,
424
                `$NOW_DATE = [int64](([datetime]::UtcNow)-(get-date "1/1/1970")).TotalSeconds`,
425
                `$NOW_DATE = "$NOW_DATE" + (Get-Date -Format fff).ToString()`,
SparkSnail's avatar
SparkSnail committed
426
                `Write $LASTEXITCODE " " $NOW_DATE  | Out-File "${path.join(workingDirectory, '.nni', 'state')}" -NoNewline -encoding utf8`);
427
        } else {
SparkSnail's avatar
SparkSnail committed
428
            script.push(`cd $NNI_CODE_DIR`);
429
            script.push(`eval ${this.config.trialCommand} 1>${path.join(workingDirectory, 'stdout')} 2>${path.join(workingDirectory, 'stderr')}`);
430
431
432
            if (process.platform === 'darwin') {
                // https://superuser.com/questions/599072/how-to-get-bash-execution-time-in-milliseconds-under-mac-os-x
                // Considering the worst case, write 999 to avoid negative duration
SparkSnail's avatar
SparkSnail committed
433
                script.push(`echo $? \`date +%s999\` >'${path.join(workingDirectory, '.nni', 'state')}'`);
434
            } else {
SparkSnail's avatar
SparkSnail committed
435
                script.push(`echo $? \`date +%s%3N\` >'${path.join(workingDirectory, '.nni', 'state')}'`);
436
            }
437
        }
438

439
440
441
        return script;
    }

442
    private async runTrialJob(trialJobId: string, resource: {gpuIndices: number[]}): Promise<void> {
Deshui Yu's avatar
Deshui Yu committed
443
        const trialJobDetail: LocalTrialJobDetail = <LocalTrialJobDetail>this.jobMap.get(trialJobId);
444
        const variables: { key: string; value: string }[] = this.getEnvironmentVariables(trialJobDetail, resource, this.config.trialGpuNumber);
Deshui Yu's avatar
Deshui Yu committed
445

446
447
448
        const runScriptContent: string[] = [];
        if (process.platform !== 'win32') {
            runScriptContent.push('#!/bin/bash');
449
        } else {
450
            runScriptContent.push(`$env:PATH=${powershellString(process.env.path!)}`)
451
        }
Deshui Yu's avatar
Deshui Yu committed
452
        for (const variable of variables) {
453
            runScriptContent.push(setEnvironmentVariable(variable));
Deshui Yu's avatar
Deshui Yu committed
454
        }
455
        const scripts: string[] = this.getScript(trialJobDetail.workingDirectory);
456
457
        scripts.forEach((script: string) => {
            runScriptContent.push(script);
458
459
460
461
462
        });
        await execMkdir(trialJobDetail.workingDirectory);
        await execMkdir(path.join(trialJobDetail.workingDirectory, '.nni'));
        await execNewFile(path.join(trialJobDetail.workingDirectory, '.nni', 'metrics'));
        const scriptName: string = getScriptName('run');
463
464
        await fs.promises.writeFile(path.join(trialJobDetail.workingDirectory, scriptName),
                                    runScriptContent.join(getNewLine()), { encoding: 'utf8', mode: 0o777 });
465
        await this.writeParameterFile(trialJobDetail.workingDirectory, trialJobDetail.form.hyperParameters);
466
        const trialJobProcess: cp.ChildProcess = runScript(path.join(trialJobDetail.workingDirectory, scriptName));
Deshui Yu's avatar
Deshui Yu committed
467
        this.setTrialJobStatus(trialJobDetail, 'RUNNING');
468
469
        trialJobDetail.startTime = Date.now(); // eslint-disable-line require-atomic-updates
        trialJobDetail.pid = trialJobProcess.pid; // eslint-disable-line require-atomic-updates
Deshui Yu's avatar
Deshui Yu committed
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
        this.setExtraProperties(trialJobDetail, resource);

        let buffer: Buffer = Buffer.alloc(0);
        const stream: ts.Stream = ts.createReadStream(path.join(trialJobDetail.workingDirectory, '.nni', 'metrics'));
        stream.on('data', (data: Buffer) => {
            buffer = Buffer.concat([buffer, data]);
            while (buffer.length > 0) {
                const [success, , content, remain] = decodeCommand(buffer);
                if (!success) {
                    break;
                }
                this.eventEmitter.emit('metric', {
                    id: trialJobDetail.id,
                    data: content
                });
                this.log.debug(`Sending metrics, job id: ${trialJobDetail.id}, metrics: ${content}`);
                buffer = remain;
            }
        });
489
        this.jobStreamMap.set(trialJobDetail.id, stream);
Deshui Yu's avatar
Deshui Yu committed
490
491
    }

chicm-ms's avatar
chicm-ms committed
492
    private async writeParameterFile(directory: string, hyperParameters: HyperParameters): Promise<void> {
493
        const filepath: string = path.join(directory, generateParamFileName(hyperParameters));
chicm-ms's avatar
chicm-ms committed
494
495
        await fs.promises.writeFile(filepath, hyperParameters.value, { encoding: 'utf8' });
    }
J-shang's avatar
J-shang committed
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511

    public async getTrialOutputLocalPath(trialJobId: string): Promise<string> {
        return Promise.resolve(path.join(this.rootDir, 'trials', trialJobId));
    }

    public async fetchTrialOutput(trialJobId: string, subpath: string): Promise<void> {
        let trialLocalPath = await this.getTrialOutputLocalPath(trialJobId);
        if (subpath !== undefined) {
            trialLocalPath = path.join(trialLocalPath, subpath);
        }
        if (fs.existsSync(trialLocalPath)) {
            return Promise.resolve();
        } else {
            return Promise.reject(new Error('Trial local path not exist.'));
        }
    }
Deshui Yu's avatar
Deshui Yu committed
512
513
514
}

export { LocalTrainingService };