localTrainingService.ts 21.2 KB
Newer Older
liuzhe-lz's avatar
liuzhe-lz committed
1
2
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
Deshui Yu's avatar
Deshui Yu committed
3

4
import cp from 'child_process';
Deshui Yu's avatar
Deshui Yu committed
5
import { EventEmitter } from 'events';
6
7
8
9
10
11
12
import fs from 'fs';
import path from 'path';
import ts from 'tail-stream';
import tkill from 'tree-kill';
import { NNIError, NNIErrorNames } from 'common/errors';
import { getExperimentId } from 'common/experimentStartupInfo';
import { getLogger, Logger } from 'common/log';
13
import { powershellString, shellString, createScriptFile } from 'common/shellUtils';
Deshui Yu's avatar
Deshui Yu committed
14
import {
15
    HyperParameters, TrainingService, TrialJobApplicationForm,
Yuge Zhang's avatar
Yuge Zhang committed
16
    TrialJobDetail, TrialJobMetric, TrialJobStatus
17
} from 'common/trainingService';
18
19
import {
    delay, generateParamFileName, getExperimentRootDir, getJobCancelStatus, getNewLine, isAlive, uniqueString
20
} from 'common/utils';
liuzhe-lz's avatar
liuzhe-lz committed
21
import { LocalConfig } from 'common/experimentConfig';
22
import { execMkdir, execNewFile, getScriptName, runScript, setEnvironmentVariable } from '../common/util';
23
import { GPUScheduler } from './gpuScheduler';
Deshui Yu's avatar
Deshui Yu committed
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52

/**
 * Decode a command
 * @param Buffer binary incoming data
 * @returns a tuple of (success, commandType, content, remain)
 *          success: true if the buffer contains at least one complete command; otherwise false
 *          remain: remaining data after the first command
 */
function decodeCommand(data: Buffer): [boolean, string, string, Buffer] {
    if (data.length < 8) {
        return [false, '', '', data];
    }
    const commandType: string = data.slice(0, 2).toString();
    const contentLength: number = parseInt(data.slice(2, 8).toString(), 10);
    if (data.length < contentLength + 8) {
        return [false, '', '', data];
    }
    const content: string = data.slice(8, contentLength + 8).toString();
    const remain: Buffer = data.slice(contentLength + 8);

    return [true, commandType, content, remain];
}

/**
 * LocalTrialJobDetail
 */
class LocalTrialJobDetail implements TrialJobDetail {
    public id: string;
    public status: TrialJobStatus;
53
54
55
    public submitTime: number;
    public startTime?: number;
    public endTime?: number;
Deshui Yu's avatar
Deshui Yu committed
56
57
58
    public tags?: string[];
    public url?: string;
    public workingDirectory: string;
59
    public form: TrialJobApplicationForm;
Deshui Yu's avatar
Deshui Yu committed
60
    public pid?: number;
61
    public gpuIndices?: number[];
Deshui Yu's avatar
Deshui Yu committed
62

63
64
    constructor(
        id: string, status: TrialJobStatus, submitTime: number,
65
        workingDirectory: string, form: TrialJobApplicationForm) {
Deshui Yu's avatar
Deshui Yu committed
66
67
68
69
70
71
        this.id = id;
        this.status = status;
        this.submitTime = submitTime;
        this.workingDirectory = workingDirectory;
        this.form = form;
        this.url = `file://localhost:${workingDirectory}`;
72
73
74
75
        this.gpuIndices = [];
    }
}

Deshui Yu's avatar
Deshui Yu committed
76
/**
chicm-ms's avatar
chicm-ms committed
77
 * Local machine training service
Deshui Yu's avatar
Deshui Yu committed
78
79
 */
