remoteMachineTrainingService.ts 27.5 KB
Newer Older
liuzhe-lz's avatar
liuzhe-lz committed
1
2
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
Deshui Yu's avatar
Deshui Yu committed
3

4
import assert from 'assert';
Deshui Yu's avatar
Deshui Yu committed
5
import { EventEmitter } from 'events';
6
7
import fs from 'fs';
import path from 'path';
8
import { ShellExecutor } from 'training_service/remote_machine/shellExecutor';
Deshui Yu's avatar
Deshui Yu committed
9
import { Deferred } from 'ts-deferred';
10
11
12
13
14
import * as component from 'common/component';
import { NNIError, NNIErrorNames, MethodNotImplementedError } from 'common/errors';
import { getExperimentId } from 'common/experimentStartupInfo';
import { getLogger, Logger } from 'common/log';
import { ObservableTimer } from 'common/observableTimer';
Deshui Yu's avatar
Deshui Yu committed
15
import {
16
    HyperParameters, TrainingService, TrialJobApplicationForm,
Yuge Zhang's avatar
Yuge Zhang committed
17
    TrialJobDetail, TrialJobMetric
18
} from 'common/trainingService';
19
import {
20
21
    delay, generateParamFileName, getExperimentRootDir, getIPV4Address, getJobCancelStatus,
    getVersion, uniqueString
22
23
} from 'common/utils';
import { ExperimentConfig, RemoteConfig, RemoteMachineConfig, flattenConfig } from 'common/experimentConfig';
24
import { CONTAINER_INSTALL_NNI_SHELL_FORMAT } from '../common/containerJobData';
25
import { GPUSummary, ScheduleResultType } from '../common/gpuData';
26
import { execMkdir, validateCodeDir } from '../common/util';
Deshui Yu's avatar
Deshui Yu committed
27
28
import { GPUScheduler } from './gpuScheduler';
import {
29
    ExecutorManager, RemoteMachineScheduleInfo, RemoteMachineScheduleResult, RemoteMachineTrialJobDetail
Deshui Yu's avatar
Deshui Yu committed
30
} from './remoteMachineData';
SparkSnail's avatar
SparkSnail committed
31
import { RemoteMachineJobRestServer } from './remoteMachineJobRestServer';
Deshui Yu's avatar
Deshui Yu committed
32

33
34
interface FlattenRemoteConfig extends ExperimentConfig, RemoteConfig { }

Deshui Yu's avatar
Deshui Yu committed
35
36
37
/**
 * Training Service implementation for Remote Machine (Linux)
 */
SparkSnail's avatar
SparkSnail committed
38
@component.Singleton
Deshui Yu's avatar
Deshui Yu committed
39
class RemoteMachineTrainingService implements TrainingService {
40
    private readonly initExecutorId = "initConnection";
41
42
    private readonly machineExecutorManagerMap: Map<RemoteMachineConfig, ExecutorManager>; //machine excutor map
    private readonly machineCopyExpCodeDirPromiseMap: Map<RemoteMachineConfig, Promise<void>>;
43
    private readonly trialExecutorManagerMap: Map<string, ExecutorManager>; //trial excutor map
44
45
    private readonly trialJobsMap: Map<string, RemoteMachineTrialJobDetail>;
    private readonly expRootDir: string;
46
    private gpuScheduler?: GPUScheduler;
47
48
    private readonly jobQueue: string[];
    private readonly timer: ObservableTimer;
Deshui Yu's avatar
Deshui Yu committed
49
    private stopping: boolean = false;
50
51
    private readonly metricsEmitter: EventEmitter;
    private readonly log: Logger;
SparkSnail's avatar
SparkSnail committed
52
    private remoteRestServerPort?: number;
53
    private versionCheck: boolean = true;
54
    private logCollection: string = 'none';
55
    private sshConnectionPromises: any[];
56
    private config: FlattenRemoteConfig;
Deshui Yu's avatar
Deshui Yu committed
57

58
    constructor(config: ExperimentConfig) {
Deshui Yu's avatar
Deshui Yu committed
59
60
        this.metricsEmitter = new EventEmitter();
        this.trialJobsMap = new Map<string, RemoteMachineTrialJobDetail>();
61
        this.trialExecutorManagerMap = new Map<string, ExecutorManager>();
62
63
        this.machineCopyExpCodeDirPromiseMap = new Map<RemoteMachineConfig, Promise<void>>();
        this.machineExecutorManagerMap = new Map<RemoteMachineConfig, ExecutorManager>();
Deshui Yu's avatar
Deshui Yu committed
64
        this.jobQueue = [];
65
        this.sshConnectionPromises = [];
Deshui Yu's avatar
Deshui Yu committed
66
        this.expRootDir = getExperimentRootDir();
67
        this.timer = component.get(ObservableTimer);
liuzhe-lz's avatar
liuzhe-lz committed
68
        this.log = getLogger('RemoteMachineTrainingService');
chicm-ms's avatar
chicm-ms committed
69
        this.log.info('Construct remote machine training service.');
70
71
72
73
74
75
76
77
78
79
        this.config = flattenConfig(config, 'remote');

        if (!fs.lstatSync(this.config.trialCodeDirectory).isDirectory()) {
            throw new Error(`codeDir ${this.config.trialCodeDirectory} is not a directory`);
        }
        validateCodeDir(this.config.trialCodeDirectory);

        this.sshConnectionPromises = this.config.machineList.map(
            machine => this.initRemoteMachineOnConnected(machine)
        );
Deshui Yu's avatar
Deshui Yu committed
80
81
82
83
84
85
    }

