remoteMachineTrainingService.ts 27.4 KB
Newer Older
liuzhe-lz's avatar
liuzhe-lz committed
1
2
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
Deshui Yu's avatar
Deshui Yu committed
3

4
import assert from 'assert';
Deshui Yu's avatar
Deshui Yu committed
5
import { EventEmitter } from 'events';
6
7
import fs from 'fs';
import path from 'path';
8
import { ShellExecutor } from 'training_service/remote_machine/shellExecutor';
Deshui Yu's avatar
Deshui Yu committed
9
import { Deferred } from 'ts-deferred';
10
11
12
13
14
import * as component from 'common/component';
import { NNIError, NNIErrorNames, MethodNotImplementedError } from 'common/errors';
import { getExperimentId } from 'common/experimentStartupInfo';
import { getLogger, Logger } from 'common/log';
import { ObservableTimer } from 'common/observableTimer';
Deshui Yu's avatar
Deshui Yu committed
15
import {
16
    HyperParameters, TrainingService, TrialJobApplicationForm,
Yuge Zhang's avatar
Yuge Zhang committed
17
    TrialJobDetail, TrialJobMetric
18
} from 'common/trainingService';
19
import {
20
21
    delay, generateParamFileName, getExperimentRootDir, getIPV4Address, getJobCancelStatus,
    getVersion, uniqueString
22
} from 'common/utils';
liuzhe-lz's avatar
liuzhe-lz committed
23
import { RemoteConfig, RemoteMachineConfig } from 'common/experimentConfig';
24
import { CONTAINER_INSTALL_NNI_SHELL_FORMAT } from '../common/containerJobData';
25
import { GPUSummary, ScheduleResultType } from '../common/gpuData';
26
import { execMkdir, validateCodeDir } from '../common/util';
Deshui Yu's avatar
Deshui Yu committed
27
28
import { GPUScheduler } from './gpuScheduler';
import {
29
    ExecutorManager, RemoteMachineScheduleInfo, RemoteMachineScheduleResult, RemoteMachineTrialJobDetail
Deshui Yu's avatar
Deshui Yu committed
30
} from './remoteMachineData';
SparkSnail's avatar
SparkSnail committed
31
import { RemoteMachineJobRestServer } from './remoteMachineJobRestServer';
Deshui Yu's avatar
Deshui Yu committed
32
33
34
35

/**
 * Training Service implementation for Remote Machine (Linux)
 */
SparkSnail's avatar
SparkSnail committed
36
@component.Singleton
Deshui Yu's avatar
Deshui Yu committed
37
class RemoteMachineTrainingService implements TrainingService {
38
    private readonly initExecutorId = "initConnection";
39
40
    private readonly machineExecutorManagerMap: Map<RemoteMachineConfig, ExecutorManager>; //machine excutor map
    private readonly machineCopyExpCodeDirPromiseMap: Map<RemoteMachineConfig, Promise<void>>;
41
    private readonly trialExecutorManagerMap: Map<string, ExecutorManager>; //trial excutor map
42
43
    private readonly trialJobsMap: Map<string, RemoteMachineTrialJobDetail>;
    private readonly expRootDir: string;
44
    private gpuScheduler?: GPUScheduler;
45
46
    private readonly jobQueue: string[];
    private readonly timer: ObservableTimer;
Deshui Yu's avatar
Deshui Yu committed
47
    private stopping: boolean = false;
48
49
    private readonly metricsEmitter: EventEmitter;
    private readonly log: Logger;
SparkSnail's avatar
SparkSnail committed
50
    private remoteRestServerPort?: number;
51
    private versionCheck: boolean = true;
52
    private logCollection: string = 'none';
53
    private sshConnectionPromises: any[];
liuzhe-lz's avatar
liuzhe-lz committed
54
    private config: RemoteConfig;
Deshui Yu's avatar
Deshui Yu committed
55

liuzhe-lz's avatar
liuzhe-lz committed
56
    constructor(config: RemoteConfig) {
Deshui Yu's avatar
Deshui Yu committed
57
58
        this.metricsEmitter = new EventEmitter();
        this.trialJobsMap = new Map<string, RemoteMachineTrialJobDetail>();
59
        this.trialExecutorManagerMap = new Map<string, ExecutorManager>();
60
61
        this.machineCopyExpCodeDirPromiseMap = new Map<RemoteMachineConfig, Promise<void>>();
        this.machineExecutorManagerMap = new Map<RemoteMachineConfig, ExecutorManager>();
Deshui Yu's avatar
Deshui Yu committed
62
        this.jobQueue = [];
63
        this.sshConnectionPromises = [];
Deshui Yu's avatar
Deshui Yu committed
64
        this.expRootDir = getExperimentRootDir();
65
        this.timer = component.get(ObservableTimer);
liuzhe-lz's avatar
liuzhe-lz committed
66
        this.log = getLogger('RemoteMachineTrainingService');
chicm-ms's avatar
chicm-ms committed
67
        this.log.info('Construct remote machine training service.');
liuzhe-lz's avatar
liuzhe-lz committed
68
        this.config = config;
69
70
71
72
73
74
75
76
77

        if (!fs.lstatSync(this.config.trialCodeDirectory).isDirectory()) {
            throw new Error(`codeDir ${this.config.trialCodeDirectory} is not a directory`);
        }
        validateCodeDir(this.config.trialCodeDirectory);

        this.sshConnectionPromises = this.config.machineList.map(
            machine => this.initRemoteMachineOnConnected(machine)
        );
Deshui Yu's avatar
Deshui Yu committed
78
79
80
81
82
83
    }