class LocalTrainingService implements TrainingService {
liuzhe-lz's avatar
liuzhe-lz committed
80
    private readonly config: LocalConfig;
81
82
83
    private readonly eventEmitter: EventEmitter;
    private readonly jobMap: Map<string, LocalTrialJobDetail>;
    private readonly jobQueue: string[];
Deshui Yu's avatar
Deshui Yu committed
84
85
86
    private initialized: boolean;
    private stopping: boolean;
    private rootDir!: string;
chicm-ms's avatar
chicm-ms committed
87
    private readonly experimentId!: string;
88
    private gpuScheduler!: GPUScheduler;
89
90
91
    private readonly occupiedGpuIndexNumMap: Map<number, number>;
    private readonly log: Logger;
    private readonly jobStreamMap: Map<string, ts.Stream>;
Deshui Yu's avatar
Deshui Yu committed
92

liuzhe-lz's avatar
liuzhe-lz committed
93
94
    constructor(config: LocalConfig) {
        this.config = config;
Deshui Yu's avatar
Deshui Yu committed
95
96
97
98
        this.eventEmitter = new EventEmitter();
        this.jobMap = new Map<string, LocalTrialJobDetail>();
        this.jobQueue = [];
        this.stopping = false;
liuzhe-lz's avatar
liuzhe-lz committed
99
        this.log = getLogger('LocalTrainingService');
suiguoxin's avatar
suiguoxin committed
100
        this.experimentId = getExperimentId();
101
        this.jobStreamMap = new Map<string, ts.Stream>();
chicm-ms's avatar
chicm-ms committed
102
        this.log.info('Construct local machine training service.');
103
        this.occupiedGpuIndexNumMap = new Map<number, number>();
104
105
106
107
108
109
110
111
112
113
114
115
116
117

        if (this.config.trialGpuNumber !== undefined && this.config.trialGpuNumber > 0) {
            this.gpuScheduler = new GPUScheduler();
        }

        if (this.config.gpuIndices === []) {
            throw new Error('gpuIndices cannot be empty when specified.');
        }

        this.rootDir = getExperimentRootDir();
        if (!fs.existsSync(this.rootDir)) {
            throw new Error('root dir not created');
        }
        this.initialized = true;
Deshui Yu's avatar
Deshui Yu committed
118
119
120
    }

    public async run(): Promise<void> {
chicm-ms's avatar
chicm-ms committed
121
        this.log.info('Run local machine training service.');
122
123
124
        const longRunningTasks: Promise<void>[] = [this.runJobLoop()];
        if (this.gpuScheduler !== undefined) {
            longRunningTasks.push(this.gpuScheduler.run());
Deshui Yu's avatar
Deshui Yu committed
125
        }
126
        await Promise.all(longRunningTasks);
chicm-ms's avatar
chicm-ms committed
127
        this.log.info('Local machine training service exit.');
Deshui Yu's avatar
Deshui Yu committed
128
129
130
131
132
133
    }

    public async listTrialJobs(): Promise<TrialJobDetail[]> {
        const jobs: TrialJobDetail[] = [];
        for (const key of this.jobMap.keys()) {
            const trialJob: TrialJobDetail = await this.getTrialJob(key);
134
            jobs.push(trialJob);
Deshui Yu's avatar
Deshui Yu committed
135
136
137
138
139
140
141
142
143
144
145
        }

        return jobs;
    }