    /**
     * Loop to launch trial jobs and collect trial metrics
     */
    public async run(): Promise<void> {
86
        const restServer = new RemoteMachineJobRestServer(this);
SparkSnail's avatar
SparkSnail committed
87
        await restServer.start();
88
        restServer.setEnableVersionCheck = this.versionCheck;
chicm-ms's avatar
chicm-ms committed
89
        this.log.info('Run remote machine training service.');
90
91
92
93
94
        if (this.sshConnectionPromises.length > 0) {
            await Promise.all(this.sshConnectionPromises);
            this.log.info('ssh connection initialized!');
            // set sshConnectionPromises to [] to avoid log information duplicated
            this.sshConnectionPromises = [];
95
96
            // initialize gpuScheduler
            this.gpuScheduler = new GPUScheduler(this.machineExecutorManagerMap);
SparkSnail's avatar
SparkSnail committed
97
            // Copy codeDir to remote machine
98
            for (const [machineConfig, executorManager] of this.machineExecutorManagerMap.entries()) {
SparkSnail's avatar
SparkSnail committed
99
100
101
                const executor: ShellExecutor = await executorManager.getExecutor(this.initExecutorId);
                if (executor !== undefined) {
                    this.machineCopyExpCodeDirPromiseMap.set(
102
103
                        machineConfig,
                        executor.copyDirectoryToRemote(this.config.trialCodeDirectory, executor.getRemoteCodePath(getExperimentId()))
SparkSnail's avatar
SparkSnail committed
104
105
106
                    );
                }
            }
107
        }
Deshui Yu's avatar
Deshui Yu committed
108
109
        while (!this.stopping) {
            while (this.jobQueue.length > 0) {
SparkSnail's avatar
SparkSnail committed
110
                this.updateGpuReservation();
Deshui Yu's avatar
Deshui Yu committed
111
                const trialJobId: string = this.jobQueue[0];
chicm-ms's avatar
chicm-ms committed
112
                const prepareResult: boolean = await this.prepareTrialJob(trialJobId);
Deshui Yu's avatar
Deshui Yu committed
113
114
115
116
                if (prepareResult) {
                    // Remove trial job with trialJobId from job queue
                    this.jobQueue.shift();
                } else {
117
                    // Break the while loop since no GPU resource is available right now,
Deshui Yu's avatar
Deshui Yu committed
118
119
120
                    // Wait to schedule job in next time iteration
                    break;
                }
121
            }
122
            if (restServer.getErrorMessage !== undefined) {
123
                this.stopping = true;
124
                throw new Error(restServer.getErrorMessage);
125
            }
Deshui Yu's avatar
Deshui Yu committed
126
127
            await delay(3000);
        }
128
        this.log.info('RemoteMachineTrainingService run loop exited.');
Deshui Yu's avatar
Deshui Yu committed
129
    }
130

SparkSnail's avatar
SparkSnail committed
131
    /**
132
     * give trial an executor
133
     * @param trial remote machine trial job detail
SparkSnail's avatar
SparkSnail committed
134
     */
135
    public allocateExecutorManagerForTrial(trial: RemoteMachineTrialJobDetail): void {
136
        if (trial.rmMeta === undefined) {
SparkSnail's avatar
SparkSnail committed
137
138
            throw new Error(`rmMeta not set in trial ${trial.id}`);
        }
139
        const executorManager: ExecutorManager | undefined = this.machineExecutorManagerMap.get(trial.rmMeta.config);
140
141
        if (executorManager === undefined) {
            throw new Error(`executorManager not initialized`);
SparkSnail's avatar
SparkSnail committed
142
        }
143
        this.trialExecutorManagerMap.set(trial.id, executorManager);
SparkSnail's avatar
SparkSnail committed
144
    }
145

SparkSnail's avatar
SparkSnail committed
146
147
    /**
     * If a trial is finished, release the connection resource
148
     * @param trial remote machine trial job detail
SparkSnail's avatar
SparkSnail committed
149
     */
150
    public releaseTrialResource(trial: RemoteMachineTrialJobDetail): void {
151
        if (trial.rmMeta === undefined) {
SparkSnail's avatar
SparkSnail committed
152
153
            throw new Error(`rmMeta not set in trial ${trial.id}`);
        }
154
        const executorManager = this.trialExecutorManagerMap.get(trial.id);
155
        if (executorManager === undefined) {
156
            throw new Error(`ExecutorManager is not assigned for trial ${trial.id}`);
SparkSnail's avatar
SparkSnail committed
157
        }
158
159
        // Note, it still keep reference in trialExecutorManagerMap, as there may be following requests from nni manager.
        executorManager.releaseExecutor(trial.id);
SparkSnail's avatar
SparkSnail committed
160
    }
Deshui Yu's avatar
Deshui Yu committed
161
162
163
164