    /**
     * Loop to launch trial jobs and collect trial metrics
     */
    public async run(): Promise<void> {
84
        const restServer = new RemoteMachineJobRestServer(this);
SparkSnail's avatar
SparkSnail committed
85
        await restServer.start();
86
        restServer.setEnableVersionCheck = this.versionCheck;
chicm-ms's avatar
chicm-ms committed
87
        this.log.info('Run remote machine training service.');
88
89
90
91
92
        if (this.sshConnectionPromises.length > 0) {
            await Promise.all(this.sshConnectionPromises);
            this.log.info('ssh connection initialized!');
            // set sshConnectionPromises to [] to avoid log information duplicated
            this.sshConnectionPromises = [];
93
94
            // initialize gpuScheduler
            this.gpuScheduler = new GPUScheduler(this.machineExecutorManagerMap);
SparkSnail's avatar
SparkSnail committed
95
            // Copy codeDir to remote machine
96
            for (const [machineConfig, executorManager] of this.machineExecutorManagerMap.entries()) {
SparkSnail's avatar
SparkSnail committed
97
98
99
                const executor: ShellExecutor = await executorManager.getExecutor(this.initExecutorId);
                if (executor !== undefined) {
                    this.machineCopyExpCodeDirPromiseMap.set(
100
101
                        machineConfig,
                        executor.copyDirectoryToRemote(this.config.trialCodeDirectory, executor.getRemoteCodePath(getExperimentId()))
SparkSnail's avatar
SparkSnail committed
102
103
104
                    );
                }
            }
105
        }
Deshui Yu's avatar
Deshui Yu committed
106
107
        while (!this.stopping) {
            while (this.jobQueue.length > 0) {
SparkSnail's avatar
SparkSnail committed
108
                this.updateGpuReservation();
Deshui Yu's avatar
Deshui Yu committed
109
                const trialJobId: string = this.jobQueue[0];
chicm-ms's avatar
chicm-ms committed
110
                const prepareResult: boolean = await this.prepareTrialJob(trialJobId);
Deshui Yu's avatar
Deshui Yu committed
111
112
113
114
                if (prepareResult) {
                    // Remove trial job with trialJobId from job queue
                    this.jobQueue.shift();
                } else {
115
                    // Break the while loop since no GPU resource is available right now,
Deshui Yu's avatar
Deshui Yu committed
116
117
118
                    // Wait to schedule job in next time iteration
                    break;
                }
119
            }
120
            if (restServer.getErrorMessage !== undefined) {
121
                this.stopping = true;
122
                throw new Error(restServer.getErrorMessage);
123
            }
Deshui Yu's avatar
Deshui Yu committed
124
125
            await delay(3000);
        }
126
        this.log.info('RemoteMachineTrainingService run loop exited.');
Deshui Yu's avatar
Deshui Yu committed
127
    }
128

SparkSnail's avatar
SparkSnail committed
129
    /**
130
     * give trial an executor
131
     * @param trial remote machine trial job detail
SparkSnail's avatar
SparkSnail committed
132
     */
133
    public allocateExecutorManagerForTrial(trial: RemoteMachineTrialJobDetail): void {
134
        if (trial.rmMeta === undefined) {
SparkSnail's avatar
SparkSnail committed
135
136
            throw new Error(`rmMeta not set in trial ${trial.id}`);
        }
137
        const executorManager: ExecutorManager | undefined = this.machineExecutorManagerMap.get(trial.rmMeta.config);
138
139
        if (executorManager === undefined) {
            throw new Error(`executorManager not initialized`);
SparkSnail's avatar
SparkSnail committed
140
        }
141
        this.trialExecutorManagerMap.set(trial.id, executorManager);
SparkSnail's avatar
SparkSnail committed
142
    }
143

SparkSnail's avatar
SparkSnail committed
144
145
    /**
     * If a trial is finished, release the connection resource
146
     * @param trial remote machine trial job detail
SparkSnail's avatar
SparkSnail committed
147
     */
148
    public releaseTrialResource(trial: RemoteMachineTrialJobDetail): void {
149
        if (trial.rmMeta === undefined) {
SparkSnail's avatar
SparkSnail committed
150
151
            throw new Error(`rmMeta not set in trial ${trial.id}`);
        }
152
        const executorManager = this.trialExecutorManagerMap.get(trial.id);
153
        if (executorManager === undefined) {
154
            throw new Error(`ExecutorManager is not assigned for trial ${trial.id}`);
SparkSnail's avatar
SparkSnail committed
155
        }
156
157
        // Note, it still keep reference in trialExecutorManagerMap, as there may be following requests from nni manager.
        executorManager.releaseExecutor(trial.id);
SparkSnail's avatar
SparkSnail committed
158
    }
Deshui Yu's avatar
Deshui Yu committed
159
160
161
162