    public async getTrialJob(trialJobId: string): Promise<TrialJobDetail> {
        const trialJob: LocalTrialJobDetail | undefined = this.jobMap.get(trialJobId);
        if (trialJob === undefined) {
            throw new NNIError(NNIErrorNames.NOT_FOUND, 'Trial job not found');
        }
        if (trialJob.status === 'RUNNING') {
146
            const alive: boolean = await isAlive(trialJob.pid);
Deshui Yu's avatar
Deshui Yu committed
147
            if (!alive) {
148
                trialJob.endTime = Date.now();
Deshui Yu's avatar
Deshui Yu committed
149
150
151
                this.setTrialJobStatus(trialJob, 'FAILED');
                try {
                    const state: string = await fs.promises.readFile(path.join(trialJob.workingDirectory, '.nni', 'state'), 'utf8');
152
153
                    const match: RegExpMatchArray | null = state.trim()
                        .match(/^(\d+)\s+(\d+)/);
Deshui Yu's avatar
Deshui Yu committed
154
155
156
157
158
                    if (match !== null) {
                        const { 1: code, 2: timestamp } = match;
                        if (parseInt(code, 10) === 0) {
                            this.setTrialJobStatus(trialJob, 'SUCCEEDED');
                        }
159
                        trialJob.endTime = parseInt(timestamp, 10);
Deshui Yu's avatar
Deshui Yu committed
160
161
162
163
                    }
                } catch (error) {
                    //ignore
                }
164
                this.log.debug(`trialJob status update: ${trialJobId}, ${trialJob.status}`);
Deshui Yu's avatar
Deshui Yu committed
165
166
167
168
169
170
            }
        }

        return trialJob;
    }

Yuge Zhang's avatar
Yuge Zhang committed
171
172
173
174
    public async getTrialFile(trialJobId: string, fileName: string): Promise<string | Buffer> {
        // check filename here for security
        if (!['trial.log', 'stderr', 'model.onnx', 'stdout'].includes(fileName)) {
            throw new Error(`File unaccessible: ${fileName}`);
175
        }
Yuge Zhang's avatar
Yuge Zhang committed
176
177
178
179
180
181
182
183
184
        let encoding: string | null = null;
        if (!fileName.includes('.') || fileName.match(/.*\.(txt|log)/g)) {
            encoding = 'utf8';
        }
        const logPath = path.join(this.rootDir, 'trials', trialJobId, fileName);
        if (!fs.existsSync(logPath)) {
            throw new Error(`File not found: ${logPath}`);
        }
        return fs.promises.readFile(logPath, {encoding: encoding as any});
185
186
    }

Deshui Yu's avatar
Deshui Yu committed
187
188
189
190
191
192
193
194
    public addTrialJobMetricListener(listener: (metric: TrialJobMetric) => void): void {
        this.eventEmitter.on('metric', listener);
    }