    /**
     * List submitted trial jobs
     */
165
    public async listTrialJobs(): Promise<TrialJobDetail[]> {
Deshui Yu's avatar
Deshui Yu committed
166
167
168
        const jobs: TrialJobDetail[] = [];
        const deferred: Deferred<TrialJobDetail[]> = new Deferred<TrialJobDetail[]>();

169
        for (const [key,] of this.trialJobsMap) {
170
            jobs.push(await this.getTrialJob(key));
171
        }
Deshui Yu's avatar
Deshui Yu committed
172
173
174
175
176
177
178
179
180
181
182
        deferred.resolve(jobs);

        return deferred.promise;
    }

    /**
     * Get trial job detail information
     * @param trialJobId ID of trial job
     */
    public async getTrialJob(trialJobId: string): Promise<TrialJobDetail> {
        const trialJob: RemoteMachineTrialJobDetail | undefined = this.trialJobsMap.get(trialJobId);
183
        if (trialJob === undefined) {
Deshui Yu's avatar
Deshui Yu committed
184
185
186
187
            throw new NNIError(NNIErrorNames.NOT_FOUND, `trial job id ${trialJobId} not found`);
        }
        //TO DO: add another job status, and design new job status change logic
        if (trialJob.status === 'RUNNING' || trialJob.status === 'UNKNOWN') {
188
            // Get executor where the job is running
Deshui Yu's avatar
Deshui Yu committed
189
190
191
            if (trialJob.rmMeta === undefined) {
                throw new Error(`rmMeta not set for submitted job ${trialJobId}`);
            }
192
            const executor = await this.getExecutor(trialJob.id);
Deshui Yu's avatar
Deshui Yu committed
193

194
            return this.updateTrialJobStatus(trialJob, executor);
Deshui Yu's avatar
Deshui Yu committed
195
196
197
198
199
        } else {
            return trialJob;
        }
    }

200
201
202
203
204
    /**
     * Get trial job log
     * @param _trialJobId ID of trial job
     * @param _logType 'TRIAL_LOG' | 'TRIAL_STDERR'
     */
Yuge Zhang's avatar
Yuge Zhang committed
205
    public async getTrialFile(_trialJobId: string, _fileName: string): Promise<string | Buffer> {
206
207
208
        throw new MethodNotImplementedError();
    }

Deshui Yu's avatar
Deshui Yu committed
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
    /**
     * Add job metrics listener
     * @param listener callback listener
     */
    public addTrialJobMetricListener(listener: (metric: TrialJobMetric) => void): void {
        this.metricsEmitter.on('metric', listener);
    }

    /**
     * Remove job metrics listener
     * @param listener callback listener
     */
    public removeTrialJobMetricListener(listener: (metric: TrialJobMetric) => void): void {
        this.metricsEmitter.off('metric', listener);
    }