    /**
     * List submitted trial jobs
     */
163
    public async listTrialJobs(): Promise<TrialJobDetail[]> {
Deshui Yu's avatar
Deshui Yu committed
164
165
166
        const jobs: TrialJobDetail[] = [];
        const deferred: Deferred<TrialJobDetail[]> = new Deferred<TrialJobDetail[]>();

167
        for (const [key,] of this.trialJobsMap) {
168
            jobs.push(await this.getTrialJob(key));
169
        }
Deshui Yu's avatar
Deshui Yu committed
170
171
172
173
174
175
176
177
178
179
180
        deferred.resolve(jobs);

        return deferred.promise;
    }

    /**
     * Get trial job detail information
     * @param trialJobId ID of trial job
     */
    public async getTrialJob(trialJobId: string): Promise<TrialJobDetail> {
        const trialJob: RemoteMachineTrialJobDetail | undefined = this.trialJobsMap.get(trialJobId);
181
        if (trialJob === undefined) {
Deshui Yu's avatar
Deshui Yu committed
182
183
184
185
            throw new NNIError(NNIErrorNames.NOT_FOUND, `trial job id ${trialJobId} not found`);
        }
        //TO DO: add another job status, and design new job status change logic
        if (trialJob.status === 'RUNNING' || trialJob.status === 'UNKNOWN') {
186
            // Get executor where the job is running
Deshui Yu's avatar
Deshui Yu committed
187
188
189
            if (trialJob.rmMeta === undefined) {
                throw new Error(`rmMeta not set for submitted job ${trialJobId}`);
            }
190
            const executor = await this.getExecutor(trialJob.id);
Deshui Yu's avatar
Deshui Yu committed
191

192
            return this.updateTrialJobStatus(trialJob, executor);
Deshui Yu's avatar
Deshui Yu committed
193
194
195
196
197
        } else {
            return trialJob;
        }
    }

198
199
200
201
202
    /**
     * Get trial job log
     * @param _trialJobId ID of trial job
     * @param _logType 'TRIAL_LOG' | 'TRIAL_STDERR'
     */
Yuge Zhang's avatar
Yuge Zhang committed
203
    public async getTrialFile(_trialJobId: string, _fileName: string): Promise<string | Buffer> {
204
205
206
        throw new MethodNotImplementedError();
    }

Deshui Yu's avatar
Deshui Yu committed
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
    /**
     * Add job metrics listener
     * @param listener callback listener
     */
    public addTrialJobMetricListener(listener: (metric: TrialJobMetric) => void): void {
        this.metricsEmitter.on('metric', listener);
    }

    /**
     * Remove job metrics listener
     * @param listener callback listener
     */
    public removeTrialJobMetricListener(listener: (metric: TrialJobMetric) => void): void {
        this.metricsEmitter.off('metric', listener);
    }