    public removeTrialJobMetricListener(listener: (metric: TrialJobMetric) => void): void {
        this.eventEmitter.off('metric', listener);
    }

195
    public submitTrialJob(form: TrialJobApplicationForm): Promise<TrialJobDetail> {
196
        const trialJobId: string = form.id === undefined ? uniqueString(5) : form.id;
197
198
199
200
201
202
203
204
205
206
        const trialJobDetail: LocalTrialJobDetail = new LocalTrialJobDetail(
            trialJobId,
            'WAITING',
            Date.now(),
            path.join(this.rootDir, 'trials', trialJobId),
            form
        );
        this.jobQueue.push(trialJobId);
        this.jobMap.set(trialJobId, trialJobDetail);

liuzhe-lz's avatar
liuzhe-lz committed
207
        this.log.debug('submitTrialJob: return:',  trialJobDetail);
208
209

        return Promise.resolve(trialJobDetail);
Deshui Yu's avatar
Deshui Yu committed
210
211
    }

212
213
214
215
216
    /**
     * Update trial job for multi-phase
     * @param trialJobId trial job id
     * @param form job application form
     */
217
    public async updateTrialJob(trialJobId: string, form: TrialJobApplicationForm): Promise<TrialJobDetail> {
chicm-ms's avatar
chicm-ms committed
218
219
220
221
        const trialJobDetail: undefined | TrialJobDetail = this.jobMap.get(trialJobId);
        if (trialJobDetail === undefined) {
            throw new Error(`updateTrialJob failed: ${trialJobId} not found`);
        }
222
        await this.writeParameterFile(trialJobDetail.workingDirectory, form.hyperParameters);
chicm-ms's avatar
chicm-ms committed
223
224

        return trialJobDetail;
225
226
    }

QuanluZhang's avatar
QuanluZhang committed
227
    public async cancelTrialJob(trialJobId: string, isEarlyStopped: boolean = false): Promise<void> {
Deshui Yu's avatar
Deshui Yu committed
228
229
230
231
        const trialJob: LocalTrialJobDetail | undefined = this.jobMap.get(trialJobId);
        if (trialJob === undefined) {
            throw new NNIError(NNIErrorNames.NOT_FOUND, 'Trial job not found');
        }
232
        if (trialJob.pid === undefined) {
233
            this.setTrialJobStatus(trialJob, 'USER_CANCELED');
234

SparkSnail's avatar
SparkSnail committed
235
            return Promise.resolve();
236
        }
237
        tkill(trialJob.pid, 'SIGTERM');
238
239
        this.setTrialJobStatus(trialJob, getJobCancelStatus(isEarlyStopped));

240
        const startTime = Date.now();
241
        while(await isAlive(trialJob.pid)) {
242
243
244
245
246
247
248
249
250
251
252
            if (Date.now() - startTime > 4999) {
                tkill(trialJob.pid, 'SIGKILL', (err) => {
                    if (err) {
                        this.log.error(`kill trial job error: ${err}`);
                    }
                });
                break;
            }
            await delay(500);
        }

SparkSnail's avatar
SparkSnail committed
253
        return Promise.resolve();
Deshui Yu's avatar
Deshui Yu committed
254
255
    }

256
257
    public async setClusterMetadata(_key: string, _value: string): Promise<void> { return; }
    public async getClusterMetadata(_key: string): Promise<string> { return ''; }
Deshui Yu's avatar
Deshui Yu committed
258

259
    public async cleanUp(): Promise<void> {
chicm-ms's avatar
chicm-ms committed
260
        this.log.info('Stopping local machine training service...');
Deshui Yu's avatar
Deshui Yu committed
261
        this.stopping = true;
262
        for (const stream of this.jobStreamMap.values()) {
263
264
            stream.end(0);
            stream.emit('end');
265
        }
266
267
268
269
        if (this.gpuScheduler !== undefined) {
            await this.gpuScheduler.stop();
        }

Deshui Yu's avatar
Deshui Yu committed
270
271
272
        return Promise.resolve();
    }

273
    private onTrialJobStatusChanged(trialJob: LocalTrialJobDetail, oldStatus: TrialJobStatus): void {
274
        //if job is not running, destory job stream
275
276
277
        if (['SUCCEEDED', 'FAILED', 'USER_CANCELED', 'SYS_CANCELED', 'EARLY_STOPPED'].includes(trialJob.status)) {
            if (this.jobStreamMap.has(trialJob.id)) {
                const stream: ts.Stream | undefined = this.jobStreamMap.get(trialJob.id);
278
                if (stream === undefined) {
279
280
                    throw new Error(`Could not find stream in trial ${trialJob.id}`);
                }
281
                //Refer https://github.com/Juul/tail-stream/issues/20
Yan Ni's avatar
Yan Ni committed
282
283
284
285
286
                setTimeout(() => {
                    stream.end(0);
                    stream.emit('end');
                    this.jobStreamMap.delete(trialJob.id);
                }, 5000);
287
288
            }
        }
289
290
291
        if (trialJob.gpuIndices !== undefined && trialJob.gpuIndices.length > 0 && this.gpuScheduler !== undefined) {
            if (oldStatus === 'RUNNING' && trialJob.status !== 'RUNNING') {
                for (const index of trialJob.gpuIndices) {
292
293
                    const num: number | undefined = this.occupiedGpuIndexNumMap.get(index);
                    if (num === undefined) {
294
                        throw new Error(`gpu resource schedule error`);
295
                    } else if (num === 1) {
296
297
                        this.occupiedGpuIndexNumMap.delete(index);
                    } else {
298
                        this.occupiedGpuIndexNumMap.set(index, num - 1);
299
                    }
300
301
302
                }
            }
        }
Deshui Yu's avatar
Deshui Yu committed
303
304
    }

305
306
    private getEnvironmentVariables(
        trialJobDetail: TrialJobDetail,
SparkSnail's avatar
SparkSnail committed
307
308
        resource: { gpuIndices: number[] },
        gpuNum: number | undefined): { key: string; value: string }[] {
309
310
311
312
313
314
315
            const envVariables: { key: string; value: string }[] = [
                { key: 'NNI_PLATFORM', value: 'local' },
                { key: 'NNI_EXP_ID', value: this.experimentId },
                { key: 'NNI_SYS_DIR', value: trialJobDetail.workingDirectory },
                { key: 'NNI_TRIAL_JOB_ID', value: trialJobDetail.id },
                { key: 'NNI_OUTPUT_DIR', value: trialJobDetail.workingDirectory },
                { key: 'NNI_TRIAL_SEQ_ID', value: trialJobDetail.form.sequenceId.toString() },
316
                { key: 'NNI_CODE_DIR', value: this.config.trialCodeDirectory}
317
318
319
320
321
322
323
            ];
            if (gpuNum !== undefined) {
                envVariables.push({
                    key: 'CUDA_VISIBLE_DEVICES',
                    value: this.gpuScheduler === undefined ? '-1' : resource.gpuIndices.join(',')
                });
            }
324
325
326
327
328
329
330
331
332
333
334
335
336
337

        return envVariables;
    }