    /**
     * Submit trial job
     * @param form trial job description form
     */
229
230
231
    public async submitTrialJob(form: TrialJobApplicationForm): Promise<TrialJobDetail> {
        // Generate trial job id(random)
        const trialJobId: string = uniqueString(5);
Deshui Yu's avatar
Deshui Yu committed
232

233
234
235
236
        const trialJobDetail: RemoteMachineTrialJobDetail = new RemoteMachineTrialJobDetail(
            trialJobId,
            'WAITING',
            Date.now(),
237
            "unset",
238
239
240
241
            form
        );
        this.jobQueue.push(trialJobId);
        this.trialJobsMap.set(trialJobId, trialJobDetail);
242

243
        return Promise.resolve(trialJobDetail);
Deshui Yu's avatar
Deshui Yu committed
244
245
    }

246
247
248
249
250
    /**
     * Update trial job for multi-phase
     * @param trialJobId trial job id
     * @param form job application form
     */
251
    public async updateTrialJob(trialJobId: string, form: TrialJobApplicationForm): Promise<TrialJobDetail> {
chicm-ms's avatar
chicm-ms committed
252
253
254
255
        const trialJobDetail: undefined | TrialJobDetail = this.trialJobsMap.get(trialJobId);
        if (trialJobDetail === undefined) {
            throw new Error(`updateTrialJob failed: ${trialJobId} not found`);
        }
chicm-ms's avatar
chicm-ms committed
256
        await this.writeParameterFile(trialJobId, form.hyperParameters);
chicm-ms's avatar
chicm-ms committed
257
258

        return trialJobDetail;
259
    }
260

Deshui Yu's avatar
Deshui Yu committed
261
262
263
264
    /**
     * Cancel trial job
     * @param trialJobId ID of trial job
     */
QuanluZhang's avatar
QuanluZhang committed
265
    public async cancelTrialJob(trialJobId: string, isEarlyStopped: boolean = false): Promise<void> {
Deshui Yu's avatar
Deshui Yu committed
266
        const trialJob: RemoteMachineTrialJobDetail | undefined = this.trialJobsMap.get(trialJobId);
267
        if (trialJob === undefined) {
Deshui Yu's avatar
Deshui Yu committed
268
269
270
271
            throw new Error(`trial job id ${trialJobId} not found`);
        }

        // Remove the job with trialJobId from job queue
chicm-ms's avatar
chicm-ms committed
272
        const index: number = this.jobQueue.indexOf(trialJobId);
273
        if (index >= 0) {
Deshui Yu's avatar
Deshui Yu committed
274
275
276
            this.jobQueue.splice(index, 1);
        }

277
        // Get executor where the job is running
Deshui Yu's avatar
Deshui Yu committed
278
279
        if (trialJob.rmMeta !== undefined) {
            // If the trial job is already scheduled, check its status and kill the trial process in remote machine
280
            const executor = await this.getExecutor(trialJob.id);
Deshui Yu's avatar
Deshui Yu committed
281

chicm-ms's avatar
chicm-ms committed
282
283
            if (trialJob.status === 'UNKNOWN') {
                trialJob.status = 'USER_CANCELED';
284
                this.releaseTrialResource(trialJob);
chicm-ms's avatar
chicm-ms committed
285
286
287
                return
            }

288
            const jobpidPath: string = this.getJobPidPath(executor, trialJob.id);
Deshui Yu's avatar
Deshui Yu committed
289
            try {
290
291
                // Mark the toEarlyStop tag here
                trialJob.isEarlyStopped = isEarlyStopped;
292
                await executor.killChildProcesses(jobpidPath);
293
                this.releaseTrialResource(trialJob);
Deshui Yu's avatar
Deshui Yu committed
294
295
            } catch (error) {
                // Not handle the error since pkill failed will not impact trial job's current status
296
                this.log.error(`remoteTrainingService.cancelTrialJob: ${error}`);
Deshui Yu's avatar
Deshui Yu committed
297
298
299
            }
        } else {
            // Job is not scheduled yet, set status to 'USER_CANCELLED' directly
QuanluZhang's avatar
QuanluZhang committed
300
301
            assert(isEarlyStopped === false, 'isEarlyStopped is not supposed to be true here.');
            trialJob.status = getJobCancelStatus(isEarlyStopped);
Deshui Yu's avatar
Deshui Yu committed
302
303
304
        }
    }

305
306
    public async setClusterMetadata(_key: string, _value: string): Promise<void> { return; }
    public async getClusterMetadata(_key: string): Promise<string> { return ''; }
307

SparkSnail's avatar
SparkSnail committed
308
    /**
309
     * cleanup() has a time out of 10s to clean remote connections
SparkSnail's avatar
SparkSnail committed
310
311
     */
    public async cleanUp(): Promise<void> {
chicm-ms's avatar
chicm-ms committed
312
        this.log.info('Stopping remote machine training service...');
Deshui Yu's avatar
Deshui Yu committed
313
        this.stopping = true;
314
315
316
317
318
319
320
321
322
        await this.cleanupConnections();
    }