    /**
     * Submit trial job
     * @param form trial job description form
     */
227
228
229
    public async submitTrialJob(form: TrialJobApplicationForm): Promise<TrialJobDetail> {
        // Generate trial job id(random)
        const trialJobId: string = uniqueString(5);
Deshui Yu's avatar
Deshui Yu committed
230

231
232
233
234
        const trialJobDetail: RemoteMachineTrialJobDetail = new RemoteMachineTrialJobDetail(
            trialJobId,
            'WAITING',
            Date.now(),
235
            "unset",
236
237
238
239
            form
        );
        this.jobQueue.push(trialJobId);
        this.trialJobsMap.set(trialJobId, trialJobDetail);
240

241
        return Promise.resolve(trialJobDetail);
Deshui Yu's avatar
Deshui Yu committed
242
243
    }

244
245
246
247
248
    /**
     * Update trial job for multi-phase
     * @param trialJobId trial job id
     * @param form job application form
     */
249
    public async updateTrialJob(trialJobId: string, form: TrialJobApplicationForm): Promise<TrialJobDetail> {
chicm-ms's avatar
chicm-ms committed
250
251
252
253
        const trialJobDetail: undefined | TrialJobDetail = this.trialJobsMap.get(trialJobId);
        if (trialJobDetail === undefined) {
            throw new Error(`updateTrialJob failed: ${trialJobId} not found`);
        }
chicm-ms's avatar
chicm-ms committed
254
        await this.writeParameterFile(trialJobId, form.hyperParameters);
chicm-ms's avatar
chicm-ms committed
255
256

        return trialJobDetail;
257
    }
258

Deshui Yu's avatar
Deshui Yu committed
259
260
261
262
    /**
     * Cancel trial job
     * @param trialJobId ID of trial job
     */
QuanluZhang's avatar
QuanluZhang committed
263
    public async cancelTrialJob(trialJobId: string, isEarlyStopped: boolean = false): Promise<void> {
Deshui Yu's avatar
Deshui Yu committed
264
        const trialJob: RemoteMachineTrialJobDetail | undefined = this.trialJobsMap.get(trialJobId);
265
        if (trialJob === undefined) {
Deshui Yu's avatar
Deshui Yu committed
266
267
268
269
            throw new Error(`trial job id ${trialJobId} not found`);
        }

        // Remove the job with trialJobId from job queue
chicm-ms's avatar
chicm-ms committed
270
        const index: number = this.jobQueue.indexOf(trialJobId);
271
        if (index >= 0) {
Deshui Yu's avatar
Deshui Yu committed
272
273
274
            this.jobQueue.splice(index, 1);
        }

275
        // Get executor where the job is running
Deshui Yu's avatar
Deshui Yu committed
276
277
        if (trialJob.rmMeta !== undefined) {
            // If the trial job is already scheduled, check its status and kill the trial process in remote machine
278
            const executor = await this.getExecutor(trialJob.id);
Deshui Yu's avatar
Deshui Yu committed
279

chicm-ms's avatar
chicm-ms committed
280
281
            if (trialJob.status === 'UNKNOWN') {
                trialJob.status = 'USER_CANCELED';
282
                this.releaseTrialResource(trialJob);
chicm-ms's avatar
chicm-ms committed
283
284
285
                return
            }

286
            const jobpidPath: string = this.getJobPidPath(executor, trialJob.id);
Deshui Yu's avatar
Deshui Yu committed
287
            try {
288
289
                // Mark the toEarlyStop tag here
                trialJob.isEarlyStopped = isEarlyStopped;
290
                await executor.killChildProcesses(jobpidPath);
291
                this.releaseTrialResource(trialJob);
Deshui Yu's avatar
Deshui Yu committed
292
293
            } catch (error) {
                // Not handle the error since pkill failed will not impact trial job's current status
294
                this.log.error(`remoteTrainingService.cancelTrialJob: ${error}`);
Deshui Yu's avatar
Deshui Yu committed
295
296
297
            }
        } else {
            // Job is not scheduled yet, set status to 'USER_CANCELLED' directly
QuanluZhang's avatar
QuanluZhang committed
298
299
            assert(isEarlyStopped === false, 'isEarlyStopped is not supposed to be true here.');
            trialJob.status = getJobCancelStatus(isEarlyStopped);
Deshui Yu's avatar
Deshui Yu committed
300
301
302
        }
    }

303
304
    public async setClusterMetadata(_key: string, _value: string): Promise<void> { return; }
    public async getClusterMetadata(_key: string): Promise<string> { return ''; }
305

SparkSnail's avatar
SparkSnail committed
306
    /**
307
     * cleanup() has a time out of 10s to clean remote connections
SparkSnail's avatar
SparkSnail committed
308
309
     */
    public async cleanUp(): Promise<void> {
chicm-ms's avatar
chicm-ms committed
310
        this.log.info('Stopping remote machine training service...');
Deshui Yu's avatar
Deshui Yu committed
311
        this.stopping = true;
312
313
314
315
316
317
318
319
320
        await this.cleanupConnections();
    }