    private setExtraProperties(trialJobDetail: LocalTrialJobDetail, resource: { gpuIndices: number[] }): void {
        trialJobDetail.gpuIndices = resource.gpuIndices;
    }

    private tryGetAvailableResource(): [boolean, { gpuIndices: number[]}] {
        const resource: { gpuIndices: number[] } = { gpuIndices: [] };
        if (this.gpuScheduler === undefined) {
            return [true, resource];
        }

338
        let selectedGPUIndices: number[] = [];
339
        const availableGpuIndices: number[] = this.gpuScheduler.getAvailableGPUIndices(this.config.useActiveGpu, this.occupiedGpuIndexNumMap);
340
341
        for (const index of availableGpuIndices) {
            const num: number | undefined = this.occupiedGpuIndexNumMap.get(index);
342
            if (num === undefined || num < this.config.maxTrialNumberPerGpu) {
343
344
345
                selectedGPUIndices.push(index);
            }
        }
346

347
        if (this.config.gpuIndices !== undefined) {
348
            this.checkSpecifiedGpuIndices();
349
            selectedGPUIndices = selectedGPUIndices.filter((index: number) => this.config.gpuIndices!.includes(index));
350
351
        }

352
        if (selectedGPUIndices.length < this.config.trialGpuNumber!) {
353
354
355
            return [false, resource];
        }

356
        selectedGPUIndices.splice(this.config.trialGpuNumber!);
357
358
359
        Object.assign(resource, { gpuIndices: selectedGPUIndices });

        return [true, resource];
Deshui Yu's avatar
Deshui Yu committed
360
361
    }

362
    private checkSpecifiedGpuIndices(): void {
363
        const gpuCount: number | undefined = this.gpuScheduler.getSystemGpuCount();
364
365
        if (this.config.gpuIndices !== undefined && gpuCount !== undefined) {
            for (const index of this.config.gpuIndices) {
366
367
368
369
370
                if (index >= gpuCount) {
                    throw new Error(`Specified GPU index not found: ${index}`);
                }
            }
        }
Deshui Yu's avatar
Deshui Yu committed
371
372
    }

373
374
375
    private occupyResource(resource: {gpuIndices: number[]}): void {
        if (this.gpuScheduler !== undefined) {
            for (const index of resource.gpuIndices) {
376
377
378
                const num: number | undefined = this.occupiedGpuIndexNumMap.get(index);
                if (num === undefined) {
                    this.occupiedGpuIndexNumMap.set(index, 1);
379
                } else {
380
                    this.occupiedGpuIndexNumMap.set(index, num + 1);
381
                }
382
383
            }
        }
Deshui Yu's avatar
Deshui Yu committed
384
385
    }

386
387
388
389
    private async runJobLoop(): Promise<void> {
        while (!this.stopping) {
            while (!this.stopping && this.jobQueue.length !== 0) {
                const trialJobId: string = this.jobQueue[0];
390
391
                const trialJobDetail: LocalTrialJobDetail | undefined = this.jobMap.get(trialJobId);
                if (trialJobDetail !== undefined && trialJobDetail.status === 'WAITING') {
392
393
394
395
                    const [success, resource] = this.tryGetAvailableResource();
                    if (!success) {
                        break;
                    }
396

397
398
399
400
401
402
403
                    this.occupyResource(resource);
                    await this.runTrialJob(trialJobId, resource);
                }
                this.jobQueue.shift();
            }
            await delay(5000);
        }
Deshui Yu's avatar
Deshui Yu committed
404
405
406
407
408
409
410
411
412
413
    }