    private async getExecutor(trialId: string): Promise<ShellExecutor> {
        const executorManager = this.trialExecutorManagerMap.get(trialId);
        if (executorManager === undefined) {
            throw new Error(`ExecutorManager is not assigned for trial ${trialId}`);
        }
        return await executorManager.getExecutor(trialId);
SparkSnail's avatar
SparkSnail committed
323
    }
324

325
326
327
328
    /**
     * remove gpu reversion when job is not running
     */
    private updateGpuReservation(): void {
329
330
331
332
333
        if (this.gpuScheduler) {
            for (const [key, value] of this.trialJobsMap) {
                if (!['WAITING', 'RUNNING'].includes(value.status)) {
                    this.gpuScheduler.removeGpuReservation(key, this.trialJobsMap);
                }
334
335
336
337
            }
        }
    }

SparkSnail's avatar
SparkSnail committed
338
339
340
341
    /**
     * stop gpu_metric_collector process in remote machine and remove unused scripts
     */
    private async cleanupConnections(): Promise<void> {
342
        try {
343
344
            for (const executorManager of this.machineExecutorManagerMap.values()) {
                const executor = await executorManager.getExecutor(this.initExecutorId);
345
                if (executor !== undefined) {
346
347
                    this.log.info(`killing gpu metric collector on ${executor.name}`);
                    const gpuJobPidPath: string = executor.joinPath(executor.getRemoteScriptsPath(getExperimentId()), 'pid');
348
                    await executor.killChildProcesses(gpuJobPidPath, true);
SparkSnail's avatar
SparkSnail committed
349
                }
350
                executorManager.releaseAllExecutor();
SparkSnail's avatar
SparkSnail committed
351
            }
352
        } catch (error) {
SparkSnail's avatar
SparkSnail committed
353
            //ignore error, this function is called to cleanup remote connections when experiment is stopping
354
            this.log.error(`Cleanup connection exception, error is ${error}`);
SparkSnail's avatar
SparkSnail committed
355
        }
356
357
    }

358
359
360
    private async initRemoteMachineOnConnected(machineConfig: RemoteMachineConfig): Promise<void> {
        const executorManager: ExecutorManager = new ExecutorManager(machineConfig);
        this.log.info(`connecting to ${machineConfig.user}@${machineConfig.host}:${machineConfig.port}`);
361
362
        const executor: ShellExecutor = await executorManager.getExecutor(this.initExecutorId);
        this.log.debug(`reached ${executor.name}`);
363
        this.machineExecutorManagerMap.set(machineConfig, executorManager);
364
365
        this.log.debug(`initializing ${executor.name}`);

366
        // Create root working directory after executor is ready
367
368
        const nniRootDir: string = executor.joinPath(executor.getTempPath(), 'nni');
        await executor.createFolder(executor.getRemoteExperimentRootDir(getExperimentId()));
369

370
        // the directory to store temp scripts in remote machine
371
372
373
        const remoteGpuScriptCollectorDir: string = executor.getRemoteScriptsPath(getExperimentId());

        // clean up previous result.
374
        await executor.createFolder(remoteGpuScriptCollectorDir, true);
Junwei Sun's avatar
Junwei Sun committed
375
        await executor.allowPermission(true, nniRootDir);
376

Deshui Yu's avatar
Deshui Yu committed
377
        //Begin to execute gpu_metrics_collection scripts
378
        const script = executor.generateGpuStatsScript(getExperimentId());
379
        executor.executeScript(script, false, true);
380
381
382
        // the timer is trigger in 1 second, it causes multiple runs on server.
        // So reduce it's freqeunce, only allow one of it run.
        const collectingCount: boolean[] = [];
383

384
        const disposable: Rx.IDisposable = this.timer.subscribe(
385
            async () => {
386
387
388
389
                if (collectingCount.length == 0) {
                    collectingCount.push(true);
                    const cmdresult = await executor.readLastLines(executor.joinPath(remoteGpuScriptCollectorDir, 'gpu_metrics'));
                    if (cmdresult !== "") {
390
391
392
                        executorManager.rmMeta.gpuSummary = <GPUSummary>JSON.parse(cmdresult);
                        if (executorManager.rmMeta.gpuSummary.gpuCount === 0) {
                            this.log.warning(`No GPU found on remote machine ${machineConfig.host}`);
393
394
                            this.timer.unsubscribe(disposable);
                        }
395
                    }
396
                    if (this.stopping) {
397
                        this.timer.unsubscribe(disposable);
398
                        this.log.debug(`Stopped GPU collector on ${machineConfig.host}, since experiment is exiting.`);
399
                    }
400
                    collectingCount.pop();
Deshui Yu's avatar
Deshui Yu committed
401
402
403
404
405
406
                }
            }
        );
    }