    private async getExecutor(trialId: string): Promise<ShellExecutor> {
        const executorManager = this.trialExecutorManagerMap.get(trialId);
        if (executorManager === undefined) {
            throw new Error(`ExecutorManager is not assigned for trial ${trialId}`);
        }
        return await executorManager.getExecutor(trialId);
SparkSnail's avatar
SparkSnail committed
321
    }
322

323
324
325
326
    /**
     * remove gpu reversion when job is not running
     */
    private updateGpuReservation(): void {
327
328
329
330
331
        if (this.gpuScheduler) {
            for (const [key, value] of this.trialJobsMap) {
                if (!['WAITING', 'RUNNING'].includes(value.status)) {
                    this.gpuScheduler.removeGpuReservation(key, this.trialJobsMap);
                }
332
333
334
335
            }
        }
    }

SparkSnail's avatar
SparkSnail committed
336
337
338
339
    /**
     * stop gpu_metric_collector process in remote machine and remove unused scripts
     */
    private async cleanupConnections(): Promise<void> {
340
        try {
341
342
            for (const executorManager of this.machineExecutorManagerMap.values()) {
                const executor = await executorManager.getExecutor(this.initExecutorId);
343
                if (executor !== undefined) {
344
345
                    this.log.info(`killing gpu metric collector on ${executor.name}`);
                    const gpuJobPidPath: string = executor.joinPath(executor.getRemoteScriptsPath(getExperimentId()), 'pid');
346
                    await executor.killChildProcesses(gpuJobPidPath, true);
SparkSnail's avatar
SparkSnail committed
347
                }
348
                executorManager.releaseAllExecutor();
SparkSnail's avatar
SparkSnail committed
349
            }
350
        } catch (error) {
SparkSnail's avatar
SparkSnail committed
351
            //ignore error, this function is called to cleanup remote connections when experiment is stopping
352
            this.log.error(`Cleanup connection exception, error is ${error}`);
SparkSnail's avatar
SparkSnail committed
353
        }
354
355
    }

356
357
358
    private async initRemoteMachineOnConnected(machineConfig: RemoteMachineConfig): Promise<void> {
        const executorManager: ExecutorManager = new ExecutorManager(machineConfig);
        this.log.info(`connecting to ${machineConfig.user}@${machineConfig.host}:${machineConfig.port}`);
359
360
        const executor: ShellExecutor = await executorManager.getExecutor(this.initExecutorId);
        this.log.debug(`reached ${executor.name}`);
361
        this.machineExecutorManagerMap.set(machineConfig, executorManager);
362
363
        this.log.debug(`initializing ${executor.name}`);

364
        // Create root working directory after executor is ready
365
366
        const nniRootDir: string = executor.joinPath(executor.getTempPath(), 'nni');
        await executor.createFolder(executor.getRemoteExperimentRootDir(getExperimentId()));
367

368
        // the directory to store temp scripts in remote machine
369
370
371
        const remoteGpuScriptCollectorDir: string = executor.getRemoteScriptsPath(getExperimentId());

        // clean up previous result.
372
        await executor.createFolder(remoteGpuScriptCollectorDir, true);
Junwei Sun's avatar
Junwei Sun committed
373
        await executor.allowPermission(true, nniRootDir);
374

Deshui Yu's avatar
Deshui Yu committed
375
        //Begin to execute gpu_metrics_collection scripts
376
        const script = executor.generateGpuStatsScript(getExperimentId());
377
        executor.executeScript(script, false, true);
378
379
380
        // the timer is trigger in 1 second, it causes multiple runs on server.
        // So reduce it's freqeunce, only allow one of it run.
        const collectingCount: boolean[] = [];
381

382
        const disposable: Rx.IDisposable = this.timer.subscribe(
383
            async () => {
384
385
386
387
                if (collectingCount.length == 0) {
                    collectingCount.push(true);
                    const cmdresult = await executor.readLastLines(executor.joinPath(remoteGpuScriptCollectorDir, 'gpu_metrics'));
                    if (cmdresult !== "") {
388
389
390
                        executorManager.rmMeta.gpuSummary = <GPUSummary>JSON.parse(cmdresult);
                        if (executorManager.rmMeta.gpuSummary.gpuCount === 0) {
                            this.log.warning(`No GPU found on remote machine ${machineConfig.host}`);
391
392
                            this.timer.unsubscribe(disposable);
                        }
393
                    }
394
                    if (this.stopping) {
395
                        this.timer.unsubscribe(disposable);
396
                        this.log.debug(`Stopped GPU collector on ${machineConfig.host}, since experiment is exiting.`);
397
                    }
398
                    collectingCount.pop();
Deshui Yu's avatar
Deshui Yu committed
399
400
401
402
403
404
                }
            }
        );
    }