    private setTrialJobStatus(trialJob: LocalTrialJobDetail, newStatus: TrialJobStatus): void {
        if (trialJob.status !== newStatus) {
            const oldStatus: TrialJobStatus = trialJob.status;
            trialJob.status = newStatus;
            this.onTrialJobStatusChanged(trialJob, oldStatus);
        }
    }

414
    private getScript(workingDirectory: string): string[] {
415
        const script: string[] = [];
Yuge Zhang's avatar
Yuge Zhang committed
416
        const escapedCommand = shellString(this.config.trialCommand);
417
        if (process.platform === 'win32') {
418
            script.push(`$PSDefaultParameterValues = @{'Out-File:Encoding' = 'utf8'}`);
SparkSnail's avatar
SparkSnail committed
419
            script.push(`cd $env:NNI_CODE_DIR`);
420
            script.push(
Yuge Zhang's avatar
Yuge Zhang committed
421
                `cmd.exe /c ${escapedCommand} 1>${path.join(workingDirectory, 'stdout')} 2>${path.join(workingDirectory, 'stderr')}`,
422
                `$NOW_DATE = [int64](([datetime]::UtcNow)-(get-date "1/1/1970")).TotalSeconds`,
423
                `$NOW_DATE = "$NOW_DATE" + (Get-Date -Format fff).ToString()`,
SparkSnail's avatar
SparkSnail committed
424
                `Write $LASTEXITCODE " " $NOW_DATE  | Out-File "${path.join(workingDirectory, '.nni', 'state')}" -NoNewline -encoding utf8`);
425
        } else {
SparkSnail's avatar
SparkSnail committed
426
            script.push(`cd $NNI_CODE_DIR`);
Yuge Zhang's avatar
Yuge Zhang committed
427
            script.push(`eval ${escapedCommand} 1>${path.join(workingDirectory, 'stdout')} 2>${path.join(workingDirectory, 'stderr')}`);
428
429
430
            if (process.platform === 'darwin') {
                // https://superuser.com/questions/599072/how-to-get-bash-execution-time-in-milliseconds-under-mac-os-x
                // Considering the worst case, write 999 to avoid negative duration
SparkSnail's avatar
SparkSnail committed
431
                script.push(`echo $? \`date +%s999\` >'${path.join(workingDirectory, '.nni', 'state')}'`);
432
            } else {
SparkSnail's avatar
SparkSnail committed
433
                script.push(`echo $? \`date +%s%3N\` >'${path.join(workingDirectory, '.nni', 'state')}'`);
434
            }
435
        }
436

437
438
439
        return script;
    }

440
    private async runTrialJob(trialJobId: string, resource: {gpuIndices: number[]}): Promise<void> {
Deshui Yu's avatar
Deshui Yu committed
441
        const trialJobDetail: LocalTrialJobDetail = <LocalTrialJobDetail>this.jobMap.get(trialJobId);
442
        const variables: { key: string; value: string }[] = this.getEnvironmentVariables(trialJobDetail, resource, this.config.trialGpuNumber);
Deshui Yu's avatar
Deshui Yu committed
443

444
445
446
        const runScriptContent: string[] = [];
        if (process.platform !== 'win32') {
            runScriptContent.push('#!/bin/bash');
447
        } else {
448
            runScriptContent.push(`$env:PATH=${powershellString(process.env['path']!)}`)
449
        }
Deshui Yu's avatar
Deshui Yu committed
450
        for (const variable of variables) {
451
            runScriptContent.push(setEnvironmentVariable(variable));
Deshui Yu's avatar
Deshui Yu committed
452
        }
453
        const scripts: string[] = this.getScript(trialJobDetail.workingDirectory);
454
455
        scripts.forEach((script: string) => {
            runScriptContent.push(script);
456
457
458
459
460
        });
        await execMkdir(trialJobDetail.workingDirectory);
        await execMkdir(path.join(trialJobDetail.workingDirectory, '.nni'));
        await execNewFile(path.join(trialJobDetail.workingDirectory, '.nni', 'metrics'));
        const scriptName: string = getScriptName('run');
461
462
        await createScriptFile(path.join(trialJobDetail.workingDirectory, scriptName),
                runScriptContent.join(getNewLine()));
463
        await this.writeParameterFile(trialJobDetail.workingDirectory, trialJobDetail.form.hyperParameters);
464
        const trialJobProcess: cp.ChildProcess = runScript(path.join(trialJobDetail.workingDirectory, scriptName));
Deshui Yu's avatar
Deshui Yu committed
465
        this.setTrialJobStatus(trialJobDetail, 'RUNNING');
466
467
        trialJobDetail.startTime = Date.now(); // eslint-disable-line require-atomic-updates
        trialJobDetail.pid = trialJobProcess.pid; // eslint-disable-line require-atomic-updates
Deshui Yu's avatar
Deshui Yu committed
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
        this.setExtraProperties(trialJobDetail, resource);

        let buffer: Buffer = Buffer.alloc(0);
        const stream: ts.Stream = ts.createReadStream(path.join(trialJobDetail.workingDirectory, '.nni', 'metrics'));
        stream.on('data', (data: Buffer) => {
            buffer = Buffer.concat([buffer, data]);
            while (buffer.length > 0) {
                const [success, , content, remain] = decodeCommand(buffer);
                if (!success) {
                    break;
                }
                this.eventEmitter.emit('metric', {
                    id: trialJobDetail.id,
                    data: content
                });
                this.log.debug(`Sending metrics, job id: ${trialJobDetail.id}, metrics: ${content}`);
                buffer = remain;
            }
        });
487
        this.jobStreamMap.set(trialJobDetail.id, stream);
Deshui Yu's avatar
Deshui Yu committed
488
489
    }

chicm-ms's avatar
chicm-ms committed
490
    private async writeParameterFile(directory: string, hyperParameters: HyperParameters): Promise<void> {
491
        const filepath: string = path.join(directory, generateParamFileName(hyperParameters));
chicm-ms's avatar
chicm-ms committed
492
493
        await fs.promises.writeFile(filepath, hyperParameters.value, { encoding: 'utf8' });
    }
J-shang's avatar
J-shang committed
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509

    public async getTrialOutputLocalPath(trialJobId: string): Promise<string> {
        return Promise.resolve(path.join(this.rootDir, 'trials', trialJobId));
    }

    public async fetchTrialOutput(trialJobId: string, subpath: string): Promise<void> {
        let trialLocalPath = await this.getTrialOutputLocalPath(trialJobId);
        if (subpath !== undefined) {
            trialLocalPath = path.join(trialLocalPath, subpath);
        }
        if (fs.existsSync(trialLocalPath)) {
            return Promise.resolve();
        } else {
            return Promise.reject(new Error('Trial local path not exist.'));
        }
    }
Deshui Yu's avatar
Deshui Yu committed
510
511
512
}

export { LocalTrainingService };