    private async prepareTrialJob(trialJobId: string): Promise<boolean> {
chicm-ms's avatar
chicm-ms committed
407
        const deferred: Deferred<boolean> = new Deferred<boolean>();
Deshui Yu's avatar
Deshui Yu committed
408

409
410
411
        if (this.gpuScheduler === undefined) {
            throw new Error('gpuScheduler is not initialized');
        }
Deshui Yu's avatar
Deshui Yu committed
412
413
414
415
        const trialJobDetail: RemoteMachineTrialJobDetail | undefined = this.trialJobsMap.get(trialJobId);
        if (trialJobDetail === undefined) {
            throw new NNIError(NNIErrorNames.INVALID_JOB_DETAIL, `Invalid job detail information for trial job ${trialJobId}`);
        }
416
417
418
        // If job is not WATIING, Don't prepare and resolve true immediately
        if (trialJobDetail.status !== 'WAITING') {
            deferred.resolve(true);
419

420
421
            return deferred.promise;
        }
422
        // get an executor from scheduler
423
        const rmScheduleResult: RemoteMachineScheduleResult = this.gpuScheduler.scheduleMachine(this.config.trialGpuNumber, trialJobDetail);
Deshui Yu's avatar
Deshui Yu committed
424
        if (rmScheduleResult.resultType === ScheduleResultType.REQUIRE_EXCEED_TOTAL) {
425
            const errorMessage: string = `Required GPU number ${this.config.trialGpuNumber} is too large, no machine can meet`;
Deshui Yu's avatar
Deshui Yu committed
426
427
428
            this.log.error(errorMessage);
            deferred.reject();
            throw new NNIError(NNIErrorNames.RESOURCE_NOT_AVAILABLE, errorMessage);
429
        } else if (rmScheduleResult.resultType === ScheduleResultType.SUCCEED
Deshui Yu's avatar
Deshui Yu committed
430
            && rmScheduleResult.scheduleInfo !== undefined) {
chicm-ms's avatar
chicm-ms committed
431
            const rmScheduleInfo: RemoteMachineScheduleInfo = rmScheduleResult.scheduleInfo;
SparkSnail's avatar
SparkSnail committed
432
433

            trialJobDetail.rmMeta = rmScheduleInfo.rmMeta;
434
            const copyExpCodeDirPromise = this.machineCopyExpCodeDirPromiseMap.get(rmScheduleInfo.rmMeta.config);
435
436
437
            if (copyExpCodeDirPromise !== undefined) {
                await copyExpCodeDirPromise;
            }
SparkSnail's avatar
SparkSnail committed
438

439
440
441
442
443
            this.allocateExecutorManagerForTrial(trialJobDetail);
            const executor = await this.getExecutor(trialJobDetail.id);

            trialJobDetail.workingDirectory = executor.joinPath(executor.getRemoteExperimentRootDir(getExperimentId()), 'trials', trialJobDetail.id);

Deshui Yu's avatar
Deshui Yu committed
444
            await this.launchTrialOnScheduledMachine(
445
                trialJobId, trialJobDetail.form, rmScheduleInfo);
Deshui Yu's avatar
Deshui Yu committed
446
447

            trialJobDetail.status = 'RUNNING';
448
            trialJobDetail.url = `file://${rmScheduleInfo.rmMeta.config.host}:${trialJobDetail.workingDirectory}`;
449
            trialJobDetail.startTime = Date.now();
Deshui Yu's avatar
Deshui Yu committed
450

451
            this.trialJobsMap.set(trialJobId, trialJobDetail);
Deshui Yu's avatar
Deshui Yu committed
452
            deferred.resolve(true);
453
        } else if (rmScheduleResult.resultType === ScheduleResultType.TMP_NO_AVAILABLE_GPU) {
Deshui Yu's avatar
Deshui Yu committed
454
455
456
            this.log.info(`Right now no available GPU can be allocated for trial ${trialJobId}, will try to schedule later`);
            deferred.resolve(false);
        } else {
457
            deferred.reject(`Invalid schedule resutl type: ${rmScheduleResult.resultType}`);
Deshui Yu's avatar
Deshui Yu committed
458
459
460
461
462
        }

        return deferred.promise;
    }

463
    private async launchTrialOnScheduledMachine(trialJobId: string, form: TrialJobApplicationForm,
464
        rmScheduleInfo: RemoteMachineScheduleInfo): Promise<void> {
chicm-ms's avatar
chicm-ms committed
465
        const cudaVisibleDevice: string = rmScheduleInfo.cudaVisibleDevice;
466
        const executor = await this.getExecutor(trialJobId);
467
468
469
470
471
        const trialJobDetail: RemoteMachineTrialJobDetail | undefined = this.trialJobsMap.get(trialJobId);
        if (trialJobDetail === undefined) {
            throw new Error(`Can not get trial job detail for job: ${trialJobId}`);
        }

Deshui Yu's avatar
Deshui Yu committed
472
473
        const trialLocalTempFolder: string = path.join(this.expRootDir, 'trials-local', trialJobId);

474
        await executor.createFolder(executor.joinPath(trialJobDetail.workingDirectory, '.nni'));
Deshui Yu's avatar
Deshui Yu committed
475
476
477

        // RemoteMachineRunShellFormat is the run shell format string,
        // See definition in remoteMachineData.ts
SparkSnail's avatar
SparkSnail committed
478

479
        let cudaVisible: string;
chicm-ms's avatar
chicm-ms committed
480
481
        // Set CUDA_VISIBLE_DEVICES environment variable based on cudaVisibleDevice
        // If no valid cudaVisibleDevice is defined, set CUDA_VISIBLE_DEVICES to empty string to hide GPU device
SparkSnail's avatar
SparkSnail committed
482
        // If gpuNum is undefined, will not set CUDA_VISIBLE_DEVICES in script
483
        if (this.config.trialGpuNumber === undefined) {
484
            cudaVisible = ""
SparkSnail's avatar
SparkSnail committed
485
        } else {
chicm-ms's avatar
chicm-ms committed
486
            if (typeof cudaVisibleDevice === 'string' && cudaVisibleDevice.length > 0) {
487
                cudaVisible = `CUDA_VISIBLE_DEVICES=${cudaVisibleDevice}`;
SparkSnail's avatar
SparkSnail committed
488
            } else {
489
                cudaVisible = `CUDA_VISIBLE_DEVICES=" "`;
SparkSnail's avatar
SparkSnail committed
490
            }
SparkSnail's avatar
SparkSnail committed
491
        }
liuzhe-lz's avatar
liuzhe-lz committed
492
        const nniManagerIp: string = this.config.nniManagerIp ? this.config.nniManagerIp : await getIPV4Address();
493
        if (this.remoteRestServerPort === undefined) {
SparkSnail's avatar
SparkSnail committed
494
495
496
            const restServer: RemoteMachineJobRestServer = component.get(RemoteMachineJobRestServer);
            this.remoteRestServerPort = restServer.clusterRestServerPort;
        }
497
        const version: string = this.versionCheck ? await getVersion() : '';
498
499
        const runScriptTrialContent: string = executor.generateStartScript(
            trialJobDetail.workingDirectory,
Deshui Yu's avatar
Deshui Yu committed
500
            trialJobId,
SparkSnail's avatar
SparkSnail committed
501
            getExperimentId(),
502
            trialJobDetail.form.sequenceId.toString(),
503
504
            false,  // multi-phase
            this.config.trialCommand,
SparkSnail's avatar
SparkSnail committed
505
506
            nniManagerIp,
            this.remoteRestServerPort,
507
            version,
508
509
            this.logCollection,
            cudaVisible);
Deshui Yu's avatar
Deshui Yu committed
510
511

        //create tmp trial working folder locally.
512
        await execMkdir(path.join(trialLocalTempFolder, '.nni'));
513
514
515

        // Write install_nni.sh, it's not used in Windows platform.
        await fs.promises.writeFile(path.join(trialLocalTempFolder, executor.getScriptName("install_nni")), CONTAINER_INSTALL_NNI_SHELL_FORMAT, { encoding: 'utf8' });
516
        // Write file content ( run.sh and parameter.cfg ) to local tmp files
517
        await fs.promises.writeFile(path.join(trialLocalTempFolder, executor.getScriptName("run")), runScriptTrialContent, { encoding: 'utf8' });
chicm-ms's avatar
chicm-ms committed
518
        await this.writeParameterFile(trialJobId, form.hyperParameters);
Deshui Yu's avatar
Deshui Yu committed
519
        // Copy files in codeDir to remote working directory
520
        await executor.copyDirectoryToRemote(trialLocalTempFolder, trialJobDetail.workingDirectory);
Deshui Yu's avatar
Deshui Yu committed
521
        // Execute command in remote machine
522
        executor.executeScript(executor.joinPath(trialJobDetail.workingDirectory, executor.getScriptName("run")), true, true);
Deshui Yu's avatar
Deshui Yu committed
523
524
    }

525
    private async updateTrialJobStatus(trialJob: RemoteMachineTrialJobDetail, executor: ShellExecutor): Promise<TrialJobDetail> {
Deshui Yu's avatar
Deshui Yu committed
526
        const deferred: Deferred<TrialJobDetail> = new Deferred<TrialJobDetail>();
527
528
        const jobpidPath: string = this.getJobPidPath(executor, trialJob.id);
        const trialReturnCodeFilePath: string = executor.joinPath(executor.getRemoteExperimentRootDir(getExperimentId()), 'trials', trialJob.id, '.nni', 'code');
529
        /* eslint-disable require-atomic-updates */
Deshui Yu's avatar
Deshui Yu committed
530
        try {
531
            const isAlive = await executor.isProcessAlive(jobpidPath);
Deshui Yu's avatar
Deshui Yu committed
532
            // if the process of jobpid is not alive any more
533
534
535
536
            if (!isAlive) {
                const trialReturnCode: string = await executor.getRemoteFileContent(trialReturnCodeFilePath);
                this.log.debug(`trailjob ${trialJob.id} return code: ${trialReturnCode}`);
                const match: RegExpMatchArray | null = trialReturnCode.trim()
537
                    .match(/^-?(\d+)\s+(\d+)$/);
538
                if (match !== null) {
Deshui Yu's avatar
Deshui Yu committed
539
540
541
542
543
                    const { 1: code, 2: timestamp } = match;
                    // Update trial job's status based on result code
                    if (parseInt(code, 10) === 0) {
                        trialJob.status = 'SUCCEEDED';
                    } else {
544
545
546
547
548
549
                        // isEarlyStopped is never set, mean it's not cancelled by NNI, so if the process's exit code >0, mark it as FAILED
                        if (trialJob.isEarlyStopped === undefined) {
                            trialJob.status = 'FAILED';
                        } else {
                            trialJob.status = getJobCancelStatus(trialJob.isEarlyStopped);
                        }
Deshui Yu's avatar
Deshui Yu committed
550
                    }
551
                    trialJob.endTime = parseInt(timestamp, 10);
552
                    this.releaseTrialResource(trialJob);
Deshui Yu's avatar
Deshui Yu committed
553
                }
chicm-ms's avatar
chicm-ms committed
554
                this.log.debug(`trailJob status update: ${trialJob.id}, ${trialJob.status}`);
Deshui Yu's avatar
Deshui Yu committed
555
556
557
            }
            deferred.resolve(trialJob);
        } catch (error) {
558
            this.log.debug(`(Ignorable mostly)Update job status exception, error is ${error.message}`);
Deshui Yu's avatar
Deshui Yu committed
559
560
561
562
563
564
565
            if (error instanceof NNIError && error.name === NNIErrorNames.NOT_FOUND) {
                deferred.resolve(trialJob);
            } else {
                trialJob.status = 'UNKNOWN';
                deferred.resolve(trialJob);
            }
        }
566
        /* eslint-enable require-atomic-updates */
Deshui Yu's avatar
Deshui Yu committed
567
568
569
        return deferred.promise;
    }

chicm-ms's avatar
chicm-ms committed
570
    public get MetricsEmitter(): EventEmitter {
SparkSnail's avatar
SparkSnail committed
571
572
573
        return this.metricsEmitter;
    }

574
    private getJobPidPath(executor: ShellExecutor, jobId: string): string {
Deshui Yu's avatar
Deshui Yu committed
575
576
577
578
579
        const trialJobDetail: RemoteMachineTrialJobDetail | undefined = this.trialJobsMap.get(jobId);
        if (trialJobDetail === undefined) {
            throw new NNIError(NNIErrorNames.INVALID_JOB_DETAIL, `Invalid job detail information for trial job ${jobId}`);
        }

580
        return executor.joinPath(trialJobDetail.workingDirectory, '.nni', 'jobpid');
Deshui Yu's avatar
Deshui Yu committed
581
    }
chicm-ms's avatar
chicm-ms committed
582

chicm-ms's avatar
chicm-ms committed
583
    private async writeParameterFile(trialJobId: string, hyperParameters: HyperParameters): Promise<void> {
584
        const executor = await this.getExecutor(trialJobId);
chicm-ms's avatar
chicm-ms committed
585

586
        const trialWorkingFolder: string = executor.joinPath(executor.getRemoteExperimentRootDir(getExperimentId()), 'trials', trialJobId);
chicm-ms's avatar
chicm-ms committed
587
588
        const trialLocalTempFolder: string = path.join(this.expRootDir, 'trials-local', trialJobId);

589
        const fileName: string = generateParamFileName(hyperParameters);
chicm-ms's avatar
chicm-ms committed
590
591
592
        const localFilepath: string = path.join(trialLocalTempFolder, fileName);
        await fs.promises.writeFile(localFilepath, hyperParameters.value, { encoding: 'utf8' });

593
        await executor.copyFileToRemote(localFilepath, executor.joinPath(trialWorkingFolder, fileName));
chicm-ms's avatar
chicm-ms committed
594
    }
J-shang's avatar
J-shang committed
595
596
597
598
599
600
601
602

    public getTrialOutputLocalPath(_trialJobId: string): Promise<string> {
        throw new MethodNotImplementedError();
    }

    public fetchTrialOutput(_trialJobId: string, _subpath: string): Promise<void> {
        throw new MethodNotImplementedError();
    }
Deshui Yu's avatar
Deshui Yu committed
603
604
605
}

export { RemoteMachineTrainingService };