    private async prepareTrialJob(trialJobId: string): Promise<boolean> {
chicm-ms's avatar
chicm-ms committed
405
        const deferred: Deferred<boolean> = new Deferred<boolean>();
Deshui Yu's avatar
Deshui Yu committed
406

407
408
409
        if (this.gpuScheduler === undefined) {
            throw new Error('gpuScheduler is not initialized');
        }
Deshui Yu's avatar
Deshui Yu committed
410
411
412
413
        const trialJobDetail: RemoteMachineTrialJobDetail | undefined = this.trialJobsMap.get(trialJobId);
        if (trialJobDetail === undefined) {
            throw new NNIError(NNIErrorNames.INVALID_JOB_DETAIL, `Invalid job detail information for trial job ${trialJobId}`);
        }
414
415
416
        // If job is not WATIING, Don't prepare and resolve true immediately
        if (trialJobDetail.status !== 'WAITING') {
            deferred.resolve(true);
417

418
419
            return deferred.promise;
        }
420
        // get an executor from scheduler
421
        const rmScheduleResult: RemoteMachineScheduleResult = this.gpuScheduler.scheduleMachine(this.config.trialGpuNumber, trialJobDetail);
Deshui Yu's avatar
Deshui Yu committed
422
        if (rmScheduleResult.resultType === ScheduleResultType.REQUIRE_EXCEED_TOTAL) {
423
            const errorMessage: string = `Required GPU number ${this.config.trialGpuNumber} is too large, no machine can meet`;
Deshui Yu's avatar
Deshui Yu committed
424
425
426
            this.log.error(errorMessage);
            deferred.reject();
            throw new NNIError(NNIErrorNames.RESOURCE_NOT_AVAILABLE, errorMessage);
427
        } else if (rmScheduleResult.resultType === ScheduleResultType.SUCCEED
Deshui Yu's avatar
Deshui Yu committed
428
            && rmScheduleResult.scheduleInfo !== undefined) {
chicm-ms's avatar
chicm-ms committed
429
            const rmScheduleInfo: RemoteMachineScheduleInfo = rmScheduleResult.scheduleInfo;
SparkSnail's avatar
SparkSnail committed
430
431

            trialJobDetail.rmMeta = rmScheduleInfo.rmMeta;
432
            const copyExpCodeDirPromise = this.machineCopyExpCodeDirPromiseMap.get(rmScheduleInfo.rmMeta.config);
433
434
435
            if (copyExpCodeDirPromise !== undefined) {
                await copyExpCodeDirPromise;
            }
SparkSnail's avatar
SparkSnail committed
436

437
438
439
440
441
            this.allocateExecutorManagerForTrial(trialJobDetail);
            const executor = await this.getExecutor(trialJobDetail.id);

            trialJobDetail.workingDirectory = executor.joinPath(executor.getRemoteExperimentRootDir(getExperimentId()), 'trials', trialJobDetail.id);

Deshui Yu's avatar
Deshui Yu committed
442
            await this.launchTrialOnScheduledMachine(
443
                trialJobId, trialJobDetail.form, rmScheduleInfo);
Deshui Yu's avatar
Deshui Yu committed
444
445

            trialJobDetail.status = 'RUNNING';
446
            trialJobDetail.url = `file://${rmScheduleInfo.rmMeta.config.host}:${trialJobDetail.workingDirectory}`;
447
            trialJobDetail.startTime = Date.now();
Deshui Yu's avatar
Deshui Yu committed
448

449
            this.trialJobsMap.set(trialJobId, trialJobDetail);
Deshui Yu's avatar
Deshui Yu committed
450
            deferred.resolve(true);
451
        } else if (rmScheduleResult.resultType === ScheduleResultType.TMP_NO_AVAILABLE_GPU) {
Deshui Yu's avatar
Deshui Yu committed
452
453
454
            this.log.info(`Right now no available GPU can be allocated for trial ${trialJobId}, will try to schedule later`);
            deferred.resolve(false);
        } else {
455
            deferred.reject(`Invalid schedule resutl type: ${rmScheduleResult.resultType}`);
Deshui Yu's avatar
Deshui Yu committed
456
457
458
459
460
        }

        return deferred.promise;
    }

461
    private async launchTrialOnScheduledMachine(trialJobId: string, form: TrialJobApplicationForm,
462
        rmScheduleInfo: RemoteMachineScheduleInfo): Promise<void> {
chicm-ms's avatar
chicm-ms committed
463
        const cudaVisibleDevice: string = rmScheduleInfo.cudaVisibleDevice;
464
        const executor = await this.getExecutor(trialJobId);
465
466
467
468
469
        const trialJobDetail: RemoteMachineTrialJobDetail | undefined = this.trialJobsMap.get(trialJobId);
        if (trialJobDetail === undefined) {
            throw new Error(`Can not get trial job detail for job: ${trialJobId}`);
        }

J-shang's avatar
J-shang committed
470
        const trialLocalTempFolder: string = path.join(this.expRootDir, 'trials', trialJobId);
Deshui Yu's avatar
Deshui Yu committed
471

472
        await executor.createFolder(executor.joinPath(trialJobDetail.workingDirectory, '.nni'));
Deshui Yu's avatar
Deshui Yu committed
473
474
475

        // RemoteMachineRunShellFormat is the run shell format string,
        // See definition in remoteMachineData.ts
SparkSnail's avatar
SparkSnail committed
476

477
        let cudaVisible: string;
chicm-ms's avatar
chicm-ms committed
478
479
        // Set CUDA_VISIBLE_DEVICES environment variable based on cudaVisibleDevice
        // If no valid cudaVisibleDevice is defined, set CUDA_VISIBLE_DEVICES to empty string to hide GPU device
SparkSnail's avatar
SparkSnail committed
480
        // If gpuNum is undefined, will not set CUDA_VISIBLE_DEVICES in script
481
        if (this.config.trialGpuNumber === undefined) {
482
            cudaVisible = ""
SparkSnail's avatar
SparkSnail committed
483
        } else {
chicm-ms's avatar
chicm-ms committed
484
            if (typeof cudaVisibleDevice === 'string' && cudaVisibleDevice.length > 0) {
485
                cudaVisible = `CUDA_VISIBLE_DEVICES=${cudaVisibleDevice}`;
SparkSnail's avatar
SparkSnail committed
486
            } else {
487
                cudaVisible = `CUDA_VISIBLE_DEVICES=" "`;
SparkSnail's avatar
SparkSnail committed
488
            }
SparkSnail's avatar
SparkSnail committed
489
        }
liuzhe-lz's avatar
liuzhe-lz committed
490
        const nniManagerIp: string = this.config.nniManagerIp ? this.config.nniManagerIp : await getIPV4Address();
491
        if (this.remoteRestServerPort === undefined) {
SparkSnail's avatar
SparkSnail committed
492
493
494
            const restServer: RemoteMachineJobRestServer = component.get(RemoteMachineJobRestServer);
            this.remoteRestServerPort = restServer.clusterRestServerPort;
        }
495
        const version: string = this.versionCheck ? await getVersion() : '';
496
497
        const runScriptTrialContent: string = executor.generateStartScript(
            trialJobDetail.workingDirectory,
Deshui Yu's avatar
Deshui Yu committed
498
            trialJobId,
SparkSnail's avatar
SparkSnail committed
499
            getExperimentId(),
500
            trialJobDetail.form.sequenceId.toString(),
501
502
            false,  // multi-phase
            this.config.trialCommand,
SparkSnail's avatar
SparkSnail committed
503
504
            nniManagerIp,
            this.remoteRestServerPort,
505
            version,
506
507
            this.logCollection,
            cudaVisible);
Deshui Yu's avatar
Deshui Yu committed
508
509

        //create tmp trial working folder locally.
510
        await execMkdir(path.join(trialLocalTempFolder, '.nni'));
511
512
513

        // Write install_nni.sh, it's not used in Windows platform.
        await fs.promises.writeFile(path.join(trialLocalTempFolder, executor.getScriptName("install_nni")), CONTAINER_INSTALL_NNI_SHELL_FORMAT, { encoding: 'utf8' });
514
        // Write file content ( run.sh and parameter.cfg ) to local tmp files
515
        await fs.promises.writeFile(path.join(trialLocalTempFolder, executor.getScriptName("run")), runScriptTrialContent, { encoding: 'utf8' });
chicm-ms's avatar
chicm-ms committed
516
        await this.writeParameterFile(trialJobId, form.hyperParameters);
Deshui Yu's avatar
Deshui Yu committed
517
        // Copy files in codeDir to remote working directory
518
        await executor.copyDirectoryToRemote(trialLocalTempFolder, trialJobDetail.workingDirectory);
Deshui Yu's avatar
Deshui Yu committed
519
        // Execute command in remote machine
520
        executor.executeScript(executor.joinPath(trialJobDetail.workingDirectory, executor.getScriptName("run")), true, true);
Deshui Yu's avatar
Deshui Yu committed
521
522
    }

523
    private async updateTrialJobStatus(trialJob: RemoteMachineTrialJobDetail, executor: ShellExecutor): Promise<TrialJobDetail> {
Deshui Yu's avatar
Deshui Yu committed
524
        const deferred: Deferred<TrialJobDetail> = new Deferred<TrialJobDetail>();
525
526
        const jobpidPath: string = this.getJobPidPath(executor, trialJob.id);
        const trialReturnCodeFilePath: string = executor.joinPath(executor.getRemoteExperimentRootDir(getExperimentId()), 'trials', trialJob.id, '.nni', 'code');
527
        /* eslint-disable require-atomic-updates */
Deshui Yu's avatar
Deshui Yu committed
528
        try {
529
            const isAlive = await executor.isProcessAlive(jobpidPath);
Deshui Yu's avatar
Deshui Yu committed
530
            // if the process of jobpid is not alive any more
531
532
533
534
            if (!isAlive) {
                const trialReturnCode: string = await executor.getRemoteFileContent(trialReturnCodeFilePath);
                this.log.debug(`trailjob ${trialJob.id} return code: ${trialReturnCode}`);
                const match: RegExpMatchArray | null = trialReturnCode.trim()
535
                    .match(/^-?(\d+)\s+(\d+)$/);
536
                if (match !== null) {
Deshui Yu's avatar
Deshui Yu committed
537
538
539
540
541
                    const { 1: code, 2: timestamp } = match;
                    // Update trial job's status based on result code
                    if (parseInt(code, 10) === 0) {
                        trialJob.status = 'SUCCEEDED';
                    } else {
542
543
544
545
546
547
                        // isEarlyStopped is never set, mean it's not cancelled by NNI, so if the process's exit code >0, mark it as FAILED
                        if (trialJob.isEarlyStopped === undefined) {
                            trialJob.status = 'FAILED';
                        } else {
                            trialJob.status = getJobCancelStatus(trialJob.isEarlyStopped);
                        }
Deshui Yu's avatar
Deshui Yu committed
548
                    }
549
                    trialJob.endTime = parseInt(timestamp, 10);
550
                    this.releaseTrialResource(trialJob);
Deshui Yu's avatar
Deshui Yu committed
551
                }
chicm-ms's avatar
chicm-ms committed
552
                this.log.debug(`trailJob status update: ${trialJob.id}, ${trialJob.status}`);
Deshui Yu's avatar
Deshui Yu committed
553
554
555
            }
            deferred.resolve(trialJob);
        } catch (error) {
556
            this.log.debug(`(Ignorable mostly)Update job status exception, error is ${error.message}`);
Deshui Yu's avatar
Deshui Yu committed
557
558
559
560
561
562
563
            if (error instanceof NNIError && error.name === NNIErrorNames.NOT_FOUND) {
                deferred.resolve(trialJob);
            } else {
                trialJob.status = 'UNKNOWN';
                deferred.resolve(trialJob);
            }
        }
564
        /* eslint-enable require-atomic-updates */
Deshui Yu's avatar
Deshui Yu committed
565
566
567
        return deferred.promise;
    }

chicm-ms's avatar
chicm-ms committed
568
    public get MetricsEmitter(): EventEmitter {
SparkSnail's avatar
SparkSnail committed
569
570
571
        return this.metricsEmitter;
    }

572
    private getJobPidPath(executor: ShellExecutor, jobId: string): string {
Deshui Yu's avatar
Deshui Yu committed
573
574
575
576
577
        const trialJobDetail: RemoteMachineTrialJobDetail | undefined = this.trialJobsMap.get(jobId);
        if (trialJobDetail === undefined) {
            throw new NNIError(NNIErrorNames.INVALID_JOB_DETAIL, `Invalid job detail information for trial job ${jobId}`);
        }

578
        return executor.joinPath(trialJobDetail.workingDirectory, '.nni', 'jobpid');
Deshui Yu's avatar
Deshui Yu committed
579
    }
chicm-ms's avatar
chicm-ms committed
580

chicm-ms's avatar
chicm-ms committed
581
    private async writeParameterFile(trialJobId: string, hyperParameters: HyperParameters): Promise<void> {
582
        const executor = await this.getExecutor(trialJobId);
chicm-ms's avatar
chicm-ms committed
583

584
        const trialWorkingFolder: string = executor.joinPath(executor.getRemoteExperimentRootDir(getExperimentId()), 'trials', trialJobId);
J-shang's avatar
J-shang committed
585
        const trialLocalTempFolder: string = path.join(this.expRootDir, 'trials', trialJobId);
chicm-ms's avatar
chicm-ms committed
586

587
        const fileName: string = generateParamFileName(hyperParameters);
chicm-ms's avatar
chicm-ms committed
588
589
590
        const localFilepath: string = path.join(trialLocalTempFolder, fileName);
        await fs.promises.writeFile(localFilepath, hyperParameters.value, { encoding: 'utf8' });

591
        await executor.copyFileToRemote(localFilepath, executor.joinPath(trialWorkingFolder, fileName));
chicm-ms's avatar
chicm-ms committed
592
    }
J-shang's avatar
J-shang committed
593
594
595
596
597
598
599
600

    public getTrialOutputLocalPath(_trialJobId: string): Promise<string> {
        throw new MethodNotImplementedError();
    }

    public fetchTrialOutput(_trialJobId: string, _subpath: string): Promise<void> {
        throw new MethodNotImplementedError();
    }
Deshui Yu's avatar
Deshui Yu committed
601
602
603
}

export { RemoteMachineTrainingService };